80 lines
2.0 KiB
Go
80 lines
2.0 KiB
Go
package brokers
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"github.com/twmb/franz-go/pkg/kgo"
|
||
"go.uber.org/zap"
|
||
"time"
|
||
)
|
||
|
||
type ProducerDeps struct {
|
||
KafkaClient *kgo.Client
|
||
Logger *zap.Logger
|
||
}
|
||
|
||
type Producer struct {
|
||
kafkaClient *kgo.Client
|
||
logger *zap.Logger
|
||
}
|
||
|
||
func NewProducer(deps ProducerDeps) *Producer {
|
||
return &Producer{
|
||
logger: deps.Logger,
|
||
kafkaClient: deps.KafkaClient,
|
||
}
|
||
}
|
||
|
||
type Message struct {
|
||
AccountID string
|
||
Email string
|
||
ServiceKey string
|
||
SendAt time.Time
|
||
}
|
||
|
||
func (p *Producer) ToMailNotify(ctx context.Context, message Message) error {
|
||
bytes, err := json.Marshal(message)
|
||
if err != nil {
|
||
p.logger.Error("error marshal message to kafka", zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
responses := p.kafkaClient.ProduceSync(ctx, &kgo.Record{Value: bytes})
|
||
for _, response := range responses {
|
||
if response.Err != nil {
|
||
p.logger.Error("failed to send message for mail notify", zap.Error(response.Err))
|
||
return response.Err
|
||
}
|
||
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
type MessageGigaChat struct {
|
||
ID int64 `json:"id"`
|
||
QuizID int64 `json:"quiz_id"`
|
||
Sex int32 `json:"sex"` // 0 - female, 1 - male, 2 - not sex
|
||
Age string `json:"age"`
|
||
}
|
||
|
||
// todo очень плохое решение что у нас два клиента к кафке и два продюсера у которых два метода самовырубащих эти клиенты, как только перестанет быть костылем надо будет переписать на нормальную логику
|
||
func (p *Producer) ToGigaChatNotify(ctx context.Context, message MessageGigaChat) error {
|
||
bytes, err := json.Marshal(message)
|
||
if err != nil {
|
||
p.logger.Error("error marshal message to kafka", zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
responses := p.kafkaClient.ProduceSync(ctx, &kgo.Record{Value: bytes})
|
||
for _, response := range responses {
|
||
if response.Err != nil {
|
||
p.logger.Error("failed to send message for giga chat notify", zap.Error(response.Err))
|
||
return response.Err
|
||
}
|
||
|
||
}
|
||
|
||
return nil
|
||
}
|