go-kratos/app/websocket/internal/server/kafka_consume.go

73 lines
1.7 KiB
Go

package server
import (
"context"
"sync"
"ky-go-kratos/app/websocket/internal/conf"
"ky-go-kratos/app/websocket/internal/service"
"ky-go-kratos/pkg/kafka"
"github.com/go-kratos/kratos/v2/log"
)
type KafkaConsumerServer struct {
consumer *kafka.KafkaConsumer
messageHandler *service.MessageHandler
wg sync.WaitGroup
}
// NewKafkaConsumerServer 创建 Kafka 消费者服务器
func NewKafkaConsumerServer(c *conf.Data, wsService *service.WebSocketService) (*KafkaConsumerServer, func()) {
consumer := kafka.NewKafkaReader(c.Kafka.Brokers, c.Kafka.Topic, int(c.Kafka.Partition), c.Kafka.GroupId)
messageHandler := service.NewMessageHandler(wsService, consumer.Reader)
server := &KafkaConsumerServer{
consumer: consumer,
messageHandler: messageHandler,
}
return server, func() {
if err := server.Stop(context.Background()); err != nil {
log.Errorf("failed to stop kafka consumer: %v", err)
}
}
}
// Start 启动 Kafka 消费者
func (s *KafkaConsumerServer) Start(ctx context.Context) error {
s.wg.Add(1)
go s.consumeMessages(ctx)
return nil
}
// Stop 停止 Kafka 消费者
func (s *KafkaConsumerServer) Stop(ctx context.Context) error {
s.wg.Wait()
return s.consumer.Close()
}
// consumeMessages 消费 Kafka 消息
func (s *KafkaConsumerServer) consumeMessages(ctx context.Context) {
defer s.wg.Done()
for {
select {
case <-ctx.Done():
return
default:
message, err := s.consumer.Reader.ReadMessage(ctx)
if err != nil {
if s.consumer.IsTransientNetworkError(err) {
continue
}
return
}
if err := s.messageHandler.HandleMessage(&message); err != nil {
log.Errorf("failed to handle message: %v", err)
}
}
}
}