Compare commits
2 Commits
f47baeac5f
...
e9f0f8fd40
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9f0f8fd40 | ||
|
|
ae2389c0b9 |
@ -34,7 +34,7 @@ func init() {
|
|||||||
flag.StringVar(&flagconf, "conf", "../../configs", "config path, eg: -conf config.yaml")
|
flag.StringVar(&flagconf, "conf", "../../configs", "config path, eg: -conf config.yaml")
|
||||||
}
|
}
|
||||||
|
|
||||||
func newApp(logger log.Logger, ws *server.WebSocketServer, gs *grpc.Server, hs *http.Server) *kratos.App {
|
func newApp(logger log.Logger, ks *server.KafkaConsumerServer, ws *server.WebSocketServer, gs *grpc.Server, hs *http.Server) *kratos.App {
|
||||||
return kratos.New(
|
return kratos.New(
|
||||||
kratos.ID(id),
|
kratos.ID(id),
|
||||||
kratos.Name(Name),
|
kratos.Name(Name),
|
||||||
@ -42,6 +42,7 @@ func newApp(logger log.Logger, ws *server.WebSocketServer, gs *grpc.Server, hs *
|
|||||||
kratos.Metadata(map[string]string{}),
|
kratos.Metadata(map[string]string{}),
|
||||||
kratos.Logger(logger),
|
kratos.Logger(logger),
|
||||||
kratos.Server(
|
kratos.Server(
|
||||||
|
ks,
|
||||||
ws,
|
ws,
|
||||||
gs,
|
gs,
|
||||||
hs,
|
hs,
|
||||||
|
|||||||
@ -18,13 +18,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// wireApp init kratos application.
|
// wireApp init kratos application.
|
||||||
func wireApp(c *conf.Server, d *conf.Data, logger log.Logger) (*kratos.App, func(), error) {
|
func wireApp(*conf.Server, *conf.Data, log.Logger) (*kratos.App, func(), error) {
|
||||||
wire.Build(
|
panic(wire.Build(server.ProviderSet, data.ProviderSet, biz.ProviderSet, service.ProviderSet, newApp))
|
||||||
data.ProviderSet,
|
|
||||||
biz.ProviderSet,
|
|
||||||
service.ProviderSet,
|
|
||||||
server.ProviderSet,
|
|
||||||
newApp,
|
|
||||||
)
|
|
||||||
return nil, nil, nil
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -23,19 +23,21 @@ import (
|
|||||||
// Injectors from wire.go:
|
// Injectors from wire.go:
|
||||||
|
|
||||||
// wireApp init kratos application.
|
// wireApp init kratos application.
|
||||||
func wireApp(c *conf.Server, d *conf.Data, logger log.Logger) (*kratos.App, func(), error) {
|
func wireApp(confServer *conf.Server, confData *conf.Data, logger log.Logger) (*kratos.App, func(), error) {
|
||||||
dataData, cleanup, err := data.NewData(d, logger)
|
dataData, cleanup, err := data.NewData(confData, logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
webSocketRepo := data.NewWebSocketRepo(dataData, logger)
|
webSocketRepo := data.NewWebSocketRepo(dataData, logger)
|
||||||
webSocketUsecase := biz.NewWebSocketUsecase(webSocketRepo)
|
webSocketUsecase := biz.NewWebSocketUsecase(webSocketRepo)
|
||||||
webSocketService := service.NewWebSocketService(webSocketUsecase)
|
webSocketService := service.NewWebSocketService(webSocketUsecase)
|
||||||
webSocketServer := server.NewWebSocketServer(c, webSocketService)
|
kafkaConsumerServer, cleanup2 := server.NewKafkaConsumerServer(confData, webSocketService)
|
||||||
grpcServer := server.NewGRPCServer(c, logger)
|
webSocketServer := server.NewWebSocketServer(confServer, webSocketService)
|
||||||
httpServer := server.NewHTTPServer(c, logger)
|
grpcServer := server.NewGRPCServer(confServer, logger)
|
||||||
app := newApp(logger, webSocketServer, grpcServer, httpServer)
|
httpServer := server.NewHTTPServer(confServer, logger)
|
||||||
|
app := newApp(logger, kafkaConsumerServer, webSocketServer, grpcServer, httpServer)
|
||||||
return app, func() {
|
return app, func() {
|
||||||
|
cleanup2()
|
||||||
cleanup()
|
cleanup()
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -20,3 +20,5 @@ data:
|
|||||||
brokers:
|
brokers:
|
||||||
- 47.108.232.131:9092
|
- 47.108.232.131:9092
|
||||||
topic: websocket-topic
|
topic: websocket-topic
|
||||||
|
partition: 0
|
||||||
|
group_id: websocket-topic
|
||||||
|
|||||||
@ -354,6 +354,8 @@ type Data_Kafka struct {
|
|||||||
state protoimpl.MessageState `protogen:"open.v1"`
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
Brokers []string `protobuf:"bytes,1,rep,name=brokers,proto3" json:"brokers,omitempty"`
|
Brokers []string `protobuf:"bytes,1,rep,name=brokers,proto3" json:"brokers,omitempty"`
|
||||||
Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"`
|
Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"`
|
||||||
|
Partition int64 `protobuf:"varint,3,opt,name=partition,proto3" json:"partition,omitempty"`
|
||||||
|
GroupId string `protobuf:"bytes,4,opt,name=group_id,json=groupId,proto3" json:"group_id,omitempty"`
|
||||||
unknownFields protoimpl.UnknownFields
|
unknownFields protoimpl.UnknownFields
|
||||||
sizeCache protoimpl.SizeCache
|
sizeCache protoimpl.SizeCache
|
||||||
}
|
}
|
||||||
@ -402,6 +404,20 @@ func (x *Data_Kafka) GetTopic() string {
|
|||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x *Data_Kafka) GetPartition() int64 {
|
||||||
|
if x != nil {
|
||||||
|
return x.Partition
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Data_Kafka) GetGroupId() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.GroupId
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
var File_conf_conf_proto protoreflect.FileDescriptor
|
var File_conf_conf_proto protoreflect.FileDescriptor
|
||||||
|
|
||||||
const file_conf_conf_proto_rawDesc = "" +
|
const file_conf_conf_proto_rawDesc = "" +
|
||||||
@ -425,12 +441,14 @@ const file_conf_conf_proto_rawDesc = "" +
|
|||||||
"\atimeout\x18\x03 \x01(\v2\x19.google.protobuf.DurationR\atimeout\x1a3\n" +
|
"\atimeout\x18\x03 \x01(\v2\x19.google.protobuf.DurationR\atimeout\x1a3\n" +
|
||||||
"\tWebSocket\x12\x12\n" +
|
"\tWebSocket\x12\x12\n" +
|
||||||
"\x04addr\x18\x01 \x01(\tR\x04addr\x12\x12\n" +
|
"\x04addr\x18\x01 \x01(\tR\x04addr\x12\x12\n" +
|
||||||
"\x04path\x18\x02 \x01(\tR\x04path\"m\n" +
|
"\x04path\x18\x02 \x01(\tR\x04path\"\xa6\x01\n" +
|
||||||
"\x04Data\x12,\n" +
|
"\x04Data\x12,\n" +
|
||||||
"\x05kafka\x18\x01 \x01(\v2\x16.kratos.api.Data.KafkaR\x05kafka\x1a7\n" +
|
"\x05kafka\x18\x01 \x01(\v2\x16.kratos.api.Data.KafkaR\x05kafka\x1ap\n" +
|
||||||
"\x05Kafka\x12\x18\n" +
|
"\x05Kafka\x12\x18\n" +
|
||||||
"\abrokers\x18\x01 \x03(\tR\abrokers\x12\x14\n" +
|
"\abrokers\x18\x01 \x03(\tR\abrokers\x12\x14\n" +
|
||||||
"\x05topic\x18\x02 \x01(\tR\x05topicB/Z-ky-go-kratos/app/websocket/internal/conf;confb\x06proto3"
|
"\x05topic\x18\x02 \x01(\tR\x05topic\x12\x1c\n" +
|
||||||
|
"\tpartition\x18\x03 \x01(\x03R\tpartition\x12\x19\n" +
|
||||||
|
"\bgroup_id\x18\x04 \x01(\tR\agroupIdB/Z-ky-go-kratos/app/websocket/internal/conf;confb\x06proto3"
|
||||||
|
|
||||||
var (
|
var (
|
||||||
file_conf_conf_proto_rawDescOnce sync.Once
|
file_conf_conf_proto_rawDescOnce sync.Once
|
||||||
|
|||||||
@ -34,6 +34,8 @@ message Data {
|
|||||||
message Kafka {
|
message Kafka {
|
||||||
repeated string brokers = 1;
|
repeated string brokers = 1;
|
||||||
string topic = 2;
|
string topic = 2;
|
||||||
|
int64 partition = 3;
|
||||||
|
string group_id = 4;
|
||||||
}
|
}
|
||||||
Kafka kafka = 1;
|
Kafka kafka = 1;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -30,7 +30,6 @@ type Message struct {
|
|||||||
|
|
||||||
// SendMessage sends a message to Kafka.
|
// SendMessage sends a message to Kafka.
|
||||||
func (r *WebSocketRepo) SendMessage(ctx context.Context, message []byte) error {
|
func (r *WebSocketRepo) SendMessage(ctx context.Context, message []byte) error {
|
||||||
r.log.Warnf(">>>>>>>> %v", string(message))
|
|
||||||
err := r.data.kafka.SendToKafkaMessage(ctx, "websocket", message)
|
err := r.data.kafka.SendToKafkaMessage(ctx, "websocket", message)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.log.Errorf("failed to send message to kafka: %v", err)
|
r.log.Errorf("failed to send message to kafka: %v", err)
|
||||||
|
|||||||
68
app/websocket/internal/server/kafka_consume.go
Normal file
68
app/websocket/internal/server/kafka_consume.go
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"ky-go-kratos/app/websocket/internal/conf"
|
||||||
|
"ky-go-kratos/app/websocket/internal/service"
|
||||||
|
"ky-go-kratos/pkg/kafka"
|
||||||
|
|
||||||
|
"github.com/go-kratos/kratos/v2/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
type KafkaConsumerServer struct {
|
||||||
|
consumer *kafka.KafkaConsumer
|
||||||
|
messageHandler *service.MessageHandler
|
||||||
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewKafkaConsumerServer(c *conf.Data, wsService *service.WebSocketService) (*KafkaConsumerServer, func()) {
|
||||||
|
consumer := kafka.NewKafkaReader(c.Kafka.Brokers, c.Kafka.Topic, int(c.Kafka.Partition), c.Kafka.GroupId)
|
||||||
|
messageHandler := service.NewMessageHandler(wsService, consumer.Reader)
|
||||||
|
|
||||||
|
server := &KafkaConsumerServer{
|
||||||
|
consumer: consumer,
|
||||||
|
messageHandler: messageHandler,
|
||||||
|
}
|
||||||
|
|
||||||
|
return server, func() {
|
||||||
|
if err := server.Stop(context.Background()); err != nil {
|
||||||
|
log.Errorf("failed to stop kafka consumer: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *KafkaConsumerServer) Start(ctx context.Context) error {
|
||||||
|
s.wg.Add(1)
|
||||||
|
go s.consumeMessages(ctx)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *KafkaConsumerServer) Stop(ctx context.Context) error {
|
||||||
|
s.wg.Wait()
|
||||||
|
return s.consumer.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *KafkaConsumerServer) consumeMessages(ctx context.Context) {
|
||||||
|
defer s.wg.Done()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
message, err := s.consumer.Reader.ReadMessage(ctx)
|
||||||
|
if err != nil {
|
||||||
|
if s.consumer.IsTransientNetworkError(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.messageHandler.HandleMessage(&message); err != nil {
|
||||||
|
log.Errorf("failed to handle message: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -5,4 +5,4 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// ProviderSet is server providers.
|
// ProviderSet is server providers.
|
||||||
var ProviderSet = wire.NewSet(NewGRPCServer, NewHTTPServer, NewWebSocketServer)
|
var ProviderSet = wire.NewSet(NewGRPCServer, NewHTTPServer, NewWebSocketServer, NewKafkaConsumerServer)
|
||||||
|
|||||||
@ -20,8 +20,9 @@ type WebSocketServer struct {
|
|||||||
broadcast chan []byte
|
broadcast chan []byte
|
||||||
register chan *websocket.Conn
|
register chan *websocket.Conn
|
||||||
unregister chan *websocket.Conn
|
unregister chan *websocket.Conn
|
||||||
mu sync.Mutex
|
mu sync.RWMutex
|
||||||
server *http.Server
|
server *http.Server
|
||||||
|
done chan struct{} // 添加 done 通道用于优雅关闭
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWebSocketServer(c *conf.Server, svc *service.WebSocketService) *WebSocketServer {
|
func NewWebSocketServer(c *conf.Server, svc *service.WebSocketService) *WebSocketServer {
|
||||||
@ -37,6 +38,7 @@ func NewWebSocketServer(c *conf.Server, svc *service.WebSocketService) *WebSocke
|
|||||||
broadcast: make(chan []byte),
|
broadcast: make(chan []byte),
|
||||||
register: make(chan *websocket.Conn),
|
register: make(chan *websocket.Conn),
|
||||||
unregister: make(chan *websocket.Conn),
|
unregister: make(chan *websocket.Conn),
|
||||||
|
done: make(chan struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -63,6 +65,22 @@ func (s *WebSocketServer) Start(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *WebSocketServer) Stop(ctx context.Context) error {
|
func (s *WebSocketServer) Stop(ctx context.Context) error {
|
||||||
|
// 关闭所有 WebSocket 连接
|
||||||
|
s.mu.Lock()
|
||||||
|
for client := range s.clients {
|
||||||
|
client.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseGoingAway, "Server is shutting down"))
|
||||||
|
client.Close()
|
||||||
|
delete(s.clients, client)
|
||||||
|
}
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
// 关闭通道
|
||||||
|
close(s.done)
|
||||||
|
close(s.broadcast)
|
||||||
|
close(s.register)
|
||||||
|
close(s.unregister)
|
||||||
|
|
||||||
|
// 关闭 HTTP 服务器
|
||||||
if s.server != nil {
|
if s.server != nil {
|
||||||
return s.server.Shutdown(ctx)
|
return s.server.Shutdown(ctx)
|
||||||
}
|
}
|
||||||
@ -90,12 +108,16 @@ func (s *WebSocketServer) readPump(conn *websocket.Conn) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.done:
|
||||||
|
return
|
||||||
|
default:
|
||||||
_, message, err := conn.ReadMessage()
|
_, message, err := conn.ReadMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
||||||
log.Printf("error: %v", err)
|
log.Printf("error: %v", err)
|
||||||
}
|
}
|
||||||
break
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Received message: %v", string(message))
|
log.Printf("Received message: %v", string(message))
|
||||||
@ -107,13 +129,20 @@ func (s *WebSocketServer) readPump(conn *websocket.Conn) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 广播消息给所有客户端
|
// 广播消息给所有客户端
|
||||||
s.broadcast <- message
|
select {
|
||||||
|
case s.broadcast <- message:
|
||||||
|
case <-s.done:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WebSocketServer) run() {
|
func (s *WebSocketServer) run() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case <-s.done:
|
||||||
|
return
|
||||||
case client := <-s.register:
|
case client := <-s.register:
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
s.clients[client] = true
|
s.clients[client] = true
|
||||||
|
|||||||
95
app/websocket/internal/service/kafka_message_handler.go
Normal file
95
app/websocket/internal/service/kafka_message_handler.go
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
|
||||||
|
kafkago "github.com/segmentio/kafka-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MessageHandler 处理 Kafka 消息
|
||||||
|
type MessageHandler struct {
|
||||||
|
wsService *WebSocketService
|
||||||
|
reader *kafkago.Reader
|
||||||
|
}
|
||||||
|
|
||||||
|
// WebSocketMessage 定义 WebSocket 消息结构
|
||||||
|
type WebSocketMessage struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
Content string `json:"content"`
|
||||||
|
Payload json.RawMessage `json:"payload,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMessageHandler(wsService *WebSocketService, reader *kafkago.Reader) *MessageHandler {
|
||||||
|
return &MessageHandler{
|
||||||
|
wsService: wsService,
|
||||||
|
reader: reader,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseJSONMessage 解析并格式化 JSON 消息
|
||||||
|
func (h *MessageHandler) parseJSONMessage(message []byte) (string, error) {
|
||||||
|
var jsonData interface{}
|
||||||
|
if err := json.Unmarshal(message, &jsonData); err != nil {
|
||||||
|
return "", fmt.Errorf("failed to parse JSON: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
prettyJSON, err := json.MarshalIndent(jsonData, "", " ")
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed to format JSON: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return string(prettyJSON), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseWebSocketMessage 解析为 WebSocketMessage 结构
|
||||||
|
func (h *MessageHandler) parseWebSocketMessage(message []byte) (*WebSocketMessage, error) {
|
||||||
|
var wsMsg WebSocketMessage
|
||||||
|
if err := json.Unmarshal(message, &wsMsg); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse WebSocket message: %v", err)
|
||||||
|
}
|
||||||
|
return &wsMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *MessageHandler) HandleMessage(message *kafkago.Message) error {
|
||||||
|
// 解析并打印格式化的 JSON
|
||||||
|
prettyJSON, err := h.parseJSONMessage(message.Value)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error parsing message: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fmt.Printf("kafka parseJSONMessage:\n%s\n", prettyJSON)
|
||||||
|
|
||||||
|
// 解析为 WebSocketMessage
|
||||||
|
wsMsg, err := h.parseWebSocketMessage(message.Value)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error parsing WebSocket message: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 根据消息类型处理不同的业务逻辑
|
||||||
|
var processErr error
|
||||||
|
switch wsMsg.Type {
|
||||||
|
case "message":
|
||||||
|
// 处理普通消息
|
||||||
|
processErr = h.wsService.Broadcast(message.Value)
|
||||||
|
case "broadcast":
|
||||||
|
// 广播消息给所有连接的客户端
|
||||||
|
processErr = h.wsService.Broadcast(wsMsg.Payload)
|
||||||
|
case "private":
|
||||||
|
// 处理私聊消息
|
||||||
|
processErr = h.wsService.SendPrivateMessage(wsMsg.Payload)
|
||||||
|
default:
|
||||||
|
log.Printf("Unknown message type: %s", wsMsg.Type)
|
||||||
|
}
|
||||||
|
|
||||||
|
if processErr != nil {
|
||||||
|
log.Printf("Error processing message: %v", processErr)
|
||||||
|
} else {
|
||||||
|
log.Printf("Message processed successfully - Topic: %s, Partition: %d, Offset: %d",
|
||||||
|
message.Topic, message.Partition, message.Offset)
|
||||||
|
}
|
||||||
|
|
||||||
|
return processErr
|
||||||
|
}
|
||||||
@ -1,6 +1,8 @@
|
|||||||
package service
|
package service
|
||||||
|
|
||||||
import "github.com/google/wire"
|
import (
|
||||||
|
"github.com/google/wire"
|
||||||
|
)
|
||||||
|
|
||||||
// ProviderSet is service providers.
|
// ProviderSet is service providers.
|
||||||
var ProviderSet = wire.NewSet(NewWebSocketService)
|
var ProviderSet = wire.NewSet(NewWebSocketService, NewMessageHandler)
|
||||||
|
|||||||
@ -2,20 +2,76 @@ package service
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"ky-go-kratos/app/websocket/internal/biz"
|
"ky-go-kratos/app/websocket/internal/biz"
|
||||||
|
"log"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WebSocketService is a WebSocket service.
|
// WebSocketService is a WebSocket service.
|
||||||
type WebSocketService struct {
|
type WebSocketService struct {
|
||||||
uc *biz.WebSocketUsecase
|
uc *biz.WebSocketUsecase
|
||||||
|
clients map[string]*Client
|
||||||
|
clientsMux sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
type Client struct {
|
||||||
|
ID string
|
||||||
|
Send chan []byte
|
||||||
|
Topics map[string]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWebSocketService new a WebSocket service.
|
// NewWebSocketService new a WebSocket service.
|
||||||
func NewWebSocketService(uc *biz.WebSocketUsecase) *WebSocketService {
|
func NewWebSocketService(uc *biz.WebSocketUsecase) *WebSocketService {
|
||||||
return &WebSocketService{uc: uc}
|
return &WebSocketService{
|
||||||
|
uc: uc,
|
||||||
|
clients: make(map[string]*Client),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleMessage handles incoming WebSocket messages.
|
// HandleMessage handles incoming WebSocket messages.
|
||||||
func (s *WebSocketService) HandleMessage(ctx context.Context, message []byte) error {
|
func (s *WebSocketService) HandleMessage(ctx context.Context, message []byte) error {
|
||||||
return s.uc.HandleMessage(ctx, message)
|
return s.uc.HandleMessage(ctx, message)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Broadcast 广播消息给所有客户端
|
||||||
|
func (s *WebSocketService) Broadcast(message json.RawMessage) error {
|
||||||
|
s.clientsMux.RLock()
|
||||||
|
defer s.clientsMux.RUnlock()
|
||||||
|
|
||||||
|
for _, client := range s.clients {
|
||||||
|
select {
|
||||||
|
case client.Send <- message:
|
||||||
|
default:
|
||||||
|
log.Printf("Client %s buffer is full, message dropped", client.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendPrivateMessage 发送私聊消息
|
||||||
|
func (s *WebSocketService) SendPrivateMessage(message json.RawMessage) error {
|
||||||
|
var msg struct {
|
||||||
|
To string `json:"to"`
|
||||||
|
Content json.RawMessage `json:"content"`
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.Unmarshal(message, &msg); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.clientsMux.RLock()
|
||||||
|
client, exists := s.clients[msg.To]
|
||||||
|
s.clientsMux.RUnlock()
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case client.Send <- msg.Content:
|
||||||
|
default:
|
||||||
|
log.Printf("Client %s buffer is full, message dropped", client.ID)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@ -1 +1,40 @@
|
|||||||
package kafka
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/segmentio/kafka-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type KafkaConsumer struct {
|
||||||
|
Reader *kafka.Reader
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewKafkaReader(brokers []string, topic string, partition int, groupID string) *KafkaConsumer {
|
||||||
|
return &KafkaConsumer{
|
||||||
|
Reader: kafka.NewReader(kafka.ReaderConfig{
|
||||||
|
Brokers: brokers,
|
||||||
|
Topic: topic,
|
||||||
|
Partition: partition,
|
||||||
|
MinBytes: 10e3,
|
||||||
|
MaxBytes: 10e6,
|
||||||
|
MaxWait: 500 * time.Millisecond,
|
||||||
|
CommitInterval: 5 * time.Second,
|
||||||
|
// GroupID: groupID,
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *KafkaConsumer) IsTransientNetworkError(err error) bool {
|
||||||
|
return errors.Is(err, io.ErrUnexpectedEOF) ||
|
||||||
|
errors.Is(err, syscall.ECONNREFUSED) ||
|
||||||
|
errors.Is(err, syscall.ECONNRESET) ||
|
||||||
|
errors.Is(err, syscall.EPIPE)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *KafkaConsumer) Close() error {
|
||||||
|
return c.Reader.Close()
|
||||||
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user