261 lines
7.1 KiB
Go
261 lines
7.1 KiB
Go
|
package queueUpdater
|
|||
|
|
|||
|
import (
|
|||
|
"context"
|
|||
|
"encoding/json"
|
|||
|
"github.com/twmb/franz-go/pkg/kgo"
|
|||
|
"go.uber.org/zap"
|
|||
|
"penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/models"
|
|||
|
"penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/workers_methods"
|
|||
|
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
|
|||
|
"time"
|
|||
|
)
|
|||
|
|
|||
|
type QueueUpdater struct {
|
|||
|
logger *zap.Logger
|
|||
|
kafkaClient *kgo.Client
|
|||
|
methods *workers_methods.Methods
|
|||
|
}
|
|||
|
|
|||
|
type Deps struct {
|
|||
|
Logger *zap.Logger
|
|||
|
KafkaClient *kgo.Client
|
|||
|
Methods *workers_methods.Methods
|
|||
|
}
|
|||
|
|
|||
|
func NewQueueUpdater(deps Deps) *QueueUpdater {
|
|||
|
return &QueueUpdater{
|
|||
|
logger: deps.Logger,
|
|||
|
kafkaClient: deps.KafkaClient,
|
|||
|
methods: deps.Methods,
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
func (wc *QueueUpdater) Start(ctx context.Context) {
|
|||
|
ticker := time.NewTicker(5 * time.Second)
|
|||
|
defer ticker.Stop()
|
|||
|
|
|||
|
for {
|
|||
|
select {
|
|||
|
case <-ticker.C:
|
|||
|
wc.consumeMessages(ctx)
|
|||
|
|
|||
|
case <-ctx.Done():
|
|||
|
return
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
func (wc *QueueUpdater) consumeMessages(ctx context.Context) {
|
|||
|
fetches := wc.kafkaClient.PollFetches(ctx)
|
|||
|
iter := fetches.RecordIter()
|
|||
|
for !iter.Done() {
|
|||
|
record := iter.Next()
|
|||
|
var message models.KafkaMessage
|
|||
|
|
|||
|
err := json.Unmarshal(record.Value, &message)
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error unmarshal kafka message:", zap.Error(err))
|
|||
|
continue
|
|||
|
}
|
|||
|
|
|||
|
err = wc.processMessages(ctx, message)
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error processing kafka message:", zap.Error(err))
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
func (wc *QueueUpdater) processMessages(ctx context.Context, message models.KafkaMessage) error {
|
|||
|
switch message.Type {
|
|||
|
case models.UsersUpdate:
|
|||
|
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error getting user token from db", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
|
|||
|
if token != nil {
|
|||
|
err = wc.methods.CheckUsers(ctx, []model.Token{*token})
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error update user information in queue worker", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
case models.PipelinesUpdate:
|
|||
|
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error getting user token from db", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
|
|||
|
if token != nil {
|
|||
|
err = wc.methods.CheckPipelines(ctx, []model.Token{*token})
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error update user pipelines and steps information in queue worker", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
}
|
|||
|
case models.StepsUpdate:
|
|||
|
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error getting user token from db", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
|
|||
|
if token != nil {
|
|||
|
err = wc.methods.CheckSteps(ctx, []model.Token{*token})
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error update user pipelines and steps information in queue worker", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
case models.FieldsUpdate:
|
|||
|
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error getting user token from db", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
|
|||
|
if token != nil {
|
|||
|
err = wc.methods.CheckFields(ctx, []model.Token{*token})
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error update user fields information in queue worker", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
//case models.TagsUpdate:
|
|||
|
// token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
|||
|
// if err != nil {
|
|||
|
// wc.logger.Error("error getting user token from db", zap.Error(err))
|
|||
|
// return err
|
|||
|
// }
|
|||
|
//
|
|||
|
// if token != nil {
|
|||
|
// err = wc.methods.CheckTags(ctx, []model.Token{*token})
|
|||
|
// if err != nil {
|
|||
|
// wc.logger.Error("error update user tags information in queue worker", zap.Error(err))
|
|||
|
// return err
|
|||
|
// }
|
|||
|
// }
|
|||
|
|
|||
|
case models.UserCreate:
|
|||
|
token, err := wc.methods.CreateUserFromWebHook(ctx, message)
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error creating user from webhook request", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
|
|||
|
err = wc.methods.CheckUsers(ctx, []model.Token{token})
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error update user information in queue worker", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
|
|||
|
err = wc.methods.CheckPipelines(ctx, []model.Token{token})
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error update user pipelines information in queue worker", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
|
|||
|
err = wc.methods.CheckSteps(ctx, []model.Token{token})
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error update user steps information in queue worker", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
|
|||
|
err = wc.methods.CheckFields(ctx, []model.Token{token})
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error update user fields information in queue worker", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
|
|||
|
//err = wc.methods.CheckTags(ctx, []model.Token{*token})
|
|||
|
//if err != nil {
|
|||
|
// wc.logger.Error("error update user tags information in queue worker", zap.Error(err))
|
|||
|
// return err
|
|||
|
//}
|
|||
|
case models.AllDataUpdate:
|
|||
|
// сначала получаем список токенов
|
|||
|
newTokens, err := wc.methods.UpdateTokens(ctx)
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error updating tokens and getting new tokens", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
|
|||
|
if len(newTokens) > 0 {
|
|||
|
// обновляем информацию о пользователях
|
|||
|
err = wc.methods.CheckUsers(ctx, newTokens)
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error update users information", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
|
|||
|
// обновляем информацию о pipelines
|
|||
|
err = wc.methods.CheckPipelines(ctx, newTokens)
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error updating users pipelines and users pipelines-steps", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
|
|||
|
// обновляем информацию о steps
|
|||
|
err = wc.methods.CheckSteps(ctx, newTokens)
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error updating users pipelines and users pipelines-steps", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
|
|||
|
// обновляем информацию о tags
|
|||
|
//err = wc.methods.CheckTags(ctx, newTokens)
|
|||
|
//if err != nil {
|
|||
|
// wc.logger.Error("error updating users tags", zap.Error(err))
|
|||
|
// return err
|
|||
|
//}
|
|||
|
|
|||
|
// обновляем информацию о fields
|
|||
|
err = wc.methods.CheckFields(ctx, newTokens)
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error updating users fields", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
}
|
|||
|
//case models.RuleCheck:
|
|||
|
// token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
|||
|
// if err != nil {
|
|||
|
// wc.logger.Error("error getting user token from db", zap.Error(err))
|
|||
|
// return err
|
|||
|
// }
|
|||
|
// if token != nil {
|
|||
|
// err = wc.methods.CheckFields(ctx, []model.Token{*token})
|
|||
|
// if err != nil {
|
|||
|
// wc.logger.Error("error update user fields information in queue worker", zap.Error(err))
|
|||
|
// return err
|
|||
|
// }
|
|||
|
//
|
|||
|
// err = wc.methods.CheckFieldRule(ctx, token.AccessToken, message)
|
|||
|
// if err != nil {
|
|||
|
// wc.logger.Error("error check field rules for fields rules", zap.Error(err))
|
|||
|
// return err
|
|||
|
// }
|
|||
|
// }
|
|||
|
case models.UserReLogin:
|
|||
|
err := wc.methods.UserReLogin(ctx, message)
|
|||
|
if err != nil {
|
|||
|
wc.logger.Error("error update user information in re-login method", zap.Error(err))
|
|||
|
return err
|
|||
|
}
|
|||
|
|
|||
|
default:
|
|||
|
wc.logger.Error("incorrect message type", zap.Any("Type:", message))
|
|||
|
return nil
|
|||
|
}
|
|||
|
return nil
|
|||
|
}
|
|||
|
|
|||
|
func (wc *QueueUpdater) Stop(_ context.Context) error {
|
|||
|
return nil
|
|||
|
}
|