package workers_methods import ( "amocrm/internal/models" "amocrm/internal/tools" "amocrm/pkg/amoClient" "context" "go.uber.org/zap" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model" "time" ) type Methods struct { repo *dal.AmoDal amoClient *amoClient.Amo logger *zap.Logger } type Deps struct { Repo *dal.AmoDal AmoClient *amoClient.Amo Logger *zap.Logger } func NewWorkersMethods(deps Deps) *Methods { return &Methods{ repo: deps.Repo, amoClient: deps.AmoClient, logger: deps.Logger, } } func (m *Methods) UpdateTokens(ctx context.Context) ([]model.Token, error) { allTokens, err := m.repo.AmoRepo.GetAllTokens(ctx) if err != nil { m.logger.Error("error getting all tokens from db in UpdateTokens", zap.Error(err)) return nil, err } var newTokens []model.Token for _, oldToken := range allTokens { req := models.UpdateWebHookReq{ GrantType: "refresh_token", RefreshToken: oldToken.RefreshToken, } resp, err := m.amoClient.CreateWebHook(&req) if err != nil { m.logger.Error("error create webhook in UpdateTokens", zap.Error(err)) continue } newToken := model.Token{ AccountID: oldToken.AccountID, RefreshToken: resp.RefreshToken, AccessToken: resp.AccessToken, Expiration: time.Now().Unix() + resp.ExpiresIn, CreatedAt: time.Now().Unix(), } newTokens = append(newTokens, newToken) } err = m.repo.AmoRepo.WebhookUpdate(ctx, newTokens) if err != nil { m.logger.Error("error update newTokens in UpdateTokens", zap.Error(err)) return nil, err } return newTokens, nil } func (m *Methods) CheckUsers(ctx context.Context, allTokens []model.Token) error { listUser := make(map[string][]models.Users) for _, token := range allTokens { page := 1 limit := 250 userData, err := m.amoClient.GetUserList(models.RequestGetListUsers{ Page: page, Limit: limit, }, token.AccessToken) if err != nil { m.logger.Error("error fetching list users", zap.Error(err)) break } listUser[token.AccountID] = append(listUser[token.AccountID], userData.Embedded.Users...) } for accountID, users := range listUser { mainAccount, err := m.repo.AmoRepo.GetCurrentAccount(ctx, accountID) if err != nil { m.logger.Error("error getting current account from db", zap.Error(err)) return err } for _, user := range users { if user.ID == mainAccount.AmoID { err := m.repo.AmoRepo.CheckMainUser(ctx, model.User{ Name: user.Name, Role: int32(user.Rights.RoleID), Group: int32(user.Rights.GroupID), Email: user.Email, AmoID: user.ID, }) if err != nil { m.logger.Error("error update main user data in db", zap.Error(err)) return err } } err := m.repo.AmoRepo.CheckAndUpdateUsers(ctx, model.User{ AmoID: user.ID, Name: user.FullName, Group: int32(user.Rights.GroupID), Role: int32(user.Rights.RoleID), Email: user.Email, Amouserid: mainAccount.Amouserid, }) if err != nil { m.logger.Error("error update users list data in db", zap.Error(err)) return err } } } return nil } func (m *Methods) CheckPipelinesAndSteps(ctx context.Context, tokens []model.Token) error { for _, token := range tokens { pipelines, err := m.amoClient.GetListPipelines(token.AccessToken) if err != nil { m.logger.Error("error fetching list pipelines from amo", zap.Error(err)) continue } if len(pipelines.Embedded.Pipelines) > 0 { err = m.repo.AmoRepo.CheckPipelines(ctx, tools.ToPipeline(pipelines.Embedded.Pipelines)) if err != nil { m.logger.Error("error update list pipelines in db:", zap.Error(err)) return err } for _, pipeline := range pipelines.Embedded.Pipelines { steps, err := m.amoClient.GetListSteps(pipeline.ID, token.AccessToken) if err != nil { m.logger.Error("error getting list steps pipeline:", zap.Error(err)) continue } err = m.repo.AmoRepo.CheckSteps(ctx, tools.ToStep(steps.Embedded.Statuses)) if err != nil { m.logger.Error("error update pipeline steps in db:", zap.Error(err)) return err } } } } return nil } func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error { for _, token := range tokens { var leadsTags []models.Tag var contactsTags []models.Tag var companiesTags []models.Tag var customersTags []models.Tag entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType} for _, entityType := range entityTypes { page := 1 limit := 250 for { req := models.GetListTagsReq{ Page: page, Limit: limit, EntityType: entityType, } tags, err := m.amoClient.GetListTags(req, token.AccessToken) if err != nil { m.logger.Error("error getting list of tags", zap.Error(err)) break } if tags == nil || len(tags.Embedded.Tags) == 0 { break } switch entityType { case model.LeadsType: leadsTags = append(leadsTags, tags.Embedded.Tags...) case model.ContactsType: contactsTags = append(contactsTags, tags.Embedded.Tags...) case model.CompaniesType: companiesTags = append(companiesTags, tags.Embedded.Tags...) case model.CustomersType: customersTags = append(customersTags, tags.Embedded.Tags...) } page++ } } for _, entityType := range entityTypes { switch entityType { case model.LeadsType: if len(leadsTags) > 0 { err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(leadsTags, entityType), token.AccountID) if err != nil { m.logger.Error("error update leads tags in db", zap.Error(err)) return err } } case model.ContactsType: if len(contactsTags) > 0 { err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(contactsTags, entityType), token.AccountID) if err != nil { m.logger.Error("error update contacts tags in db", zap.Error(err)) return err } } case model.CompaniesType: if len(companiesTags) > 0 { err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(companiesTags, entityType), token.AccountID) if err != nil { m.logger.Error("error update companies tags in db", zap.Error(err)) return err } } case model.CustomersType: if len(customersTags) > 0 { err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(customersTags, entityType), token.AccountID) if err != nil { m.logger.Error("error update customer tags in db", zap.Error(err)) return err } } } } } return nil } func (m *Methods) CheckFields(ctx context.Context, tokens []model.Token) error { for _, token := range tokens { var leadsFields []models.CustomField var contactsFields []models.CustomField var companiesFields []models.CustomField var customersFields []models.CustomField entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType} for _, entityType := range entityTypes { page := 1 limit := 50 for { req := models.GetListFieldsReq{ Page: page, Limit: limit, EntityType: entityType, } fields, err := m.amoClient.GetListFields(req, token.AccessToken) if err != nil { m.logger.Error("error getting list of fields", zap.Error(err)) break } if fields == nil || len(fields.Embedded.CustomFields) == 0 { break } switch entityType { case model.LeadsType: leadsFields = append(leadsFields, fields.Embedded.CustomFields...) case model.ContactsType: contactsFields = append(contactsFields, fields.Embedded.CustomFields...) case model.CompaniesType: companiesFields = append(companiesFields, fields.Embedded.CustomFields...) case model.CustomersType: customersFields = append(customersFields, fields.Embedded.CustomFields...) } page++ } } for _, entityType := range entityTypes { switch entityType { case model.LeadsType: if len(leadsFields) > 0 { err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(leadsFields, entityType), token.AccountID) if err != nil { m.logger.Error("error update leads fields in db", zap.Error(err)) return err } } case model.ContactsType: if len(contactsFields) > 0 { err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(contactsFields, entityType), token.AccountID) if err != nil { m.logger.Error("error update contacts fields in db", zap.Error(err)) return err } } case model.CompaniesType: if len(companiesFields) > 0 { err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(companiesFields, entityType), token.AccountID) if err != nil { m.logger.Error("error update companies fields", zap.Error(err)) return err } } case model.CustomersType: if len(customersFields) > 0 { err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(customersFields, entityType), token.AccountID) if err != nil { m.logger.Error("error update customer fields", zap.Error(err)) return err } } } } } return nil } func (m *Methods) GetTokenByID(ctx context.Context, accountID string) (*model.Token, error) { token, err := m.repo.AmoRepo.GetTokenByID(ctx, accountID) if err != nil { m.logger.Error("error getting token by account id from db", zap.Error(err)) return nil, err } return token, nil } func (m *Methods) CreateUserFromWebHook(ctx context.Context, accountID string, authCode string) (*model.Token, error) { // получаем аксес и рефреш токены по коду авторизации forGetTokens := models.CreateWebHookReq{ GrantType: "authorization_code", Code: authCode, } tokens, err := m.amoClient.CreateWebHook(&forGetTokens) if err != nil { m.logger.Error("error getting webhook in CreateUserFromWebHook:", zap.Error(err)) return nil, err } // получаем информацию о пользователе по аксес токену userInfo, err := m.amoClient.GetUserInfo(tokens.AccessToken) if err != nil { m.logger.Error("error getting UserInfo in CreateUserFromWebHook:", zap.Error(err)) return nil, err } toCreate := model.User{ Name: userInfo.Name, Subdomain: userInfo.Subdomain, AmoID: userInfo.ID, Amouserid: userInfo.ID, Country: userInfo.Country, } err = m.repo.AmoRepo.CreateAccount(ctx, accountID, toCreate) if err != nil { m.logger.Error("error create account in db in CreateUserFromWebHook", zap.Error(err)) return nil, err } err = m.repo.AmoRepo.WebhookCreate(ctx, model.Token{ RefreshToken: tokens.RefreshToken, AccessToken: tokens.AccessToken, AccountID: accountID, AuthCode: authCode, Expiration: time.Now().Unix() + tokens.ExpiresIn, CreatedAt: time.Now().Unix(), }) if err != nil { m.logger.Error("error adding tokens to db in CreateUserFromWebHook", zap.Error(err)) return nil, err } return &model.Token{ AccountID: accountID, RefreshToken: tokens.RefreshToken, AccessToken: tokens.AccessToken, }, nil } func (m *Methods) CheckUTMs(ctx context.Context, token, accountID string, ids []int32) ([]model.Field, error) { utms, err := m.repo.AmoRepo.GetUtmsByID(ctx, ids) if err != nil { m.logger.Error("error getting user UTM byList IDs", zap.Error(err)) return nil, err } fields, err := m.repo.AmoRepo.GetUserFieldsByID(ctx, utms[0].Accountid) if err != nil { m.logger.Error("error getting user fields by amo account id", zap.Error(err)) return nil, err } toCreated, toUpdate := tools.ToCreatedUpdate(utms, fields) if len(toUpdate) > 0 { err = m.repo.AmoRepo.UpdateUtmsFields(ctx, toUpdate) if err != nil { m.logger.Error("error update utms fields in db", zap.Error(err)) return nil, err } } if len(toCreated) > 0 { createdFields, err := m.amoClient.AddFields(toCreated, model.LeadsType, token) if err != nil { m.logger.Error("error created amo fields", zap.Error(err)) return nil, err } newFields := tools.ToField(createdFields.Embedded.CustomFields, model.LeadsType) err = m.repo.AmoRepo.CheckFields(ctx, newFields, accountID) if err != nil { m.logger.Error("error created amo fields in db", zap.Error(err)) return nil, err } forUpdate := tools.MatchingUTMs(utms, createdFields.Embedded.CustomFields) err = m.repo.AmoRepo.UpdateUTMs(ctx, forUpdate) if err != nil { m.logger.Error("error update utms in db", zap.Error(err)) return nil, err } fields = append(fields, newFields...) } return fields, nil } func (m *Methods) CheckFieldRule(ctx context.Context, token string, accountID string, req models.KafkaRule, currentFields []model.Field) error { var ( leadIDs, contactIDs, companyIDs, customerIDs []int32 leadQuestions, contactQuestions, companyQuestions, customerQuestions []model.Question questionsTypeMap = make(map[model.EntityType][]model.Question) newFields []model.Field lead, contact, company, customer []model.FieldRule currentFieldsRule = req.Fieldsrule err error ) leadIDs = tools.ToQuestionIDs(req.Fieldsrule.Lead) contactIDs = tools.ToQuestionIDs(req.Fieldsrule.Contact) customerIDs = tools.ToQuestionIDs(req.Fieldsrule.Customer) companyIDs = tools.ToQuestionIDs(req.Fieldsrule.Company) getQuestions := func(questionIDs []int32, questions *[]model.Question) { if len(questionIDs) > 0 { *questions, err = m.repo.QuestionRepo.GetQuestionListByIDs(ctx, questionIDs) if err != nil { m.logger.Error("error getting questions", zap.Error(err)) return } } } getQuestions(leadIDs, &leadQuestions) getQuestions(contactIDs, &contactQuestions) getQuestions(customerIDs, &customerQuestions) getQuestions(companyIDs, &companyQuestions) questionsTypeMap[model.LeadsType] = append(questionsTypeMap[model.LeadsType], leadQuestions...) questionsTypeMap[model.ContactsType] = append(questionsTypeMap[model.ContactsType], contactQuestions...) questionsTypeMap[model.CustomersType] = append(questionsTypeMap[model.CustomersType], customerQuestions...) questionsTypeMap[model.CompaniesType] = append(questionsTypeMap[model.CompaniesType], companyQuestions...) toCreated, toUpdate := tools.ToCreatedUpdateQuestionRules(questionsTypeMap, currentFields) for entity, fields := range toCreated { if len(fields) == 0 { continue } createdFields, err := m.amoClient.AddFields(fields, entity, token) if err != nil { m.logger.Error("error adding fields to amo", zap.Any("type", entity), zap.Error(err)) continue } newFields = append(newFields, tools.ToField(createdFields.Embedded.CustomFields, entity)...) } defer func() { if err != nil { m.logger.Error("error updating fields rule in db", zap.Error(err)) } }() err = m.repo.AmoRepo.CheckFields(ctx, newFields, accountID) if err != nil { return err } constructFieldRules := func(fieldRuleArrCurrent []model.FieldRule, questions []model.Question, fieldRule *[]model.FieldRule) { for _, rules := range fieldRuleArrCurrent { for questionID := range rules.Questionid { for _, question := range questions { if fieldID, ok := toUpdate[questionID]; ok { ruleMap := make(map[int]int) ruleMap[questionID] = fieldID *fieldRule = append(*fieldRule, model.FieldRule{Questionid: ruleMap}) break } if questionID == int(question.Id) { for _, field := range newFields { if question.Title == field.Name { ruleMap := make(map[int]int) ruleMap[questionID] = int(field.Amoid) *fieldRule = append(*fieldRule, model.FieldRule{Questionid: ruleMap}) } } } } } } } constructFieldRules(currentFieldsRule.Lead, leadQuestions, &lead) constructFieldRules(currentFieldsRule.Contact, contactQuestions, &contact) constructFieldRules(currentFieldsRule.Customer, customerQuestions, &customer) constructFieldRules(currentFieldsRule.Company, companyQuestions, &company) // todo pq: cannot call json_array_elements on a scalar return m.repo.AmoRepo.UpdateFieldRules(ctx, model.Fieldsrule{ Lead: lead, Customer: customer, Company: company, Contact: contact, }, accountID, req.QuizID) }