diff --git a/app/websocket/cmd/websocket/main.go b/app/websocket/cmd/websocket/main.go index 80f0665..2bd5499 100644 --- a/app/websocket/cmd/websocket/main.go +++ b/app/websocket/cmd/websocket/main.go @@ -34,7 +34,7 @@ func init() { flag.StringVar(&flagconf, "conf", "../../configs", "config path, eg: -conf config.yaml") } -func newApp(logger log.Logger, ws *server.WebSocketServer, gs *grpc.Server, hs *http.Server) *kratos.App { +func newApp(logger log.Logger, ks *server.KafkaConsumerServer, ws *server.WebSocketServer, gs *grpc.Server, hs *http.Server) *kratos.App { return kratos.New( kratos.ID(id), kratos.Name(Name), @@ -42,6 +42,7 @@ func newApp(logger log.Logger, ws *server.WebSocketServer, gs *grpc.Server, hs * kratos.Metadata(map[string]string{}), kratos.Logger(logger), kratos.Server( + ks, ws, gs, hs, diff --git a/app/websocket/cmd/websocket/wire.go b/app/websocket/cmd/websocket/wire.go index f09a43b..47a1d03 100644 --- a/app/websocket/cmd/websocket/wire.go +++ b/app/websocket/cmd/websocket/wire.go @@ -18,13 +18,6 @@ import ( ) // wireApp init kratos application. -func wireApp(c *conf.Server, d *conf.Data, logger log.Logger) (*kratos.App, func(), error) { - wire.Build( - data.ProviderSet, - biz.ProviderSet, - service.ProviderSet, - server.ProviderSet, - newApp, - ) - return nil, nil, nil +func wireApp(*conf.Server, *conf.Data, log.Logger) (*kratos.App, func(), error) { + panic(wire.Build(server.ProviderSet, data.ProviderSet, biz.ProviderSet, service.ProviderSet, newApp)) } diff --git a/app/websocket/cmd/websocket/wire_gen.go b/app/websocket/cmd/websocket/wire_gen.go index 1327490..5cb77f1 100644 --- a/app/websocket/cmd/websocket/wire_gen.go +++ b/app/websocket/cmd/websocket/wire_gen.go @@ -23,19 +23,22 @@ import ( // Injectors from wire.go: // wireApp init kratos application. -func wireApp(c *conf.Server, d *conf.Data, logger log.Logger) (*kratos.App, func(), error) { - dataData, cleanup, err := data.NewData(d, logger) +func wireApp(confServer *conf.Server, confData *conf.Data, logger log.Logger) (*kratos.App, func(), error) { + dataData, cleanup, err := data.NewData(confData, logger) if err != nil { return nil, nil, err } webSocketRepo := data.NewWebSocketRepo(dataData, logger) webSocketUsecase := biz.NewWebSocketUsecase(webSocketRepo) webSocketService := service.NewWebSocketService(webSocketUsecase) - webSocketServer := server.NewWebSocketServer(c, webSocketService) - grpcServer := server.NewGRPCServer(c, logger) - httpServer := server.NewHTTPServer(c, logger) - app := newApp(logger, webSocketServer, grpcServer, httpServer) + messageHandler := service.NewMessageHandler(webSocketService) + kafkaConsumerServer, cleanup2 := server.NewKafkaConsumerServer(confData, messageHandler) + webSocketServer := server.NewWebSocketServer(confServer, webSocketService) + grpcServer := server.NewGRPCServer(confServer, logger) + httpServer := server.NewHTTPServer(confServer, logger) + app := newApp(logger, kafkaConsumerServer, webSocketServer, grpcServer, httpServer) return app, func() { + cleanup2() cleanup() }, nil } diff --git a/app/websocket/configs/config.yaml b/app/websocket/configs/config.yaml index 0d3a3a7..4646622 100644 --- a/app/websocket/configs/config.yaml +++ b/app/websocket/configs/config.yaml @@ -20,3 +20,4 @@ data: brokers: - 47.108.232.131:9092 topic: websocket-topic + partition: 0 diff --git a/app/websocket/internal/conf/conf.pb.go b/app/websocket/internal/conf/conf.pb.go index 9362f9f..1792ba8 100644 --- a/app/websocket/internal/conf/conf.pb.go +++ b/app/websocket/internal/conf/conf.pb.go @@ -354,6 +354,7 @@ type Data_Kafka struct { state protoimpl.MessageState `protogen:"open.v1"` Brokers []string `protobuf:"bytes,1,rep,name=brokers,proto3" json:"brokers,omitempty"` Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"` + Partition int64 `protobuf:"varint,3,opt,name=partition,proto3" json:"partition,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -402,6 +403,13 @@ func (x *Data_Kafka) GetTopic() string { return "" } +func (x *Data_Kafka) GetPartition() int64 { + if x != nil { + return x.Partition + } + return 0 +} + var File_conf_conf_proto protoreflect.FileDescriptor const file_conf_conf_proto_rawDesc = "" + @@ -425,12 +433,13 @@ const file_conf_conf_proto_rawDesc = "" + "\atimeout\x18\x03 \x01(\v2\x19.google.protobuf.DurationR\atimeout\x1a3\n" + "\tWebSocket\x12\x12\n" + "\x04addr\x18\x01 \x01(\tR\x04addr\x12\x12\n" + - "\x04path\x18\x02 \x01(\tR\x04path\"m\n" + + "\x04path\x18\x02 \x01(\tR\x04path\"\x8b\x01\n" + "\x04Data\x12,\n" + - "\x05kafka\x18\x01 \x01(\v2\x16.kratos.api.Data.KafkaR\x05kafka\x1a7\n" + + "\x05kafka\x18\x01 \x01(\v2\x16.kratos.api.Data.KafkaR\x05kafka\x1aU\n" + "\x05Kafka\x12\x18\n" + "\abrokers\x18\x01 \x03(\tR\abrokers\x12\x14\n" + - "\x05topic\x18\x02 \x01(\tR\x05topicB/Z-ky-go-kratos/app/websocket/internal/conf;confb\x06proto3" + "\x05topic\x18\x02 \x01(\tR\x05topic\x12\x1c\n" + + "\tpartition\x18\x03 \x01(\x03R\tpartitionB/Z-ky-go-kratos/app/websocket/internal/conf;confb\x06proto3" var ( file_conf_conf_proto_rawDescOnce sync.Once diff --git a/app/websocket/internal/conf/conf.proto b/app/websocket/internal/conf/conf.proto index 9bf0869..82f6bed 100644 --- a/app/websocket/internal/conf/conf.proto +++ b/app/websocket/internal/conf/conf.proto @@ -34,6 +34,7 @@ message Data { message Kafka { repeated string brokers = 1; string topic = 2; + int64 partition = 3; } Kafka kafka = 1; } diff --git a/app/websocket/internal/server/kafka_consume.go b/app/websocket/internal/server/kafka_consume.go new file mode 100644 index 0000000..eb8c6ae --- /dev/null +++ b/app/websocket/internal/server/kafka_consume.go @@ -0,0 +1,79 @@ +package server + +import ( + "context" + "sync" + + "ky-go-kratos/app/websocket/internal/conf" + "ky-go-kratos/app/websocket/internal/service" + "ky-go-kratos/pkg/kafka" + + "github.com/go-kratos/kratos/v2/log" +) + +type KafkaConsumerServer struct { + consumer *kafka.KafkaConsumer + messageHandler *service.MessageHandler + wg sync.WaitGroup +} + +func NewKafkaConsumerServer(c *conf.Data, messageHandler *service.MessageHandler) (*KafkaConsumerServer, func()) { + consumer := kafka.NewKafkaReader(c.Kafka.Brokers, c.Kafka.Topic, int(c.Kafka.Partition)) + server := &KafkaConsumerServer{ + consumer: consumer, + messageHandler: messageHandler, + } + + ctx := context.Background() + if err := server.Start(ctx); err != nil { + panic(err) + } + + return server, func() { + if err := server.Stop(ctx); err != nil { + log.Errorf("failed to stop kafka consumer: %v", err) + } + } +} + +func (s *KafkaConsumerServer) Start(ctx context.Context) error { + if err := s.messageHandler.Start(); err != nil { + return err + } + + s.wg.Add(1) + go s.consumeMessages(ctx) + return nil +} + +func (s *KafkaConsumerServer) Stop(ctx context.Context) error { + s.wg.Wait() + if err := s.messageHandler.Stop(); err != nil { + return err + } + return s.consumer.Close() +} + +func (s *KafkaConsumerServer) consumeMessages(ctx context.Context) { + defer s.wg.Done() + + for { + select { + case <-ctx.Done(): + return + default: + message, err := s.consumer.Reader.ReadMessage(ctx) + log.Infof("message data: %v", message) + if err != nil { + if s.consumer.IsTransientNetworkError(err) { + continue + } + return + } + + if err := s.messageHandler.HandleMessage(&message); err != nil { + log.Errorf("failed to handle message: %v", err) + } + } + } +} diff --git a/app/websocket/internal/server/server.go b/app/websocket/internal/server/server.go index b9003fe..df23c5c 100644 --- a/app/websocket/internal/server/server.go +++ b/app/websocket/internal/server/server.go @@ -5,4 +5,4 @@ import ( ) // ProviderSet is server providers. -var ProviderSet = wire.NewSet(NewGRPCServer, NewHTTPServer, NewWebSocketServer) +var ProviderSet = wire.NewSet(NewGRPCServer, NewHTTPServer, NewWebSocketServer, NewKafkaConsumerServer) diff --git a/app/websocket/internal/service/message_handler.go b/app/websocket/internal/service/message_handler.go new file mode 100644 index 0000000..4083d42 --- /dev/null +++ b/app/websocket/internal/service/message_handler.go @@ -0,0 +1,92 @@ +package service + +import ( + "encoding/json" + "fmt" + "log" + + kafkago "github.com/segmentio/kafka-go" +) + +// MessageHandler 处理 Kafka 消息 +type MessageHandler struct { + wsService *WebSocketService +} + +// WebSocketMessage 定义 WebSocket 消息结构 +type WebSocketMessage struct { + Type string `json:"type"` + Content string `json:"content"` + Payload json.RawMessage `json:"payload,omitempty"` +} + +func NewMessageHandler(wsService *WebSocketService) *MessageHandler { + return &MessageHandler{ + wsService: wsService, + } +} + +func (h *MessageHandler) Start() error { + return nil +} + +func (h *MessageHandler) Stop() error { + return nil +} + +// parseJSONMessage 解析并格式化 JSON 消息 +func (h *MessageHandler) parseJSONMessage(message []byte) (string, error) { + var jsonData interface{} + if err := json.Unmarshal(message, &jsonData); err != nil { + return "", fmt.Errorf("failed to parse JSON: %v", err) + } + + prettyJSON, err := json.MarshalIndent(jsonData, "", " ") + if err != nil { + return "", fmt.Errorf("failed to format JSON: %v", err) + } + + return string(prettyJSON), nil +} + +// parseWebSocketMessage 解析为 WebSocketMessage 结构 +func (h *MessageHandler) parseWebSocketMessage(message []byte) (*WebSocketMessage, error) { + var wsMsg WebSocketMessage + if err := json.Unmarshal(message, &wsMsg); err != nil { + return nil, fmt.Errorf("failed to parse WebSocket message: %v", err) + } + return &wsMsg, nil +} + +func (h *MessageHandler) HandleMessage(message *kafkago.Message) error { + // 解析并打印格式化的 JSON + prettyJSON, err := h.parseJSONMessage(message.Value) + if err != nil { + log.Printf("Error parsing message: %v", err) + return err + } + fmt.Printf("Received message:\n%s\n", prettyJSON) + + // 解析为 WebSocketMessage + wsMsg, err := h.parseWebSocketMessage(message.Value) + if err != nil { + log.Printf("Error parsing WebSocket message: %v", err) + return err + } + + // 根据消息类型处理不同的业务逻辑 + switch wsMsg.Type { + case "message": + // 处理普通消息 + return h.wsService.Broadcast(message.Value) + case "broadcast": + // 广播消息给所有连接的客户端 + return h.wsService.Broadcast(wsMsg.Payload) + case "private": + // 处理私聊消息 + return h.wsService.SendPrivateMessage(wsMsg.Payload) + default: + log.Printf("Unknown message type: %s", wsMsg.Type) + return nil + } +} diff --git a/app/websocket/internal/service/service.go b/app/websocket/internal/service/service.go index 204d6d7..0810141 100644 --- a/app/websocket/internal/service/service.go +++ b/app/websocket/internal/service/service.go @@ -1,6 +1,8 @@ package service -import "github.com/google/wire" +import ( + "github.com/google/wire" +) // ProviderSet is service providers. -var ProviderSet = wire.NewSet(NewWebSocketService) +var ProviderSet = wire.NewSet(NewWebSocketService, NewMessageHandler) diff --git a/app/websocket/internal/service/websocket.go b/app/websocket/internal/service/websocket.go index be7392e..e86a8cf 100644 --- a/app/websocket/internal/service/websocket.go +++ b/app/websocket/internal/service/websocket.go @@ -2,20 +2,76 @@ package service import ( "context" + "encoding/json" "ky-go-kratos/app/websocket/internal/biz" + "log" + "sync" ) // WebSocketService is a WebSocket service. type WebSocketService struct { - uc *biz.WebSocketUsecase + uc *biz.WebSocketUsecase + clients map[string]*Client + clientsMux sync.RWMutex +} + +type Client struct { + ID string + Send chan []byte + Topics map[string]bool } // NewWebSocketService new a WebSocket service. func NewWebSocketService(uc *biz.WebSocketUsecase) *WebSocketService { - return &WebSocketService{uc: uc} + return &WebSocketService{ + uc: uc, + clients: make(map[string]*Client), + } } // HandleMessage handles incoming WebSocket messages. func (s *WebSocketService) HandleMessage(ctx context.Context, message []byte) error { return s.uc.HandleMessage(ctx, message) } + +// Broadcast 广播消息给所有客户端 +func (s *WebSocketService) Broadcast(message json.RawMessage) error { + s.clientsMux.RLock() + defer s.clientsMux.RUnlock() + + for _, client := range s.clients { + select { + case client.Send <- message: + default: + log.Printf("Client %s buffer is full, message dropped", client.ID) + } + } + return nil +} + +// SendPrivateMessage 发送私聊消息 +func (s *WebSocketService) SendPrivateMessage(message json.RawMessage) error { + var msg struct { + To string `json:"to"` + Content json.RawMessage `json:"content"` + } + + if err := json.Unmarshal(message, &msg); err != nil { + return err + } + + s.clientsMux.RLock() + client, exists := s.clients[msg.To] + s.clientsMux.RUnlock() + + if !exists { + return nil + } + + select { + case client.Send <- msg.Content: + default: + log.Printf("Client %s buffer is full, message dropped", client.ID) + } + return nil +} diff --git a/pkg/kafka/consumer.go b/pkg/kafka/consumer.go index 82b3441..85ac62b 100644 --- a/pkg/kafka/consumer.go +++ b/pkg/kafka/consumer.go @@ -1 +1,39 @@ package kafka + +import ( + "errors" + "io" + "syscall" + "time" + + "github.com/segmentio/kafka-go" +) + +type KafkaConsumer struct { + Reader *kafka.Reader +} + +func NewKafkaReader(brokers []string, topic string, partition int) *KafkaConsumer { + return &KafkaConsumer{ + Reader: kafka.NewReader(kafka.ReaderConfig{ + Brokers: brokers, + Topic: topic, + Partition: partition, + MinBytes: 10e3, + MaxBytes: 10e6, + MaxWait: 500 * time.Millisecond, + CommitInterval: 5 * time.Second, + }), + } +} + +func (c *KafkaConsumer) IsTransientNetworkError(err error) bool { + return errors.Is(err, io.ErrUnexpectedEOF) || + errors.Is(err, syscall.ECONNREFUSED) || + errors.Is(err, syscall.ECONNRESET) || + errors.Is(err, syscall.EPIPE) +} + +func (c *KafkaConsumer) Close() error { + return c.Reader.Close() +}