notifier/internal/workers/consumer.go

69 lines
1.3 KiB
Go
Raw Normal View History

2024-03-31 20:04:15 +00:00
package workers
import (
"context"
"encoding/json"
2024-03-31 20:04:15 +00:00
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"mailnotifier/internal/models"
2024-03-31 20:04:15 +00:00
"mailnotifier/internal/repository"
"time"
)
type Consumer struct {
repo *repository.Repository
kafkaClient *kgo.Client
logger *zap.Logger
2024-03-31 20:04:15 +00:00
}
type ConsumerDeps struct {
Repo *repository.Repository
KafkaClient *kgo.Client
Logger *zap.Logger
2024-03-31 20:04:15 +00:00
}
func NewConsumerWC(deps ConsumerDeps) *Consumer {
return &Consumer{
repo: deps.Repo,
kafkaClient: deps.KafkaClient,
logger: deps.Logger,
2024-03-31 20:04:15 +00:00
}
}
func (c *Consumer) Start(ctx context.Context) {
2024-04-02 15:11:02 +00:00
ticker := time.NewTicker(30 * time.Minute)
2024-03-31 20:04:15 +00:00
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.consumeMessages(ctx)
2024-03-31 20:04:15 +00:00
case <-ctx.Done():
return
}
}
}
// тут только консюмим по тикеру и инзертим в монгу
func (c *Consumer) consumeMessages(ctx context.Context) {
fetches := c.kafkaClient.PollFetches(ctx)
iter := fetches.RecordIter()
for !iter.Done() {
record := iter.Next()
var message models.Message
err := json.Unmarshal(record.Value, &message)
if err != nil {
c.logger.Error("error unmarshal kafka message:", zap.Error(err))
}
err = c.repo.Insert(ctx, message)
if err != nil {
c.logger.Error("error insert kafka data to mongo:", zap.Error(err))
}
}
}