amocrm/internal/workers_methods/methods.go
2025-02-27 16:30:52 +03:00

733 lines
22 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package workers_methods
import (
"context"
"encoding/json"
"fmt"
"gitea.pena/SQuiz/amocrm/internal/models"
"gitea.pena/SQuiz/amocrm/internal/tools"
"gitea.pena/SQuiz/amocrm/pkg/amoClient"
"gitea.pena/SQuiz/common/dal"
"gitea.pena/SQuiz/common/model"
"go.uber.org/zap"
"strings"
"sync"
"time"
)
type Methods struct {
repo *dal.AmoDal
amoClient *amoClient.Amo
logger *zap.Logger
}
type Deps struct {
Repo *dal.AmoDal
AmoClient *amoClient.Amo
Logger *zap.Logger
}
func NewWorkersMethods(deps Deps) *Methods {
return &Methods{
repo: deps.Repo,
amoClient: deps.AmoClient,
logger: deps.Logger,
}
}
func (m *Methods) UpdateTokens(ctx context.Context) ([]model.Token, error) {
allTokens, err := m.repo.AmoRepo.GetAllTokens(ctx)
if err != nil {
m.logger.Error("error getting all tokens from db in UpdateTokens", zap.Error(err))
return nil, err
}
return allTokens, nil
}
func (m *Methods) CheckUsers(ctx context.Context, allTokens []model.Token) error {
listUser := make(map[string][]models.Users)
for _, token := range allTokens {
user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, token.AccountID)
if err != nil {
m.logger.Error("error getting account by id in CheckUsers", zap.Error(err))
return err
}
page := 1
limit := 250
userData, err := m.amoClient.GetUserList(models.RequestGetListUsers{
Page: page,
Limit: limit,
}, token.AccessToken, user.Subdomain)
if err != nil {
m.logger.Error("error fetching list users", zap.Error(err))
break
}
listUser[token.AccountID] = append(listUser[token.AccountID], userData.Embedded.Users...)
}
for accountID, users := range listUser {
mainAccount, err := m.repo.AmoRepo.GetCurrentAccount(ctx, accountID)
if err != nil {
m.logger.Error("error getting current account from db", zap.Error(err))
return err
}
currentUserUsers, err := m.repo.AmoRepo.GetUserUsersByID(ctx, mainAccount.AmoID)
if err != nil {
m.logger.Error("error getting user users by amo user id", zap.Error(err))
return err
}
for _, user := range users {
found := false
for _, currentUser := range currentUserUsers {
if user.ID == currentUser.AmoUserID {
found = true
err := m.repo.AmoRepo.UpdateAmoAccountUser(ctx, model.AmoAccountUser{
AmoID: currentUser.AmoID,
AmoUserID: currentUser.AmoUserID,
Name: user.Name,
Email: user.Email,
Role: int32(user.Rights.RoleID),
Group: int32(user.Rights.GroupID),
})
if err != nil {
m.logger.Error("failed update user amo account in db", zap.Error(err))
return err
}
}
}
if !found {
err := m.repo.AmoRepo.AddAmoAccountUser(ctx, model.AmoAccountUser{
AmoID: mainAccount.AmoID,
AmoUserID: user.ID,
Name: user.Name,
Email: user.Email,
Role: int32(user.Rights.RoleID),
Group: int32(user.Rights.GroupID),
})
if err != nil {
m.logger.Error("failed insert user amo account in db", zap.Error(err))
return err
}
}
}
var deletedUserIDs []int64
for _, currentUserUser := range currentUserUsers {
found := false
for _, user := range users {
if currentUserUser.AmoUserID == user.ID {
found = true
break
}
}
if !found {
deletedUserIDs = append(deletedUserIDs, currentUserUser.ID)
}
}
if len(deletedUserIDs) > 0 {
err := m.repo.AmoRepo.DeleteUsers(ctx, deletedUserIDs)
if err != nil {
m.logger.Error("error deleting users in db", zap.Error(err))
return err
}
}
}
return nil
}
func (m *Methods) CheckPipelinesAndSteps(ctx context.Context, tokens []model.Token) error {
for _, token := range tokens {
user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, token.AccountID)
if err != nil {
m.logger.Error("error getting amoUserInfo by account quiz id", zap.Error(err))
return err
}
currentUserPipelines, err := m.repo.AmoRepo.GetUserPipelinesByID(ctx, user.AmoID)
if err != nil {
m.logger.Error("error getting user pipelines by amo id", zap.Error(err))
return err
}
currentUserSteps, err := m.repo.AmoRepo.GetUserStepsByID(ctx, user.AmoID)
if err != nil {
m.logger.Error("error getting user steps by amo id", zap.Error(err))
return err
}
var receivedSteps []model.Step
pipelines, err := m.amoClient.GetListPipelines(token.AccessToken, user.Subdomain)
if err != nil {
m.logger.Error("error fetching list pipelines from amo", zap.Error(err))
continue
}
if len(pipelines.Embedded.Pipelines) > 0 {
receivedPipelines := tools.ToPipeline(pipelines.Embedded.Pipelines)
err = m.repo.AmoRepo.CheckPipelines(ctx, receivedPipelines)
if err != nil {
m.logger.Error("error update list pipelines in db:", zap.Error(err))
return err
}
for _, pipeline := range pipelines.Embedded.Pipelines {
steps, err := m.amoClient.GetListSteps(pipeline.ID, token.AccessToken, user.Subdomain)
if err != nil {
m.logger.Error("error getting list steps pipeline:", zap.Error(err))
continue
}
receivedStep := tools.ToStep(steps.Embedded.Statuses)
receivedSteps = append(receivedSteps, receivedStep...)
err = m.repo.AmoRepo.CheckSteps(ctx, receivedStep)
if err != nil {
m.logger.Error("error update pipeline steps in db:", zap.Error(err))
return err
}
}
var deletedPipelineIDs []int64
for _, currentUserPipeline := range currentUserPipelines {
found := false
for _, receivedPipeline := range receivedPipelines {
if currentUserPipeline.Amoid == receivedPipeline.Amoid && currentUserPipeline.AccountID == user.AmoID {
found = true
break
}
}
if !found {
deletedPipelineIDs = append(deletedPipelineIDs, currentUserPipeline.ID)
}
}
if len(deletedPipelineIDs) > 0 {
err := m.repo.AmoRepo.DeletePipelines(ctx, deletedPipelineIDs)
if err != nil {
m.logger.Error("error deleting pipelines in db", zap.Error(err))
return err
}
}
var deletedStepIDs []int64
for _, currentUserStep := range currentUserSteps {
found := false
for _, receivedStep := range receivedSteps {
if currentUserStep.Amoid == receivedStep.Amoid && currentUserStep.Accountid == user.AmoID && currentUserStep.Pipelineid == receivedStep.Pipelineid {
found = true
break
}
}
if !found {
deletedStepIDs = append(deletedStepIDs, currentUserStep.ID)
}
}
if len(deletedStepIDs) > 0 {
err := m.repo.AmoRepo.DeleteSteps(ctx, deletedStepIDs)
if err != nil {
m.logger.Error("error deleting steps in db", zap.Error(err))
return err
}
}
}
}
return nil
}
func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error {
for _, token := range tokens {
user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, token.AccountID)
if err != nil {
m.logger.Error("error getting amoUserInfo by account quiz id", zap.Error(err))
return err
}
currentUserTags, err := m.repo.AmoRepo.GetUserTagsByID(ctx, user.AmoID)
if err != nil {
m.logger.Error("error getting user tags by amo id", zap.Error(err))
return err
}
var wg sync.WaitGroup
wg.Add(4)
var tagsMap sync.Map
entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType}
for _, entityType := range entityTypes {
go func(entityType model.EntityType) {
defer wg.Done()
page := 1
limit := 250
for {
req := models.GetListTagsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
tags, err := m.amoClient.GetListTags(req, token.AccessToken, user.Subdomain)
if err != nil {
m.logger.Error("error getting list of tags", zap.Error(err))
return
}
if tags == nil || len(tags.Embedded.Tags) == 0 {
break
}
tagsMap.Store(entityType, tags.Embedded.Tags)
page++
}
}(entityType)
}
wg.Wait()
var deletedTagIDs []int64
for _, currentUserTag := range currentUserTags {
found := false
for _, entityType := range entityTypes {
if tags, ok := tagsMap.Load(entityType); ok {
if len(tags.([]models.Tag)) > 0 {
receivedTags := tools.ToTag(tags.([]models.Tag), entityType)
for _, tag := range receivedTags {
if currentUserTag.Amoid == tag.Amoid && currentUserTag.Accountid == user.AmoID && currentUserTag.Entity == entityType {
found = true
break
}
}
}
}
if found {
break
}
}
if !found {
deletedTagIDs = append(deletedTagIDs, currentUserTag.ID)
}
}
if len(deletedTagIDs) > 0 {
err = m.repo.AmoRepo.DeleteTags(ctx, deletedTagIDs)
if err != nil {
m.logger.Error("error deleting tags in db", zap.Error(err))
return err
}
}
for _, entityType := range entityTypes {
if tags, ok := tagsMap.Load(entityType); ok {
if len(tags.([]models.Tag)) > 0 {
err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(tags.([]models.Tag), entityType), token.AccountID)
if err != nil {
switch entityType {
case model.LeadsType:
m.logger.Error("error updating leads tags in db", zap.Error(err))
return err
case model.ContactsType:
m.logger.Error("error updating contacts tags in db", zap.Error(err))
return err
case model.CompaniesType:
m.logger.Error("error updating companies tags in db", zap.Error(err))
return err
case model.CustomersType:
m.logger.Error("error updating customer tags in db", zap.Error(err))
return err
}
}
}
}
}
}
return nil
}
func (m *Methods) CheckFields(ctx context.Context, tokens []model.Token) error {
for _, token := range tokens {
user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, token.AccountID)
if err != nil {
m.logger.Error("error getting amoUserInfo by account quiz id", zap.Error(err))
return err
}
currentUserFields, err := m.repo.AmoRepo.GetUserFieldsByID(ctx, user.AmoID)
if err != nil {
m.logger.Error("error getting user fields by amo id", zap.Error(err))
return err
}
var wg sync.WaitGroup
wg.Add(4)
var fieldsMap sync.Map
entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType}
for _, entityType := range entityTypes {
go func(entityType model.EntityType) {
defer wg.Done()
page := 1
limit := 50
for {
req := models.GetListFieldsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
fields, err := m.amoClient.GetListFields(req, token.AccessToken, user.Subdomain)
if err != nil {
m.logger.Error("error getting list of fields", zap.Error(err))
return
}
if fields == nil || len(fields.Embedded.CustomFields) == 0 {
break
}
fieldsMap.Store(entityType, fields.Embedded.CustomFields)
page++
}
}(entityType)
}
wg.Wait()
var deletedFieldIDs []int64
for _, currentUserField := range currentUserFields {
found := false
for _, entityType := range entityTypes {
if fields, ok := fieldsMap.Load(entityType); ok {
if len(fields.([]models.CustomField)) > 0 {
receivedFields := tools.ToField(fields.([]models.CustomField), entityType)
for _, field := range receivedFields {
if currentUserField.Amoid == field.Amoid && currentUserField.Accountid == user.AmoID && currentUserField.Entity == entityType {
found = true
break
}
}
}
}
if found {
break
}
}
if !found {
deletedFieldIDs = append(deletedFieldIDs, currentUserField.ID)
}
}
if len(deletedFieldIDs) > 0 {
err = m.repo.AmoRepo.DeleteFields(ctx, deletedFieldIDs)
if err != nil {
m.logger.Error("error deleting fields in db", zap.Error(err))
return err
}
}
for _, entityType := range entityTypes {
if fields, ok := fieldsMap.Load(entityType); ok {
if len(fields.([]models.CustomField)) > 0 {
err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(fields.([]models.CustomField), entityType), token.AccountID)
if err != nil {
switch entityType {
case model.LeadsType:
m.logger.Error("error updating leads fields in db", zap.Error(err))
return err
case model.ContactsType:
m.logger.Error("error updating contacts fields in db", zap.Error(err))
return err
case model.CompaniesType:
m.logger.Error("error updating companies fields in db", zap.Error(err))
return err
case model.CustomersType:
m.logger.Error("error updating customer fields in db", zap.Error(err))
return err
}
}
}
}
}
}
return nil
}
func (m *Methods) GetTokenByID(ctx context.Context, accountID string) (*model.Token, error) {
token, err := m.repo.AmoRepo.GetTokenByID(ctx, accountID)
if err != nil {
m.logger.Error("error getting token by account id from db", zap.Error(err))
return nil, err
}
return token, nil
}
func (m *Methods) CreateUserFromWebHook(ctx context.Context, msg models.KafkaMessage) (*model.Token, error) {
// получаем аксес и рефреш токены по коду авторизации
forGetTokens := models.CreateWebHookReq{
GrantType: "authorization_code",
Code: msg.AuthCode,
}
tokens, err := m.amoClient.CreateWebHook(&forGetTokens, msg.RefererURL)
if err != nil {
m.logger.Error("error getting webhook in CreateUserFromWebHook:", zap.Error(err))
return nil, err
}
// получаем информацию о пользователе по аксес токену
userInfo, err := m.amoClient.GetUserInfo(tokens.AccessToken, msg.RefererURL)
if err != nil {
m.logger.Error("error getting UserInfo in CreateUserFromWebHook:", zap.Error(err))
return nil, err
}
toCreate := model.AmoAccount{
AccountID: msg.AccountID,
AmoID: userInfo.ID,
Name: userInfo.Name,
Subdomain: msg.RefererURL,
Country: userInfo.Country,
DriveURL: userInfo.DriveUrl,
}
err = m.repo.AmoRepo.CreateAccount(ctx, toCreate)
if err != nil {
m.logger.Error("error create account in db in CreateUserFromWebHook", zap.Error(err))
return nil, err
}
err = m.repo.AmoRepo.WebhookCreate(ctx, model.Token{
RefreshToken: tokens.RefreshToken,
AccessToken: tokens.AccessToken,
AccountID: msg.AccountID,
AuthCode: msg.AuthCode,
Expiration: time.Now().Unix() + tokens.ExpiresIn,
CreatedAt: time.Now().Unix(),
})
if err != nil {
m.logger.Error("error adding tokens to db in CreateUserFromWebHook", zap.Error(err))
return nil, err
}
return &model.Token{
AccountID: msg.AccountID,
RefreshToken: tokens.RefreshToken,
AccessToken: tokens.AccessToken,
}, nil
}
func (m *Methods) CheckFieldRule(ctx context.Context, token string, msg models.KafkaMessage) error {
var (
leadIDs, companyIDs, customerIDs, contactIDs []int32
leadQuestions, companyQuestions, customerQuestions, contactQuestions []model.Question
questionsTypeMap = make(map[model.EntityType][]model.Question)
newFields []model.Field
lead, company, customer, contact model.FieldRule
currentFieldsRule = msg.Rule.Fieldsrule
err error
)
user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, msg.AccountID)
if err != nil {
m.logger.Error("error getting user data by account id in check utms wc method", zap.Error(err))
return err
}
currentFields, err := m.repo.AmoRepo.GetUserFieldsByID(ctx, user.AmoID)
if err != nil {
m.logger.Error("error getting user fields by amo account id", zap.Error(err))
return err
}
quiz, err := m.repo.QuizRepo.GetQuizById(ctx, msg.AccountID, uint64(msg.Rule.QuizID))
if err != nil {
m.logger.Error("error getting quiz by quizID and accountID", zap.Error(err))
return err
}
var quizConfig model.QuizContact
err = json.Unmarshal([]byte(quiz.Config), &quizConfig)
if err != nil {
m.logger.Error("error serialization quizConfig to model QuizContact", zap.Error(err))
return err
}
leadIDs = tools.ToQuestionIDs(msg.Rule.Fieldsrule.Lead.Questionid)
customerIDs = tools.ToQuestionIDs(msg.Rule.Fieldsrule.Customer.Questionid)
companyIDs = tools.ToQuestionIDs(msg.Rule.Fieldsrule.Company.Questionid)
contactIDs = tools.ToQuestionIDs(msg.Rule.Fieldsrule.Contact.Questionid)
getQuestions := func(questionIDs []int32, questions *[]model.Question) {
if len(questionIDs) > 0 {
*questions, err = m.repo.QuestionRepo.GetQuestionListByIDs(ctx, questionIDs)
if err != nil {
m.logger.Error("error getting questions", zap.Error(err))
return
}
}
}
getQuestions(leadIDs, &leadQuestions)
getQuestions(customerIDs, &customerQuestions)
getQuestions(companyIDs, &companyQuestions)
getQuestions(contactIDs, &contactQuestions)
questionsTypeMap[model.LeadsType] = append(questionsTypeMap[model.LeadsType], leadQuestions...)
questionsTypeMap[model.CustomersType] = append(questionsTypeMap[model.CustomersType], customerQuestions...)
questionsTypeMap[model.CompaniesType] = append(questionsTypeMap[model.CompaniesType], companyQuestions...)
questionsTypeMap[model.ContactsType] = append(questionsTypeMap[model.ContactsType], contactQuestions...)
toCreated, toUpdate := tools.ToCreatedUpdateQuestionRules(questionsTypeMap, currentFields)
contactFieldsToCreate, forAdding := tools.ForContactRules(quizConfig, currentFields)
for entity, fields := range toCreated {
if len(fields) == 0 {
continue
}
createdFields, err := m.amoClient.AddFields(fields, entity, token, user.Subdomain)
if err != nil {
m.logger.Error("error adding fields to amo", zap.Any("type", entity), zap.Error(err))
continue
}
newFields = append(newFields, tools.ToField(createdFields.Embedded.CustomFields, entity)...)
}
if len(contactFieldsToCreate) > 0 {
createdFields, err := m.amoClient.AddFields(contactFieldsToCreate, model.ContactsType, token, user.Subdomain)
if err != nil {
m.logger.Error("error adding fields to amo", zap.Any("type", model.ContactsType), zap.Error(err))
}
contructedFields := tools.ToField(createdFields.Embedded.CustomFields, model.ContactsType)
newFields = append(newFields, contructedFields...)
for _, field := range contructedFields {
if _, ok := forAdding[field.Name]; ok {
forAdding[field.Name] = int(field.Amoid)
}
}
}
if len(newFields) > 0 {
err = m.repo.AmoRepo.CheckFields(ctx, newFields, msg.AccountID)
if err != nil {
m.logger.Error("error updating fields rule in db Check Fields", zap.Error(err))
return err
}
}
constructFieldRules := func(fieldRuleArrCurrent map[int]int, questions []model.Question, fieldRule *model.FieldRule, currentEntity model.EntityType) {
ruleMap := make(map[int]int)
for questionID, fieldID := range fieldRuleArrCurrent {
if fieldID != 0 {
// если fieldID уже заполнен добавляем его как есть
ruleMap[questionID] = fieldID
continue
}
for _, question := range questions {
if dataQues, ok := toUpdate[questionID]; ok {
if dataQues.Entity == currentEntity {
ruleMap[questionID] = dataQues.FieldID
break
}
}
if questionID == int(question.Id) {
// тут также делаем чтобы сверить филд с вопросом
title := strings.ToLower(strings.ReplaceAll(question.Title, " ", ""))
if title == "" {
question.Title = fmt.Sprintf("Вопрос №%d", question.Page)
}
title = strings.ToLower(strings.ReplaceAll(question.Title, " ", ""))
for _, field := range newFields {
fieldName := strings.ToLower(strings.ReplaceAll(field.Name, " ", ""))
if title == fieldName && field.Entity == currentEntity {
ruleMap[questionID] = int(field.Amoid)
}
}
}
}
}
fieldRule.Questionid = ruleMap
}
constructFieldRules(currentFieldsRule.Lead.Questionid, leadQuestions, &lead, model.LeadsType)
constructFieldRules(currentFieldsRule.Customer.Questionid, customerQuestions, &customer, model.CustomersType)
constructFieldRules(currentFieldsRule.Company.Questionid, companyQuestions, &company, model.CompaniesType)
constructFieldRules(currentFieldsRule.Contact.Questionid, contactQuestions, &contact, model.ContactsType)
err = m.repo.AmoRepo.UpdateFieldRules(ctx, model.Fieldsrule{
Lead: lead,
Customer: customer,
Company: company,
Contact: model.ContactRules{ContactRuleMap: forAdding, Questionid: contact.Questionid},
}, msg.AccountID, msg.Rule.QuizID)
if err != nil {
m.logger.Error("error updating fields rule in db", zap.Error(err))
return err
}
return nil
}
func (m *Methods) UserReLogin(ctx context.Context, msg models.KafkaMessage) error {
forGetTokens := models.CreateWebHookReq{
GrantType: "authorization_code",
Code: msg.AuthCode,
}
tokens, err := m.amoClient.CreateWebHook(&forGetTokens, msg.RefererURL)
if err != nil {
m.logger.Error("error getting tokens in method user re-login:", zap.Error(err))
return err
}
userInfo, err := m.amoClient.GetUserInfo(tokens.AccessToken, msg.RefererURL)
if err != nil {
m.logger.Error("error getting UserInfo in method user re-login:", zap.Error(err))
return err
}
toUpdate := model.AmoAccount{
AccountID: msg.AccountID,
AmoID: userInfo.ID,
Name: userInfo.Name,
Subdomain: msg.RefererURL,
Country: userInfo.Country,
DriveURL: userInfo.DriveUrl,
}
err = m.repo.AmoRepo.UpdateCurrentAccount(ctx, toUpdate)
if err != nil {
m.logger.Error("error update account in db in method user re-login", zap.Error(err))
return err
}
err = m.repo.AmoRepo.WebhookUpdate(ctx, model.Token{
RefreshToken: tokens.RefreshToken,
AccessToken: tokens.AccessToken,
AccountID: msg.AccountID,
Expiration: time.Now().Unix() + tokens.ExpiresIn,
CreatedAt: time.Now().Unix(),
})
if err != nil {
m.logger.Error("error update tokens in db in method user re-login", zap.Error(err))
return err
}
return nil
}