feat: integrate Kafka consumer into WebSocket server and enhance message handling capabilities

This commit is contained in:
Kai 2025-05-12 18:10:32 +08:00
parent f47baeac5f
commit ae2389c0b9
12 changed files with 299 additions and 24 deletions

View File

@ -34,7 +34,7 @@ func init() {
flag.StringVar(&flagconf, "conf", "../../configs", "config path, eg: -conf config.yaml") flag.StringVar(&flagconf, "conf", "../../configs", "config path, eg: -conf config.yaml")
} }
func newApp(logger log.Logger, ws *server.WebSocketServer, gs *grpc.Server, hs *http.Server) *kratos.App { func newApp(logger log.Logger, ks *server.KafkaConsumerServer, ws *server.WebSocketServer, gs *grpc.Server, hs *http.Server) *kratos.App {
return kratos.New( return kratos.New(
kratos.ID(id), kratos.ID(id),
kratos.Name(Name), kratos.Name(Name),
@ -42,6 +42,7 @@ func newApp(logger log.Logger, ws *server.WebSocketServer, gs *grpc.Server, hs *
kratos.Metadata(map[string]string{}), kratos.Metadata(map[string]string{}),
kratos.Logger(logger), kratos.Logger(logger),
kratos.Server( kratos.Server(
ks,
ws, ws,
gs, gs,
hs, hs,

View File

@ -18,13 +18,6 @@ import (
) )
// wireApp init kratos application. // wireApp init kratos application.
func wireApp(c *conf.Server, d *conf.Data, logger log.Logger) (*kratos.App, func(), error) { func wireApp(*conf.Server, *conf.Data, log.Logger) (*kratos.App, func(), error) {
wire.Build( panic(wire.Build(server.ProviderSet, data.ProviderSet, biz.ProviderSet, service.ProviderSet, newApp))
data.ProviderSet,
biz.ProviderSet,
service.ProviderSet,
server.ProviderSet,
newApp,
)
return nil, nil, nil
} }

View File

@ -23,19 +23,22 @@ import (
// Injectors from wire.go: // Injectors from wire.go:
// wireApp init kratos application. // wireApp init kratos application.
func wireApp(c *conf.Server, d *conf.Data, logger log.Logger) (*kratos.App, func(), error) { func wireApp(confServer *conf.Server, confData *conf.Data, logger log.Logger) (*kratos.App, func(), error) {
dataData, cleanup, err := data.NewData(d, logger) dataData, cleanup, err := data.NewData(confData, logger)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
webSocketRepo := data.NewWebSocketRepo(dataData, logger) webSocketRepo := data.NewWebSocketRepo(dataData, logger)
webSocketUsecase := biz.NewWebSocketUsecase(webSocketRepo) webSocketUsecase := biz.NewWebSocketUsecase(webSocketRepo)
webSocketService := service.NewWebSocketService(webSocketUsecase) webSocketService := service.NewWebSocketService(webSocketUsecase)
webSocketServer := server.NewWebSocketServer(c, webSocketService) messageHandler := service.NewMessageHandler(webSocketService)
grpcServer := server.NewGRPCServer(c, logger) kafkaConsumerServer, cleanup2 := server.NewKafkaConsumerServer(confData, messageHandler)
httpServer := server.NewHTTPServer(c, logger) webSocketServer := server.NewWebSocketServer(confServer, webSocketService)
app := newApp(logger, webSocketServer, grpcServer, httpServer) grpcServer := server.NewGRPCServer(confServer, logger)
httpServer := server.NewHTTPServer(confServer, logger)
app := newApp(logger, kafkaConsumerServer, webSocketServer, grpcServer, httpServer)
return app, func() { return app, func() {
cleanup2()
cleanup() cleanup()
}, nil }, nil
} }

View File

@ -20,3 +20,4 @@ data:
brokers: brokers:
- 47.108.232.131:9092 - 47.108.232.131:9092
topic: websocket-topic topic: websocket-topic
partition: 0

View File

@ -354,6 +354,7 @@ type Data_Kafka struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
Brokers []string `protobuf:"bytes,1,rep,name=brokers,proto3" json:"brokers,omitempty"` Brokers []string `protobuf:"bytes,1,rep,name=brokers,proto3" json:"brokers,omitempty"`
Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"` Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"`
Partition int64 `protobuf:"varint,3,opt,name=partition,proto3" json:"partition,omitempty"`
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
} }
@ -402,6 +403,13 @@ func (x *Data_Kafka) GetTopic() string {
return "" return ""
} }
func (x *Data_Kafka) GetPartition() int64 {
if x != nil {
return x.Partition
}
return 0
}
var File_conf_conf_proto protoreflect.FileDescriptor var File_conf_conf_proto protoreflect.FileDescriptor
const file_conf_conf_proto_rawDesc = "" + const file_conf_conf_proto_rawDesc = "" +
@ -425,12 +433,13 @@ const file_conf_conf_proto_rawDesc = "" +
"\atimeout\x18\x03 \x01(\v2\x19.google.protobuf.DurationR\atimeout\x1a3\n" + "\atimeout\x18\x03 \x01(\v2\x19.google.protobuf.DurationR\atimeout\x1a3\n" +
"\tWebSocket\x12\x12\n" + "\tWebSocket\x12\x12\n" +
"\x04addr\x18\x01 \x01(\tR\x04addr\x12\x12\n" + "\x04addr\x18\x01 \x01(\tR\x04addr\x12\x12\n" +
"\x04path\x18\x02 \x01(\tR\x04path\"m\n" + "\x04path\x18\x02 \x01(\tR\x04path\"\x8b\x01\n" +
"\x04Data\x12,\n" + "\x04Data\x12,\n" +
"\x05kafka\x18\x01 \x01(\v2\x16.kratos.api.Data.KafkaR\x05kafka\x1a7\n" + "\x05kafka\x18\x01 \x01(\v2\x16.kratos.api.Data.KafkaR\x05kafka\x1aU\n" +
"\x05Kafka\x12\x18\n" + "\x05Kafka\x12\x18\n" +
"\abrokers\x18\x01 \x03(\tR\abrokers\x12\x14\n" + "\abrokers\x18\x01 \x03(\tR\abrokers\x12\x14\n" +
"\x05topic\x18\x02 \x01(\tR\x05topicB/Z-ky-go-kratos/app/websocket/internal/conf;confb\x06proto3" "\x05topic\x18\x02 \x01(\tR\x05topic\x12\x1c\n" +
"\tpartition\x18\x03 \x01(\x03R\tpartitionB/Z-ky-go-kratos/app/websocket/internal/conf;confb\x06proto3"
var ( var (
file_conf_conf_proto_rawDescOnce sync.Once file_conf_conf_proto_rawDescOnce sync.Once

View File

@ -34,6 +34,7 @@ message Data {
message Kafka { message Kafka {
repeated string brokers = 1; repeated string brokers = 1;
string topic = 2; string topic = 2;
int64 partition = 3;
} }
Kafka kafka = 1; Kafka kafka = 1;
} }

View File

@ -0,0 +1,79 @@
package server
import (
"context"
"sync"
"ky-go-kratos/app/websocket/internal/conf"
"ky-go-kratos/app/websocket/internal/service"
"ky-go-kratos/pkg/kafka"
"github.com/go-kratos/kratos/v2/log"
)
type KafkaConsumerServer struct {
consumer *kafka.KafkaConsumer
messageHandler *service.MessageHandler
wg sync.WaitGroup
}
func NewKafkaConsumerServer(c *conf.Data, messageHandler *service.MessageHandler) (*KafkaConsumerServer, func()) {
consumer := kafka.NewKafkaReader(c.Kafka.Brokers, c.Kafka.Topic, int(c.Kafka.Partition))
server := &KafkaConsumerServer{
consumer: consumer,
messageHandler: messageHandler,
}
ctx := context.Background()
if err := server.Start(ctx); err != nil {
panic(err)
}
return server, func() {
if err := server.Stop(ctx); err != nil {
log.Errorf("failed to stop kafka consumer: %v", err)
}
}
}
func (s *KafkaConsumerServer) Start(ctx context.Context) error {
if err := s.messageHandler.Start(); err != nil {
return err
}
s.wg.Add(1)
go s.consumeMessages(ctx)
return nil
}
func (s *KafkaConsumerServer) Stop(ctx context.Context) error {
s.wg.Wait()
if err := s.messageHandler.Stop(); err != nil {
return err
}
return s.consumer.Close()
}
func (s *KafkaConsumerServer) consumeMessages(ctx context.Context) {
defer s.wg.Done()
for {
select {
case <-ctx.Done():
return
default:
message, err := s.consumer.Reader.ReadMessage(ctx)
log.Infof("message data: %v", message)
if err != nil {
if s.consumer.IsTransientNetworkError(err) {
continue
}
return
}
if err := s.messageHandler.HandleMessage(&message); err != nil {
log.Errorf("failed to handle message: %v", err)
}
}
}
}

View File

@ -5,4 +5,4 @@ import (
) )
// ProviderSet is server providers. // ProviderSet is server providers.
var ProviderSet = wire.NewSet(NewGRPCServer, NewHTTPServer, NewWebSocketServer) var ProviderSet = wire.NewSet(NewGRPCServer, NewHTTPServer, NewWebSocketServer, NewKafkaConsumerServer)

View File

@ -0,0 +1,92 @@
package service
import (
"encoding/json"
"fmt"
"log"
kafkago "github.com/segmentio/kafka-go"
)
// MessageHandler 处理 Kafka 消息
type MessageHandler struct {
wsService *WebSocketService
}
// WebSocketMessage 定义 WebSocket 消息结构
type WebSocketMessage struct {
Type string `json:"type"`
Content string `json:"content"`
Payload json.RawMessage `json:"payload,omitempty"`
}
func NewMessageHandler(wsService *WebSocketService) *MessageHandler {
return &MessageHandler{
wsService: wsService,
}
}
func (h *MessageHandler) Start() error {
return nil
}
func (h *MessageHandler) Stop() error {
return nil
}
// parseJSONMessage 解析并格式化 JSON 消息
func (h *MessageHandler) parseJSONMessage(message []byte) (string, error) {
var jsonData interface{}
if err := json.Unmarshal(message, &jsonData); err != nil {
return "", fmt.Errorf("failed to parse JSON: %v", err)
}
prettyJSON, err := json.MarshalIndent(jsonData, "", " ")
if err != nil {
return "", fmt.Errorf("failed to format JSON: %v", err)
}
return string(prettyJSON), nil
}
// parseWebSocketMessage 解析为 WebSocketMessage 结构
func (h *MessageHandler) parseWebSocketMessage(message []byte) (*WebSocketMessage, error) {
var wsMsg WebSocketMessage
if err := json.Unmarshal(message, &wsMsg); err != nil {
return nil, fmt.Errorf("failed to parse WebSocket message: %v", err)
}
return &wsMsg, nil
}
func (h *MessageHandler) HandleMessage(message *kafkago.Message) error {
// 解析并打印格式化的 JSON
prettyJSON, err := h.parseJSONMessage(message.Value)
if err != nil {
log.Printf("Error parsing message: %v", err)
return err
}
fmt.Printf("Received message:\n%s\n", prettyJSON)
// 解析为 WebSocketMessage
wsMsg, err := h.parseWebSocketMessage(message.Value)
if err != nil {
log.Printf("Error parsing WebSocket message: %v", err)
return err
}
// 根据消息类型处理不同的业务逻辑
switch wsMsg.Type {
case "message":
// 处理普通消息
return h.wsService.Broadcast(message.Value)
case "broadcast":
// 广播消息给所有连接的客户端
return h.wsService.Broadcast(wsMsg.Payload)
case "private":
// 处理私聊消息
return h.wsService.SendPrivateMessage(wsMsg.Payload)
default:
log.Printf("Unknown message type: %s", wsMsg.Type)
return nil
}
}

View File

@ -1,6 +1,8 @@
package service package service
import "github.com/google/wire" import (
"github.com/google/wire"
)
// ProviderSet is service providers. // ProviderSet is service providers.
var ProviderSet = wire.NewSet(NewWebSocketService) var ProviderSet = wire.NewSet(NewWebSocketService, NewMessageHandler)

View File

@ -2,20 +2,76 @@ package service
import ( import (
"context" "context"
"encoding/json"
"ky-go-kratos/app/websocket/internal/biz" "ky-go-kratos/app/websocket/internal/biz"
"log"
"sync"
) )
// WebSocketService is a WebSocket service. // WebSocketService is a WebSocket service.
type WebSocketService struct { type WebSocketService struct {
uc *biz.WebSocketUsecase uc *biz.WebSocketUsecase
clients map[string]*Client
clientsMux sync.RWMutex
}
type Client struct {
ID string
Send chan []byte
Topics map[string]bool
} }
// NewWebSocketService new a WebSocket service. // NewWebSocketService new a WebSocket service.
func NewWebSocketService(uc *biz.WebSocketUsecase) *WebSocketService { func NewWebSocketService(uc *biz.WebSocketUsecase) *WebSocketService {
return &WebSocketService{uc: uc} return &WebSocketService{
uc: uc,
clients: make(map[string]*Client),
}
} }
// HandleMessage handles incoming WebSocket messages. // HandleMessage handles incoming WebSocket messages.
func (s *WebSocketService) HandleMessage(ctx context.Context, message []byte) error { func (s *WebSocketService) HandleMessage(ctx context.Context, message []byte) error {
return s.uc.HandleMessage(ctx, message) return s.uc.HandleMessage(ctx, message)
} }
// Broadcast 广播消息给所有客户端
func (s *WebSocketService) Broadcast(message json.RawMessage) error {
s.clientsMux.RLock()
defer s.clientsMux.RUnlock()
for _, client := range s.clients {
select {
case client.Send <- message:
default:
log.Printf("Client %s buffer is full, message dropped", client.ID)
}
}
return nil
}
// SendPrivateMessage 发送私聊消息
func (s *WebSocketService) SendPrivateMessage(message json.RawMessage) error {
var msg struct {
To string `json:"to"`
Content json.RawMessage `json:"content"`
}
if err := json.Unmarshal(message, &msg); err != nil {
return err
}
s.clientsMux.RLock()
client, exists := s.clients[msg.To]
s.clientsMux.RUnlock()
if !exists {
return nil
}
select {
case client.Send <- msg.Content:
default:
log.Printf("Client %s buffer is full, message dropped", client.ID)
}
return nil
}

View File

@ -1 +1,39 @@
package kafka package kafka
import (
"errors"
"io"
"syscall"
"time"
"github.com/segmentio/kafka-go"
)
type KafkaConsumer struct {
Reader *kafka.Reader
}
func NewKafkaReader(brokers []string, topic string, partition int) *KafkaConsumer {
return &KafkaConsumer{
Reader: kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
Partition: partition,
MinBytes: 10e3,
MaxBytes: 10e6,
MaxWait: 500 * time.Millisecond,
CommitInterval: 5 * time.Second,
}),
}
}
func (c *KafkaConsumer) IsTransientNetworkError(err error) bool {
return errors.Is(err, io.ErrUnexpectedEOF) ||
errors.Is(err, syscall.ECONNREFUSED) ||
errors.Is(err, syscall.ECONNRESET) ||
errors.Is(err, syscall.EPIPE)
}
func (c *KafkaConsumer) Close() error {
return c.Reader.Close()
}