amocrm/internal/workers_methods/methods.go
2024-05-07 21:46:14 +03:00

742 lines
21 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package workers_methods
import (
"amocrm/internal/models"
"amocrm/internal/tools"
"amocrm/pkg/amoClient"
"context"
"encoding/json"
"go.uber.org/zap"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
"sync"
"time"
)
type Methods struct {
repo *dal.AmoDal
amoClient *amoClient.Amo
logger *zap.Logger
}
type Deps struct {
Repo *dal.AmoDal
AmoClient *amoClient.Amo
Logger *zap.Logger
}
func NewWorkersMethods(deps Deps) *Methods {
return &Methods{
repo: deps.Repo,
amoClient: deps.AmoClient,
logger: deps.Logger,
}
}
func (m *Methods) UpdateTokens(ctx context.Context) ([]model.Token, error) {
allTokens, err := m.repo.AmoRepo.GetAllTokens(ctx)
if err != nil {
m.logger.Error("error getting all tokens from db in UpdateTokens", zap.Error(err))
return nil, err
}
for _, oldToken := range allTokens {
req := models.UpdateWebHookReq{
GrantType: "refresh_token",
RefreshToken: oldToken.RefreshToken,
}
resp, err := m.amoClient.CreateWebHook(&req)
if err != nil {
m.logger.Error("error create webhook in UpdateTokens", zap.Error(err))
continue
}
newToken := model.Token{
AccountID: oldToken.AccountID,
RefreshToken: resp.RefreshToken,
AccessToken: resp.AccessToken,
Expiration: time.Now().Unix() + resp.ExpiresIn,
CreatedAt: time.Now().Unix(),
}
err = m.repo.AmoRepo.WebhookUpdate(ctx, newToken)
if err != nil {
m.logger.Error("error update token in db", zap.Error(err))
return nil, err
}
}
newTokens, err := m.repo.AmoRepo.GetAllTokens(ctx)
if err != nil {
m.logger.Error("error getting all new updated tokens from db in UpdateTokens", zap.Error(err))
return nil, err
}
return newTokens, nil
}
func (m *Methods) CheckUsers(ctx context.Context, allTokens []model.Token) error {
listUser := make(map[string][]models.Users)
for _, token := range allTokens {
page := 1
limit := 250
userData, err := m.amoClient.GetUserList(models.RequestGetListUsers{
Page: page,
Limit: limit,
}, token.AccessToken)
if err != nil {
m.logger.Error("error fetching list users", zap.Error(err))
break
}
listUser[token.AccountID] = append(listUser[token.AccountID], userData.Embedded.Users...)
}
var usersForUpdateAndCreate []model.User
for accountID, users := range listUser {
mainAccount, err := m.repo.AmoRepo.GetCurrentAccount(ctx, accountID)
if err != nil {
m.logger.Error("error getting current account from db", zap.Error(err))
return err
}
currentUserUsers, err := m.repo.AmoRepo.GetUserUsersByID(ctx, mainAccount.Amouserid)
if err != nil {
m.logger.Error("error getting user users by amo user id", zap.Error(err))
return err
}
for _, user := range users {
if user.ID == mainAccount.AmoID {
err := m.repo.AmoRepo.CheckMainUser(ctx, model.User{
Name: user.Name,
Role: int32(user.Rights.RoleID),
Group: int32(user.Rights.GroupID),
Email: user.Email,
AmoID: user.ID,
})
if err != nil {
m.logger.Error("error update main user data in db", zap.Error(err))
return err
}
}
usersForUpdateAndCreate = append(usersForUpdateAndCreate, model.User{
AmoID: user.ID,
Name: user.FullName,
Group: int32(user.Rights.GroupID),
Role: int32(user.Rights.RoleID),
Email: user.Email,
Amouserid: mainAccount.Amouserid,
})
}
var deletedUserIDs []int64
for _, currentUserUser := range currentUserUsers {
found := false
for _, user := range users {
if currentUserUser.AmoID == user.ID {
found = true
break
}
}
if !found {
deletedUserIDs = append(deletedUserIDs, currentUserUser.ID)
}
}
if len(deletedUserIDs) > 0 {
err := m.repo.AmoRepo.DeleteUsers(ctx, deletedUserIDs)
if err != nil {
m.logger.Error("error deleting users in db", zap.Error(err))
return err
}
}
}
err := m.repo.AmoRepo.CheckAndUpdateUsers(ctx, usersForUpdateAndCreate)
if err != nil {
m.logger.Error("error update users list data in db", zap.Error(err))
return err
}
return nil
}
func (m *Methods) CheckPipelinesAndSteps(ctx context.Context, tokens []model.Token) error {
for _, token := range tokens {
user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, token.AccountID)
if err != nil {
m.logger.Error("error getting amoUserInfo by account quiz id", zap.Error(err))
return err
}
currentUserPipelines, err := m.repo.AmoRepo.GetUserPipelinesByID(ctx, user.AmoID)
if err != nil {
m.logger.Error("error getting user pipelines by amo id", zap.Error(err))
return err
}
currentUserSteps, err := m.repo.AmoRepo.GetUserStepsByID(ctx, user.AmoID)
if err != nil {
m.logger.Error("error getting user steps by amo id", zap.Error(err))
return err
}
var receivedSteps []model.Step
pipelines, err := m.amoClient.GetListPipelines(token.AccessToken)
if err != nil {
m.logger.Error("error fetching list pipelines from amo", zap.Error(err))
continue
}
if len(pipelines.Embedded.Pipelines) > 0 {
receivedPipelines := tools.ToPipeline(pipelines.Embedded.Pipelines)
err = m.repo.AmoRepo.CheckPipelines(ctx, receivedPipelines)
if err != nil {
m.logger.Error("error update list pipelines in db:", zap.Error(err))
return err
}
for _, pipeline := range pipelines.Embedded.Pipelines {
steps, err := m.amoClient.GetListSteps(pipeline.ID, token.AccessToken)
if err != nil {
m.logger.Error("error getting list steps pipeline:", zap.Error(err))
continue
}
receivedStep := tools.ToStep(steps.Embedded.Statuses)
receivedSteps = append(receivedSteps, receivedStep...)
err = m.repo.AmoRepo.CheckSteps(ctx, receivedStep)
if err != nil {
m.logger.Error("error update pipeline steps in db:", zap.Error(err))
return err
}
}
var deletedPipelineIDs []int64
for _, currentUserPipeline := range currentUserPipelines {
found := false
for _, receivedPipeline := range receivedPipelines {
if currentUserPipeline.Amoid == receivedPipeline.Amoid && currentUserPipeline.AccountID == user.AmoID {
found = true
break
}
}
if !found {
deletedPipelineIDs = append(deletedPipelineIDs, currentUserPipeline.ID)
}
}
if len(deletedPipelineIDs) > 0 {
err := m.repo.AmoRepo.DeletePipelines(ctx, deletedPipelineIDs)
if err != nil {
m.logger.Error("error deleting pipelines in db", zap.Error(err))
return err
}
}
var deletedStepIDs []int64
for _, currentUserStep := range currentUserSteps {
found := false
for _, receivedStep := range receivedSteps {
if currentUserStep.Amoid == receivedStep.Amoid && currentUserStep.Accountid == user.AmoID && currentUserStep.Pipelineid == receivedStep.Pipelineid {
found = true
break
}
}
if !found {
deletedStepIDs = append(deletedStepIDs, currentUserStep.ID)
}
}
if len(deletedStepIDs) > 0 {
err := m.repo.AmoRepo.DeleteSteps(ctx, deletedStepIDs)
if err != nil {
m.logger.Error("error deleting steps in db", zap.Error(err))
return err
}
}
}
}
return nil
}
func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error {
for _, token := range tokens {
user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, token.AccountID)
if err != nil {
m.logger.Error("error getting amoUserInfo by account quiz id", zap.Error(err))
return err
}
currentUserTags, err := m.repo.AmoRepo.GetUserTagsByID(ctx, user.AmoID)
if err != nil {
m.logger.Error("error getting user tags by amo id", zap.Error(err))
return err
}
var wg sync.WaitGroup
wg.Add(4)
var tagsMap sync.Map
entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType}
for _, entityType := range entityTypes {
go func(entityType model.EntityType) {
defer wg.Done()
page := 1
limit := 250
for {
req := models.GetListTagsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
tags, err := m.amoClient.GetListTags(req, token.AccessToken)
if err != nil {
m.logger.Error("error getting list of tags", zap.Error(err))
return
}
if tags == nil || len(tags.Embedded.Tags) == 0 {
break
}
tagsMap.Store(entityType, tags.Embedded.Tags)
page++
}
}(entityType)
}
wg.Wait()
var deletedTagIDs []int64
for _, currentUserTag := range currentUserTags {
found := false
for _, entityType := range entityTypes {
if tags, ok := tagsMap.Load(entityType); ok {
if len(tags.([]models.Tag)) > 0 {
receivedTags := tools.ToTag(tags.([]models.Tag), entityType)
for _, tag := range receivedTags {
if currentUserTag.Amoid == tag.Amoid && currentUserTag.Accountid == user.AmoID && currentUserTag.Entity == entityType {
found = true
break
}
}
}
}
if found {
break
}
}
if !found {
deletedTagIDs = append(deletedTagIDs, currentUserTag.ID)
}
}
if len(deletedTagIDs) > 0 {
err = m.repo.AmoRepo.DeleteTags(ctx, deletedTagIDs)
if err != nil {
m.logger.Error("error deleting tags in db", zap.Error(err))
return err
}
}
for _, entityType := range entityTypes {
if tags, ok := tagsMap.Load(entityType); ok {
if len(tags.([]models.Tag)) > 0 {
err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(tags.([]models.Tag), entityType), token.AccountID)
if err != nil {
switch entityType {
case model.LeadsType:
m.logger.Error("error updating leads tags in db", zap.Error(err))
return err
case model.ContactsType:
m.logger.Error("error updating contacts tags in db", zap.Error(err))
return err
case model.CompaniesType:
m.logger.Error("error updating companies tags in db", zap.Error(err))
return err
case model.CustomersType:
m.logger.Error("error updating customer tags in db", zap.Error(err))
return err
}
}
}
}
}
}
return nil
}
func (m *Methods) CheckFields(ctx context.Context, tokens []model.Token) error {
for _, token := range tokens {
user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, token.AccountID)
if err != nil {
m.logger.Error("error getting amoUserInfo by account quiz id", zap.Error(err))
return err
}
currentUserFields, err := m.repo.AmoRepo.GetUserFieldsByID(ctx, user.AmoID)
if err != nil {
m.logger.Error("error getting user fields by amo id", zap.Error(err))
return err
}
var wg sync.WaitGroup
wg.Add(4)
var fieldsMap sync.Map
entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType}
for _, entityType := range entityTypes {
go func(entityType model.EntityType) {
defer wg.Done()
page := 1
limit := 50
for {
req := models.GetListFieldsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
fields, err := m.amoClient.GetListFields(req, token.AccessToken)
if err != nil {
m.logger.Error("error getting list of fields", zap.Error(err))
return
}
if fields == nil || len(fields.Embedded.CustomFields) == 0 {
break
}
fieldsMap.Store(entityType, fields.Embedded.CustomFields)
page++
}
}(entityType)
}
wg.Wait()
var deletedFieldIDs []int64
for _, currentUserField := range currentUserFields {
found := false
for _, entityType := range entityTypes {
if fields, ok := fieldsMap.Load(entityType); ok {
if len(fields.([]models.CustomField)) > 0 {
receivedFields := tools.ToField(fields.([]models.CustomField), entityType)
for _, field := range receivedFields {
if currentUserField.Amoid == field.Amoid && currentUserField.Accountid == user.AmoID && currentUserField.Entity == entityType {
found = true
break
}
}
}
}
if found {
break
}
}
if !found {
deletedFieldIDs = append(deletedFieldIDs, currentUserField.ID)
}
}
if len(deletedFieldIDs) > 0 {
err = m.repo.AmoRepo.DeleteFields(ctx, deletedFieldIDs)
if err != nil {
m.logger.Error("error deleting fields in db", zap.Error(err))
return err
}
}
for _, entityType := range entityTypes {
if fields, ok := fieldsMap.Load(entityType); ok {
if len(fields.([]models.CustomField)) > 0 {
err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(fields.([]models.CustomField), entityType), token.AccountID)
if err != nil {
switch entityType {
case model.LeadsType:
m.logger.Error("error updating leads fields in db", zap.Error(err))
return err
case model.ContactsType:
m.logger.Error("error updating contacts fields in db", zap.Error(err))
return err
case model.CompaniesType:
m.logger.Error("error updating companies fields in db", zap.Error(err))
return err
case model.CustomersType:
m.logger.Error("error updating customer fields in db", zap.Error(err))
return err
}
}
}
}
}
}
return nil
}
func (m *Methods) GetTokenByID(ctx context.Context, accountID string) (*model.Token, error) {
token, err := m.repo.AmoRepo.GetTokenByID(ctx, accountID)
if err != nil {
m.logger.Error("error getting token by account id from db", zap.Error(err))
return nil, err
}
return token, nil
}
func (m *Methods) CreateUserFromWebHook(ctx context.Context, accountID string, authCode string) (*model.Token, error) {
// получаем аксес и рефреш токены по коду авторизации
forGetTokens := models.CreateWebHookReq{
GrantType: "authorization_code",
Code: authCode,
}
tokens, err := m.amoClient.CreateWebHook(&forGetTokens)
if err != nil {
m.logger.Error("error getting webhook in CreateUserFromWebHook:", zap.Error(err))
return nil, err
}
// получаем информацию о пользователе по аксес токену
userInfo, err := m.amoClient.GetUserInfo(tokens.AccessToken)
if err != nil {
m.logger.Error("error getting UserInfo in CreateUserFromWebHook:", zap.Error(err))
return nil, err
}
toCreate := model.User{
Name: userInfo.Name,
Subdomain: userInfo.Subdomain,
AmoID: userInfo.ID,
Amouserid: userInfo.ID,
Country: userInfo.Country,
}
err = m.repo.AmoRepo.CreateAccount(ctx, accountID, toCreate)
if err != nil {
m.logger.Error("error create account in db in CreateUserFromWebHook", zap.Error(err))
return nil, err
}
err = m.repo.AmoRepo.WebhookCreate(ctx, model.Token{
RefreshToken: tokens.RefreshToken,
AccessToken: tokens.AccessToken,
AccountID: accountID,
AuthCode: authCode,
Expiration: time.Now().Unix() + tokens.ExpiresIn,
CreatedAt: time.Now().Unix(),
})
if err != nil {
m.logger.Error("error adding tokens to db in CreateUserFromWebHook", zap.Error(err))
return nil, err
}
return &model.Token{
AccountID: accountID,
RefreshToken: tokens.RefreshToken,
AccessToken: tokens.AccessToken,
}, nil
}
func (m *Methods) CheckUTMs(ctx context.Context, token, accountID string, ids []int32) ([]model.Field, error) {
utms, err := m.repo.AmoRepo.GetUtmsByID(ctx, ids)
if err != nil {
m.logger.Error("error getting user UTM byList IDs", zap.Error(err))
return nil, err
}
user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, accountID)
if err != nil {
m.logger.Error("error getting user data by account id in check utms wc method", zap.Error(err))
return nil, err
}
fields, err := m.repo.AmoRepo.GetUserFieldsByID(ctx, user.AmoID)
if err != nil {
m.logger.Error("error getting user fields by amo account id", zap.Error(err))
return nil, err
}
if len(utms) < 0 {
return fields, nil
}
toCreated, toUpdate := tools.ToCreatedUpdate(utms, fields)
if len(toUpdate) > 0 {
err = m.repo.AmoRepo.UpdateUtmsFields(ctx, toUpdate)
if err != nil {
m.logger.Error("error update utms fields in db", zap.Error(err))
return nil, err
}
}
if len(toCreated) > 0 {
createdFields, err := m.amoClient.AddFields(toCreated, model.LeadsType, token)
if err != nil {
m.logger.Error("error created amo fields", zap.Error(err))
return nil, err
}
newFields := tools.ToField(createdFields.Embedded.CustomFields, model.LeadsType)
err = m.repo.AmoRepo.CheckFields(ctx, newFields, accountID)
if err != nil {
m.logger.Error("error created amo fields in db", zap.Error(err))
return nil, err
}
forUpdate := tools.MatchingUTMs(utms, createdFields.Embedded.CustomFields)
err = m.repo.AmoRepo.UpdateUTMs(ctx, forUpdate)
if err != nil {
m.logger.Error("error update utms in db", zap.Error(err))
return nil, err
}
fields = append(fields, newFields...)
}
return fields, nil
}
func (m *Methods) CheckFieldRule(ctx context.Context, token string, accountID string, req models.KafkaRule, currentFields []model.Field) error {
var (
leadIDs, companyIDs, customerIDs []int32
leadQuestions, companyQuestions, customerQuestions []model.Question
questionsTypeMap = make(map[model.EntityType][]model.Question)
newFields []model.Field
lead, company, customer []model.FieldRule
currentFieldsRule = req.Fieldsrule
err error
)
quiz, err := m.repo.QuizRepo.GetQuizById(ctx, accountID, uint64(req.QuizID))
if err != nil {
m.logger.Error("error getting quiz by quizID and accountID", zap.Error(err))
return err
}
var quizConfig model.QuizContact
err = json.Unmarshal([]byte(quiz.Config), &quizConfig)
if err != nil {
m.logger.Error("error serialization quizConfig to model QuizContact", zap.Error(err))
return err
}
leadIDs = tools.ToQuestionIDs(req.Fieldsrule.Lead)
customerIDs = tools.ToQuestionIDs(req.Fieldsrule.Customer)
companyIDs = tools.ToQuestionIDs(req.Fieldsrule.Company)
getQuestions := func(questionIDs []int32, questions *[]model.Question) {
if len(questionIDs) > 0 {
*questions, err = m.repo.QuestionRepo.GetQuestionListByIDs(ctx, questionIDs)
if err != nil {
m.logger.Error("error getting questions", zap.Error(err))
return
}
}
}
getQuestions(leadIDs, &leadQuestions)
getQuestions(customerIDs, &customerQuestions)
getQuestions(companyIDs, &companyQuestions)
questionsTypeMap[model.LeadsType] = append(questionsTypeMap[model.LeadsType], leadQuestions...)
questionsTypeMap[model.CustomersType] = append(questionsTypeMap[model.CustomersType], customerQuestions...)
questionsTypeMap[model.CompaniesType] = append(questionsTypeMap[model.CompaniesType], companyQuestions...)
toCreated, toUpdate := tools.ToCreatedUpdateQuestionRules(questionsTypeMap, currentFields)
contactFieldsToCreate, forAdding := tools.ForContactRules(quizConfig, currentFields)
for entity, fields := range toCreated {
if len(fields) == 0 {
continue
}
createdFields, err := m.amoClient.AddFields(fields, entity, token)
if err != nil {
m.logger.Error("error adding fields to amo", zap.Any("type", entity), zap.Error(err))
continue
}
newFields = append(newFields, tools.ToField(createdFields.Embedded.CustomFields, entity)...)
}
if len(contactFieldsToCreate) > 0 {
createdFields, err := m.amoClient.AddFields(contactFieldsToCreate, model.ContactsType, token)
if err != nil {
m.logger.Error("error adding fields to amo", zap.Any("type", model.ContactsType), zap.Error(err))
}
contructedFields := tools.ToField(createdFields.Embedded.CustomFields, model.ContactsType)
newFields = append(newFields, contructedFields...)
for _, field := range contructedFields {
if _, ok := forAdding[field.Name]; ok {
forAdding[field.Name] = int(field.Amoid)
}
}
}
if len(newFields) > 0 {
err = m.repo.AmoRepo.CheckFields(ctx, newFields, accountID)
if err != nil {
m.logger.Error("error updating fields rule in db Check Fields", zap.Error(err))
return err
}
}
constructFieldRules := func(fieldRuleArrCurrent []model.FieldRule, questions []model.Question, fieldRule *[]model.FieldRule) {
for _, rules := range fieldRuleArrCurrent {
for questionID := range rules.Questionid {
for _, question := range questions {
if fieldID, ok := toUpdate[questionID]; ok {
ruleMap := make(map[int]int)
ruleMap[questionID] = fieldID
*fieldRule = append(*fieldRule, model.FieldRule{Questionid: ruleMap})
break
}
if questionID == int(question.Id) {
for _, field := range newFields {
if question.Title == field.Name {
ruleMap := make(map[int]int)
ruleMap[questionID] = int(field.Amoid)
*fieldRule = append(*fieldRule, model.FieldRule{Questionid: ruleMap})
}
}
}
}
}
}
}
constructFieldRules(currentFieldsRule.Lead, leadQuestions, &lead)
constructFieldRules(currentFieldsRule.Customer, customerQuestions, &customer)
constructFieldRules(currentFieldsRule.Company, companyQuestions, &company)
err = m.repo.AmoRepo.UpdateFieldRules(ctx, model.Fieldsrule{
Lead: lead,
Customer: customer,
Company: company,
Contact: model.ContactRules{ContactRuleMap: forAdding},
}, accountID, req.QuizID)
if err != nil {
m.logger.Error("error updating fields rule in db", zap.Error(err))
return err
}
return nil
}