171 lines
4.2 KiB
Go
171 lines
4.2 KiB
Go
|
package queueUpdater
|
||
|
|
||
|
import (
|
||
|
"amocrm/internal/models"
|
||
|
"amocrm/internal/workers_methods"
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
"github.com/twmb/franz-go/pkg/kgo"
|
||
|
"go.uber.org/zap"
|
||
|
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
type QueueUpdater struct {
|
||
|
logger *zap.Logger
|
||
|
kafkaClient *kgo.Client
|
||
|
methods *workers_methods.Methods
|
||
|
}
|
||
|
|
||
|
type Deps struct {
|
||
|
Logger *zap.Logger
|
||
|
KafkaClient *kgo.Client
|
||
|
Methods *workers_methods.Methods
|
||
|
}
|
||
|
|
||
|
func NewQueueUpdater(deps Deps) *QueueUpdater {
|
||
|
return &QueueUpdater{
|
||
|
logger: deps.Logger,
|
||
|
kafkaClient: deps.KafkaClient,
|
||
|
methods: deps.Methods,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (wc *QueueUpdater) Start(ctx context.Context) {
|
||
|
ticker := time.NewTicker(10 * time.Second)
|
||
|
defer ticker.Stop()
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case <-ticker.C:
|
||
|
wc.consumeMessages(ctx)
|
||
|
|
||
|
case <-ctx.Done():
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (wc *QueueUpdater) consumeMessages(ctx context.Context) {
|
||
|
fetches := wc.kafkaClient.PollFetches(ctx)
|
||
|
iter := fetches.RecordIter()
|
||
|
for !iter.Done() {
|
||
|
record := iter.Next()
|
||
|
var message models.KafkaMessage
|
||
|
|
||
|
err := json.Unmarshal(record.Value, &message)
|
||
|
if err != nil {
|
||
|
wc.logger.Error("error unmarshal kafka message:", zap.Error(err))
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
err = wc.processMessages(ctx, message)
|
||
|
if err != nil {
|
||
|
wc.logger.Error("error processing kafka message:", zap.Error(err))
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (wc *QueueUpdater) processMessages(ctx context.Context, message models.KafkaMessage) error {
|
||
|
switch message.Type {
|
||
|
case models.UsersUpdate:
|
||
|
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
||
|
if err != nil {
|
||
|
wc.logger.Error("error getting user token from db", zap.Error(err))
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if token != nil {
|
||
|
err = wc.methods.CheckUsers(ctx, []model.Token{*token})
|
||
|
if err != nil {
|
||
|
wc.logger.Error("error update user information in queue worker", zap.Error(err))
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
case models.PipelinesUpdate:
|
||
|
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
||
|
if err != nil {
|
||
|
wc.logger.Error("error getting user token from db", zap.Error(err))
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if token != nil {
|
||
|
err = wc.methods.CheckPipelinesAndSteps(ctx, []model.Token{*token})
|
||
|
if err != nil {
|
||
|
wc.logger.Error("error update user pipelines and steps information in queue worker", zap.Error(err))
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
case models.FieldsUpdate:
|
||
|
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
||
|
if err != nil {
|
||
|
wc.logger.Error("error getting user token from db", zap.Error(err))
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if token != nil {
|
||
|
err = wc.methods.CheckFields(ctx, []model.Token{*token})
|
||
|
if err != nil {
|
||
|
wc.logger.Error("error update user fields information in queue worker", zap.Error(err))
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
case models.TagsUpdate:
|
||
|
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
||
|
if err != nil {
|
||
|
wc.logger.Error("error getting user token from db", zap.Error(err))
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if token != nil {
|
||
|
err = wc.methods.CheckTags(ctx, []model.Token{*token})
|
||
|
if err != nil {
|
||
|
wc.logger.Error("error update user tags information in queue worker", zap.Error(err))
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
case models.UserCreate:
|
||
|
token, err := wc.methods.CreateUserFromWebHook(ctx, message.AccountID, *message.AuthCode)
|
||
|
if err != nil {
|
||
|
wc.logger.Error("error creating user from webhook request", zap.Error(err))
|
||
|
}
|
||
|
|
||
|
err = wc.methods.CheckUsers(ctx, []model.Token{*token})
|
||
|
if err != nil {
|
||
|
wc.logger.Error("error update user information in queue worker", zap.Error(err))
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
err = wc.methods.CheckPipelinesAndSteps(ctx, []model.Token{*token})
|
||
|
if err != nil {
|
||
|
wc.logger.Error("error update user pipelines and steps information in queue worker", zap.Error(err))
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
err = wc.methods.CheckFields(ctx, []model.Token{*token})
|
||
|
if err != nil {
|
||
|
wc.logger.Error("error update user fields information in queue worker", zap.Error(err))
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
err = wc.methods.CheckTags(ctx, []model.Token{*token})
|
||
|
if err != nil {
|
||
|
wc.logger.Error("error update user tags information in queue worker", zap.Error(err))
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
default:
|
||
|
wc.logger.Error("incorrect message type", zap.Any("Type:", message.Type))
|
||
|
return nil
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (wc *QueueUpdater) Stop(ctx context.Context) error {
|
||
|
return nil
|
||
|
}
|