package workers_methods import ( "amocrm/internal/models" "amocrm/internal/tools" "amocrm/pkg/amoClient" "context" "go.uber.org/zap" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model" "time" ) type Methods struct { repo *dal.AmoDal amoClient *amoClient.Amo logger *zap.Logger } type Deps struct { Repo *dal.AmoDal AmoClient *amoClient.Amo Logger *zap.Logger } func NewWorkersMethods(deps Deps) *Methods { return &Methods{ repo: deps.Repo, amoClient: deps.AmoClient, logger: deps.Logger, } } func (m *Methods) UpdateTokens(ctx context.Context) ([]model.Token, error) { allTokens, err := m.repo.AmoRepo.GetAllTokens(ctx) if err != nil { m.logger.Error("error getting all tokens from db in UpdateTokens", zap.Error(err)) return nil, err } var newTokens []model.Token for _, oldToken := range allTokens { req := models.UpdateWebHookReq{ GrantType: "refresh_token", RefreshToken: oldToken.RefreshToken, } resp, err := m.amoClient.CreateWebHook(&req) if err != nil { m.logger.Error("error create webhook in UpdateTokens", zap.Error(err)) continue } newToken := model.Token{ AccountID: oldToken.AccountID, RefreshToken: resp.RefreshToken, AccessToken: resp.AccessToken, Expiration: time.Now().Unix() + resp.ExpiresIn, CreatedAt: time.Now().Unix(), } newTokens = append(newTokens, newToken) } err = m.repo.AmoRepo.WebhookUpdate(ctx, newTokens) if err != nil { m.logger.Error("error update newTokens in UpdateTokens", zap.Error(err)) return nil, err } return newTokens, nil } func (m *Methods) CheckUsers(ctx context.Context, allTokens []model.Token) error { listUser := make(map[string][]models.Users) for _, token := range allTokens { page := 1 limit := 250 userData, err := m.amoClient.GetUserList(models.RequestGetListUsers{ Page: page, Limit: limit, }, token.AccessToken) if err != nil { m.logger.Error("error fetching list users", zap.Error(err)) break } listUser[token.AccountID] = append(listUser[token.AccountID], userData.Embedded.Users...) } for accountID, users := range listUser { mainAccount, err := m.repo.AmoRepo.GetCurrentAccount(ctx, accountID) if err != nil { m.logger.Error("error getting current account from db", zap.Error(err)) return err } for _, user := range users { if user.ID == mainAccount.AmoID { err := m.repo.AmoRepo.CheckMainUser(ctx, model.User{ Name: user.Name, Role: int32(user.Rights.RoleID), Group: int32(user.Rights.GroupID), Email: user.Email, AmoID: user.ID, }) if err != nil { m.logger.Error("error update main user data in db", zap.Error(err)) return err } } err := m.repo.AmoRepo.CheckAndUpdateUsers(ctx, model.User{ AmoID: user.ID, Name: user.FullName, Group: int32(user.Rights.GroupID), Role: int32(user.Rights.RoleID), Email: user.Email, Amouserid: mainAccount.Amouserid, }) if err != nil { m.logger.Error("error update users list data in db", zap.Error(err)) return err } } } return nil } func (m *Methods) CheckPipelinesAndSteps(ctx context.Context, tokens []model.Token) error { for _, token := range tokens { pipelines, err := m.amoClient.GetListPipelines(token.AccessToken) if err != nil { m.logger.Error("error fetching list pipelines from amo", zap.Error(err)) continue } err = m.repo.AmoRepo.CheckPipelines(ctx, tools.ToPipeline(pipelines.Embedded.Pipelines)) if err != nil { m.logger.Error("error update list pipelines in db:", zap.Error(err)) return err } for _, pipeline := range pipelines.Embedded.Pipelines { steps, err := m.amoClient.GetListSteps(pipeline.ID, token.AccessToken) if err != nil { m.logger.Error("error getting list steps pipeline:", zap.Error(err)) continue } err = m.repo.AmoRepo.CheckSteps(ctx, tools.ToStep(steps.Embedded.Statuses)) if err != nil { m.logger.Error("error update pipeline steps in db:", zap.Error(err)) return err } } } return nil } func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error { for _, token := range tokens { var leadsTags []models.Tag var contactsTags []models.Tag var companiesTags []models.Tag var customersTags []models.Tag entityTypes := []model.EntityType{model.LeadsTags, model.ContactsTags, model.CompaniesTags, model.CustomersTags} for _, entityType := range entityTypes { page := 1 limit := 250 for { req := models.GetListTagsReq{ Page: page, Limit: limit, EntityType: entityType, } tags, err := m.amoClient.GetListTags(req, token.AccessToken) if err != nil { m.logger.Error("error getting list of tags", zap.Error(err)) break } if tags == nil || len(tags.Embedded.Tags) == 0 { break } switch entityType { case model.LeadsTags: leadsTags = append(leadsTags, tags.Embedded.Tags...) case model.ContactsTags: contactsTags = append(contactsTags, tags.Embedded.Tags...) case model.CompaniesTags: companiesTags = append(companiesTags, tags.Embedded.Tags...) case model.CustomersTags: customersTags = append(customersTags, tags.Embedded.Tags...) } page++ } } for _, entityType := range entityTypes { switch entityType { case model.LeadsTags: err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(leadsTags, entityType), token.AccountID) if err != nil { m.logger.Error("error update leads tags in db", zap.Error(err)) return err } case model.ContactsTags: err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(contactsTags, entityType), token.AccountID) if err != nil { m.logger.Error("error update contacts tags in db", zap.Error(err)) return err } case model.CompaniesTags: err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(companiesTags, entityType), token.AccountID) if err != nil { m.logger.Error("error update companies tags in db", zap.Error(err)) return err } case model.CustomersTags: err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(customersTags, entityType), token.AccountID) if err != nil { m.logger.Error("error update customer tags in db", zap.Error(err)) return err } } } } return nil } func (m *Methods) CheckFields(ctx context.Context, tokens []model.Token) error { for _, token := range tokens { var leadsFields []models.CustomField var contactsFields []models.CustomField var companiesFields []models.CustomField var customersFields []models.CustomField entityTypes := []model.EntityType{model.LeadsTags, model.ContactsTags, model.CompaniesTags, model.CustomersTags} for _, entityType := range entityTypes { page := 1 limit := 50 for { req := models.GetListFieldsReq{ Page: page, Limit: limit, EntityType: entityType, } fields, err := m.amoClient.GetListFields(req, token.AccessToken) if err != nil { m.logger.Error("error getting list of fields", zap.Error(err)) break } if fields == nil || len(fields.Embedded.CustomFields) == 0 { break } switch entityType { case model.LeadsTags: leadsFields = append(leadsFields, fields.Embedded.CustomFields...) case model.ContactsTags: contactsFields = append(contactsFields, fields.Embedded.CustomFields...) case model.CompaniesTags: companiesFields = append(companiesFields, fields.Embedded.CustomFields...) case model.CustomersTags: customersFields = append(customersFields, fields.Embedded.CustomFields...) } page++ } } for _, entityType := range entityTypes { switch entityType { case model.LeadsTags: err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(leadsFields, entityType), token.AccountID) if err != nil { m.logger.Error("error update leads fields in db", zap.Error(err)) return err } case model.ContactsTags: err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(contactsFields, entityType), token.AccountID) if err != nil { m.logger.Error("error update contacts fields in db", zap.Error(err)) return err } case model.CompaniesTags: err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(companiesFields, entityType), token.AccountID) if err != nil { m.logger.Error("error update companies fields", zap.Error(err)) return err } case model.CustomersTags: err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(companiesFields, entityType), token.AccountID) if err != nil { m.logger.Error("error update customer fields", zap.Error(err)) return err } } } } return nil } func (m *Methods) GetTokenByID(ctx context.Context, accountID string) (*model.Token, error) { token, err := m.repo.AmoRepo.GetTokenByID(ctx, accountID) if err != nil { m.logger.Error("error getting token by account id from db", zap.Error(err)) return nil, err } return token, nil } func (m *Methods) CreateUserFromWebHook(ctx context.Context, accountID string, authCode string) (*model.Token, error) { // получаем аксес и рефреш токены по коду авторизации forGetTokens := models.CreateWebHookReq{ GrantType: "authorization_code", Code: authCode, } tokens, err := m.amoClient.CreateWebHook(&forGetTokens) if err != nil { m.logger.Error("error getting webhook in CreateUserFromWebHook:", zap.Error(err)) return nil, err } // получаем информацию о пользователе по аксес токену userInfo, err := m.amoClient.GetUserInfo(tokens.AccessToken) if err != nil { m.logger.Error("error getting UserInfo in CreateUserFromWebHook:", zap.Error(err)) return nil, err } toCreate := model.User{ Name: userInfo.Name, Subdomain: userInfo.Subdomain, AmoID: userInfo.ID, Amouserid: userInfo.ID, Country: userInfo.Country, } err = m.repo.AmoRepo.CreateAccount(ctx, accountID, toCreate) if err != nil { m.logger.Error("error create account in db in CreateUserFromWebHook", zap.Error(err)) return nil, err } err = m.repo.AmoRepo.WebhookCreate(ctx, model.Token{ RefreshToken: tokens.RefreshToken, AccessToken: tokens.AccessToken, AccountID: accountID, AuthCode: authCode, Expiration: time.Now().Unix() + tokens.ExpiresIn, CreatedAt: time.Now().Unix(), }) if err != nil { m.logger.Error("error adding tokens to db in CreateUserFromWebHook", zap.Error(err)) return nil, err } return &model.Token{ AccountID: accountID, RefreshToken: tokens.RefreshToken, AccessToken: tokens.AccessToken, }, nil }