98 lines
2.6 KiB
Go
98 lines
2.6 KiB
Go
package service
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
|
|
kafkago "github.com/segmentio/kafka-go"
|
|
)
|
|
|
|
// MessageHandler 处理 Kafka 消息
|
|
type MessageHandler struct {
|
|
wsService *WebSocketService
|
|
reader *kafkago.Reader
|
|
}
|
|
|
|
// WebSocketMessage 定义 WebSocket 消息结构
|
|
type WebSocketMessage struct {
|
|
Type string `json:"type"`
|
|
Content string `json:"content"`
|
|
Payload json.RawMessage `json:"payload,omitempty"`
|
|
}
|
|
|
|
// NewMessageHandler 创建 MessageHandler
|
|
func NewMessageHandler(wsService *WebSocketService, reader *kafkago.Reader) *MessageHandler {
|
|
return &MessageHandler{
|
|
wsService: wsService,
|
|
reader: reader,
|
|
}
|
|
}
|
|
|
|
// parseJSONMessage 解析并格式化 JSON 消息
|
|
func (h *MessageHandler) parseJSONMessage(message []byte) (string, error) {
|
|
var jsonData interface{}
|
|
if err := json.Unmarshal(message, &jsonData); err != nil {
|
|
return "", fmt.Errorf("failed to parse JSON: %v", err)
|
|
}
|
|
|
|
prettyJSON, err := json.MarshalIndent(jsonData, "", " ")
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to format JSON: %v", err)
|
|
}
|
|
|
|
return string(prettyJSON), nil
|
|
}
|
|
|
|
// parseWebSocketMessage 解析为 WebSocketMessage 结构
|
|
func (h *MessageHandler) parseWebSocketMessage(message []byte) (*WebSocketMessage, error) {
|
|
var wsMsg WebSocketMessage
|
|
if err := json.Unmarshal(message, &wsMsg); err != nil {
|
|
return nil, fmt.Errorf("failed to parse WebSocket message: %v", err)
|
|
}
|
|
return &wsMsg, nil
|
|
}
|
|
|
|
// HandleMessage 处理 Kafka 消息
|
|
func (h *MessageHandler) HandleMessage(message *kafkago.Message) error {
|
|
// 解析并打印格式化的 JSON
|
|
prettyJSON, err := h.parseJSONMessage(message.Value)
|
|
if err != nil {
|
|
log.Printf("Error parsing message: %v", err)
|
|
return err
|
|
}
|
|
fmt.Printf("kafka parseJSONMessage:\n%s\n", prettyJSON)
|
|
|
|
// 解析为 WebSocketMessage
|
|
wsMsg, err := h.parseWebSocketMessage(message.Value)
|
|
if err != nil {
|
|
log.Printf("Error parsing WebSocket message: %v", err)
|
|
return err
|
|
}
|
|
|
|
// 根据消息类型处理不同的业务逻辑
|
|
var processErr error
|
|
switch wsMsg.Type {
|
|
case "message":
|
|
// 处理普通消息
|
|
processErr = h.wsService.Broadcast(message.Value)
|
|
case "broadcast":
|
|
// 广播消息给所有连接的客户端
|
|
processErr = h.wsService.Broadcast(wsMsg.Payload)
|
|
case "private":
|
|
// 处理私聊消息
|
|
processErr = h.wsService.SendPrivateMessage(wsMsg.Payload)
|
|
default:
|
|
log.Printf("Unknown message type: %s", wsMsg.Type)
|
|
}
|
|
|
|
if processErr != nil {
|
|
log.Printf("Error processing message: %v", processErr)
|
|
} else {
|
|
log.Printf("Message processed successfully - Topic: %s, Partition: %d, Offset: %d",
|
|
message.Topic, message.Partition, message.Offset)
|
|
}
|
|
|
|
return processErr
|
|
}
|