package data import ( "context" "errors" "ky-go-kratos/app/websocket/internal/biz" "ky-go-kratos/app/websocket/internal/conf" "ky-go-kratos/app/websocket/internal/websocket" "sync" "time" "github.com/go-kratos/kratos/v2/log" "github.com/google/wire" ) // ProviderSet is data providers. var ProviderSet = wire.NewSet(NewData, NewSocketRepo) // Data . type Data struct { // TODO wrapped database client log *log.Helper // 在线用户状态 onlineUsers sync.Map // map[string]*UserStatus } // UserStatus 用户在线状态 type UserStatus struct { UserID string // 用户ID RoomID string // 房间ID LastSeen time.Time // 最后活跃时间 Connected bool // 是否在线 } // NewData . func NewData(c *conf.Data, logger log.Logger) (*Data, func(), error) { cleanup := func() { log.NewHelper(logger).Info("closing the data resources") } return &Data{ log: log.NewHelper(logger), }, cleanup, nil } type socketRepo struct { data *Data } // NewSocketRepo . func NewSocketRepo(data *Data) biz.SocketRepo { return &socketRepo{data: data} } // SendMessage implements biz.SocketRepo func (r *socketRepo) SendMessage(ctx context.Context, to []string, from []string, message []string) error { // 获取hub实例 hub := websocket.GetHub() if hub == nil { return errors.New("hub not initialized") } // 创建WebSocket消息 wsMsg := &websocket.WSMessage{ Type: "direct", Content: message[0], // 目前只处理第一条消息 From: from[0], // 目前只处理第一个发送者 Time: time.Now(), } // 发送消息给每个接收者 for _, receiver := range to { hub.SendDirectMessage(from[0], receiver, wsMsg) } r.data.log.WithContext(ctx).Infof("SendMessage: to=%v, from=%v, message=%v", to, from, message) return nil } // IsOnline implements biz.SocketRepo func (r *socketRepo) IsOnline(ctx context.Context, userID string) (bool, error) { if status, ok := r.data.onlineUsers.Load(userID); ok { userStatus := status.(*UserStatus) return userStatus.Connected, nil } return false, nil } // UpdateUserStatus 更新用户在线状态 func (r *socketRepo) UpdateUserStatus(ctx context.Context, userID string, roomID string, connected bool) error { status := &UserStatus{ UserID: userID, RoomID: roomID, LastSeen: time.Now(), Connected: connected, } r.data.onlineUsers.Store(userID, status) r.data.log.WithContext(ctx).Infof("UpdateUserStatus: userID=%s, roomID=%s, connected=%v", userID, roomID, connected) return nil } // GetOnlineUsers 获取在线用户列表 func (r *socketRepo) GetOnlineUsers(ctx context.Context, roomID string) ([]string, error) { var users []string r.data.onlineUsers.Range(func(key, value interface{}) bool { status := value.(*UserStatus) if status.Connected && status.RoomID == roomID { users = append(users, status.UserID) } return true }) return users, nil } // CleanupInactiveUsers 清理不活跃的用户 func (r *socketRepo) CleanupInactiveUsers(ctx context.Context, timeout time.Duration) error { now := time.Now() r.data.onlineUsers.Range(func(key, value interface{}) bool { status := value.(*UserStatus) if status.Connected && now.Sub(status.LastSeen) > timeout { status.Connected = false r.data.onlineUsers.Store(key, status) r.data.log.WithContext(ctx).Infof("CleanupInactiveUsers: userID=%s, lastSeen=%v", status.UserID, status.LastSeen) } return true }) return nil }