package workers import ( "context" _ "embed" "encoding/json" "fmt" "github.com/twmb/franz-go/pkg/kgo" "go.mongodb.org/mongo-driver/bson/primitive" "go.uber.org/zap" "gitea.pena/PenaSide/notifier/internal/clients" "gitea.pena/PenaSide/notifier/internal/models" "gitea.pena/PenaSide/notifier/internal/repository" "time" ) //go:embed mail/register.tmpl var register string type Consumer struct { repo *repository.Repository kafkaClient *kgo.Client mailClient *clients.MailClient logger *zap.Logger } type ConsumerDeps struct { Repo *repository.Repository KafkaClient *kgo.Client MailClient *clients.MailClient Logger *zap.Logger } func NewConsumerWC(deps ConsumerDeps) *Consumer { return &Consumer{ repo: deps.Repo, kafkaClient: deps.KafkaClient, mailClient: deps.MailClient, logger: deps.Logger, } } func (c *Consumer) Start(ctx context.Context) { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: c.consumeMessages(ctx) case <-ctx.Done(): return } } } // тут только консюмим по тикеру и инзертим в монгу func (c *Consumer) consumeMessages(ctx context.Context) { fetches := c.kafkaClient.PollFetches(ctx) iter := fetches.RecordIter() for !iter.Done() { record := iter.Next() var message models.Message err := json.Unmarshal(record.Value, &message) if err != nil { c.logger.Error("error unmarshal kafka message:", zap.Error(err)) continue } err = c.processMsg(ctx, message) if err != nil { c.logger.Error("error processing kafka message:", zap.Error(err)) } } } func (c *Consumer) processMsg(ctx context.Context, message models.Message) error { message.ID = primitive.NewObjectID() err := c.repo.Insert(ctx, message) if err != nil { return fmt.Errorf("error insert kafka data to mongo: %w", err) } err = c.mailClient.MailSender(clients.SenderDeps{ Subject: "registration", Email: message.Email, Tmpl: register, }) if err != nil { return fmt.Errorf("error sending message to mailbox registration: %w", err) } message.SendRegistration = true err = c.repo.Update(ctx, message) if err != nil { return fmt.Errorf("error updating record for registration: %w", err) } return nil }