amocrm/internal/workers/data_updater/data_updater.go

321 lines
8.4 KiB
Go
Raw Normal View History

2024-04-11 12:45:01 +00:00
package data_updater
import (
2024-04-12 14:51:26 +00:00
"amocrm/internal/models"
"amocrm/internal/tools"
2024-04-11 12:45:01 +00:00
"amocrm/pkg/amoClient"
"context"
"go.uber.org/zap"
2024-04-17 12:21:06 +00:00
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
2024-04-11 12:45:01 +00:00
"time"
)
type Deps struct {
AmoClient *amoClient.Amo
2024-04-17 12:21:06 +00:00
Repo *dal.AmoDal
2024-04-11 12:45:01 +00:00
Logger *zap.Logger
}
type DataUpdater struct {
amoClient *amoClient.Amo
2024-04-17 12:21:06 +00:00
repo *dal.AmoDal
2024-04-11 12:45:01 +00:00
logger *zap.Logger
}
func NewDataUpdaterWC(deps Deps) *DataUpdater {
return &DataUpdater{
amoClient: deps.AmoClient,
repo: deps.Repo,
logger: deps.Logger,
}
}
func (wc *DataUpdater) Start(ctx context.Context) {
nextStart := calculateTime()
ticker := time.NewTicker(time.Second * time.Duration(nextStart))
2024-04-20 19:02:13 +00:00
//ticker := time.NewTicker(10 * time.Second)
2024-04-11 12:45:01 +00:00
defer ticker.Stop()
for {
select {
case <-ticker.C:
wc.processTasks(ctx)
nextStart = calculateTime()
ticker.Reset(time.Second * time.Duration(nextStart))
case <-ctx.Done():
return
}
}
}
func (wc *DataUpdater) processTasks(ctx context.Context) {
2024-04-11 15:08:54 +00:00
// сначала получаем список токенов
2024-04-17 12:21:06 +00:00
allTokens, err := wc.repo.AmoRepo.GetAllTokens(ctx)
2024-04-11 15:08:54 +00:00
if err != nil {
wc.logger.Error("error fetch all tokens from mongo:", zap.Error(err))
return
}
2024-04-19 16:05:42 +00:00
var newTokens []model.Token
for _, oldToken := range allTokens {
req := models.UpdateWebHookReq{
GrantType: "refresh_token",
RefreshToken: oldToken.RefreshToken,
}
resp, err := wc.amoClient.CreateWebHook(&req)
if err != nil {
wc.logger.Error("error create webhook for update tokens", zap.Error(err))
continue
}
newToken := model.Token{
AccountID: oldToken.AccountID,
RefreshToken: resp.RefreshToken,
AccessToken: resp.AccessToken,
Expiration: time.Now().Unix() + resp.ExpiresIn,
CreatedAt: time.Now().Unix(),
}
newTokens = append(newTokens, newToken)
}
err = wc.repo.AmoRepo.WebhookUpdate(ctx, newTokens)
if err != nil {
wc.logger.Error("error updating users tokens", zap.Error(err))
return
}
2024-04-19 16:05:42 +00:00
// обновляем информацию о пользователях
err = wc.UserUpdater(ctx, newTokens)
2024-04-19 16:05:42 +00:00
if err != nil {
wc.logger.Error("some error from UserUpdater", zap.Error(err))
}
for _, token := range newTokens {
2024-04-12 11:52:38 +00:00
// pipelines
2024-04-11 15:08:54 +00:00
pipelines, err := wc.amoClient.GetListPipelines(token.AccessToken)
if err != nil {
wc.logger.Error("error getting list pipelines:", zap.Error(err))
}
err = wc.repo.AmoRepo.CheckPipelines(ctx, tools.ToPipeline(pipelines.Embedded.Pipelines))
2024-04-11 15:08:54 +00:00
if err != nil {
wc.logger.Error("error update pipelines in mongo:", zap.Error(err))
}
2024-04-11 15:50:27 +00:00
2024-04-12 11:52:38 +00:00
// steps
2024-04-11 15:50:27 +00:00
for _, pipeline := range pipelines.Embedded.Pipelines {
steps, err := wc.amoClient.GetListSteps(pipeline.ID, token.AccessToken)
if err != nil {
wc.logger.Error("error getting list steps pipeline:", zap.Error(err))
continue
}
err = wc.repo.AmoRepo.CheckSteps(ctx, tools.ToStep(steps.Embedded.Statuses))
2024-04-11 15:50:27 +00:00
if err != nil {
wc.logger.Error("error update pipeline steps in mongo:", zap.Error(err))
}
}
2024-04-12 11:52:38 +00:00
// tags
2024-04-17 12:21:06 +00:00
var leadsTags []models.Tag
var contactsTags []models.Tag
var companiesTags []models.Tag
var customersTags []models.Tag
2024-04-17 12:21:06 +00:00
entityTypes := []model.EntityType{model.LeadsTags, model.ContactsTags, model.CompaniesTags, model.CustomersTags}
for _, entityType := range entityTypes {
page := 1
limit := 250
for {
2024-04-17 12:21:06 +00:00
req := models.GetListTagsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
tags, err := wc.amoClient.GetListTags(req, token.AccessToken)
if err != nil {
wc.logger.Error("error getting list of tags", zap.Error(err))
break
}
2024-04-12 14:51:26 +00:00
if tags == nil || len(tags.Embedded.Tags) == 0 {
break
}
switch entityType {
2024-04-17 12:21:06 +00:00
case model.LeadsTags:
leadsTags = append(leadsTags, tags.Embedded.Tags...)
2024-04-17 12:21:06 +00:00
case model.ContactsTags:
contactsTags = append(contactsTags, tags.Embedded.Tags...)
2024-04-17 12:21:06 +00:00
case model.CompaniesTags:
companiesTags = append(companiesTags, tags.Embedded.Tags...)
2024-04-17 12:21:06 +00:00
case model.CustomersTags:
customersTags = append(customersTags, tags.Embedded.Tags...)
}
page++
}
2024-04-11 16:32:53 +00:00
}
for _, entityType := range entityTypes {
switch entityType {
2024-04-17 12:21:06 +00:00
case model.LeadsTags:
2024-04-18 11:13:03 +00:00
err := wc.repo.AmoRepo.CheckTags(ctx, tools.ToTag(leadsTags, entityType), token.AccountID)
if err != nil {
wc.logger.Error("error update leads tags")
continue
}
2024-04-17 12:21:06 +00:00
case model.ContactsTags:
2024-04-18 11:13:03 +00:00
err := wc.repo.AmoRepo.CheckTags(ctx, tools.ToTag(contactsTags, entityType), token.AccountID)
if err != nil {
wc.logger.Error("error update contacts tags")
continue
}
2024-04-17 12:21:06 +00:00
case model.CompaniesTags:
2024-04-18 11:13:03 +00:00
err := wc.repo.AmoRepo.CheckTags(ctx, tools.ToTag(companiesTags, entityType), token.AccountID)
if err != nil {
wc.logger.Error("error update companies tags")
continue
}
2024-04-17 12:21:06 +00:00
case model.CustomersTags:
2024-04-18 11:13:03 +00:00
err := wc.repo.AmoRepo.CheckTags(ctx, tools.ToTag(customersTags, entityType), token.AccountID)
if err != nil {
wc.logger.Error("error update customer tags")
continue
}
}
}
2024-04-12 11:52:38 +00:00
// fields
2024-04-17 12:21:06 +00:00
var leadsFields []models.CustomField
var contactsFields []models.CustomField
var companiesFields []models.CustomField
var customersFields []models.CustomField
2024-04-12 11:52:38 +00:00
for _, entityType := range entityTypes {
page := 1
limit := 50
for {
2024-04-17 12:21:06 +00:00
req := models.GetListFieldsReq{
2024-04-12 11:52:38 +00:00
Page: page,
Limit: limit,
EntityType: entityType,
}
2024-04-12 14:51:26 +00:00
fields, err := wc.amoClient.GetListFields(req, token.AccessToken)
2024-04-12 11:52:38 +00:00
if err != nil {
wc.logger.Error("error getting list of fields", zap.Error(err))
break
}
2024-04-12 14:51:26 +00:00
if fields == nil || len(fields.Embedded.CustomFields) == 0 {
break
}
2024-04-12 11:52:38 +00:00
switch entityType {
2024-04-17 12:21:06 +00:00
case model.LeadsTags:
2024-04-12 14:51:26 +00:00
leadsFields = append(leadsFields, fields.Embedded.CustomFields...)
2024-04-17 12:21:06 +00:00
case model.ContactsTags:
2024-04-12 14:51:26 +00:00
contactsFields = append(contactsFields, fields.Embedded.CustomFields...)
2024-04-17 12:21:06 +00:00
case model.CompaniesTags:
2024-04-12 14:51:26 +00:00
companiesFields = append(companiesFields, fields.Embedded.CustomFields...)
2024-04-17 12:21:06 +00:00
case model.CustomersTags:
2024-04-12 14:51:26 +00:00
customersFields = append(customersFields, fields.Embedded.CustomFields...)
2024-04-12 11:52:38 +00:00
}
page++
}
}
for _, entityType := range entityTypes {
switch entityType {
2024-04-17 12:21:06 +00:00
case model.LeadsTags:
2024-04-18 11:13:03 +00:00
err := wc.repo.AmoRepo.CheckFields(ctx, tools.ToField(leadsFields, entityType), token.AccountID)
2024-04-12 11:52:38 +00:00
if err != nil {
wc.logger.Error("error update leads fields")
continue
}
2024-04-17 12:21:06 +00:00
case model.ContactsTags:
2024-04-18 11:13:03 +00:00
err := wc.repo.AmoRepo.CheckFields(ctx, tools.ToField(contactsFields, entityType), token.AccountID)
2024-04-12 11:52:38 +00:00
if err != nil {
wc.logger.Error("error update contacts fields")
continue
}
2024-04-17 12:21:06 +00:00
case model.CompaniesTags:
2024-04-18 11:13:03 +00:00
err := wc.repo.AmoRepo.CheckFields(ctx, tools.ToField(companiesFields, entityType), token.AccountID)
2024-04-12 11:52:38 +00:00
if err != nil {
wc.logger.Error("error update companies fields")
continue
}
2024-04-17 12:21:06 +00:00
case model.CustomersTags:
2024-04-18 11:13:03 +00:00
err := wc.repo.AmoRepo.CheckFields(ctx, tools.ToField(companiesFields, entityType), token.AccountID)
2024-04-12 11:52:38 +00:00
if err != nil {
wc.logger.Error("error update customer fields")
continue
}
}
}
2024-04-11 15:08:54 +00:00
}
2024-04-11 12:45:01 +00:00
}
2024-04-19 16:05:42 +00:00
func (wc *DataUpdater) UserUpdater(ctx context.Context, allTokens []model.Token) error {
2024-04-20 12:57:37 +00:00
listUser := make(map[string][]models.Users)
2024-04-19 16:05:42 +00:00
for _, token := range allTokens {
page := 1
limit := 250
2024-04-20 19:02:13 +00:00
userData, err := wc.amoClient.GetUserList(models.RequestGetListUsers{
Page: page,
Limit: limit,
}, token.AccessToken)
if err != nil {
wc.logger.Error("error getting user list", zap.Error(err))
break
}
2024-04-12 14:51:26 +00:00
2024-04-20 19:02:13 +00:00
listUser[token.AccountID] = append(listUser[token.AccountID], userData.Embedded.Users...)
2024-04-12 14:51:26 +00:00
}
2024-04-20 12:57:37 +00:00
for accountID, users := range listUser {
mainAccount, err := wc.repo.AmoRepo.GetCurrentAccount(ctx, accountID)
2024-04-19 16:05:42 +00:00
if err != nil {
return err
}
2024-04-20 12:57:37 +00:00
for _, user := range users {
if user.ID == mainAccount.AmoID {
2024-04-20 19:02:13 +00:00
err := wc.repo.AmoRepo.CheckMainUser(ctx, model.User{
Name: user.Name,
Role: int32(user.Rights.RoleID),
Group: int32(user.Rights.GroupID),
Email: user.Email,
AmoID: user.ID,
})
if err != nil {
return err
}
2024-04-20 12:57:37 +00:00
}
err := wc.repo.AmoRepo.CheckAndUpdateUsers(ctx, model.User{
AmoID: user.ID,
Name: user.FullName,
2024-04-20 19:02:13 +00:00
Group: int32(user.Rights.GroupID),
Role: int32(user.Rights.RoleID),
2024-04-20 12:57:37 +00:00
Email: user.Email,
Amouserid: mainAccount.Amouserid,
})
if err != nil {
return err
}
}
2024-04-12 14:51:26 +00:00
}
return nil
}
2024-04-11 12:45:01 +00:00
func (wc *DataUpdater) Stop(ctx context.Context) error {
return nil
}