amocrm/internal/workers/data_updater/data_updater.go

129 lines
3.1 KiB
Go

package data_updater
import (
"amocrm/internal/models/amo"
"amocrm/internal/repository"
"amocrm/pkg/amoClient"
"context"
"go.uber.org/zap"
"time"
)
type Deps struct {
AmoClient *amoClient.Amo
Repo *repository.Repository
Logger *zap.Logger
}
type DataUpdater struct {
amoClient *amoClient.Amo
repo *repository.Repository
logger *zap.Logger
}
func NewDataUpdaterWC(deps Deps) *DataUpdater {
return &DataUpdater{
amoClient: deps.AmoClient,
repo: deps.Repo,
logger: deps.Logger,
}
}
func (wc *DataUpdater) Start(ctx context.Context) {
nextStart := calculateTime()
ticker := time.NewTicker(time.Second * time.Duration(nextStart))
defer ticker.Stop()
for {
select {
case <-ticker.C:
wc.processTasks(ctx)
nextStart = calculateTime()
ticker.Reset(time.Second * time.Duration(nextStart))
case <-ctx.Done():
return
}
}
}
func (wc *DataUpdater) processTasks(ctx context.Context) {
// сначала получаем список токенов
allTokens, err := wc.repo.GetAllTokens(ctx)
if err != nil {
wc.logger.Error("error fetch all tokens from mongo:", zap.Error(err))
return
}
for _, token := range allTokens {
pipelines, err := wc.amoClient.GetListPipelines(token.AccessToken)
if err != nil {
wc.logger.Error("error getting list pipelines:", zap.Error(err))
}
err = wc.repo.CheckPipelines(ctx, token.AccountID, pipelines.Embedded.Pipelines)
if err != nil {
wc.logger.Error("error update pipelines in mongo:", zap.Error(err))
}
for _, pipeline := range pipelines.Embedded.Pipelines {
steps, err := wc.amoClient.GetListSteps(pipeline.ID, token.AccessToken)
if err != nil {
wc.logger.Error("error getting list steps pipeline:", zap.Error(err))
continue
}
err = wc.repo.CheckSteps(ctx, token.AccountID, steps.Embedded.Statuses)
if err != nil {
wc.logger.Error("error update pipeline steps in mongo:", zap.Error(err))
}
}
var leadsTags []amo.Tag
var contactsTags []amo.Tag
var companiesTags []amo.Tag
var customersTags []amo.Tag
entityTypes := []amo.EntityType{amo.LeadsTags, amo.ContactsTags, amo.CompaniesTags, amo.CustomersTags}
for _, entityType := range entityTypes {
page := 1
limit := 250
for {
req := amo.GetListTagsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
tags, err := wc.amoClient.GetListTags(req, token.AccessToken)
if err != nil {
wc.logger.Error("error getting list of tags", zap.Error(err))
break
}
switch entityType {
case amo.LeadsTags:
leadsTags = append(leadsTags, tags.Embedded.Tags...)
case amo.ContactsTags:
contactsTags = append(contactsTags, tags.Embedded.Tags...)
case amo.CompaniesTags:
companiesTags = append(companiesTags, tags.Embedded.Tags...)
case amo.CustomersTags:
customersTags = append(customersTags, tags.Embedded.Tags...)
}
if len(tags.Embedded.Tags) == 0 {
break
}
page++
}
}
// todo вставка списков в монгу то есть апдейт
// todo fields
}
}
func (wc *DataUpdater) Stop(ctx context.Context) error {
return nil
}