package brokers import ( "context" "encoding/json" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/zap" "time" ) type ProducerDeps struct { KafkaClient *kgo.Client Logger *zap.Logger } type Producer struct { kafkaClient *kgo.Client logger *zap.Logger } func NewProducer(deps ProducerDeps) *Producer { return &Producer{ logger: deps.Logger, kafkaClient: deps.KafkaClient, } } type Message struct { AccountID string Email string ServiceKey string SendAt time.Time } func (p *Producer) ToMailNotify(ctx context.Context, message Message) error { bytes, err := json.Marshal(message) if err != nil { p.logger.Error("error marshal message to kafka", zap.Error(err)) return err } responses := p.kafkaClient.ProduceSync(ctx, &kgo.Record{Value: bytes}) for _, response := range responses { if response.Err != nil { p.logger.Error("failed to send message for mail notify", zap.Error(response.Err)) return response.Err } } return nil }