core/internal/brokers/producer.go

80 lines
2.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package brokers
import (
"context"
"encoding/json"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"time"
)
type ProducerDeps struct {
KafkaClient *kgo.Client
Logger *zap.Logger
}
type Producer struct {
kafkaClient *kgo.Client
logger *zap.Logger
}
func NewProducer(deps ProducerDeps) *Producer {
return &Producer{
logger: deps.Logger,
kafkaClient: deps.KafkaClient,
}
}
type Message struct {
AccountID string
Email string
ServiceKey string
SendAt time.Time
}
func (p *Producer) ToMailNotify(ctx context.Context, message Message) error {
bytes, err := json.Marshal(message)
if err != nil {
p.logger.Error("error marshal message to kafka", zap.Error(err))
return err
}
responses := p.kafkaClient.ProduceSync(ctx, &kgo.Record{Value: bytes})
for _, response := range responses {
if response.Err != nil {
p.logger.Error("failed to send message for mail notify", zap.Error(response.Err))
return response.Err
}
}
return nil
}
type MessageGigaChat struct {
ID int64 `json:"id"`
QuizID int64 `json:"quiz_id"`
Sex bool `json:"sex"`
Age string `json:"age"`
}
// todo очень плохое решение что у нас два клиента к кафке и два продюсера у которых два метода самовырубащих эти клиенты, как только перестанет быть костылем надо будет переписать на нормальную логику
func (p *Producer) ToGigaChatNotify(ctx context.Context, message MessageGigaChat) error {
bytes, err := json.Marshal(message)
if err != nil {
p.logger.Error("error marshal message to kafka", zap.Error(err))
return err
}
responses := p.kafkaClient.ProduceSync(ctx, &kgo.Record{Value: bytes})
for _, response := range responses {
if response.Err != nil {
p.logger.Error("failed to send message for giga chat notify", zap.Error(response.Err))
return response.Err
}
}
return nil
}