package tariff import ( "context" "fmt" "log" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/zap" "google.golang.org/protobuf/proto" "gitea.pena/PenaSide/customer/internal/models" "gitea.pena/PenaSide/customer/internal/proto/broker" "gitea.pena/PenaSide/customer/internal/utils/transfer" ) type ConsumerDeps struct { Logger *zap.Logger Client *kgo.Client } type Consumer struct { logger *zap.Logger client *kgo.Client } func NewConsumer(deps ConsumerDeps) *Consumer { if deps.Logger == nil { log.Panicln("logger is nil on ") } if deps.Client == nil { log.Panicln("Kafka client is nil on ") } return &Consumer{ logger: deps.Logger, client: deps.Client, } } func (receiver *Consumer) FetchPrivileges(ctx context.Context) []models.Privilege { fetches := receiver.client.PollFetches(ctx) iter := fetches.RecordIter() tariffs := make([]models.Tariff, 0) privileges := make([]models.Privilege, 0) for !iter.Done() { record := iter.Next() tariff := broker.TariffMessage{} if err := proto.Unmarshal(record.Value, &tariff); err != nil { receiver.logger.Error(fmt.Sprintf("error decoding message: %v\n", err)) continue } tariffs = append(tariffs, *transfer.TariffMessageProtoToTariffModel(&tariff)) } for _, tariff := range tariffs { privileges = append(privileges, tariff.Privileges...) } return privileges }