go-kratos/app/websocket/internal/data/data.go

36 lines
750 B
Go

package data
import (
"ky-go-kratos/app/websocket/internal/conf"
"ky-go-kratos/pkg/kafka"
"github.com/go-kratos/kratos/v2/log"
"github.com/google/wire"
)
// ProviderSet is data providers.
var ProviderSet = wire.NewSet(NewData, NewWebSocketRepo)
// Data .
type Data struct {
kafka *kafka.KafkaProducer
}
// NewData .
func NewData(c *conf.Data, logger log.Logger) (*Data, func(), error) {
log := log.NewHelper(logger)
// 创建 Kafka 生产者
kafkaWriter := kafka.NewKafkaWriter(c.Kafka.Brokers, c.Kafka.Topic)
cleanup := func() {
log.Info("closing the data resources")
if err := kafkaWriter.Close(); err != nil {
log.Error("failed to close kafka server: %v", err)
}
}
return &Data{
kafka: kafkaWriter,
}, cleanup, nil
}