package queueUpdater import ( "context" "encoding/json" "fmt" "gitea.pena/SQuiz/amocrm/internal/models" "gitea.pena/SQuiz/amocrm/internal/workers_methods" "gitea.pena/SQuiz/common/model" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/zap" "time" ) type QueueUpdater struct { logger *zap.Logger kafkaClient *kgo.Client methods *workers_methods.Methods } type Deps struct { Logger *zap.Logger KafkaClient *kgo.Client Methods *workers_methods.Methods } func NewQueueUpdater(deps Deps) *QueueUpdater { return &QueueUpdater{ logger: deps.Logger, kafkaClient: deps.KafkaClient, methods: deps.Methods, } } func (wc *QueueUpdater) Start(ctx context.Context) { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { func() { defer func() { if v := recover(); v != nil { fmt.Println("queuer recover", v) } }() select { case <-ticker.C: wc.consumeMessages(ctx) case <-ctx.Done(): return } }() } } func (wc *QueueUpdater) consumeMessages(ctx context.Context) { fetches := wc.kafkaClient.PollFetches(ctx) iter := fetches.RecordIter() for !iter.Done() { record := iter.Next() var message models.KafkaMessage err := json.Unmarshal(record.Value, &message) if err != nil { wc.logger.Error("error unmarshal kafka message:", zap.Error(err)) continue } err = wc.processMessages(ctx, message) if err != nil { wc.logger.Error("error processing kafka message:", zap.Error(err)) } } } func (wc *QueueUpdater) processMessages(ctx context.Context, message models.KafkaMessage) error { switch message.Type { case models.UsersUpdate: token, err := wc.methods.GetTokenByID(ctx, message.AccountID) if err != nil { wc.logger.Error("error getting user token from db", zap.Error(err)) return err } if token != nil { err = wc.methods.CheckUsers(ctx, []model.Token{*token}) if err != nil { wc.logger.Error("error update user information in queue worker", zap.Error(err)) return err } } case models.PipelinesUpdate: token, err := wc.methods.GetTokenByID(ctx, message.AccountID) if err != nil { wc.logger.Error("error getting user token from db", zap.Error(err)) return err } if token != nil { err = wc.methods.CheckPipelinesAndSteps(ctx, []model.Token{*token}) if err != nil { wc.logger.Error("error update user pipelines and steps information in queue worker", zap.Error(err)) return err } } case models.FieldsUpdate: token, err := wc.methods.GetTokenByID(ctx, message.AccountID) if err != nil { wc.logger.Error("error getting user token from db", zap.Error(err)) return err } if token != nil { err = wc.methods.CheckFields(ctx, []model.Token{*token}) if err != nil { wc.logger.Error("error update user fields information in queue worker", zap.Error(err)) return err } } case models.TagsUpdate: token, err := wc.methods.GetTokenByID(ctx, message.AccountID) if err != nil { wc.logger.Error("error getting user token from db", zap.Error(err)) return err } if token != nil { err = wc.methods.CheckTags(ctx, []model.Token{*token}) if err != nil { wc.logger.Error("error update user tags information in queue worker", zap.Error(err)) return err } } case models.UserCreate: token, err := wc.methods.CreateUserFromWebHook(ctx, message) if err != nil { wc.logger.Error("error creating user from webhook request", zap.Error(err)) return err } if token == nil { return nil } err = wc.methods.CheckUsers(ctx, []model.Token{*token}) if err != nil { wc.logger.Error("error update user information in queue worker", zap.Error(err)) return err } err = wc.methods.CheckPipelinesAndSteps(ctx, []model.Token{*token}) if err != nil { wc.logger.Error("error update user pipelines and steps information in queue worker", zap.Error(err)) return err } err = wc.methods.CheckFields(ctx, []model.Token{*token}) if err != nil { wc.logger.Error("error update user fields information in queue worker", zap.Error(err)) return err } err = wc.methods.CheckTags(ctx, []model.Token{*token}) if err != nil { wc.logger.Error("error update user tags information in queue worker", zap.Error(err)) return err } case models.AllDataUpdate: // сначала получаем список токенов newTokens, err := wc.methods.UpdateTokens(ctx) if err != nil { wc.logger.Error("error updating tokens and getting new tokens", zap.Error(err)) return err } if len(newTokens) > 0 { // обновляем информацию о пользователях err = wc.methods.CheckUsers(ctx, newTokens) if err != nil { wc.logger.Error("error update users information", zap.Error(err)) return err } // обновляем информацию о pipelines и их steps err = wc.methods.CheckPipelinesAndSteps(ctx, newTokens) if err != nil { wc.logger.Error("error updating users pipelines and users pipelines-steps", zap.Error(err)) return err } // обновляем информацию о tags err = wc.methods.CheckTags(ctx, newTokens) if err != nil { wc.logger.Error("error updating users tags", zap.Error(err)) return err } // обновляем информацию о fields err = wc.methods.CheckFields(ctx, newTokens) if err != nil { wc.logger.Error("error updating users fields", zap.Error(err)) return err } } case models.RuleCheck: token, err := wc.methods.GetTokenByID(ctx, message.AccountID) if err != nil { wc.logger.Error("error getting user token from db", zap.Error(err)) return err } if token != nil { err = wc.methods.CheckFields(ctx, []model.Token{*token}) if err != nil { wc.logger.Error("error update user fields information in queue worker", zap.Error(err)) return err } err = wc.methods.CheckFieldRule(ctx, token.AccessToken, message) if err != nil { wc.logger.Error("error check field rules for fields rules", zap.Error(err)) return err } } case models.UserReLogin: err := wc.methods.UserReLogin(ctx, message) if err != nil { wc.logger.Error("error update user information in re-login method", zap.Error(err)) return err } default: wc.logger.Error("incorrect message type", zap.Any("Type:", message)) return nil } return nil } func (wc *QueueUpdater) Stop(_ context.Context) error { return nil }