2024-04-21 14:52:55 +00:00
|
|
|
|
package queueUpdater
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"amocrm/internal/models"
|
|
|
|
|
"amocrm/internal/workers_methods"
|
|
|
|
|
"context"
|
2024-12-02 19:04:18 +00:00
|
|
|
|
"fmt"
|
2024-04-21 14:52:55 +00:00
|
|
|
|
"encoding/json"
|
|
|
|
|
"github.com/twmb/franz-go/pkg/kgo"
|
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type QueueUpdater struct {
|
|
|
|
|
logger *zap.Logger
|
|
|
|
|
kafkaClient *kgo.Client
|
|
|
|
|
methods *workers_methods.Methods
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type Deps struct {
|
|
|
|
|
Logger *zap.Logger
|
|
|
|
|
KafkaClient *kgo.Client
|
|
|
|
|
Methods *workers_methods.Methods
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewQueueUpdater(deps Deps) *QueueUpdater {
|
|
|
|
|
return &QueueUpdater{
|
|
|
|
|
logger: deps.Logger,
|
|
|
|
|
kafkaClient: deps.KafkaClient,
|
|
|
|
|
methods: deps.Methods,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (wc *QueueUpdater) Start(ctx context.Context) {
|
|
|
|
|
ticker := time.NewTicker(10 * time.Second)
|
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
|
|
|
|
|
for {
|
2024-12-02 19:04:18 +00:00
|
|
|
|
func () {
|
|
|
|
|
defer func(){
|
|
|
|
|
if v:=recover();v!=nil{
|
|
|
|
|
fmt.Println("queuer recover",v)
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
select {
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
wc.consumeMessages(ctx)
|
2024-04-21 14:52:55 +00:00
|
|
|
|
|
2024-12-02 19:04:18 +00:00
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}()
|
2024-04-21 14:52:55 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (wc *QueueUpdater) consumeMessages(ctx context.Context) {
|
|
|
|
|
fetches := wc.kafkaClient.PollFetches(ctx)
|
|
|
|
|
iter := fetches.RecordIter()
|
|
|
|
|
for !iter.Done() {
|
|
|
|
|
record := iter.Next()
|
|
|
|
|
var message models.KafkaMessage
|
|
|
|
|
|
|
|
|
|
err := json.Unmarshal(record.Value, &message)
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error unmarshal kafka message:", zap.Error(err))
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err = wc.processMessages(ctx, message)
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error processing kafka message:", zap.Error(err))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (wc *QueueUpdater) processMessages(ctx context.Context, message models.KafkaMessage) error {
|
|
|
|
|
switch message.Type {
|
|
|
|
|
case models.UsersUpdate:
|
|
|
|
|
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error getting user token from db", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if token != nil {
|
|
|
|
|
err = wc.methods.CheckUsers(ctx, []model.Token{*token})
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error update user information in queue worker", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case models.PipelinesUpdate:
|
|
|
|
|
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error getting user token from db", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if token != nil {
|
|
|
|
|
err = wc.methods.CheckPipelinesAndSteps(ctx, []model.Token{*token})
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error update user pipelines and steps information in queue worker", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case models.FieldsUpdate:
|
|
|
|
|
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error getting user token from db", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if token != nil {
|
|
|
|
|
err = wc.methods.CheckFields(ctx, []model.Token{*token})
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error update user fields information in queue worker", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case models.TagsUpdate:
|
|
|
|
|
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error getting user token from db", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if token != nil {
|
|
|
|
|
err = wc.methods.CheckTags(ctx, []model.Token{*token})
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error update user tags information in queue worker", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case models.UserCreate:
|
2024-06-05 13:27:14 +00:00
|
|
|
|
token, err := wc.methods.CreateUserFromWebHook(ctx, message)
|
2024-04-21 14:52:55 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error creating user from webhook request", zap.Error(err))
|
2024-06-05 13:27:14 +00:00
|
|
|
|
return err
|
2024-04-21 14:52:55 +00:00
|
|
|
|
}
|
|
|
|
|
|
2024-05-30 19:08:32 +00:00
|
|
|
|
if token == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2024-04-21 14:52:55 +00:00
|
|
|
|
err = wc.methods.CheckUsers(ctx, []model.Token{*token})
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error update user information in queue worker", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err = wc.methods.CheckPipelinesAndSteps(ctx, []model.Token{*token})
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error update user pipelines and steps information in queue worker", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err = wc.methods.CheckFields(ctx, []model.Token{*token})
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error update user fields information in queue worker", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err = wc.methods.CheckTags(ctx, []model.Token{*token})
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error update user tags information in queue worker", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
2024-04-26 16:26:59 +00:00
|
|
|
|
case models.AllDataUpdate:
|
|
|
|
|
// сначала получаем список токенов
|
|
|
|
|
newTokens, err := wc.methods.UpdateTokens(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error updating tokens and getting new tokens", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2024-05-27 06:36:38 +00:00
|
|
|
|
if len(newTokens) > 0 {
|
|
|
|
|
// обновляем информацию о пользователях
|
|
|
|
|
err = wc.methods.CheckUsers(ctx, newTokens)
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error update users information", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
2024-04-26 16:26:59 +00:00
|
|
|
|
|
2024-05-27 06:36:38 +00:00
|
|
|
|
// обновляем информацию о pipelines и их steps
|
|
|
|
|
err = wc.methods.CheckPipelinesAndSteps(ctx, newTokens)
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error updating users pipelines and users pipelines-steps", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
2024-04-26 16:26:59 +00:00
|
|
|
|
|
2024-05-27 06:36:38 +00:00
|
|
|
|
// обновляем информацию о tags
|
|
|
|
|
err = wc.methods.CheckTags(ctx, newTokens)
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error updating users tags", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
2024-04-26 16:26:59 +00:00
|
|
|
|
|
2024-05-27 06:36:38 +00:00
|
|
|
|
// обновляем информацию о fields
|
|
|
|
|
err = wc.methods.CheckFields(ctx, newTokens)
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error updating users fields", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
2024-04-26 16:26:59 +00:00
|
|
|
|
}
|
2024-04-29 14:09:40 +00:00
|
|
|
|
case models.RuleCheck:
|
2024-04-29 12:23:40 +00:00
|
|
|
|
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error getting user token from db", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if token != nil {
|
|
|
|
|
err = wc.methods.CheckFields(ctx, []model.Token{*token})
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error update user fields information in queue worker", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2024-06-05 13:27:14 +00:00
|
|
|
|
err = wc.methods.CheckFieldRule(ctx, token.AccessToken, message)
|
2024-04-29 14:09:40 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error check field rules for fields rules", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
2024-04-29 12:23:40 +00:00
|
|
|
|
}
|
2024-06-18 10:48:07 +00:00
|
|
|
|
case models.UserReLogin:
|
|
|
|
|
err := wc.methods.UserReLogin(ctx, message)
|
|
|
|
|
if err != nil {
|
|
|
|
|
wc.logger.Error("error update user information in re-login method", zap.Error(err))
|
|
|
|
|
return err
|
|
|
|
|
}
|
2024-04-21 14:52:55 +00:00
|
|
|
|
|
|
|
|
|
default:
|
2024-05-03 09:07:38 +00:00
|
|
|
|
wc.logger.Error("incorrect message type", zap.Any("Type:", message))
|
2024-04-21 14:52:55 +00:00
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2024-05-06 20:35:08 +00:00
|
|
|
|
func (wc *QueueUpdater) Stop(_ context.Context) error {
|
2024-04-21 14:52:55 +00:00
|
|
|
|
return nil
|
|
|
|
|
}
|