amocrm/internal/workers_methods/methods.go

742 lines
21 KiB
Go
Raw Normal View History

package workers_methods
import (
"amocrm/internal/models"
"amocrm/internal/tools"
"amocrm/pkg/amoClient"
"context"
2024-05-07 15:56:02 +00:00
"encoding/json"
"go.uber.org/zap"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
2024-05-03 15:25:50 +00:00
"sync"
"time"
)
type Methods struct {
repo *dal.AmoDal
amoClient *amoClient.Amo
logger *zap.Logger
}
type Deps struct {
Repo *dal.AmoDal
AmoClient *amoClient.Amo
Logger *zap.Logger
}
func NewWorkersMethods(deps Deps) *Methods {
return &Methods{
repo: deps.Repo,
amoClient: deps.AmoClient,
logger: deps.Logger,
}
}
func (m *Methods) UpdateTokens(ctx context.Context) ([]model.Token, error) {
allTokens, err := m.repo.AmoRepo.GetAllTokens(ctx)
if err != nil {
m.logger.Error("error getting all tokens from db in UpdateTokens", zap.Error(err))
return nil, err
}
for _, oldToken := range allTokens {
req := models.UpdateWebHookReq{
GrantType: "refresh_token",
RefreshToken: oldToken.RefreshToken,
}
resp, err := m.amoClient.CreateWebHook(&req)
if err != nil {
m.logger.Error("error create webhook in UpdateTokens", zap.Error(err))
continue
}
newToken := model.Token{
AccountID: oldToken.AccountID,
RefreshToken: resp.RefreshToken,
AccessToken: resp.AccessToken,
Expiration: time.Now().Unix() + resp.ExpiresIn,
CreatedAt: time.Now().Unix(),
}
2024-05-06 11:22:05 +00:00
err = m.repo.AmoRepo.WebhookUpdate(ctx, newToken)
if err != nil {
m.logger.Error("error update token in db", zap.Error(err))
return nil, err
}
}
2024-05-06 11:22:05 +00:00
newTokens, err := m.repo.AmoRepo.GetAllTokens(ctx)
if err != nil {
2024-05-06 11:22:05 +00:00
m.logger.Error("error getting all new updated tokens from db in UpdateTokens", zap.Error(err))
return nil, err
}
return newTokens, nil
}
func (m *Methods) CheckUsers(ctx context.Context, allTokens []model.Token) error {
listUser := make(map[string][]models.Users)
for _, token := range allTokens {
page := 1
limit := 250
userData, err := m.amoClient.GetUserList(models.RequestGetListUsers{
Page: page,
Limit: limit,
}, token.AccessToken)
if err != nil {
m.logger.Error("error fetching list users", zap.Error(err))
break
}
listUser[token.AccountID] = append(listUser[token.AccountID], userData.Embedded.Users...)
}
2024-05-03 15:25:50 +00:00
var usersForUpdateAndCreate []model.User
for accountID, users := range listUser {
mainAccount, err := m.repo.AmoRepo.GetCurrentAccount(ctx, accountID)
if err != nil {
m.logger.Error("error getting current account from db", zap.Error(err))
return err
}
2024-05-07 18:46:14 +00:00
currentUserUsers, err := m.repo.AmoRepo.GetUserUsersByID(ctx, mainAccount.Amouserid)
if err != nil {
m.logger.Error("error getting user users by amo user id", zap.Error(err))
return err
}
for _, user := range users {
if user.ID == mainAccount.AmoID {
err := m.repo.AmoRepo.CheckMainUser(ctx, model.User{
Name: user.Name,
Role: int32(user.Rights.RoleID),
Group: int32(user.Rights.GroupID),
Email: user.Email,
AmoID: user.ID,
})
if err != nil {
m.logger.Error("error update main user data in db", zap.Error(err))
return err
}
}
2024-05-03 15:25:50 +00:00
usersForUpdateAndCreate = append(usersForUpdateAndCreate, model.User{
AmoID: user.ID,
Name: user.FullName,
Group: int32(user.Rights.GroupID),
Role: int32(user.Rights.RoleID),
Email: user.Email,
Amouserid: mainAccount.Amouserid,
})
}
2024-05-07 18:46:14 +00:00
var deletedUserIDs []int64
for _, currentUserUser := range currentUserUsers {
found := false
for _, user := range users {
if currentUserUser.AmoID == user.ID {
found = true
break
}
}
if !found {
deletedUserIDs = append(deletedUserIDs, currentUserUser.ID)
}
}
if len(deletedUserIDs) > 0 {
err := m.repo.AmoRepo.DeleteUsers(ctx, deletedUserIDs)
if err != nil {
m.logger.Error("error deleting users in db", zap.Error(err))
return err
}
}
2024-05-03 15:25:50 +00:00
}
2024-05-03 15:25:50 +00:00
err := m.repo.AmoRepo.CheckAndUpdateUsers(ctx, usersForUpdateAndCreate)
if err != nil {
m.logger.Error("error update users list data in db", zap.Error(err))
return err
}
return nil
}
func (m *Methods) CheckPipelinesAndSteps(ctx context.Context, tokens []model.Token) error {
for _, token := range tokens {
2024-05-07 18:46:14 +00:00
user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, token.AccountID)
if err != nil {
m.logger.Error("error getting amoUserInfo by account quiz id", zap.Error(err))
return err
}
currentUserPipelines, err := m.repo.AmoRepo.GetUserPipelinesByID(ctx, user.AmoID)
if err != nil {
m.logger.Error("error getting user pipelines by amo id", zap.Error(err))
return err
}
currentUserSteps, err := m.repo.AmoRepo.GetUserStepsByID(ctx, user.AmoID)
if err != nil {
m.logger.Error("error getting user steps by amo id", zap.Error(err))
return err
}
var receivedSteps []model.Step
pipelines, err := m.amoClient.GetListPipelines(token.AccessToken)
if err != nil {
m.logger.Error("error fetching list pipelines from amo", zap.Error(err))
continue
}
2024-04-23 14:00:51 +00:00
if len(pipelines.Embedded.Pipelines) > 0 {
2024-05-07 18:46:14 +00:00
receivedPipelines := tools.ToPipeline(pipelines.Embedded.Pipelines)
err = m.repo.AmoRepo.CheckPipelines(ctx, receivedPipelines)
if err != nil {
2024-04-23 14:00:51 +00:00
m.logger.Error("error update list pipelines in db:", zap.Error(err))
return err
}
2024-04-23 14:00:51 +00:00
for _, pipeline := range pipelines.Embedded.Pipelines {
steps, err := m.amoClient.GetListSteps(pipeline.ID, token.AccessToken)
if err != nil {
m.logger.Error("error getting list steps pipeline:", zap.Error(err))
continue
}
2024-05-07 18:46:14 +00:00
receivedStep := tools.ToStep(steps.Embedded.Statuses)
receivedSteps = append(receivedSteps, receivedStep...)
err = m.repo.AmoRepo.CheckSteps(ctx, receivedStep)
2024-04-23 14:00:51 +00:00
if err != nil {
m.logger.Error("error update pipeline steps in db:", zap.Error(err))
return err
}
}
2024-05-07 18:46:14 +00:00
var deletedPipelineIDs []int64
for _, currentUserPipeline := range currentUserPipelines {
found := false
for _, receivedPipeline := range receivedPipelines {
if currentUserPipeline.Amoid == receivedPipeline.Amoid && currentUserPipeline.AccountID == user.AmoID {
found = true
break
}
}
if !found {
deletedPipelineIDs = append(deletedPipelineIDs, currentUserPipeline.ID)
}
}
if len(deletedPipelineIDs) > 0 {
err := m.repo.AmoRepo.DeletePipelines(ctx, deletedPipelineIDs)
if err != nil {
m.logger.Error("error deleting pipelines in db", zap.Error(err))
return err
}
}
var deletedStepIDs []int64
for _, currentUserStep := range currentUserSteps {
found := false
for _, receivedStep := range receivedSteps {
if currentUserStep.Amoid == receivedStep.Amoid && currentUserStep.Accountid == user.AmoID && currentUserStep.Pipelineid == receivedStep.Pipelineid {
found = true
break
}
}
if !found {
deletedStepIDs = append(deletedStepIDs, currentUserStep.ID)
}
}
if len(deletedStepIDs) > 0 {
err := m.repo.AmoRepo.DeleteSteps(ctx, deletedStepIDs)
if err != nil {
m.logger.Error("error deleting steps in db", zap.Error(err))
return err
}
}
}
}
return nil
}
func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error {
for _, token := range tokens {
2024-05-07 18:46:14 +00:00
user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, token.AccountID)
if err != nil {
m.logger.Error("error getting amoUserInfo by account quiz id", zap.Error(err))
return err
}
currentUserTags, err := m.repo.AmoRepo.GetUserTagsByID(ctx, user.AmoID)
if err != nil {
m.logger.Error("error getting user tags by amo id", zap.Error(err))
return err
}
2024-05-03 15:25:50 +00:00
var wg sync.WaitGroup
wg.Add(4)
2024-05-03 15:25:50 +00:00
var tagsMap sync.Map
entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType}
for _, entityType := range entityTypes {
2024-05-03 15:25:50 +00:00
go func(entityType model.EntityType) {
defer wg.Done()
page := 1
limit := 250
for {
req := models.GetListTagsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
tags, err := m.amoClient.GetListTags(req, token.AccessToken)
if err != nil {
m.logger.Error("error getting list of tags", zap.Error(err))
return
}
2024-05-03 15:25:50 +00:00
if tags == nil || len(tags.Embedded.Tags) == 0 {
break
}
2024-05-03 15:25:50 +00:00
tagsMap.Store(entityType, tags.Embedded.Tags)
2024-05-03 15:25:50 +00:00
page++
}
}(entityType)
}
2024-05-03 15:25:50 +00:00
wg.Wait()
2024-05-07 18:46:14 +00:00
var deletedTagIDs []int64
for _, currentUserTag := range currentUserTags {
found := false
for _, entityType := range entityTypes {
if tags, ok := tagsMap.Load(entityType); ok {
if len(tags.([]models.Tag)) > 0 {
receivedTags := tools.ToTag(tags.([]models.Tag), entityType)
for _, tag := range receivedTags {
if currentUserTag.Amoid == tag.Amoid && currentUserTag.Accountid == user.AmoID && currentUserTag.Entity == entityType {
found = true
break
}
}
}
}
if found {
break
}
}
if !found {
deletedTagIDs = append(deletedTagIDs, currentUserTag.ID)
}
}
if len(deletedTagIDs) > 0 {
err = m.repo.AmoRepo.DeleteTags(ctx, deletedTagIDs)
if err != nil {
m.logger.Error("error deleting tags in db", zap.Error(err))
return err
}
}
for _, entityType := range entityTypes {
2024-05-03 15:25:50 +00:00
if tags, ok := tagsMap.Load(entityType); ok {
if len(tags.([]models.Tag)) > 0 {
err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(tags.([]models.Tag), entityType), token.AccountID)
2024-04-23 14:00:51 +00:00
if err != nil {
2024-05-03 15:25:50 +00:00
switch entityType {
case model.LeadsType:
m.logger.Error("error updating leads tags in db", zap.Error(err))
return err
case model.ContactsType:
m.logger.Error("error updating contacts tags in db", zap.Error(err))
return err
case model.CompaniesType:
m.logger.Error("error updating companies tags in db", zap.Error(err))
return err
case model.CustomersType:
m.logger.Error("error updating customer tags in db", zap.Error(err))
return err
}
2024-04-23 14:00:51 +00:00
}
}
}
}
}
return nil
}
func (m *Methods) CheckFields(ctx context.Context, tokens []model.Token) error {
for _, token := range tokens {
2024-05-07 18:46:14 +00:00
user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, token.AccountID)
if err != nil {
m.logger.Error("error getting amoUserInfo by account quiz id", zap.Error(err))
return err
}
currentUserFields, err := m.repo.AmoRepo.GetUserFieldsByID(ctx, user.AmoID)
if err != nil {
m.logger.Error("error getting user fields by amo id", zap.Error(err))
return err
}
2024-05-03 15:25:50 +00:00
var wg sync.WaitGroup
wg.Add(4)
2024-05-03 15:25:50 +00:00
var fieldsMap sync.Map
entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType}
for _, entityType := range entityTypes {
2024-05-03 15:25:50 +00:00
go func(entityType model.EntityType) {
defer wg.Done()
page := 1
limit := 50
for {
req := models.GetListFieldsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
fields, err := m.amoClient.GetListFields(req, token.AccessToken)
if err != nil {
m.logger.Error("error getting list of fields", zap.Error(err))
return
}
2024-05-03 15:25:50 +00:00
if fields == nil || len(fields.Embedded.CustomFields) == 0 {
break
}
2024-05-03 15:25:50 +00:00
fieldsMap.Store(entityType, fields.Embedded.CustomFields)
2024-05-03 15:25:50 +00:00
page++
}
}(entityType)
}
2024-05-03 15:25:50 +00:00
wg.Wait()
2024-05-07 18:46:14 +00:00
var deletedFieldIDs []int64
for _, currentUserField := range currentUserFields {
found := false
for _, entityType := range entityTypes {
if fields, ok := fieldsMap.Load(entityType); ok {
if len(fields.([]models.CustomField)) > 0 {
receivedFields := tools.ToField(fields.([]models.CustomField), entityType)
for _, field := range receivedFields {
if currentUserField.Amoid == field.Amoid && currentUserField.Accountid == user.AmoID && currentUserField.Entity == entityType {
found = true
break
}
}
}
}
if found {
break
}
}
if !found {
deletedFieldIDs = append(deletedFieldIDs, currentUserField.ID)
}
}
if len(deletedFieldIDs) > 0 {
err = m.repo.AmoRepo.DeleteFields(ctx, deletedFieldIDs)
if err != nil {
m.logger.Error("error deleting fields in db", zap.Error(err))
return err
}
}
for _, entityType := range entityTypes {
2024-05-03 15:25:50 +00:00
if fields, ok := fieldsMap.Load(entityType); ok {
if len(fields.([]models.CustomField)) > 0 {
err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(fields.([]models.CustomField), entityType), token.AccountID)
2024-04-23 14:00:51 +00:00
if err != nil {
2024-05-03 15:25:50 +00:00
switch entityType {
case model.LeadsType:
m.logger.Error("error updating leads fields in db", zap.Error(err))
return err
case model.ContactsType:
m.logger.Error("error updating contacts fields in db", zap.Error(err))
return err
case model.CompaniesType:
m.logger.Error("error updating companies fields in db", zap.Error(err))
return err
case model.CustomersType:
m.logger.Error("error updating customer fields in db", zap.Error(err))
return err
}
2024-04-23 14:00:51 +00:00
}
}
}
}
}
return nil
}
func (m *Methods) GetTokenByID(ctx context.Context, accountID string) (*model.Token, error) {
token, err := m.repo.AmoRepo.GetTokenByID(ctx, accountID)
if err != nil {
m.logger.Error("error getting token by account id from db", zap.Error(err))
return nil, err
}
return token, nil
}
func (m *Methods) CreateUserFromWebHook(ctx context.Context, accountID string, authCode string) (*model.Token, error) {
// получаем аксес и рефреш токены по коду авторизации
forGetTokens := models.CreateWebHookReq{
GrantType: "authorization_code",
Code: authCode,
}
tokens, err := m.amoClient.CreateWebHook(&forGetTokens)
if err != nil {
m.logger.Error("error getting webhook in CreateUserFromWebHook:", zap.Error(err))
return nil, err
}
// получаем информацию о пользователе по аксес токену
userInfo, err := m.amoClient.GetUserInfo(tokens.AccessToken)
if err != nil {
m.logger.Error("error getting UserInfo in CreateUserFromWebHook:", zap.Error(err))
return nil, err
}
toCreate := model.User{
Name: userInfo.Name,
Subdomain: userInfo.Subdomain,
AmoID: userInfo.ID,
Amouserid: userInfo.ID,
Country: userInfo.Country,
}
err = m.repo.AmoRepo.CreateAccount(ctx, accountID, toCreate)
if err != nil {
m.logger.Error("error create account in db in CreateUserFromWebHook", zap.Error(err))
return nil, err
}
err = m.repo.AmoRepo.WebhookCreate(ctx, model.Token{
RefreshToken: tokens.RefreshToken,
AccessToken: tokens.AccessToken,
AccountID: accountID,
AuthCode: authCode,
Expiration: time.Now().Unix() + tokens.ExpiresIn,
CreatedAt: time.Now().Unix(),
})
if err != nil {
m.logger.Error("error adding tokens to db in CreateUserFromWebHook", zap.Error(err))
return nil, err
}
return &model.Token{
AccountID: accountID,
RefreshToken: tokens.RefreshToken,
AccessToken: tokens.AccessToken,
}, nil
}
func (m *Methods) CheckUTMs(ctx context.Context, token, accountID string, ids []int32) ([]model.Field, error) {
2024-04-29 22:36:49 +00:00
utms, err := m.repo.AmoRepo.GetUtmsByID(ctx, ids)
if err != nil {
m.logger.Error("error getting user UTM byList IDs", zap.Error(err))
return nil, err
}
user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, accountID)
if err != nil {
m.logger.Error("error getting user data by account id in check utms wc method", zap.Error(err))
return nil, err
}
fields, err := m.repo.AmoRepo.GetUserFieldsByID(ctx, user.AmoID)
2024-04-29 22:36:49 +00:00
if err != nil {
m.logger.Error("error getting user fields by amo account id", zap.Error(err))
return nil, err
2024-04-29 22:36:49 +00:00
}
if len(utms) < 0 {
2024-05-06 11:22:05 +00:00
return fields, nil
}
2024-04-29 22:36:49 +00:00
toCreated, toUpdate := tools.ToCreatedUpdate(utms, fields)
if len(toUpdate) > 0 {
err = m.repo.AmoRepo.UpdateUtmsFields(ctx, toUpdate)
if err != nil {
m.logger.Error("error update utms fields in db", zap.Error(err))
return nil, err
}
}
if len(toCreated) > 0 {
createdFields, err := m.amoClient.AddFields(toCreated, model.LeadsType, token)
if err != nil {
m.logger.Error("error created amo fields", zap.Error(err))
return nil, err
}
newFields := tools.ToField(createdFields.Embedded.CustomFields, model.LeadsType)
err = m.repo.AmoRepo.CheckFields(ctx, newFields, accountID)
2024-04-29 22:36:49 +00:00
if err != nil {
m.logger.Error("error created amo fields in db", zap.Error(err))
return nil, err
2024-04-29 22:36:49 +00:00
}
forUpdate := tools.MatchingUTMs(utms, createdFields.Embedded.CustomFields)
err = m.repo.AmoRepo.UpdateUTMs(ctx, forUpdate)
if err != nil {
m.logger.Error("error update utms in db", zap.Error(err))
return nil, err
}
fields = append(fields, newFields...)
}
return fields, nil
}
func (m *Methods) CheckFieldRule(ctx context.Context, token string, accountID string, req models.KafkaRule, currentFields []model.Field) error {
var (
2024-05-07 15:56:02 +00:00
leadIDs, companyIDs, customerIDs []int32
leadQuestions, companyQuestions, customerQuestions []model.Question
questionsTypeMap = make(map[model.EntityType][]model.Question)
newFields []model.Field
lead, company, customer []model.FieldRule
currentFieldsRule = req.Fieldsrule
err error
)
2024-05-07 15:56:02 +00:00
quiz, err := m.repo.QuizRepo.GetQuizById(ctx, accountID, uint64(req.QuizID))
if err != nil {
m.logger.Error("error getting quiz by quizID and accountID", zap.Error(err))
return err
}
var quizConfig model.QuizContact
err = json.Unmarshal([]byte(quiz.Config), &quizConfig)
if err != nil {
m.logger.Error("error serialization quizConfig to model QuizContact", zap.Error(err))
return err
}
leadIDs = tools.ToQuestionIDs(req.Fieldsrule.Lead)
customerIDs = tools.ToQuestionIDs(req.Fieldsrule.Customer)
companyIDs = tools.ToQuestionIDs(req.Fieldsrule.Company)
getQuestions := func(questionIDs []int32, questions *[]model.Question) {
if len(questionIDs) > 0 {
*questions, err = m.repo.QuestionRepo.GetQuestionListByIDs(ctx, questionIDs)
if err != nil {
m.logger.Error("error getting questions", zap.Error(err))
return
}
}
}
getQuestions(leadIDs, &leadQuestions)
getQuestions(customerIDs, &customerQuestions)
getQuestions(companyIDs, &companyQuestions)
questionsTypeMap[model.LeadsType] = append(questionsTypeMap[model.LeadsType], leadQuestions...)
questionsTypeMap[model.CustomersType] = append(questionsTypeMap[model.CustomersType], customerQuestions...)
questionsTypeMap[model.CompaniesType] = append(questionsTypeMap[model.CompaniesType], companyQuestions...)
toCreated, toUpdate := tools.ToCreatedUpdateQuestionRules(questionsTypeMap, currentFields)
2024-05-07 15:56:02 +00:00
contactFieldsToCreate, forAdding := tools.ForContactRules(quizConfig, currentFields)
for entity, fields := range toCreated {
if len(fields) == 0 {
continue
}
createdFields, err := m.amoClient.AddFields(fields, entity, token)
if err != nil {
m.logger.Error("error adding fields to amo", zap.Any("type", entity), zap.Error(err))
continue
}
newFields = append(newFields, tools.ToField(createdFields.Embedded.CustomFields, entity)...)
}
2024-05-07 15:56:02 +00:00
if len(contactFieldsToCreate) > 0 {
createdFields, err := m.amoClient.AddFields(contactFieldsToCreate, model.ContactsType, token)
if err != nil {
m.logger.Error("error adding fields to amo", zap.Any("type", model.ContactsType), zap.Error(err))
}
contructedFields := tools.ToField(createdFields.Embedded.CustomFields, model.ContactsType)
newFields = append(newFields, contructedFields...)
for _, field := range contructedFields {
if _, ok := forAdding[field.Name]; ok {
forAdding[field.Name] = int(field.Amoid)
}
}
}
2024-05-02 17:02:03 +00:00
if len(newFields) > 0 {
err = m.repo.AmoRepo.CheckFields(ctx, newFields, accountID)
if err != nil {
2024-05-02 17:02:03 +00:00
m.logger.Error("error updating fields rule in db Check Fields", zap.Error(err))
return err
}
}
constructFieldRules := func(fieldRuleArrCurrent []model.FieldRule, questions []model.Question, fieldRule *[]model.FieldRule) {
for _, rules := range fieldRuleArrCurrent {
for questionID := range rules.Questionid {
for _, question := range questions {
if fieldID, ok := toUpdate[questionID]; ok {
ruleMap := make(map[int]int)
ruleMap[questionID] = fieldID
*fieldRule = append(*fieldRule, model.FieldRule{Questionid: ruleMap})
break
}
if questionID == int(question.Id) {
for _, field := range newFields {
if question.Title == field.Name {
ruleMap := make(map[int]int)
ruleMap[questionID] = int(field.Amoid)
*fieldRule = append(*fieldRule, model.FieldRule{Questionid: ruleMap})
}
}
}
}
}
}
}
constructFieldRules(currentFieldsRule.Lead, leadQuestions, &lead)
constructFieldRules(currentFieldsRule.Customer, customerQuestions, &customer)
constructFieldRules(currentFieldsRule.Company, companyQuestions, &company)
2024-05-02 17:02:03 +00:00
err = m.repo.AmoRepo.UpdateFieldRules(ctx, model.Fieldsrule{
Lead: lead,
Customer: customer,
Company: company,
2024-05-07 15:56:02 +00:00
Contact: model.ContactRules{ContactRuleMap: forAdding},
}, accountID, req.QuizID)
2024-05-02 17:02:03 +00:00
if err != nil {
m.logger.Error("error updating fields rule in db", zap.Error(err))
return err
}
return nil
}