go-kratos/app/websocket/internal/service/kafka_message_handler.go

98 lines
2.6 KiB
Go

package service
import (
"encoding/json"
"fmt"
"log"
kafkago "github.com/segmentio/kafka-go"
)
// MessageHandler 处理 Kafka 消息
type MessageHandler struct {
wsService *WebSocketService
reader *kafkago.Reader
}
// WebSocketMessage 定义 WebSocket 消息结构
type WebSocketMessage struct {
Type string `json:"type"`
Content string `json:"content"`
Payload json.RawMessage `json:"payload,omitempty"`
}
// NewMessageHandler 创建 MessageHandler
func NewMessageHandler(wsService *WebSocketService, reader *kafkago.Reader) *MessageHandler {
return &MessageHandler{
wsService: wsService,
reader: reader,
}
}
// parseJSONMessage 解析并格式化 JSON 消息
func (h *MessageHandler) parseJSONMessage(message []byte) (string, error) {
var jsonData interface{}
if err := json.Unmarshal(message, &jsonData); err != nil {
return "", fmt.Errorf("failed to parse JSON: %v", err)
}
prettyJSON, err := json.MarshalIndent(jsonData, "", " ")
if err != nil {
return "", fmt.Errorf("failed to format JSON: %v", err)
}
return string(prettyJSON), nil
}
// parseWebSocketMessage 解析为 WebSocketMessage 结构
func (h *MessageHandler) parseWebSocketMessage(message []byte) (*WebSocketMessage, error) {
var wsMsg WebSocketMessage
if err := json.Unmarshal(message, &wsMsg); err != nil {
return nil, fmt.Errorf("failed to parse WebSocket message: %v", err)
}
return &wsMsg, nil
}
// HandleMessage 处理 Kafka 消息
func (h *MessageHandler) HandleMessage(message *kafkago.Message) error {
// 解析并打印格式化的 JSON
prettyJSON, err := h.parseJSONMessage(message.Value)
if err != nil {
log.Printf("Error parsing message: %v", err)
return err
}
fmt.Printf("kafka parseJSONMessage:\n%s\n", prettyJSON)
// 解析为 WebSocketMessage
wsMsg, err := h.parseWebSocketMessage(message.Value)
if err != nil {
log.Printf("Error parsing WebSocket message: %v", err)
return err
}
// 根据消息类型处理不同的业务逻辑
var processErr error
switch wsMsg.Type {
case "message":
// 处理普通消息
processErr = h.wsService.Broadcast(message.Value)
case "broadcast":
// 广播消息给所有连接的客户端
processErr = h.wsService.Broadcast(wsMsg.Payload)
case "private":
// 处理私聊消息
processErr = h.wsService.SendPrivateMessage(wsMsg.Payload)
default:
log.Printf("Unknown message type: %s", wsMsg.Type)
}
if processErr != nil {
log.Printf("Error processing message: %v", processErr)
} else {
log.Printf("Message processed successfully - Topic: %s, Partition: %d, Offset: %d",
message.Topic, message.Partition, message.Offset)
}
return processErr
}