From 0b8757a8246908c43d80c21e323a1ef5f75f3d3e Mon Sep 17 00:00:00 2001 From: Kai Date: Fri, 23 May 2025 12:02:44 +0800 Subject: [PATCH] feat: add Chinese comments for Kafka and WebSocket server functionalities to improve code readability --- app/websocket/internal/data/data.go | 1 + app/websocket/internal/data/websocket.go | 2 +- app/websocket/internal/server/kafka_consume.go | 4 ++++ app/websocket/internal/server/websocket.go | 14 ++++++++++---- .../internal/service/kafka_message_handler.go | 2 ++ app/websocket/internal/service/websocket.go | 4 ++-- 6 files changed, 20 insertions(+), 7 deletions(-) diff --git a/app/websocket/internal/data/data.go b/app/websocket/internal/data/data.go index 8923ee3..4bd8cb1 100644 --- a/app/websocket/internal/data/data.go +++ b/app/websocket/internal/data/data.go @@ -20,6 +20,7 @@ type Data struct { func NewData(c *conf.Data, logger log.Logger) (*Data, func(), error) { log := log.NewHelper(logger) + // 创建 Kafka 生产者 kafkaWriter := kafka.NewKafkaWriter(c.Kafka.Brokers, c.Kafka.Topic) cleanup := func() { diff --git a/app/websocket/internal/data/websocket.go b/app/websocket/internal/data/websocket.go index e755fb6..682e879 100644 --- a/app/websocket/internal/data/websocket.go +++ b/app/websocket/internal/data/websocket.go @@ -28,7 +28,7 @@ type Message struct { Content json.RawMessage `json:"content"` } -// SendMessage sends a message to Kafka. +// SendMessage 发送消息到 Kafka func (r *WebSocketRepo) SendMessage(ctx context.Context, message []byte) error { err := r.data.kafka.SendToKafkaMessage(ctx, "websocket", message) if err != nil { diff --git a/app/websocket/internal/server/kafka_consume.go b/app/websocket/internal/server/kafka_consume.go index c6295f5..358fce1 100644 --- a/app/websocket/internal/server/kafka_consume.go +++ b/app/websocket/internal/server/kafka_consume.go @@ -17,6 +17,7 @@ type KafkaConsumerServer struct { wg sync.WaitGroup } +// NewKafkaConsumerServer 创建 Kafka 消费者服务器 func NewKafkaConsumerServer(c *conf.Data, wsService *service.WebSocketService) (*KafkaConsumerServer, func()) { consumer := kafka.NewKafkaReader(c.Kafka.Brokers, c.Kafka.Topic, int(c.Kafka.Partition), c.Kafka.GroupId) messageHandler := service.NewMessageHandler(wsService, consumer.Reader) @@ -33,17 +34,20 @@ func NewKafkaConsumerServer(c *conf.Data, wsService *service.WebSocketService) ( } } +// Start 启动 Kafka 消费者 func (s *KafkaConsumerServer) Start(ctx context.Context) error { s.wg.Add(1) go s.consumeMessages(ctx) return nil } +// Stop 停止 Kafka 消费者 func (s *KafkaConsumerServer) Stop(ctx context.Context) error { s.wg.Wait() return s.consumer.Close() } +// consumeMessages 消费 Kafka 消息 func (s *KafkaConsumerServer) consumeMessages(ctx context.Context) { defer s.wg.Done() diff --git a/app/websocket/internal/server/websocket.go b/app/websocket/internal/server/websocket.go index 032f85b..92febf9 100644 --- a/app/websocket/internal/server/websocket.go +++ b/app/websocket/internal/server/websocket.go @@ -25,6 +25,7 @@ type WebSocketServer struct { done chan struct{} // 添加 done 通道用于优雅关闭 } +// NewWebSocketServer 创建 WebSocket 服务器 func NewWebSocketServer(c *conf.Server, svc *service.WebSocketService) *WebSocketServer { return &WebSocketServer{ conf: c, @@ -42,6 +43,7 @@ func NewWebSocketServer(c *conf.Server, svc *service.WebSocketService) *WebSocke } } +// Start 启动 WebSocket 服务器 func (s *WebSocketServer) Start(ctx context.Context) error { mux := http.NewServeMux() mux.HandleFunc(s.conf.Websocket.Path, s.handleWebSocket) @@ -64,6 +66,7 @@ func (s *WebSocketServer) Start(ctx context.Context) error { return nil } +// Stop 停止 WebSocket 服务器 func (s *WebSocketServer) Stop(ctx context.Context) error { // 关闭所有 WebSocket 连接 s.mu.Lock() @@ -87,6 +90,7 @@ func (s *WebSocketServer) Stop(ctx context.Context) error { return nil } +// handleWebSocket 处理 WebSocket 连接请求 func (s *WebSocketServer) handleWebSocket(w http.ResponseWriter, r *http.Request) { log.Printf("Received WebSocket connection request from %s", r.RemoteAddr) @@ -101,6 +105,7 @@ func (s *WebSocketServer) handleWebSocket(w http.ResponseWriter, r *http.Request go s.readPump(conn) } +// readPump 读取 WebSocket 消息 func (s *WebSocketServer) readPump(conn *websocket.Conn) { defer func() { s.unregister <- conn @@ -138,17 +143,18 @@ func (s *WebSocketServer) readPump(conn *websocket.Conn) { } } +// run 运行 WebSocket 服务器 func (s *WebSocketServer) run() { for { select { - case <-s.done: + case <-s.done: // 如果 done 通道被关闭,则退出循环 return - case client := <-s.register: + case client := <-s.register: // 如果 register 通道有新的连接,则将连接添加到 clients 中 s.mu.Lock() s.clients[client] = true s.mu.Unlock() log.Printf("New client connected. Total clients: %d", len(s.clients)) - case client := <-s.unregister: + case client := <-s.unregister: // 如果 unregister 通道有连接关闭,则将连接从 clients 中删除 s.mu.Lock() if _, ok := s.clients[client]; ok { delete(s.clients, client) @@ -156,7 +162,7 @@ func (s *WebSocketServer) run() { } s.mu.Unlock() log.Printf("Client disconnected. Total clients: %d", len(s.clients)) - case message := <-s.broadcast: + case message := <-s.broadcast: // 如果 broadcast 通道有消息,则广播消息给所有客户端 s.mu.Lock() for client := range s.clients { err := client.WriteMessage(websocket.TextMessage, message) diff --git a/app/websocket/internal/service/kafka_message_handler.go b/app/websocket/internal/service/kafka_message_handler.go index 9192fd7..1b6c3f5 100644 --- a/app/websocket/internal/service/kafka_message_handler.go +++ b/app/websocket/internal/service/kafka_message_handler.go @@ -21,6 +21,7 @@ type WebSocketMessage struct { Payload json.RawMessage `json:"payload,omitempty"` } +// NewMessageHandler 创建 MessageHandler func NewMessageHandler(wsService *WebSocketService, reader *kafkago.Reader) *MessageHandler { return &MessageHandler{ wsService: wsService, @@ -52,6 +53,7 @@ func (h *MessageHandler) parseWebSocketMessage(message []byte) (*WebSocketMessag return &wsMsg, nil } +// HandleMessage 处理 Kafka 消息 func (h *MessageHandler) HandleMessage(message *kafkago.Message) error { // 解析并打印格式化的 JSON prettyJSON, err := h.parseJSONMessage(message.Value) diff --git a/app/websocket/internal/service/websocket.go b/app/websocket/internal/service/websocket.go index e86a8cf..ed79717 100644 --- a/app/websocket/internal/service/websocket.go +++ b/app/websocket/internal/service/websocket.go @@ -21,7 +21,7 @@ type Client struct { Topics map[string]bool } -// NewWebSocketService new a WebSocket service. +// NewWebSocketService 创建 WebSocket 服务 func NewWebSocketService(uc *biz.WebSocketUsecase) *WebSocketService { return &WebSocketService{ uc: uc, @@ -29,7 +29,7 @@ func NewWebSocketService(uc *biz.WebSocketUsecase) *WebSocketService { } } -// HandleMessage handles incoming WebSocket messages. +// HandleMessage 处理 WebSocket 消息 func (s *WebSocketService) HandleMessage(ctx context.Context, message []byte) error { return s.uc.HandleMessage(ctx, message) }