go-kratos/app/websocket/internal/data/data.go

128 lines
3.4 KiB
Go

package data
import (
"context"
"errors"
"ky-go-kratos/app/websocket/internal/biz"
"ky-go-kratos/app/websocket/internal/conf"
"ky-go-kratos/app/websocket/internal/websocket"
"sync"
"time"
"github.com/go-kratos/kratos/v2/log"
"github.com/google/wire"
)
// ProviderSet is data providers.
var ProviderSet = wire.NewSet(NewData, NewSocketRepo)
// Data .
type Data struct {
// TODO wrapped database client
log *log.Helper
// 在线用户状态
onlineUsers sync.Map // map[string]*UserStatus
}
// UserStatus 用户在线状态
type UserStatus struct {
UserID string // 用户ID
RoomID string // 房间ID
LastSeen time.Time // 最后活跃时间
Connected bool // 是否在线
}
// NewData .
func NewData(c *conf.Data, logger log.Logger) (*Data, func(), error) {
cleanup := func() {
log.NewHelper(logger).Info("closing the data resources")
}
return &Data{
log: log.NewHelper(logger),
}, cleanup, nil
}
type socketRepo struct {
data *Data
}
// NewSocketRepo .
func NewSocketRepo(data *Data) biz.SocketRepo {
return &socketRepo{data: data}
}
// SendMessage implements biz.SocketRepo
func (r *socketRepo) SendMessage(ctx context.Context, to []string, from []string, message []string) error {
// 获取hub实例
hub := websocket.GetHub()
if hub == nil {
return errors.New("hub not initialized")
}
// 创建WebSocket消息
wsMsg := &websocket.WSMessage{
Type: "direct",
Content: message[0], // 目前只处理第一条消息
From: from[0], // 目前只处理第一个发送者
Time: time.Now(),
}
// 发送消息给每个接收者
for _, receiver := range to {
hub.SendDirectMessage(from[0], receiver, wsMsg)
}
r.data.log.WithContext(ctx).Infof("SendMessage: to=%v, from=%v, message=%v", to, from, message)
return nil
}
// IsOnline implements biz.SocketRepo
func (r *socketRepo) IsOnline(ctx context.Context, userID string) (bool, error) {
if status, ok := r.data.onlineUsers.Load(userID); ok {
userStatus := status.(*UserStatus)
return userStatus.Connected, nil
}
return false, nil
}
// UpdateUserStatus 更新用户在线状态
func (r *socketRepo) UpdateUserStatus(ctx context.Context, userID string, roomID string, connected bool) error {
status := &UserStatus{
UserID: userID,
RoomID: roomID,
LastSeen: time.Now(),
Connected: connected,
}
r.data.onlineUsers.Store(userID, status)
r.data.log.WithContext(ctx).Infof("UpdateUserStatus: userID=%s, roomID=%s, connected=%v", userID, roomID, connected)
return nil
}
// GetOnlineUsers 获取在线用户列表
func (r *socketRepo) GetOnlineUsers(ctx context.Context, roomID string) ([]string, error) {
var users []string
r.data.onlineUsers.Range(func(key, value interface{}) bool {
status := value.(*UserStatus)
if status.Connected && status.RoomID == roomID {
users = append(users, status.UserID)
}
return true
})
return users, nil
}
// CleanupInactiveUsers 清理不活跃的用户
func (r *socketRepo) CleanupInactiveUsers(ctx context.Context, timeout time.Duration) error {
now := time.Now()
r.data.onlineUsers.Range(func(key, value interface{}) bool {
status := value.(*UserStatus)
if status.Connected && now.Sub(status.LastSeen) > timeout {
status.Connected = false
r.data.onlineUsers.Store(key, status)
r.data.log.WithContext(ctx).Infof("CleanupInactiveUsers: userID=%s, lastSeen=%v", status.UserID, status.LastSeen)
}
return true
})
return nil
}