core/internal/brokers/producer.go
pasha1coil d2b54ef05c
All checks were successful
Deploy / CreateImage (push) Successful in 2m46s
Deploy / ValidateConfig (push) Successful in 24s
Deploy / MigrateDatabase (push) Successful in 42s
Deploy / DeployService (push) Successful in 25s
added to MessageGigaChat field AccountID
2025-06-09 15:27:26 +03:00

81 lines
2.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package brokers
import (
"context"
"encoding/json"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"time"
)
type ProducerDeps struct {
KafkaClient *kgo.Client
Logger *zap.Logger
}
type Producer struct {
kafkaClient *kgo.Client
logger *zap.Logger
}
func NewProducer(deps ProducerDeps) *Producer {
return &Producer{
logger: deps.Logger,
kafkaClient: deps.KafkaClient,
}
}
type Message struct {
AccountID string
Email string
ServiceKey string
SendAt time.Time
}
func (p *Producer) ToMailNotify(ctx context.Context, message Message) error {
bytes, err := json.Marshal(message)
if err != nil {
p.logger.Error("error marshal message to kafka", zap.Error(err))
return err
}
responses := p.kafkaClient.ProduceSync(ctx, &kgo.Record{Value: bytes})
for _, response := range responses {
if response.Err != nil {
p.logger.Error("failed to send message for mail notify", zap.Error(response.Err))
return response.Err
}
}
return nil
}
type MessageGigaChat struct {
ID int64 `json:"id"`
QuizID int64 `json:"quiz_id"`
Sex int32 `json:"sex"` // 0 - female, 1 - male, 2 - not sex
Age string `json:"age"`
AccountID string `json:"account_id"`
}
// todo очень плохое решение что у нас два клиента к кафке и два продюсера у которых два метода самовырубащих эти клиенты, как только перестанет быть костылем надо будет переписать на нормальную логику
func (p *Producer) ToGigaChatNotify(ctx context.Context, message MessageGigaChat) error {
bytes, err := json.Marshal(message)
if err != nil {
p.logger.Error("error marshal message to kafka", zap.Error(err))
return err
}
responses := p.kafkaClient.ProduceSync(ctx, &kgo.Record{Value: bytes})
for _, response := range responses {
if response.Err != nil {
p.logger.Error("failed to send message for giga chat notify", zap.Error(response.Err))
return response.Err
}
}
return nil
}