generated from PenaSide/GolangTemplate
62 lines
1.6 KiB
Go
62 lines
1.6 KiB
Go
![]() |
package tariff
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"log"
|
||
|
|
||
|
"github.com/twmb/franz-go/pkg/kgo"
|
||
|
"go.uber.org/zap"
|
||
|
"google.golang.org/protobuf/proto"
|
||
|
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/errors"
|
||
|
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models"
|
||
|
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/utils/transfer"
|
||
|
)
|
||
|
|
||
|
type ProducerDeps struct {
|
||
|
Logger *zap.Logger
|
||
|
Client *kgo.Client
|
||
|
Topic string
|
||
|
}
|
||
|
|
||
|
type Producer struct {
|
||
|
logger *zap.Logger
|
||
|
client *kgo.Client
|
||
|
topic string
|
||
|
}
|
||
|
|
||
|
func NewProducer(deps ProducerDeps) *Producer {
|
||
|
if deps.Logger == nil {
|
||
|
log.Panicln("logger is nil on <NewTariffProducer>")
|
||
|
}
|
||
|
|
||
|
if deps.Client == nil {
|
||
|
log.Panicln("Kafka client is nil on <NewTariffProducer>")
|
||
|
}
|
||
|
|
||
|
return &Producer{
|
||
|
logger: deps.Logger,
|
||
|
client: deps.Client,
|
||
|
topic: deps.Topic,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (receiver *Producer) Send(ctx context.Context, userID string, tariff *models.Tariff) errors.Error {
|
||
|
bytes, err := proto.Marshal(transfer.TariffModelToProtoMessage(userID, tariff))
|
||
|
if err != nil {
|
||
|
receiver.logger.Error("failed to marshal tariff model on <Send> of <TariffProducer>")
|
||
|
return errors.New(fmt.Errorf("failed to marshal tariff: %w", err), errors.ErrInternalError)
|
||
|
}
|
||
|
|
||
|
responses := receiver.client.ProduceSync(ctx, &kgo.Record{Value: bytes})
|
||
|
|
||
|
for _, response := range responses {
|
||
|
if response.Err != nil {
|
||
|
receiver.logger.Error("failed to send tariff on <Send> of <TariffProducer>", zap.Error(response.Err))
|
||
|
return errors.New(fmt.Errorf("failed to send tariff: %w", response.Err), errors.ErrInternalError)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|