package brokers import ( "context" "encoding/json" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/zap" "time" ) type ProducerDeps struct { KafkaClient *kgo.Client Logger *zap.Logger } type Producer struct { kafkaClient *kgo.Client logger *zap.Logger } func NewProducer(deps ProducerDeps) *Producer { return &Producer{ logger: deps.Logger, kafkaClient: deps.KafkaClient, } } type Message struct { AccountID string Email string ServiceKey string SendAt time.Time } func (p *Producer) ToMailNotify(ctx context.Context, message Message) error { bytes, err := json.Marshal(message) if err != nil { p.logger.Error("error marshal message to kafka", zap.Error(err)) return err } responses := p.kafkaClient.ProduceSync(ctx, &kgo.Record{Value: bytes}) for _, response := range responses { if response.Err != nil { p.logger.Error("failed to send message for mail notify", zap.Error(response.Err)) return response.Err } } return nil } type MessageGigaChat struct { ID int64 `json:"id"` QuizID int64 `json:"quiz_id"` Sex bool `json:"sex"` Age string `json:"age"` } // todo очень плохое решение что у нас два клиента к кафке и два продюсера у которых два метода самовырубащих эти клиенты, как только перестанет быть костылем надо будет переписать на нормальную логику func (p *Producer) ToGigaChatNotify(ctx context.Context, message MessageGigaChat) error { bytes, err := json.Marshal(message) if err != nil { p.logger.Error("error marshal message to kafka", zap.Error(err)) return err } responses := p.kafkaClient.ProduceSync(ctx, &kgo.Record{Value: bytes}) for _, response := range responses { if response.Err != nil { p.logger.Error("failed to send message for giga chat notify", zap.Error(response.Err)) return response.Err } } return nil }