go-kratos/internal/websocket/connection.go
2025-04-28 19:58:24 +08:00

98 lines
1.8 KiB
Go

package websocket
import (
"encoding/json"
"log"
"time"
"github.com/gorilla/websocket"
)
const (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10
)
type WSMessage struct {
Type string `json:"type"` // "broadcast" 或 "direct"
To string `json:"to"` // direct单聊时目标uid
Content string `json:"content"`
Ack bool `json:"ack,omitempty"` // 是否是ACK响应
}
type Connection struct {
UID string
Room *Room
Conn *websocket.Conn
Send chan *WSMessage
}
func (c *Connection) ReadPump() {
defer func() {
c.Room.Unregister <- c
c.Conn.Close()
}()
c.Conn.SetReadLimit(512)
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
c.Conn.SetPongHandler(func(string) error {
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, msg, err := c.Conn.ReadMessage()
if err != nil {
log.Println("read error:", err)
break
}
var wsMsg WSMessage
if err := json.Unmarshal(msg, &wsMsg); err != nil {
log.Println("unmarshal error:", err)
continue
}
ack := &WSMessage{
Type: wsMsg.Type,
Ack: true,
}
c.Send <- ack
switch wsMsg.Type {
case "broadcast":
c.Room.Broadcast <- &BroadcastPayload{Message: &wsMsg}
case "direct":
c.Room.Direct <- &DirectPayload{To: wsMsg.To, Message: &wsMsg}
}
}
}
func (c *Connection) WritePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.Conn.Close()
}()
for {
select {
case msg, ok := <-c.Send:
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
if err := c.Conn.WriteJSON(msg); err != nil {
return
}
case <-ticker.C:
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}