package tariff import ( "context" "fmt" "gitea.pena/PenaSide/codeword/internal/models" "gitea.pena/PenaSide/codeword/internal/utils/transfer" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/zap" "google.golang.org/protobuf/proto" "log" ) type ProducerDeps struct { Logger *zap.Logger Client *kgo.Client Topic string } type Producer struct { logger *zap.Logger client *kgo.Client topic string } func NewProducer(deps ProducerDeps) *Producer { if deps.Logger == nil { log.Panicln("logger is nil on ") } if deps.Client == nil { log.Panicln("Kafka client is nil on ") } return &Producer{ logger: deps.Logger, client: deps.Client, topic: deps.Topic, } } func (p *Producer) Send(ctx context.Context, userID string, tariff *models.Tariff) error { fmt.Println("PMTP0", tariff) bytes, err := proto.Marshal(transfer.TariffModelToProtoMessage(userID, tariff)) if err != nil { p.logger.Error("failed to marshal tariff model", zap.Error(err)) return err } p.logger.Info("PRESEND", zap.String("data", string(bytes))) // упростил, возможно зря, но теперь возвращаем одну ошибку, просто прерываем цикл при первой встретившейся ошибке err = p.client.ProduceSync(ctx, &kgo.Record{Topic: p.topic, Value: bytes}).FirstErr() if err != nil { p.logger.Error("failed to send tariff to Kafka", zap.Error(err)) return err } return nil }