customer/internal/interface/broker/tariff/consumer.go
Pasha 34a88a3a70
Some checks failed
Lint / Lint (push) Failing after 1m2s
rename go.mod
2024-11-18 21:44:09 +00:00

66 lines
1.4 KiB
Go

package tariff
import (
"context"
"fmt"
"log"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"gitea.pena/PenaSide/customer/internal/models"
"gitea.pena/PenaSide/customer/internal/proto/broker"
"gitea.pena/PenaSide/customer/internal/utils/transfer"
)
type ConsumerDeps struct {
Logger *zap.Logger
Client *kgo.Client
}
type Consumer struct {
logger *zap.Logger
client *kgo.Client
}
func NewConsumer(deps ConsumerDeps) *Consumer {
if deps.Logger == nil {
log.Panicln("logger is nil on <NewTariffConsumer>")
}
if deps.Client == nil {
log.Panicln("Kafka client is nil on <NewTariffConsumer>")
}
return &Consumer{
logger: deps.Logger,
client: deps.Client,
}
}
func (receiver *Consumer) FetchPrivileges(ctx context.Context) []models.Privilege {
fetches := receiver.client.PollFetches(ctx)
iter := fetches.RecordIter()
tariffs := make([]models.Tariff, 0)
privileges := make([]models.Privilege, 0)
for !iter.Done() {
record := iter.Next()
tariff := broker.TariffMessage{}
if err := proto.Unmarshal(record.Value, &tariff); err != nil {
receiver.logger.Error(fmt.Sprintf("error decoding message: %v\n", err))
continue
}
tariffs = append(tariffs, *transfer.TariffMessageProtoToTariffModel(&tariff))
}
for _, tariff := range tariffs {
privileges = append(privileges, tariff.Privileges...)
}
return privileges
}