amocrm/internal/workers/data_updater/data_updater.go

268 lines
6.8 KiB
Go

package data_updater
import (
"amocrm/internal/models/amo"
"amocrm/internal/repository"
"amocrm/pkg/amoClient"
"context"
"go.uber.org/zap"
"time"
)
type Deps struct {
AmoClient *amoClient.Amo
Repo *repository.Repository
Logger *zap.Logger
}
type DataUpdater struct {
amoClient *amoClient.Amo
repo *repository.Repository
logger *zap.Logger
}
func NewDataUpdaterWC(deps Deps) *DataUpdater {
return &DataUpdater{
amoClient: deps.AmoClient,
repo: deps.Repo,
logger: deps.Logger,
}
}
func (wc *DataUpdater) Start(ctx context.Context) {
nextStart := calculateTime()
ticker := time.NewTicker(time.Second * time.Duration(nextStart))
defer ticker.Stop()
for {
select {
case <-ticker.C:
wc.processTasks(ctx)
nextStart = calculateTime()
ticker.Reset(time.Second * time.Duration(nextStart))
case <-ctx.Done():
return
}
}
}
func (wc *DataUpdater) processTasks(ctx context.Context) {
// сначала получаем список токенов
allTokens, err := wc.repo.GetAllTokens(ctx)
if err != nil {
wc.logger.Error("error fetch all tokens from mongo:", zap.Error(err))
return
}
for _, token := range allTokens {
accountID := 0 // id аккаунта в амо для тегов и кастомных полей так как там не приходит
// pipelines
pipelines, err := wc.amoClient.GetListPipelines(token.AccessToken)
if err != nil {
wc.logger.Error("error getting list pipelines:", zap.Error(err))
}
err = wc.repo.CheckPipelines(ctx, token.AccountID, pipelines.Embedded.Pipelines)
if err != nil {
wc.logger.Error("error update pipelines in mongo:", zap.Error(err))
}
// steps
for _, pipeline := range pipelines.Embedded.Pipelines {
accountID = pipeline.AccountID
steps, err := wc.amoClient.GetListSteps(pipeline.ID, token.AccessToken)
if err != nil {
wc.logger.Error("error getting list steps pipeline:", zap.Error(err))
continue
}
err = wc.repo.CheckSteps(ctx, token.AccountID, steps.Embedded.Statuses)
if err != nil {
wc.logger.Error("error update pipeline steps in mongo:", zap.Error(err))
}
}
// tags
var leadsTags []amo.Tag
var contactsTags []amo.Tag
var companiesTags []amo.Tag
var customersTags []amo.Tag
entityTypes := []amo.EntityType{amo.LeadsTags, amo.ContactsTags, amo.CompaniesTags, amo.CustomersTags}
for _, entityType := range entityTypes {
page := 1
limit := 250
for {
req := amo.GetListTagsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
tags, err := wc.amoClient.GetListTags(req, token.AccessToken)
if err != nil {
wc.logger.Error("error getting list of tags", zap.Error(err))
break
}
switch entityType {
case amo.LeadsTags:
leadsTags = append(leadsTags, tags.Embedded.Tags...)
case amo.ContactsTags:
contactsTags = append(contactsTags, tags.Embedded.Tags...)
case amo.CompaniesTags:
companiesTags = append(companiesTags, tags.Embedded.Tags...)
case amo.CustomersTags:
customersTags = append(customersTags, tags.Embedded.Tags...)
}
if len(tags.Embedded.Tags) == 0 {
break
}
page++
}
}
for _, entityType := range entityTypes {
switch entityType {
case amo.LeadsTags:
err := wc.repo.CheckTags(ctx, repository.CheckTagsDeps{
Tags: leadsTags,
AccountID: token.AccountID,
ID: accountID,
EntityType: entityType,
})
if err != nil {
wc.logger.Error("error update leads tags")
continue
}
case amo.ContactsTags:
err := wc.repo.CheckTags(ctx, repository.CheckTagsDeps{
Tags: contactsTags,
AccountID: token.AccountID,
ID: accountID,
EntityType: entityType,
})
if err != nil {
wc.logger.Error("error update contacts tags")
continue
}
case amo.CompaniesTags:
err := wc.repo.CheckTags(ctx, repository.CheckTagsDeps{
Tags: companiesTags,
AccountID: token.AccountID,
ID: accountID,
EntityType: entityType,
})
if err != nil {
wc.logger.Error("error update companies tags")
continue
}
case amo.CustomersTags:
err := wc.repo.CheckTags(ctx, repository.CheckTagsDeps{
Tags: customersTags,
AccountID: token.AccountID,
ID: accountID,
EntityType: entityType,
})
if err != nil {
wc.logger.Error("error update customer tags")
continue
}
}
}
// fields
var leadsFields []amo.CustomField
var contactsFields []amo.CustomField
var companiesFields []amo.CustomField
var customersFields []amo.CustomField
for _, entityType := range entityTypes {
page := 1
limit := 50
for {
req := amo.GetListFieldsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
tags, err := wc.amoClient.GetListFields(req, token.AccessToken)
if err != nil {
wc.logger.Error("error getting list of fields", zap.Error(err))
break
}
switch entityType {
case amo.LeadsTags:
leadsFields = append(leadsFields, tags.Embedded.CustomFields...)
case amo.ContactsTags:
contactsFields = append(contactsFields, tags.Embedded.CustomFields...)
case amo.CompaniesTags:
companiesFields = append(companiesFields, tags.Embedded.CustomFields...)
case amo.CustomersTags:
customersFields = append(customersFields, tags.Embedded.CustomFields...)
}
if len(tags.Embedded.CustomFields) == 0 {
break
}
page++
}
}
for _, entityType := range entityTypes {
switch entityType {
case amo.LeadsTags:
err := wc.repo.CheckFields(ctx, repository.CheckFieldsDeps{
Fields: leadsFields,
AccountID: token.AccountID,
ID: accountID,
EntityType: entityType,
})
if err != nil {
wc.logger.Error("error update leads fields")
continue
}
case amo.ContactsTags:
err := wc.repo.CheckFields(ctx, repository.CheckFieldsDeps{
Fields: contactsFields,
AccountID: token.AccountID,
ID: accountID,
EntityType: entityType,
})
if err != nil {
wc.logger.Error("error update contacts fields")
continue
}
case amo.CompaniesTags:
err := wc.repo.CheckFields(ctx, repository.CheckFieldsDeps{
Fields: companiesFields,
AccountID: token.AccountID,
ID: accountID,
EntityType: entityType,
})
if err != nil {
wc.logger.Error("error update companies fields")
continue
}
case amo.CustomersTags:
err := wc.repo.CheckFields(ctx, repository.CheckFieldsDeps{
Fields: companiesFields,
AccountID: token.AccountID,
ID: accountID,
EntityType: entityType,
})
if err != nil {
wc.logger.Error("error update customer fields")
continue
}
}
}
}
}
func (wc *DataUpdater) Stop(ctx context.Context) error {
return nil
}