package workers_methods import ( "amocrm/internal/models" "amocrm/internal/tools" "amocrm/pkg/amoClient" "context" "encoding/json" "fmt" "go.uber.org/zap" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model" "sync" "time" ) type Methods struct { repo *dal.AmoDal amoClient *amoClient.Amo logger *zap.Logger } type Deps struct { Repo *dal.AmoDal AmoClient *amoClient.Amo Logger *zap.Logger } func NewWorkersMethods(deps Deps) *Methods { return &Methods{ repo: deps.Repo, amoClient: deps.AmoClient, logger: deps.Logger, } } func (m *Methods) UpdateTokens(ctx context.Context) ([]model.Token, error) { allTokens, err := m.repo.AmoRepo.GetAllTokens(ctx) if err != nil { m.logger.Error("error getting all tokens from db in UpdateTokens", zap.Error(err)) return nil, err } for _, oldToken := range allTokens { user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, oldToken.AccountID) if err != nil { m.logger.Error("error getting account by id in UpdateTokens", zap.Error(err)) return nil, err } req := models.UpdateWebHookReq{ GrantType: "refresh_token", RefreshToken: oldToken.RefreshToken, } resp, err := m.amoClient.CreateWebHook(&req, user.Subdomain) if err != nil { m.logger.Error("error create webhook in UpdateTokens", zap.Error(err)) continue } newToken := model.Token{ AccountID: oldToken.AccountID, RefreshToken: resp.RefreshToken, AccessToken: resp.AccessToken, Expiration: time.Now().Unix() + resp.ExpiresIn, CreatedAt: time.Now().Unix(), } err = m.repo.AmoRepo.WebhookUpdate(ctx, newToken) if err != nil { m.logger.Error("error update token in db", zap.Error(err)) return nil, err } } newTokens, err := m.repo.AmoRepo.GetAllTokens(ctx) if err != nil { m.logger.Error("error getting all new updated tokens from db in UpdateTokens", zap.Error(err)) return nil, err } return newTokens, nil } func (m *Methods) CheckUsers(ctx context.Context, allTokens []model.Token) error { listUser := make(map[string][]models.Users) for _, token := range allTokens { user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, token.AccountID) if err != nil { m.logger.Error("error getting account by id in CheckUsers", zap.Error(err)) return err } page := 1 limit := 250 userData, err := m.amoClient.GetUserList(models.RequestGetListUsers{ Page: page, Limit: limit, }, token.AccessToken, user.Subdomain) if err != nil { m.logger.Error("error fetching list users", zap.Error(err)) break } listUser[token.AccountID] = append(listUser[token.AccountID], userData.Embedded.Users...) } for accountID, users := range listUser { mainAccount, err := m.repo.AmoRepo.GetCurrentAccount(ctx, accountID) if err != nil { m.logger.Error("error getting current account from db", zap.Error(err)) return err } currentUserUsers, err := m.repo.AmoRepo.GetUserUsersByID(ctx, mainAccount.AmoID) if err != nil { m.logger.Error("error getting user users by amo user id", zap.Error(err)) return err } for _, user := range users { found := false for _, currentUser := range currentUserUsers { if user.ID == currentUser.AmoUserID { found = true err := m.repo.AmoRepo.UpdateAmoAccountUser(ctx, model.AmoAccountUser{ AmoID: currentUser.AmoID, AmoUserID: currentUser.AmoUserID, Name: user.Name, Email: user.Email, Role: int32(user.Rights.RoleID), Group: int32(user.Rights.GroupID), }) if err != nil { m.logger.Error("failed update user amo account in db", zap.Error(err)) return err } } } if !found { err := m.repo.AmoRepo.AddAmoAccountUser(ctx, model.AmoAccountUser{ AmoID: mainAccount.AmoID, AmoUserID: user.ID, Name: user.Name, Email: user.Email, Role: int32(user.Rights.RoleID), Group: int32(user.Rights.GroupID), }) if err != nil { m.logger.Error("failed insert user amo account in db", zap.Error(err)) return err } } } var deletedUserIDs []int64 for _, currentUserUser := range currentUserUsers { found := false for _, user := range users { if currentUserUser.AmoUserID == user.ID { found = true break } } if !found { deletedUserIDs = append(deletedUserIDs, currentUserUser.ID) } } if len(deletedUserIDs) > 0 { err := m.repo.AmoRepo.DeleteUsers(ctx, deletedUserIDs) if err != nil { m.logger.Error("error deleting users in db", zap.Error(err)) return err } } } return nil } func (m *Methods) CheckPipelinesAndSteps(ctx context.Context, tokens []model.Token) error { for _, token := range tokens { user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, token.AccountID) if err != nil { m.logger.Error("error getting amoUserInfo by account quiz id", zap.Error(err)) return err } currentUserPipelines, err := m.repo.AmoRepo.GetUserPipelinesByID(ctx, user.AmoID) if err != nil { m.logger.Error("error getting user pipelines by amo id", zap.Error(err)) return err } currentUserSteps, err := m.repo.AmoRepo.GetUserStepsByID(ctx, user.AmoID) if err != nil { m.logger.Error("error getting user steps by amo id", zap.Error(err)) return err } var receivedSteps []model.Step pipelines, err := m.amoClient.GetListPipelines(token.AccessToken, user.Subdomain) if err != nil { m.logger.Error("error fetching list pipelines from amo", zap.Error(err)) continue } if len(pipelines.Embedded.Pipelines) > 0 { receivedPipelines := tools.ToPipeline(pipelines.Embedded.Pipelines) err = m.repo.AmoRepo.CheckPipelines(ctx, receivedPipelines) if err != nil { m.logger.Error("error update list pipelines in db:", zap.Error(err)) return err } for _, pipeline := range pipelines.Embedded.Pipelines { steps, err := m.amoClient.GetListSteps(pipeline.ID, token.AccessToken, user.Subdomain) if err != nil { m.logger.Error("error getting list steps pipeline:", zap.Error(err)) continue } receivedStep := tools.ToStep(steps.Embedded.Statuses) receivedSteps = append(receivedSteps, receivedStep...) err = m.repo.AmoRepo.CheckSteps(ctx, receivedStep) if err != nil { m.logger.Error("error update pipeline steps in db:", zap.Error(err)) return err } } var deletedPipelineIDs []int64 for _, currentUserPipeline := range currentUserPipelines { found := false for _, receivedPipeline := range receivedPipelines { if currentUserPipeline.Amoid == receivedPipeline.Amoid && currentUserPipeline.AccountID == user.AmoID { found = true break } } if !found { deletedPipelineIDs = append(deletedPipelineIDs, currentUserPipeline.ID) } } if len(deletedPipelineIDs) > 0 { err := m.repo.AmoRepo.DeletePipelines(ctx, deletedPipelineIDs) if err != nil { m.logger.Error("error deleting pipelines in db", zap.Error(err)) return err } } var deletedStepIDs []int64 for _, currentUserStep := range currentUserSteps { found := false for _, receivedStep := range receivedSteps { if currentUserStep.Amoid == receivedStep.Amoid && currentUserStep.Accountid == user.AmoID && currentUserStep.Pipelineid == receivedStep.Pipelineid { found = true break } } if !found { deletedStepIDs = append(deletedStepIDs, currentUserStep.ID) } } if len(deletedStepIDs) > 0 { err := m.repo.AmoRepo.DeleteSteps(ctx, deletedStepIDs) if err != nil { m.logger.Error("error deleting steps in db", zap.Error(err)) return err } } } } return nil } func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error { for _, token := range tokens { user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, token.AccountID) if err != nil { m.logger.Error("error getting amoUserInfo by account quiz id", zap.Error(err)) return err } currentUserTags, err := m.repo.AmoRepo.GetUserTagsByID(ctx, user.AmoID) if err != nil { m.logger.Error("error getting user tags by amo id", zap.Error(err)) return err } var wg sync.WaitGroup wg.Add(4) var tagsMap sync.Map entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType} for _, entityType := range entityTypes { go func(entityType model.EntityType) { defer wg.Done() page := 1 limit := 250 for { req := models.GetListTagsReq{ Page: page, Limit: limit, EntityType: entityType, } tags, err := m.amoClient.GetListTags(req, token.AccessToken, user.Subdomain) if err != nil { m.logger.Error("error getting list of tags", zap.Error(err)) return } if tags == nil || len(tags.Embedded.Tags) == 0 { break } tagsMap.Store(entityType, tags.Embedded.Tags) page++ } }(entityType) } wg.Wait() var deletedTagIDs []int64 for _, currentUserTag := range currentUserTags { found := false for _, entityType := range entityTypes { if tags, ok := tagsMap.Load(entityType); ok { if len(tags.([]models.Tag)) > 0 { receivedTags := tools.ToTag(tags.([]models.Tag), entityType) for _, tag := range receivedTags { if currentUserTag.Amoid == tag.Amoid && currentUserTag.Accountid == user.AmoID && currentUserTag.Entity == entityType { found = true break } } } } if found { break } } if !found { deletedTagIDs = append(deletedTagIDs, currentUserTag.ID) } } if len(deletedTagIDs) > 0 { err = m.repo.AmoRepo.DeleteTags(ctx, deletedTagIDs) if err != nil { m.logger.Error("error deleting tags in db", zap.Error(err)) return err } } for _, entityType := range entityTypes { if tags, ok := tagsMap.Load(entityType); ok { if len(tags.([]models.Tag)) > 0 { err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(tags.([]models.Tag), entityType), token.AccountID) if err != nil { switch entityType { case model.LeadsType: m.logger.Error("error updating leads tags in db", zap.Error(err)) return err case model.ContactsType: m.logger.Error("error updating contacts tags in db", zap.Error(err)) return err case model.CompaniesType: m.logger.Error("error updating companies tags in db", zap.Error(err)) return err case model.CustomersType: m.logger.Error("error updating customer tags in db", zap.Error(err)) return err } } } } } } return nil } func (m *Methods) CheckFields(ctx context.Context, tokens []model.Token) error { for _, token := range tokens { user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, token.AccountID) if err != nil { m.logger.Error("error getting amoUserInfo by account quiz id", zap.Error(err)) return err } currentUserFields, err := m.repo.AmoRepo.GetUserFieldsByID(ctx, user.AmoID) if err != nil { m.logger.Error("error getting user fields by amo id", zap.Error(err)) return err } var wg sync.WaitGroup wg.Add(4) var fieldsMap sync.Map entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType} for _, entityType := range entityTypes { go func(entityType model.EntityType) { defer wg.Done() page := 1 limit := 50 for { req := models.GetListFieldsReq{ Page: page, Limit: limit, EntityType: entityType, } fields, err := m.amoClient.GetListFields(req, token.AccessToken, user.Subdomain) if err != nil { m.logger.Error("error getting list of fields", zap.Error(err)) return } if fields == nil || len(fields.Embedded.CustomFields) == 0 { break } fieldsMap.Store(entityType, fields.Embedded.CustomFields) page++ } }(entityType) } wg.Wait() var deletedFieldIDs []int64 for _, currentUserField := range currentUserFields { found := false for _, entityType := range entityTypes { if fields, ok := fieldsMap.Load(entityType); ok { if len(fields.([]models.CustomField)) > 0 { receivedFields := tools.ToField(fields.([]models.CustomField), entityType) for _, field := range receivedFields { if currentUserField.Amoid == field.Amoid && currentUserField.Accountid == user.AmoID && currentUserField.Entity == entityType { found = true break } } } } if found { break } } if !found { deletedFieldIDs = append(deletedFieldIDs, currentUserField.ID) } } if len(deletedFieldIDs) > 0 { err = m.repo.AmoRepo.DeleteFields(ctx, deletedFieldIDs) if err != nil { m.logger.Error("error deleting fields in db", zap.Error(err)) return err } } for _, entityType := range entityTypes { if fields, ok := fieldsMap.Load(entityType); ok { if len(fields.([]models.CustomField)) > 0 { err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(fields.([]models.CustomField), entityType), token.AccountID) if err != nil { switch entityType { case model.LeadsType: m.logger.Error("error updating leads fields in db", zap.Error(err)) return err case model.ContactsType: m.logger.Error("error updating contacts fields in db", zap.Error(err)) return err case model.CompaniesType: m.logger.Error("error updating companies fields in db", zap.Error(err)) return err case model.CustomersType: m.logger.Error("error updating customer fields in db", zap.Error(err)) return err } } } } } } return nil } func (m *Methods) GetTokenByID(ctx context.Context, accountID string) (*model.Token, error) { token, err := m.repo.AmoRepo.GetTokenByID(ctx, accountID) if err != nil { m.logger.Error("error getting token by account id from db", zap.Error(err)) return nil, err } return token, nil } func (m *Methods) CreateUserFromWebHook(ctx context.Context, msg models.KafkaMessage) (*model.Token, error) { // получаем аксес и рефреш токены по коду авторизации forGetTokens := models.CreateWebHookReq{ GrantType: "authorization_code", Code: msg.AuthCode, } tokens, err := m.amoClient.CreateWebHook(&forGetTokens, msg.RefererURL) if err != nil { m.logger.Error("error getting webhook in CreateUserFromWebHook:", zap.Error(err)) return nil, err } // получаем информацию о пользователе по аксес токену userInfo, err := m.amoClient.GetUserInfo(tokens.AccessToken, msg.RefererURL) if err != nil { m.logger.Error("error getting UserInfo in CreateUserFromWebHook:", zap.Error(err)) return nil, err } toCreate := model.AmoAccount{ AccountID: msg.AccountID, AmoID: userInfo.ID, Name: userInfo.Name, Subdomain: msg.RefererURL, Country: userInfo.Country, DriveURL: userInfo.DriveUrl, } err = m.repo.AmoRepo.CreateAccount(ctx, toCreate) if err != nil { m.logger.Error("error create account in db in CreateUserFromWebHook", zap.Error(err)) return nil, err } err = m.repo.AmoRepo.WebhookCreate(ctx, model.Token{ RefreshToken: tokens.RefreshToken, AccessToken: tokens.AccessToken, AccountID: msg.AccountID, AuthCode: msg.AuthCode, Expiration: time.Now().Unix() + tokens.ExpiresIn, CreatedAt: time.Now().Unix(), }) if err != nil { m.logger.Error("error adding tokens to db in CreateUserFromWebHook", zap.Error(err)) return nil, err } return &model.Token{ AccountID: msg.AccountID, RefreshToken: tokens.RefreshToken, AccessToken: tokens.AccessToken, }, nil } func (m *Methods) CheckFieldRule(ctx context.Context, token string, msg models.KafkaMessage) error { var ( leadIDs, companyIDs, customerIDs []int32 leadQuestions, companyQuestions, customerQuestions []model.Question questionsTypeMap = make(map[model.EntityType][]model.Question) newFields []model.Field lead, company, customer []model.FieldRule currentFieldsRule = msg.Rule.Fieldsrule err error ) user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, msg.AccountID) if err != nil { m.logger.Error("error getting user data by account id in check utms wc method", zap.Error(err)) return err } currentFields, err := m.repo.AmoRepo.GetUserFieldsByID(ctx, user.AmoID) if err != nil { m.logger.Error("error getting user fields by amo account id", zap.Error(err)) return err } quiz, err := m.repo.QuizRepo.GetQuizById(ctx, msg.AccountID, uint64(msg.Rule.QuizID)) if err != nil { m.logger.Error("error getting quiz by quizID and accountID", zap.Error(err)) return err } var quizConfig model.QuizContact err = json.Unmarshal([]byte(quiz.Config), &quizConfig) if err != nil { m.logger.Error("error serialization quizConfig to model QuizContact", zap.Error(err)) return err } leadIDs = tools.ToQuestionIDs(msg.Rule.Fieldsrule.Lead) customerIDs = tools.ToQuestionIDs(msg.Rule.Fieldsrule.Customer) companyIDs = tools.ToQuestionIDs(msg.Rule.Fieldsrule.Company) getQuestions := func(questionIDs []int32, questions *[]model.Question) { if len(questionIDs) > 0 { *questions, err = m.repo.QuestionRepo.GetQuestionListByIDs(ctx, questionIDs) if err != nil { m.logger.Error("error getting questions", zap.Error(err)) return } } } getQuestions(leadIDs, &leadQuestions) getQuestions(customerIDs, &customerQuestions) getQuestions(companyIDs, &companyQuestions) questionsTypeMap[model.LeadsType] = append(questionsTypeMap[model.LeadsType], leadQuestions...) questionsTypeMap[model.CustomersType] = append(questionsTypeMap[model.CustomersType], customerQuestions...) questionsTypeMap[model.CompaniesType] = append(questionsTypeMap[model.CompaniesType], companyQuestions...) toCreated, toUpdate := tools.ToCreatedUpdateQuestionRules(questionsTypeMap, currentFields) contactFieldsToCreate, forAdding := tools.ForContactRules(quizConfig, currentFields) for entity, fields := range toCreated { if len(fields) == 0 { continue } createdFields, err := m.amoClient.AddFields(fields, entity, token, user.Subdomain) if err != nil { m.logger.Error("error adding fields to amo", zap.Any("type", entity), zap.Error(err)) continue } newFields = append(newFields, tools.ToField(createdFields.Embedded.CustomFields, entity)...) } if len(contactFieldsToCreate) > 0 { createdFields, err := m.amoClient.AddFields(contactFieldsToCreate, model.ContactsType, token, user.Subdomain) if err != nil { m.logger.Error("error adding fields to amo", zap.Any("type", model.ContactsType), zap.Error(err)) } contructedFields := tools.ToField(createdFields.Embedded.CustomFields, model.ContactsType) newFields = append(newFields, contructedFields...) for _, field := range contructedFields { if _, ok := forAdding[field.Name]; ok { forAdding[field.Name] = int(field.Amoid) } } } if len(newFields) > 0 { err = m.repo.AmoRepo.CheckFields(ctx, newFields, msg.AccountID) if err != nil { m.logger.Error("error updating fields rule in db Check Fields", zap.Error(err)) return err } } constructFieldRules := func(fieldRuleArrCurrent []model.FieldRule, questions []model.Question, fieldRule *[]model.FieldRule, currentEntity model.EntityType) { for _, rules := range fieldRuleArrCurrent { for questionID := range rules.Questionid { for _, question := range questions { if dataQues, ok := toUpdate[questionID]; ok { if dataQues.Entity == currentEntity { ruleMap := make(map[int]int) ruleMap[questionID] = dataQues.FieldID *fieldRule = append(*fieldRule, model.FieldRule{Questionid: ruleMap}) break } } if questionID == int(question.Id) { // тут также делаем чтобы сверить филд с вопросом if question.Title == "" { question.Title = fmt.Sprintf("Вопрос №%d", question.Page) } for _, field := range newFields { if question.Title == field.Name && field.Entity == currentEntity { ruleMap := make(map[int]int) ruleMap[questionID] = int(field.Amoid) *fieldRule = append(*fieldRule, model.FieldRule{Questionid: ruleMap}) } } } } } } } constructFieldRules(currentFieldsRule.Lead, leadQuestions, &lead, model.LeadsType) constructFieldRules(currentFieldsRule.Customer, customerQuestions, &customer, model.CustomersType) constructFieldRules(currentFieldsRule.Company, companyQuestions, &company, model.CompaniesType) err = m.repo.AmoRepo.UpdateFieldRules(ctx, model.Fieldsrule{ Lead: lead, Customer: customer, Company: company, Contact: model.ContactRules{ContactRuleMap: forAdding}, }, msg.AccountID, msg.Rule.QuizID) if err != nil { m.logger.Error("error updating fields rule in db", zap.Error(err)) return err } return nil } func (m *Methods) UserReLogin(ctx context.Context, msg models.KafkaMessage) error { forGetTokens := models.CreateWebHookReq{ GrantType: "authorization_code", Code: msg.AuthCode, } tokens, err := m.amoClient.CreateWebHook(&forGetTokens, msg.RefererURL) if err != nil { m.logger.Error("error getting tokens in method user re-login:", zap.Error(err)) return err } userInfo, err := m.amoClient.GetUserInfo(tokens.AccessToken, msg.RefererURL) if err != nil { m.logger.Error("error getting UserInfo in method user re-login:", zap.Error(err)) return err } toUpdate := model.AmoAccount{ AccountID: msg.AccountID, AmoID: userInfo.ID, Name: userInfo.Name, Subdomain: msg.RefererURL, Country: userInfo.Country, DriveURL: userInfo.DriveUrl, } err = m.repo.AmoRepo.UpdateCurrentAccount(ctx, toUpdate) if err != nil { m.logger.Error("error update account in db in method user re-login", zap.Error(err)) return err } err = m.repo.AmoRepo.WebhookUpdate(ctx, model.Token{ RefreshToken: tokens.RefreshToken, AccessToken: tokens.AccessToken, AccountID: msg.AccountID, Expiration: time.Now().Unix() + tokens.ExpiresIn, CreatedAt: time.Now().Unix(), }) if err != nil { m.logger.Error("error update tokens in db in method user re-login", zap.Error(err)) return err } return nil }