package server import ( "context" "sync" "ky-go-kratos/app/websocket/internal/conf" "ky-go-kratos/app/websocket/internal/service" "ky-go-kratos/pkg/kafka" "github.com/go-kratos/kratos/v2/log" ) type KafkaConsumerServer struct { consumer *kafka.KafkaConsumer messageHandler *service.MessageHandler wg sync.WaitGroup } // NewKafkaConsumerServer 创建 Kafka 消费者服务器 func NewKafkaConsumerServer(c *conf.Data, wsService *service.WebSocketService) (*KafkaConsumerServer, func()) { consumer := kafka.NewKafkaReader(c.Kafka.Brokers, c.Kafka.Topic, int(c.Kafka.Partition), c.Kafka.GroupId) messageHandler := service.NewMessageHandler(wsService, consumer.Reader) server := &KafkaConsumerServer{ consumer: consumer, messageHandler: messageHandler, } return server, func() { if err := server.Stop(context.Background()); err != nil { log.Errorf("failed to stop kafka consumer: %v", err) } } } // Start 启动 Kafka 消费者 func (s *KafkaConsumerServer) Start(ctx context.Context) error { s.wg.Add(1) go s.consumeMessages(ctx) return nil } // Stop 停止 Kafka 消费者 func (s *KafkaConsumerServer) Stop(ctx context.Context) error { s.wg.Wait() return s.consumer.Close() } // consumeMessages 消费 Kafka 消息 func (s *KafkaConsumerServer) consumeMessages(ctx context.Context) { defer s.wg.Done() for { select { case <-ctx.Done(): return default: message, err := s.consumer.Reader.ReadMessage(ctx) if err != nil { if s.consumer.IsTransientNetworkError(err) { continue } return } if err := s.messageHandler.HandleMessage(&message); err != nil { log.Errorf("failed to handle message: %v", err) } } } }