package workers_methods import ( "context" "encoding/json" "errors" "fmt" "go.uber.org/zap" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/models" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/tools" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/pkg/bitrixClient" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model" "strings" "sync" "time" ) type Methods struct { repo *dal.BitrixDal bitrixClient *bitrixClient.Bitrix logger *zap.Logger } type Deps struct { Repo *dal.BitrixDal BitrixClient *bitrixClient.Bitrix Logger *zap.Logger } func NewWorkersMethods(deps Deps) *Methods { return &Methods{ repo: deps.Repo, bitrixClient: deps.BitrixClient, logger: deps.Logger, } } func (m *Methods) UpdateTokens(ctx context.Context) ([]model.Token, error) { allTokens, err := m.repo.BitrixRepo.GetAllTokens(ctx) if err != nil { m.logger.Error("error getting all tokens from db in UpdateTokens", zap.Error(err)) return nil, err } for _, oldToken := range allTokens { req := models.UpdateWebHookReq{ GrantType: "refresh_token", RefreshToken: oldToken.RefreshToken, } resp, err := m.bitrixClient.CreateWebHook(&req, false) if err != nil { m.logger.Error("error create webhook in UpdateTokens", zap.Error(err)) continue } newToken := model.Token{ AccountID: oldToken.AccountID, RefreshToken: resp.RefreshToken, AccessToken: resp.AccessToken, Expiration: time.Now().Unix() + resp.ExpiresIn, CreatedAt: time.Now().Unix(), } err = m.repo.BitrixRepo.WebhookUpdate(ctx, newToken) if err != nil { m.logger.Error("error update token in db", zap.Error(err)) return nil, err } } newTokens, err := m.repo.BitrixRepo.GetAllTokens(ctx) if err != nil { m.logger.Error("error getting all new updated tokens from db in UpdateTokens", zap.Error(err)) return nil, err } return newTokens, nil } func (m *Methods) CheckUsers(ctx context.Context, allTokens []model.Token) error { listUser := make(map[string][]models.User) for _, token := range allTokens { currentCompany, err := m.repo.BitrixRepo.GetCurrentAccount(ctx, token.AccountID) if err != nil { m.logger.Error("error getting current company", zap.Error(err)) return err } userData, err := m.bitrixClient.GetUserList(token.AccessToken, currentCompany.Subdomain) if err != nil { m.logger.Error("error fetching list users", zap.Error(err)) break } listUser[token.AccountID] = append(listUser[token.AccountID], userData.Result...) } for accountID, users := range listUser { currentCompany, err := m.repo.BitrixRepo.GetCurrentAccount(ctx, accountID) if err != nil { m.logger.Error("error getting current company", zap.Error(err)) return err } currentUserUsers, err := m.repo.BitrixRepo.GetUserUsersByID(ctx, currentCompany.BitrixID) if err != nil { m.logger.Error("error getting user users by bitrix user id", zap.Error(err)) return err } for _, user := range users { found := false for _, currentUser := range currentUserUsers { if user.ID == currentUser.BitrixIDUserID { found = true err := m.repo.BitrixRepo.UpdateBitrixAccountUser(ctx, model.BitrixAccountUser{ AccountID: currentUser.AccountID, BitrixIDUserID: currentUser.BitrixIDUserID, Name: user.Name, LastName: user.LastName, SecondName: user.SecondName, Title: user.Title, Email: user.Email, UFDepartment: user.UFDepartment, WorkPosition: user.WorkPosition, }) if err != nil { m.logger.Error("failed update user bitrix account in db", zap.Error(err)) return err } } } if !found { err := m.repo.BitrixRepo.AddBitrixAccountUser(ctx, model.BitrixAccountUser{ AccountID: currentCompany.BitrixID, BitrixIDUserID: user.ID, Name: user.Name, LastName: user.LastName, SecondName: user.SecondName, Title: user.Title, Email: user.Email, UFDepartment: user.UFDepartment, WorkPosition: user.WorkPosition, }) if err != nil { m.logger.Error("failed insert user bitrix account in db", zap.Error(err)) return err } } } var deletedUserIDs []int64 for _, currentUserUser := range currentUserUsers { found := false for _, user := range users { if currentUserUser.BitrixIDUserID == user.ID { found = true break } } if !found { deletedUserIDs = append(deletedUserIDs, currentUserUser.ID) } } if len(deletedUserIDs) > 0 { err := m.repo.BitrixRepo.DeleteUsers(ctx, deletedUserIDs) if err != nil { m.logger.Error("error deleting users in db", zap.Error(err)) return err } } } return nil } func (m *Methods) CheckPipelines(ctx context.Context, tokens []model.Token) error { for _, token := range tokens { currentCompany, err := m.repo.BitrixRepo.GetCurrentAccount(ctx, token.AccountID) if err != nil { m.logger.Error("error getting bitrix company by account quiz id", zap.Error(err)) return err } currentCompanyPipelines, err := m.repo.BitrixRepo.GetUserPipelinesByID(ctx, currentCompany.BitrixID) if err != nil { m.logger.Error("error getting company pipelines by bitrix id", zap.Error(err)) return err } var listPipelines []models.Category for _, categoryType := range model.CategoryArr { pipelinesResp, err := m.bitrixClient.GetListPipelines(categoryType, token.AccessToken, currentCompany.Subdomain) if err != nil { m.logger.Error("error fetching list pipelines from bitrix", zap.Error(err)) continue } listPipelines = append(listPipelines, pipelinesResp.Result.Categories...) } if len(listPipelines) > 0 { receivedPipelines := tools.ToPipeline(listPipelines, currentCompany.BitrixID) err = m.repo.BitrixRepo.CheckPipelines(ctx, receivedPipelines) if err != nil { m.logger.Error("error checking pipelines", zap.Error(err)) } var deletedPipelineIDs []int64 for _, currentUserPipeline := range currentCompanyPipelines { found := false for _, receivedPipeline := range receivedPipelines { if currentUserPipeline.BitrixID == receivedPipeline.BitrixID && currentUserPipeline.AccountID == currentCompany.BitrixID { found = true break } } if !found { deletedPipelineIDs = append(deletedPipelineIDs, currentUserPipeline.ID) } } if len(deletedPipelineIDs) > 0 { err := m.repo.BitrixRepo.DeletePipelines(ctx, deletedPipelineIDs) if err != nil { m.logger.Error("error deleting pipelines in db", zap.Error(err)) return err } } } } return nil } func (m *Methods) CheckSteps(ctx context.Context, tokens []model.Token) error { for _, token := range tokens { currentCompany, err := m.repo.BitrixRepo.GetCurrentAccount(ctx, token.AccountID) if err != nil { m.logger.Error("error getting bitrix company by account quiz id", zap.Error(err)) return err } currentCompanySteps, err := m.repo.BitrixRepo.GetUserStepsByID(ctx, currentCompany.BitrixID) if err != nil { m.logger.Error("error getting company steps by bitrix id", zap.Error(err)) return err } var listSteps []models.Steps stepsResp, err := m.bitrixClient.GetListSteps(token.AccessToken, currentCompany.Subdomain) if err != nil { m.logger.Error("error fetching list steps from bitrix", zap.Error(err)) continue } listSteps = append(listSteps, stepsResp.Result...) if len(listSteps) > 0 { receivedSteps, err := tools.ToStep(listSteps, currentCompany.BitrixID) if err != nil { m.logger.Error("error converting steps to bitrix", zap.Error(err)) return err } err = m.repo.BitrixRepo.CheckSteps(ctx, receivedSteps) if err != nil { m.logger.Error("error checking steps", zap.Error(err)) } var deletedStepIDs []int64 for _, currentUserStep := range currentCompanySteps { found := false for _, receivedStep := range receivedSteps { if currentUserStep.BitrixID == receivedStep.BitrixID && currentUserStep.AccountID == currentCompany.BitrixID { found = true break } } if !found { deletedStepIDs = append(deletedStepIDs, currentUserStep.ID) } } if len(deletedStepIDs) > 0 { err := m.repo.BitrixRepo.DeleteSteps(ctx, deletedStepIDs) if err != nil { m.logger.Error("error deleting steps in db", zap.Error(err)) return err } } } } return nil } //func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error { // for _, token := range tokens { // user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, token.AccountID) // if err != nil { // m.logger.Error("error getting amoUserInfo by account quiz id", zap.Error(err)) // return err // } // // currentUserTags, err := m.repo.AmoRepo.GetUserTagsByID(ctx, user.AmoID) // if err != nil { // m.logger.Error("error getting user tags by amo id", zap.Error(err)) // return err // } // // var wg sync.WaitGroup // wg.Add(4) // // var tagsMap sync.Map // entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType} // for _, entityType := range entityTypes { // go func(entityType model.EntityType) { // defer wg.Done() // page := 1 // limit := 250 // // for { // req := models.GetListTagsReq{ // Page: page, // Limit: limit, // EntityType: entityType, // } // tags, err := m.amoClient.GetListTags(req, token.AccessToken, user.Subdomain) // if err != nil { // m.logger.Error("error getting list of tags", zap.Error(err)) // return // } // // if tags == nil || len(tags.Embedded.Tags) == 0 { // break // } // // tagsMap.Store(entityType, tags.Embedded.Tags) // // page++ // } // }(entityType) // } // // wg.Wait() // // var deletedTagIDs []int64 // for _, currentUserTag := range currentUserTags { // found := false // for _, entityType := range entityTypes { // if tags, ok := tagsMap.Load(entityType); ok { // if len(tags.([]models.Tag)) > 0 { // receivedTags := tools.ToTag(tags.([]models.Tag), entityType) // for _, tag := range receivedTags { // if currentUserTag.Amoid == tag.Amoid && currentUserTag.Accountid == user.AmoID && currentUserTag.Entity == entityType { // found = true // break // } // } // } // } // if found { // break // } // } // // if !found { // deletedTagIDs = append(deletedTagIDs, currentUserTag.ID) // } // } // // if len(deletedTagIDs) > 0 { // err = m.repo.AmoRepo.DeleteTags(ctx, deletedTagIDs) // if err != nil { // m.logger.Error("error deleting tags in db", zap.Error(err)) // return err // } // } // // for _, entityType := range entityTypes { // if tags, ok := tagsMap.Load(entityType); ok { // if len(tags.([]models.Tag)) > 0 { // err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(tags.([]models.Tag), entityType), token.AccountID) // if err != nil { // switch entityType { // case model.LeadsType: // m.logger.Error("error updating leads tags in db", zap.Error(err)) // return err // case model.ContactsType: // m.logger.Error("error updating contacts tags in db", zap.Error(err)) // return err // case model.CompaniesType: // m.logger.Error("error updating companies tags in db", zap.Error(err)) // return err // case model.CustomersType: // m.logger.Error("error updating customer tags in db", zap.Error(err)) // return err // } // } // } // } // } // } // return nil //} func (m *Methods) CheckFields(ctx context.Context, tokens []model.Token) error { for _, token := range tokens { currentCompany, err := m.repo.BitrixRepo.GetCurrentAccount(ctx, token.AccountID) if err != nil { m.logger.Error("error getting company by account quiz id", zap.Error(err)) return err } currentUserFields, err := m.repo.BitrixRepo.GetUserFieldsByID(ctx, currentCompany.BitrixID) if err != nil { m.logger.Error("error getting user fields by bitrix id", zap.Error(err)) return err } var wg sync.WaitGroup wg.Add(4) var fieldsMap sync.Map entityTypes := []model.FieldsType{model.FieldTypeCompany, model.FieldTypeLead, model.FieldTypeContact, model.FieldTypeDeal} for _, entityType := range entityTypes { go func(entityType model.FieldsType) { defer wg.Done() for { fields, err := m.bitrixClient.GetListFields(entityType, token.AccessToken, currentCompany.Subdomain) if err != nil { m.logger.Error("error getting list of fields", zap.Error(err)) return } if fields == nil || len(fields.Result) == 0 { break } fieldsMap.Store(entityType, fields.Result) break } }(entityType) } wg.Wait() var deletedFieldIDs []int64 for _, currentUserField := range currentUserFields { found := false for _, entityType := range entityTypes { if fields, ok := fieldsMap.Load(entityType); ok { if len(fields.([]models.Fields)) > 0 { receivedFields := tools.ToField(fields.([]models.Fields), currentCompany.BitrixID) for _, field := range receivedFields { if currentUserField.BitrixID == field.BitrixID && currentUserField.AccountID == currentCompany.BitrixID && currentUserField.EntityID == entityType { found = true break } } } } if found { break } } if !found { deletedFieldIDs = append(deletedFieldIDs, currentUserField.ID) } } if len(deletedFieldIDs) > 0 { err = m.repo.BitrixRepo.DeleteFields(ctx, deletedFieldIDs) if err != nil { m.logger.Error("error deleting fields in db", zap.Error(err)) return err } } for _, entityType := range entityTypes { if fields, ok := fieldsMap.Load(entityType); ok { if len(fields.([]models.Fields)) > 0 { err := m.repo.BitrixRepo.CheckFields(ctx, tools.ToField(fields.([]models.Fields), currentCompany.BitrixID), token.AccountID) if err != nil { switch entityType { case model.FieldTypeLead: m.logger.Error("error updating leads fields in db", zap.Error(err)) return err case model.FieldTypeContact: m.logger.Error("error updating contacts fields in db", zap.Error(err)) return err case model.FieldTypeCompany: m.logger.Error("error updating companies fields in db", zap.Error(err)) return err case model.FieldTypeDeal: m.logger.Error("error updating deal fields in db", zap.Error(err)) return err } } } } } } return nil } func (m *Methods) GetTokenByID(ctx context.Context, accountID string) (*model.Token, error) { token, err := m.repo.BitrixRepo.GetTokenByID(ctx, accountID) if err != nil { m.logger.Error("error getting token by account id from db", zap.Error(err)) return nil, err } return token, nil } func (m *Methods) CreateUserFromWebHook(ctx context.Context, msg models.KafkaMessage) (model.Token, error) { // получаем аксес и рефреш токены по коду авторизации, id битрикса ==member id forGetTokens := models.CreateWebHookReq{ GrantType: "authorization_code", Code: msg.AuthCode, } tokens, err := m.bitrixClient.CreateWebHook(&forGetTokens, true) if err != nil { m.logger.Error("error getting webhook in CreateUserFromWebHook:", zap.Error(err)) return model.Token{}, err } fmt.Println("tokens", tokens) if tokens.AccessToken == "" || tokens.RefreshToken == "" { return model.Token{}, errors.New("invalid token") } toCreate := model.BitrixAccount{ AccountID: msg.AccountID, BitrixID: msg.MemberID, Subdomain: msg.RefererURL, } err = m.repo.BitrixRepo.CreateAccount(ctx, toCreate) if err != nil { m.logger.Error("error create account in db in CreateUserFromWebHook", zap.Error(err)) return model.Token{}, err } err = m.repo.BitrixRepo.WebhookCreate(ctx, model.Token{ RefreshToken: tokens.RefreshToken, AccessToken: tokens.AccessToken, AccountID: msg.AccountID, AuthCode: msg.AuthCode, Expiration: time.Now().Unix() + tokens.ExpiresIn, CreatedAt: time.Now().Unix(), }) if err != nil { m.logger.Error("error adding tokens to db in CreateUserFromWebHook", zap.Error(err)) return model.Token{}, err } return model.Token{ AccountID: msg.AccountID, RefreshToken: tokens.RefreshToken, AccessToken: tokens.AccessToken, }, nil } func (m *Methods) CheckFieldRule(ctx context.Context, token string, msg models.KafkaMessage) error { var ( leadIDs, companyIDs, dealIDs, contactIDs []int32 leadQuestions, companyQuestions, dealQuestions, contactQuestions []model.Question questionsTypeMap = make(map[model.FieldsType][]model.Question) newFields []model.BitrixField lead, company, deal, contact model.BitrixFieldRule currentFieldsRule = msg.Rule.FieldsRule err error ) user, err := m.repo.BitrixRepo.GetCurrentAccount(ctx, msg.AccountID) if err != nil { m.logger.Error("error getting user data by account id in check utms wc method", zap.Error(err)) return err } currentFields, err := m.repo.BitrixRepo.GetUserFieldsByID(ctx, user.BitrixID) if err != nil { m.logger.Error("error getting user fields by amo account id", zap.Error(err)) return err } quiz, err := m.repo.QuizRepo.GetQuizById(ctx, msg.AccountID, uint64(msg.Rule.QuizID)) if err != nil { m.logger.Error("error getting quiz by quizID and accountID", zap.Error(err)) return err } var quizConfig model.QuizContact err = json.Unmarshal([]byte(quiz.Config), &quizConfig) if err != nil { m.logger.Error("error serialization quizConfig to model QuizContact", zap.Error(err)) return err } leadIDs = tools.ToQuestionIDs(msg.Rule.FieldsRule.Lead.QuestionID) dealIDs = tools.ToQuestionIDs(msg.Rule.FieldsRule.Deal.QuestionID) companyIDs = tools.ToQuestionIDs(msg.Rule.FieldsRule.Company.QuestionID) contactIDs = tools.ToQuestionIDs(msg.Rule.FieldsRule.Contact.QuestionID) getQuestions := func(questionIDs []int32, questions *[]model.Question) { if len(questionIDs) > 0 { *questions, err = m.repo.QuestionRepo.GetQuestionListByIDs(ctx, questionIDs) if err != nil { m.logger.Error("error getting questions", zap.Error(err)) return } } } getQuestions(leadIDs, &leadQuestions) getQuestions(dealIDs, &dealQuestions) getQuestions(companyIDs, &companyQuestions) getQuestions(contactIDs, &contactQuestions) questionsTypeMap[model.FieldTypeLead] = append(questionsTypeMap[model.FieldTypeLead], leadQuestions...) questionsTypeMap[model.FieldTypeDeal] = append(questionsTypeMap[model.FieldTypeDeal], dealQuestions...) questionsTypeMap[model.FieldTypeCompany] = append(questionsTypeMap[model.FieldTypeCompany], companyQuestions...) questionsTypeMap[model.FieldTypeContact] = append(questionsTypeMap[model.FieldTypeContact], contactQuestions...) toCreated, toUpdate := tools.ToCreatedUpdateQuestionRules(questionsTypeMap, currentFields) contactFieldsToCreate, forAdding := tools.ForContactRules(quizConfig, currentFields) for entity, fields := range toCreated { if len(fields) == 0 { continue } for _, field := range fields { field.GenFieldName() createdID, err := m.bitrixClient.AddFields(field, entity, token, user.Subdomain) if err != nil { m.logger.Error("error adding fields to amo", zap.Any("type", entity), zap.Error(err)) continue } // todo need checking in prod newFields = append(newFields, model.BitrixField{ BitrixID: fmt.Sprintf("%d", createdID), EntityID: entity, FieldName: "UF_CRM_" + field.FieldName, EditFromLabel: field.EditFormLabel, FieldType: field.UserTypeID, }) } } if len(contactFieldsToCreate) > 0 { for _, contactField := range contactFieldsToCreate { contactField.GenFieldName() createdID, err := m.bitrixClient.AddFields(contactField, model.FieldTypeContact, token, user.Subdomain) if err != nil { m.logger.Error("error adding fields to amo", zap.Any("type", model.FieldTypeContact), zap.Error(err)) continue } // todo need checking in prod newFields = append(newFields, model.BitrixField{ BitrixID: fmt.Sprintf("%d", createdID), EntityID: model.FieldTypeContact, FieldName: "UF_CRM_" + contactField.FieldName, EditFromLabel: contactField.EditFormLabel, FieldType: contactField.UserTypeID, }) if _, ok := forAdding[contactField.EditFormLabel]; ok { forAdding[contactField.EditFormLabel] = fmt.Sprintf("%d", createdID) } } } if len(newFields) > 0 { err = m.repo.BitrixRepo.CheckFields(ctx, newFields, msg.AccountID) if err != nil { m.logger.Error("error updating fields rule in db Check Fields", zap.Error(err)) return err } } constructFieldRules := func(fieldRuleArrCurrent map[int]string, questions []model.Question, fieldRule *model.BitrixFieldRule, currentEntity model.FieldsType) { ruleMap := make(map[int]string) for questionID, fieldID := range fieldRuleArrCurrent { if fieldID != "" { // если fieldID уже заполнен добавляем его как есть ruleMap[questionID] = fieldID continue } for _, question := range questions { if dataQues, ok := toUpdate[questionID]; ok { if dataQues.Entity == currentEntity { ruleMap[questionID] = dataQues.FieldID break } } if questionID == int(question.Id) { // тут также делаем чтобы сверить филд с вопросом title := strings.ToLower(strings.ReplaceAll(question.Title, " ", "")) if title == "" { question.Title = fmt.Sprintf("Вопрос №%d", question.Page) } title = strings.ToLower(strings.ReplaceAll(question.Title, " ", "")) for _, field := range newFields { fieldName := strings.ToLower(strings.ReplaceAll(field.EditFromLabel, " ", "")) if title == fieldName && field.EntityID == currentEntity { ruleMap[questionID] = field.BitrixID } } } } } fieldRule.QuestionID = ruleMap } constructFieldRules(currentFieldsRule.Lead.QuestionID, leadQuestions, &lead, model.FieldTypeLead) constructFieldRules(currentFieldsRule.Deal.QuestionID, dealQuestions, &deal, model.FieldTypeDeal) constructFieldRules(currentFieldsRule.Company.QuestionID, companyQuestions, &company, model.FieldTypeCompany) constructFieldRules(currentFieldsRule.Contact.QuestionID, contactQuestions, &contact, model.FieldTypeContact) err = m.repo.BitrixRepo.UpdateFieldRules(ctx, model.BitrixFieldRules{ Lead: lead, Deal: deal, Company: company, Contact: model.BitrixContactRules{ContactRuleMap: forAdding, QuestionID: contact.QuestionID}, }, msg.AccountID, msg.Rule.QuizID) if err != nil { m.logger.Error("error updating fields rule in db", zap.Error(err)) return err } return nil } func (m *Methods) UserReLogin(ctx context.Context, msg models.KafkaMessage) error { forGetTokens := models.CreateWebHookReq{ GrantType: "authorization_code", Code: msg.AuthCode, } tokens, err := m.bitrixClient.CreateWebHook(&forGetTokens, true) if err != nil { m.logger.Error("error getting tokens in method user re-login:", zap.Error(err)) return err } toUpdate := model.BitrixAccount{ AccountID: msg.AccountID, BitrixID: msg.MemberID, Subdomain: msg.RefererURL, } err = m.repo.BitrixRepo.UpdateCurrentAccount(ctx, toUpdate) if err != nil { m.logger.Error("error update account in db in method user re-login", zap.Error(err)) return err } err = m.repo.BitrixRepo.WebhookUpdate(ctx, model.Token{ RefreshToken: tokens.RefreshToken, AccessToken: tokens.AccessToken, AccountID: msg.AccountID, Expiration: time.Now().Unix() + tokens.ExpiresIn, CreatedAt: time.Now().Unix(), }) if err != nil { m.logger.Error("error update tokens in db in method user re-login", zap.Error(err)) return err } return nil }