package data_updater import ( "amocrm/internal/models" "amocrm/internal/tools" "amocrm/pkg/amoClient" "context" "go.uber.org/zap" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model" "time" ) type Deps struct { AmoClient *amoClient.Amo Repo *dal.AmoDal Logger *zap.Logger } type DataUpdater struct { amoClient *amoClient.Amo repo *dal.AmoDal logger *zap.Logger } func NewDataUpdaterWC(deps Deps) *DataUpdater { return &DataUpdater{ amoClient: deps.AmoClient, repo: deps.Repo, logger: deps.Logger, } } func (wc *DataUpdater) Start(ctx context.Context) { nextStart := calculateTime() ticker := time.NewTicker(time.Second * time.Duration(nextStart)) //ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: wc.processTasks(ctx) nextStart = calculateTime() ticker.Reset(time.Second * time.Duration(nextStart)) case <-ctx.Done(): return } } } func (wc *DataUpdater) processTasks(ctx context.Context) { // сначала получаем список токенов allTokens, err := wc.repo.AmoRepo.GetAllTokens(ctx) if err != nil { wc.logger.Error("error fetch all tokens from mongo:", zap.Error(err)) return } // обновляем информацию о пользователях err = wc.UserUpdater(ctx, allTokens) if err != nil { wc.logger.Error("some error from UserUpdater", zap.Error(err)) } for _, token := range allTokens { // pipelines pipelines, err := wc.amoClient.GetListPipelines(token.AccessToken) if err != nil { wc.logger.Error("error getting list pipelines:", zap.Error(err)) } err = wc.repo.AmoRepo.CheckPipelines(ctx, tools.ToPipeline(pipelines.Embedded.Pipelines)) if err != nil { wc.logger.Error("error update pipelines in mongo:", zap.Error(err)) } // steps for _, pipeline := range pipelines.Embedded.Pipelines { steps, err := wc.amoClient.GetListSteps(pipeline.ID, token.AccessToken) if err != nil { wc.logger.Error("error getting list steps pipeline:", zap.Error(err)) continue } err = wc.repo.AmoRepo.CheckSteps(ctx, tools.ToStep(steps.Embedded.Statuses)) if err != nil { wc.logger.Error("error update pipeline steps in mongo:", zap.Error(err)) } } // tags var leadsTags []models.Tag var contactsTags []models.Tag var companiesTags []models.Tag var customersTags []models.Tag entityTypes := []model.EntityType{model.LeadsTags, model.ContactsTags, model.CompaniesTags, model.CustomersTags} for _, entityType := range entityTypes { page := 1 limit := 250 for { req := models.GetListTagsReq{ Page: page, Limit: limit, EntityType: entityType, } tags, err := wc.amoClient.GetListTags(req, token.AccessToken) if err != nil { wc.logger.Error("error getting list of tags", zap.Error(err)) break } if tags == nil || len(tags.Embedded.Tags) == 0 { break } switch entityType { case model.LeadsTags: leadsTags = append(leadsTags, tags.Embedded.Tags...) case model.ContactsTags: contactsTags = append(contactsTags, tags.Embedded.Tags...) case model.CompaniesTags: companiesTags = append(companiesTags, tags.Embedded.Tags...) case model.CustomersTags: customersTags = append(customersTags, tags.Embedded.Tags...) } page++ } } for _, entityType := range entityTypes { switch entityType { case model.LeadsTags: err := wc.repo.AmoRepo.CheckTags(ctx, tools.ToTag(leadsTags, entityType), token.AccountID) if err != nil { wc.logger.Error("error update leads tags") continue } case model.ContactsTags: err := wc.repo.AmoRepo.CheckTags(ctx, tools.ToTag(contactsTags, entityType), token.AccountID) if err != nil { wc.logger.Error("error update contacts tags") continue } case model.CompaniesTags: err := wc.repo.AmoRepo.CheckTags(ctx, tools.ToTag(companiesTags, entityType), token.AccountID) if err != nil { wc.logger.Error("error update companies tags") continue } case model.CustomersTags: err := wc.repo.AmoRepo.CheckTags(ctx, tools.ToTag(customersTags, entityType), token.AccountID) if err != nil { wc.logger.Error("error update customer tags") continue } } } // fields var leadsFields []models.CustomField var contactsFields []models.CustomField var companiesFields []models.CustomField var customersFields []models.CustomField for _, entityType := range entityTypes { page := 1 limit := 50 for { req := models.GetListFieldsReq{ Page: page, Limit: limit, EntityType: entityType, } fields, err := wc.amoClient.GetListFields(req, token.AccessToken) if err != nil { wc.logger.Error("error getting list of fields", zap.Error(err)) break } if fields == nil || len(fields.Embedded.CustomFields) == 0 { break } switch entityType { case model.LeadsTags: leadsFields = append(leadsFields, fields.Embedded.CustomFields...) case model.ContactsTags: contactsFields = append(contactsFields, fields.Embedded.CustomFields...) case model.CompaniesTags: companiesFields = append(companiesFields, fields.Embedded.CustomFields...) case model.CustomersTags: customersFields = append(customersFields, fields.Embedded.CustomFields...) } page++ } } for _, entityType := range entityTypes { switch entityType { case model.LeadsTags: err := wc.repo.AmoRepo.CheckFields(ctx, tools.ToField(leadsFields, entityType), token.AccountID) if err != nil { wc.logger.Error("error update leads fields") continue } case model.ContactsTags: err := wc.repo.AmoRepo.CheckFields(ctx, tools.ToField(contactsFields, entityType), token.AccountID) if err != nil { wc.logger.Error("error update contacts fields") continue } case model.CompaniesTags: err := wc.repo.AmoRepo.CheckFields(ctx, tools.ToField(companiesFields, entityType), token.AccountID) if err != nil { wc.logger.Error("error update companies fields") continue } case model.CustomersTags: err := wc.repo.AmoRepo.CheckFields(ctx, tools.ToField(companiesFields, entityType), token.AccountID) if err != nil { wc.logger.Error("error update customer fields") continue } } } } } func (wc *DataUpdater) UserUpdater(ctx context.Context, allTokens []model.Token) error { listUser := make(map[string][]models.Users) for _, token := range allTokens { page := 1 limit := 250 userData, err := wc.amoClient.GetUserList(models.RequestGetListUsers{ Page: page, Limit: limit, }, token.AccessToken) if err != nil { wc.logger.Error("error getting user list", zap.Error(err)) break } if userData == nil || len(userData.Embedded.Users) == 0 { break } listUser[token.AccountID] = append(listUser[token.AccountID], userData.Embedded.Users...) page++ } for accountID, users := range listUser { mainAccount, err := wc.repo.AmoRepo.GetCurrentAccount(ctx, accountID) if err != nil { return err } for _, user := range users { if user.ID == mainAccount.AmoID { err := wc.repo.AmoRepo.CheckMainUser(ctx, model.User{ Name: user.Name, Role: int32(user.Rights.RoleID), Group: int32(user.Rights.GroupID), Email: user.Email, AmoID: user.ID, }) if err != nil { return err } } err := wc.repo.AmoRepo.CheckAndUpdateUsers(ctx, model.User{ AmoID: user.ID, Name: user.FullName, Group: int32(user.Rights.GroupID), Role: int32(user.Rights.RoleID), Email: user.Email, Amouserid: mainAccount.Amouserid, }) if err != nil { return err } } } return nil } func (wc *DataUpdater) Stop(ctx context.Context) error { return nil }