amocrm/internal/workers/queueUpdater/queue_updater.go

238 lines
6.2 KiB
Go
Raw Normal View History

package queueUpdater
import (
"amocrm/internal/models"
"amocrm/internal/workers_methods"
"context"
"encoding/json"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
"time"
)
type QueueUpdater struct {
logger *zap.Logger
kafkaClient *kgo.Client
methods *workers_methods.Methods
}
type Deps struct {
Logger *zap.Logger
KafkaClient *kgo.Client
Methods *workers_methods.Methods
}
func NewQueueUpdater(deps Deps) *QueueUpdater {
return &QueueUpdater{
logger: deps.Logger,
kafkaClient: deps.KafkaClient,
methods: deps.Methods,
}
}
func (wc *QueueUpdater) Start(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
wc.consumeMessages(ctx)
case <-ctx.Done():
return
}
}
}
func (wc *QueueUpdater) consumeMessages(ctx context.Context) {
fetches := wc.kafkaClient.PollFetches(ctx)
iter := fetches.RecordIter()
for !iter.Done() {
record := iter.Next()
var message models.KafkaMessage
err := json.Unmarshal(record.Value, &message)
if err != nil {
wc.logger.Error("error unmarshal kafka message:", zap.Error(err))
continue
}
err = wc.processMessages(ctx, message)
if err != nil {
wc.logger.Error("error processing kafka message:", zap.Error(err))
}
}
}
func (wc *QueueUpdater) processMessages(ctx context.Context, message models.KafkaMessage) error {
switch message.Type {
case models.UsersUpdate:
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
if err != nil {
wc.logger.Error("error getting user token from db", zap.Error(err))
return err
}
if token != nil {
err = wc.methods.CheckUsers(ctx, []model.Token{*token})
if err != nil {
wc.logger.Error("error update user information in queue worker", zap.Error(err))
return err
}
}
case models.PipelinesUpdate:
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
if err != nil {
wc.logger.Error("error getting user token from db", zap.Error(err))
return err
}
if token != nil {
err = wc.methods.CheckPipelinesAndSteps(ctx, []model.Token{*token})
if err != nil {
wc.logger.Error("error update user pipelines and steps information in queue worker", zap.Error(err))
return err
}
}
case models.FieldsUpdate:
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
if err != nil {
wc.logger.Error("error getting user token from db", zap.Error(err))
return err
}
if token != nil {
err = wc.methods.CheckFields(ctx, []model.Token{*token})
if err != nil {
wc.logger.Error("error update user fields information in queue worker", zap.Error(err))
return err
}
}
case models.TagsUpdate:
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
if err != nil {
wc.logger.Error("error getting user token from db", zap.Error(err))
return err
}
if token != nil {
err = wc.methods.CheckTags(ctx, []model.Token{*token})
if err != nil {
wc.logger.Error("error update user tags information in queue worker", zap.Error(err))
return err
}
}
case models.UserCreate:
2024-06-05 13:27:14 +00:00
token, err := wc.methods.CreateUserFromWebHook(ctx, message)
if err != nil {
wc.logger.Error("error creating user from webhook request", zap.Error(err))
2024-06-05 13:27:14 +00:00
return err
}
2024-05-30 19:08:32 +00:00
if token == nil {
return nil
}
err = wc.methods.CheckUsers(ctx, []model.Token{*token})
if err != nil {
wc.logger.Error("error update user information in queue worker", zap.Error(err))
return err
}
err = wc.methods.CheckPipelinesAndSteps(ctx, []model.Token{*token})
if err != nil {
wc.logger.Error("error update user pipelines and steps information in queue worker", zap.Error(err))
return err
}
err = wc.methods.CheckFields(ctx, []model.Token{*token})
if err != nil {
wc.logger.Error("error update user fields information in queue worker", zap.Error(err))
return err
}
err = wc.methods.CheckTags(ctx, []model.Token{*token})
if err != nil {
wc.logger.Error("error update user tags information in queue worker", zap.Error(err))
return err
}
case models.AllDataUpdate:
// сначала получаем список токенов
newTokens, err := wc.methods.UpdateTokens(ctx)
if err != nil {
wc.logger.Error("error updating tokens and getting new tokens", zap.Error(err))
return err
}
if len(newTokens) > 0 {
// обновляем информацию о пользователях
err = wc.methods.CheckUsers(ctx, newTokens)
if err != nil {
wc.logger.Error("error update users information", zap.Error(err))
return err
}
// обновляем информацию о pipelines и их steps
err = wc.methods.CheckPipelinesAndSteps(ctx, newTokens)
if err != nil {
wc.logger.Error("error updating users pipelines and users pipelines-steps", zap.Error(err))
return err
}
// обновляем информацию о tags
err = wc.methods.CheckTags(ctx, newTokens)
if err != nil {
wc.logger.Error("error updating users tags", zap.Error(err))
return err
}
// обновляем информацию о fields
err = wc.methods.CheckFields(ctx, newTokens)
if err != nil {
wc.logger.Error("error updating users fields", zap.Error(err))
return err
}
}
case models.RuleCheck:
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
if err != nil {
wc.logger.Error("error getting user token from db", zap.Error(err))
return err
}
if token != nil {
err = wc.methods.CheckFields(ctx, []model.Token{*token})
if err != nil {
wc.logger.Error("error update user fields information in queue worker", zap.Error(err))
return err
}
2024-06-05 13:27:14 +00:00
err = wc.methods.CheckFieldRule(ctx, token.AccessToken, message)
if err != nil {
wc.logger.Error("error check field rules for fields rules", zap.Error(err))
return err
}
}
2024-06-18 10:48:07 +00:00
case models.UserReLogin:
err := wc.methods.UserReLogin(ctx, message)
if err != nil {
wc.logger.Error("error update user information in re-login method", zap.Error(err))
return err
}
default:
2024-05-03 09:07:38 +00:00
wc.logger.Error("incorrect message type", zap.Any("Type:", message))
return nil
}
return nil
}
2024-05-06 20:35:08 +00:00
func (wc *QueueUpdater) Stop(_ context.Context) error {
return nil
}