amocrm/internal/workers/data_updater/data_updater.go
2024-04-12 17:51:26 +03:00

315 lines
7.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package data_updater
import (
"amocrm/internal/models"
"amocrm/internal/models/amo"
"amocrm/internal/repository"
"amocrm/internal/tools"
"amocrm/pkg/amoClient"
"context"
"go.uber.org/zap"
"time"
)
type Deps struct {
AmoClient *amoClient.Amo
Repo *repository.Repository
Logger *zap.Logger
}
type DataUpdater struct {
amoClient *amoClient.Amo
repo *repository.Repository
logger *zap.Logger
}
func NewDataUpdaterWC(deps Deps) *DataUpdater {
return &DataUpdater{
amoClient: deps.AmoClient,
repo: deps.Repo,
logger: deps.Logger,
}
}
func (wc *DataUpdater) Start(ctx context.Context) {
nextStart := calculateTime()
ticker := time.NewTicker(time.Second * time.Duration(nextStart))
defer ticker.Stop()
for {
select {
case <-ticker.C:
wc.processTasks(ctx)
nextStart = calculateTime()
ticker.Reset(time.Second * time.Duration(nextStart))
case <-ctx.Done():
return
}
}
}
func (wc *DataUpdater) processTasks(ctx context.Context) {
// обновляем информацию о пользователях
err := wc.UserUpdater(ctx)
if err != nil {
wc.logger.Error("some error from UserUpdater", zap.Error(err))
}
// сначала получаем список токенов
allTokens, err := wc.repo.GetAllTokens(ctx)
if err != nil {
wc.logger.Error("error fetch all tokens from mongo:", zap.Error(err))
return
}
for _, token := range allTokens {
accountID := 0 // id аккаунта в амо для тегов и кастомных полей так как там не приходит
// pipelines
pipelines, err := wc.amoClient.GetListPipelines(token.AccessToken)
if err != nil {
wc.logger.Error("error getting list pipelines:", zap.Error(err))
}
err = wc.repo.CheckPipelines(ctx, token.AccountID, pipelines.Embedded.Pipelines)
if err != nil {
wc.logger.Error("error update pipelines in mongo:", zap.Error(err))
}
// steps
for _, pipeline := range pipelines.Embedded.Pipelines {
accountID = pipeline.AccountID
steps, err := wc.amoClient.GetListSteps(pipeline.ID, token.AccessToken)
if err != nil {
wc.logger.Error("error getting list steps pipeline:", zap.Error(err))
continue
}
err = wc.repo.CheckSteps(ctx, token.AccountID, steps.Embedded.Statuses)
if err != nil {
wc.logger.Error("error update pipeline steps in mongo:", zap.Error(err))
}
}
// tags
var leadsTags []amo.Tag
var contactsTags []amo.Tag
var companiesTags []amo.Tag
var customersTags []amo.Tag
entityTypes := []amo.EntityType{amo.LeadsTags, amo.ContactsTags, amo.CompaniesTags, amo.CustomersTags}
for _, entityType := range entityTypes {
page := 1
limit := 250
for {
req := amo.GetListTagsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
tags, err := wc.amoClient.GetListTags(req, token.AccessToken)
if err != nil {
wc.logger.Error("error getting list of tags", zap.Error(err))
break
}
if tags == nil || len(tags.Embedded.Tags) == 0 {
break
}
switch entityType {
case amo.LeadsTags:
leadsTags = append(leadsTags, tags.Embedded.Tags...)
case amo.ContactsTags:
contactsTags = append(contactsTags, tags.Embedded.Tags...)
case amo.CompaniesTags:
companiesTags = append(companiesTags, tags.Embedded.Tags...)
case amo.CustomersTags:
customersTags = append(customersTags, tags.Embedded.Tags...)
}
page++
}
}
for _, entityType := range entityTypes {
switch entityType {
case amo.LeadsTags:
err := wc.repo.CheckTags(ctx, repository.CheckTagsDeps{
Tags: leadsTags,
AccountID: token.AccountID,
ID: accountID,
EntityType: entityType,
})
if err != nil {
wc.logger.Error("error update leads tags")
continue
}
case amo.ContactsTags:
err := wc.repo.CheckTags(ctx, repository.CheckTagsDeps{
Tags: contactsTags,
AccountID: token.AccountID,
ID: accountID,
EntityType: entityType,
})
if err != nil {
wc.logger.Error("error update contacts tags")
continue
}
case amo.CompaniesTags:
err := wc.repo.CheckTags(ctx, repository.CheckTagsDeps{
Tags: companiesTags,
AccountID: token.AccountID,
ID: accountID,
EntityType: entityType,
})
if err != nil {
wc.logger.Error("error update companies tags")
continue
}
case amo.CustomersTags:
err := wc.repo.CheckTags(ctx, repository.CheckTagsDeps{
Tags: customersTags,
AccountID: token.AccountID,
ID: accountID,
EntityType: entityType,
})
if err != nil {
wc.logger.Error("error update customer tags")
continue
}
}
}
// fields
var leadsFields []amo.CustomField
var contactsFields []amo.CustomField
var companiesFields []amo.CustomField
var customersFields []amo.CustomField
for _, entityType := range entityTypes {
page := 1
limit := 50
for {
req := amo.GetListFieldsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
fields, err := wc.amoClient.GetListFields(req, token.AccessToken)
if err != nil {
wc.logger.Error("error getting list of fields", zap.Error(err))
break
}
if fields == nil || len(fields.Embedded.CustomFields) == 0 {
break
}
switch entityType {
case amo.LeadsTags:
leadsFields = append(leadsFields, fields.Embedded.CustomFields...)
case amo.ContactsTags:
contactsFields = append(contactsFields, fields.Embedded.CustomFields...)
case amo.CompaniesTags:
companiesFields = append(companiesFields, fields.Embedded.CustomFields...)
case amo.CustomersTags:
customersFields = append(customersFields, fields.Embedded.CustomFields...)
}
page++
}
}
for _, entityType := range entityTypes {
switch entityType {
case amo.LeadsTags:
err := wc.repo.CheckFields(ctx, repository.CheckFieldsDeps{
Fields: leadsFields,
AccountID: token.AccountID,
ID: accountID,
EntityType: entityType,
})
if err != nil {
wc.logger.Error("error update leads fields")
continue
}
case amo.ContactsTags:
err := wc.repo.CheckFields(ctx, repository.CheckFieldsDeps{
Fields: contactsFields,
AccountID: token.AccountID,
ID: accountID,
EntityType: entityType,
})
if err != nil {
wc.logger.Error("error update contacts fields")
continue
}
case amo.CompaniesTags:
err := wc.repo.CheckFields(ctx, repository.CheckFieldsDeps{
Fields: companiesFields,
AccountID: token.AccountID,
ID: accountID,
EntityType: entityType,
})
if err != nil {
wc.logger.Error("error update companies fields")
continue
}
case amo.CustomersTags:
err := wc.repo.CheckFields(ctx, repository.CheckFieldsDeps{
Fields: companiesFields,
AccountID: token.AccountID,
ID: accountID,
EntityType: entityType,
})
if err != nil {
wc.logger.Error("error update customer fields")
continue
}
}
}
}
}
func (wc *DataUpdater) UserUpdater(ctx context.Context) error {
var listUser []amo.Users
page := 1
limit := 250
for {
userData, err := wc.amoClient.GetUserList(amo.RequestGetListUsers{
Page: page,
Limit: limit,
})
if err != nil {
wc.logger.Error("error getting user list", zap.Error(err))
break
}
if userData == nil || len(userData.Embedded.Users) == 0 {
break
}
listUser = append(listUser, userData.Embedded.Users...)
page++
}
for _, user := range listUser {
onlyOneUser, err := wc.amoClient.GetUserByID(user.ID)
if err != nil {
wc.logger.Error("error getting user by id", zap.Error(err))
continue
}
err = wc.repo.CheckUsers(ctx, user.ID, models.User{
Name: onlyOneUser.Name,
Group: tools.ConvertGroups(user),
Role: *onlyOneUser.Role,
Email: onlyOneUser.Email,
})
}
return nil
}
func (wc *DataUpdater) Stop(ctx context.Context) error {
return nil
}