amocrm/internal/workers/data_updater/data_updater.go

275 lines
7.5 KiB
Go
Raw Normal View History

2024-04-11 12:45:01 +00:00
package data_updater
import (
2024-04-12 14:51:26 +00:00
"amocrm/internal/models"
"amocrm/internal/tools"
2024-04-11 12:45:01 +00:00
"amocrm/pkg/amoClient"
"context"
"go.uber.org/zap"
2024-04-17 12:21:06 +00:00
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
2024-04-11 12:45:01 +00:00
"time"
)
type Deps struct {
AmoClient *amoClient.Amo
2024-04-17 12:21:06 +00:00
Repo *dal.AmoDal
2024-04-11 12:45:01 +00:00
Logger *zap.Logger
}
type DataUpdater struct {
amoClient *amoClient.Amo
2024-04-17 12:21:06 +00:00
repo *dal.AmoDal
2024-04-11 12:45:01 +00:00
logger *zap.Logger
}
func NewDataUpdaterWC(deps Deps) *DataUpdater {
return &DataUpdater{
amoClient: deps.AmoClient,
repo: deps.Repo,
logger: deps.Logger,
}
}
func (wc *DataUpdater) Start(ctx context.Context) {
nextStart := calculateTime()
ticker := time.NewTicker(time.Second * time.Duration(nextStart))
defer ticker.Stop()
for {
select {
case <-ticker.C:
wc.processTasks(ctx)
nextStart = calculateTime()
ticker.Reset(time.Second * time.Duration(nextStart))
case <-ctx.Done():
return
}
}
}
func (wc *DataUpdater) processTasks(ctx context.Context) {
2024-04-12 14:51:26 +00:00
// обновляем информацию о пользователях
err := wc.UserUpdater(ctx)
if err != nil {
wc.logger.Error("some error from UserUpdater", zap.Error(err))
}
2024-04-11 15:08:54 +00:00
// сначала получаем список токенов
2024-04-17 12:21:06 +00:00
allTokens, err := wc.repo.AmoRepo.GetAllTokens(ctx)
2024-04-11 15:08:54 +00:00
if err != nil {
wc.logger.Error("error fetch all tokens from mongo:", zap.Error(err))
return
}
for _, token := range allTokens {
2024-04-12 11:52:38 +00:00
accountID := 0 // id аккаунта в амо для тегов и кастомных полей так как там не приходит
// pipelines
2024-04-11 15:08:54 +00:00
pipelines, err := wc.amoClient.GetListPipelines(token.AccessToken)
if err != nil {
wc.logger.Error("error getting list pipelines:", zap.Error(err))
}
2024-04-17 12:21:06 +00:00
err = wc.repo.AmoRepo.CheckPipelines(ctx, token.AccountID, tools.ToPipeline(pipelines.Embedded.Pipelines))
2024-04-11 15:08:54 +00:00
if err != nil {
wc.logger.Error("error update pipelines in mongo:", zap.Error(err))
}
2024-04-11 15:50:27 +00:00
2024-04-12 11:52:38 +00:00
// steps
2024-04-11 15:50:27 +00:00
for _, pipeline := range pipelines.Embedded.Pipelines {
accountID = pipeline.AccountID
2024-04-11 15:50:27 +00:00
steps, err := wc.amoClient.GetListSteps(pipeline.ID, token.AccessToken)
if err != nil {
wc.logger.Error("error getting list steps pipeline:", zap.Error(err))
continue
}
2024-04-17 12:21:06 +00:00
err = wc.repo.AmoRepo.CheckSteps(ctx, token.AccountID, tools.ToStep(steps.Embedded.Statuses))
2024-04-11 15:50:27 +00:00
if err != nil {
wc.logger.Error("error update pipeline steps in mongo:", zap.Error(err))
}
}
2024-04-12 11:52:38 +00:00
// tags
2024-04-17 12:21:06 +00:00
var leadsTags []models.Tag
var contactsTags []models.Tag
var companiesTags []models.Tag
var customersTags []models.Tag
2024-04-17 12:21:06 +00:00
entityTypes := []model.EntityType{model.LeadsTags, model.ContactsTags, model.CompaniesTags, model.CustomersTags}
for _, entityType := range entityTypes {
page := 1
limit := 250
for {
2024-04-17 12:21:06 +00:00
req := models.GetListTagsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
tags, err := wc.amoClient.GetListTags(req, token.AccessToken)
if err != nil {
wc.logger.Error("error getting list of tags", zap.Error(err))
break
}
2024-04-12 14:51:26 +00:00
if tags == nil || len(tags.Embedded.Tags) == 0 {
break
}
switch entityType {
2024-04-17 12:21:06 +00:00
case model.LeadsTags:
leadsTags = append(leadsTags, tags.Embedded.Tags...)
2024-04-17 12:21:06 +00:00
case model.ContactsTags:
contactsTags = append(contactsTags, tags.Embedded.Tags...)
2024-04-17 12:21:06 +00:00
case model.CompaniesTags:
companiesTags = append(companiesTags, tags.Embedded.Tags...)
2024-04-17 12:21:06 +00:00
case model.CustomersTags:
customersTags = append(customersTags, tags.Embedded.Tags...)
}
page++
}
2024-04-11 16:32:53 +00:00
}
for _, entityType := range entityTypes {
switch entityType {
2024-04-17 12:21:06 +00:00
case model.LeadsTags:
err := wc.repo.AmoRepo.CheckTags(ctx, tools.ToTag(leadsTags, entityType, accountID), token.AccountID)
if err != nil {
wc.logger.Error("error update leads tags")
continue
}
2024-04-17 12:21:06 +00:00
case model.ContactsTags:
err := wc.repo.AmoRepo.CheckTags(ctx, tools.ToTag(contactsTags, entityType, accountID), token.AccountID)
if err != nil {
wc.logger.Error("error update contacts tags")
continue
}
2024-04-17 12:21:06 +00:00
case model.CompaniesTags:
err := wc.repo.AmoRepo.CheckTags(ctx, tools.ToTag(companiesTags, entityType, accountID), token.AccountID)
if err != nil {
wc.logger.Error("error update companies tags")
continue
}
2024-04-17 12:21:06 +00:00
case model.CustomersTags:
err := wc.repo.AmoRepo.CheckTags(ctx, tools.ToTag(customersTags, entityType, accountID), token.AccountID)
if err != nil {
wc.logger.Error("error update customer tags")
continue
}
}
}
2024-04-12 11:52:38 +00:00
// fields
2024-04-17 12:21:06 +00:00
var leadsFields []models.CustomField
var contactsFields []models.CustomField
var companiesFields []models.CustomField
var customersFields []models.CustomField
2024-04-12 11:52:38 +00:00
for _, entityType := range entityTypes {
page := 1
limit := 50
for {
2024-04-17 12:21:06 +00:00
req := models.GetListFieldsReq{
2024-04-12 11:52:38 +00:00
Page: page,
Limit: limit,
EntityType: entityType,
}
2024-04-12 14:51:26 +00:00
fields, err := wc.amoClient.GetListFields(req, token.AccessToken)
2024-04-12 11:52:38 +00:00
if err != nil {
wc.logger.Error("error getting list of fields", zap.Error(err))
break
}
2024-04-12 14:51:26 +00:00
if fields == nil || len(fields.Embedded.CustomFields) == 0 {
break
}
2024-04-12 11:52:38 +00:00
switch entityType {
2024-04-17 12:21:06 +00:00
case model.LeadsTags:
2024-04-12 14:51:26 +00:00
leadsFields = append(leadsFields, fields.Embedded.CustomFields...)
2024-04-17 12:21:06 +00:00
case model.ContactsTags:
2024-04-12 14:51:26 +00:00
contactsFields = append(contactsFields, fields.Embedded.CustomFields...)
2024-04-17 12:21:06 +00:00
case model.CompaniesTags:
2024-04-12 14:51:26 +00:00
companiesFields = append(companiesFields, fields.Embedded.CustomFields...)
2024-04-17 12:21:06 +00:00
case model.CustomersTags:
2024-04-12 14:51:26 +00:00
customersFields = append(customersFields, fields.Embedded.CustomFields...)
2024-04-12 11:52:38 +00:00
}
page++
}
}
for _, entityType := range entityTypes {
switch entityType {
2024-04-17 12:21:06 +00:00
case model.LeadsTags:
err := wc.repo.AmoRepo.CheckFields(ctx, tools.ToField(leadsFields, entityType, accountID), token.AccountID)
2024-04-12 11:52:38 +00:00
if err != nil {
wc.logger.Error("error update leads fields")
continue
}
2024-04-17 12:21:06 +00:00
case model.ContactsTags:
err := wc.repo.AmoRepo.CheckFields(ctx, tools.ToField(contactsFields, entityType, accountID), token.AccountID)
2024-04-12 11:52:38 +00:00
if err != nil {
wc.logger.Error("error update contacts fields")
continue
}
2024-04-17 12:21:06 +00:00
case model.CompaniesTags:
err := wc.repo.AmoRepo.CheckFields(ctx, tools.ToField(companiesFields, entityType, accountID), token.AccountID)
2024-04-12 11:52:38 +00:00
if err != nil {
wc.logger.Error("error update companies fields")
continue
}
2024-04-17 12:21:06 +00:00
case model.CustomersTags:
err := wc.repo.AmoRepo.CheckFields(ctx, tools.ToField(companiesFields, entityType, accountID), token.AccountID)
2024-04-12 11:52:38 +00:00
if err != nil {
wc.logger.Error("error update customer fields")
continue
}
}
}
2024-04-11 15:08:54 +00:00
}
2024-04-11 12:45:01 +00:00
}
2024-04-12 14:51:26 +00:00
func (wc *DataUpdater) UserUpdater(ctx context.Context) error {
2024-04-17 12:21:06 +00:00
var listUser []models.Users
2024-04-12 14:51:26 +00:00
page := 1
limit := 250
for {
2024-04-17 12:21:06 +00:00
userData, err := wc.amoClient.GetUserList(models.RequestGetListUsers{
2024-04-12 14:51:26 +00:00
Page: page,
Limit: limit,
})
if err != nil {
wc.logger.Error("error getting user list", zap.Error(err))
break
}
if userData == nil || len(userData.Embedded.Users) == 0 {
break
}
listUser = append(listUser, userData.Embedded.Users...)
page++
}
for _, user := range listUser {
onlyOneUser, err := wc.amoClient.GetUserByID(user.ID)
if err != nil {
wc.logger.Error("error getting user by id", zap.Error(err))
continue
}
2024-04-17 12:21:06 +00:00
err = wc.repo.AmoRepo.CheckUsers(ctx, user.ID, model.User{
2024-04-12 14:51:26 +00:00
Name: onlyOneUser.Name,
Group: tools.ConvertGroups(user),
Role: *onlyOneUser.Role,
Email: onlyOneUser.Email,
})
}
return nil
}
2024-04-11 12:45:01 +00:00
func (wc *DataUpdater) Stop(ctx context.Context) error {
return nil
}