notifier/internal/workers/consumer.go

104 lines
2.1 KiB
Go

package workers
import (
"context"
_ "embed"
"encoding/json"
"fmt"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"mailnotifier/internal/clients"
"mailnotifier/internal/models"
"mailnotifier/internal/repository"
"time"
)
//go:embed mail/register.tmpl
var register string
type Consumer struct {
repo *repository.Repository
kafkaClient *kgo.Client
mailClient *clients.MailClient
logger *zap.Logger
}
type ConsumerDeps struct {
Repo *repository.Repository
KafkaClient *kgo.Client
MailClient *clients.MailClient
Logger *zap.Logger
}
func NewConsumerWC(deps ConsumerDeps) *Consumer {
return &Consumer{
repo: deps.Repo,
kafkaClient: deps.KafkaClient,
mailClient: deps.MailClient,
logger: deps.Logger,
}
}
func (c *Consumer) Start(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.consumeMessages(ctx)
case <-ctx.Done():
return
}
}
}
// тут только консюмим по тикеру и инзертим в монгу
func (c *Consumer) consumeMessages(ctx context.Context) {
fetches := c.kafkaClient.PollFetches(ctx)
iter := fetches.RecordIter()
for !iter.Done() {
record := iter.Next()
var message models.Message
err := json.Unmarshal(record.Value, &message)
if err != nil {
c.logger.Error("error unmarshal kafka message:", zap.Error(err))
continue
}
err = c.processMsg(ctx, message)
if err != nil {
c.logger.Error("error processing kafka message:", zap.Error(err))
}
}
}
func (c *Consumer) processMsg(ctx context.Context, message models.Message) error {
err := c.repo.Insert(ctx, message)
if err != nil {
return fmt.Errorf("error insert kafka data to mongo: %w", err)
}
err = c.mailClient.MailSender(clients.SenderDeps{
Subject: "registration",
Email: message.Email,
Tmpl: register,
})
if err != nil {
return fmt.Errorf("error sending message to mailbox registration: %w", err)
}
message.SendRegistration = true
err = c.repo.Update(ctx, message)
if err != nil {
return fmt.Errorf("error updating record for registration: %w", err)
}
return nil
}