amocrm/internal/workers_methods/methods.go

526 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package workers_methods
import (
"amocrm/internal/models"
"amocrm/internal/tools"
"amocrm/pkg/amoClient"
"context"
"go.uber.org/zap"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
"sync"
"time"
)
type Methods struct {
repo *dal.AmoDal
amoClient *amoClient.Amo
logger *zap.Logger
}
type Deps struct {
Repo *dal.AmoDal
AmoClient *amoClient.Amo
Logger *zap.Logger
}
func NewWorkersMethods(deps Deps) *Methods {
return &Methods{
repo: deps.Repo,
amoClient: deps.AmoClient,
logger: deps.Logger,
}
}
func (m *Methods) UpdateTokens(ctx context.Context) ([]model.Token, error) {
allTokens, err := m.repo.AmoRepo.GetAllTokens(ctx)
if err != nil {
m.logger.Error("error getting all tokens from db in UpdateTokens", zap.Error(err))
return nil, err
}
var newTokens []model.Token
for _, oldToken := range allTokens {
req := models.UpdateWebHookReq{
GrantType: "refresh_token",
RefreshToken: oldToken.RefreshToken,
}
resp, err := m.amoClient.CreateWebHook(&req)
if err != nil {
m.logger.Error("error create webhook in UpdateTokens", zap.Error(err))
continue
}
newToken := model.Token{
AccountID: oldToken.AccountID,
RefreshToken: resp.RefreshToken,
AccessToken: resp.AccessToken,
Expiration: time.Now().Unix() + resp.ExpiresIn,
CreatedAt: time.Now().Unix(),
}
newTokens = append(newTokens, newToken)
}
err = m.repo.AmoRepo.WebhookUpdate(ctx, newTokens)
if err != nil {
m.logger.Error("error update newTokens in UpdateTokens", zap.Error(err))
return nil, err
}
return newTokens, nil
}
func (m *Methods) CheckUsers(ctx context.Context, allTokens []model.Token) error {
listUser := make(map[string][]models.Users)
for _, token := range allTokens {
page := 1
limit := 250
userData, err := m.amoClient.GetUserList(models.RequestGetListUsers{
Page: page,
Limit: limit,
}, token.AccessToken)
if err != nil {
m.logger.Error("error fetching list users", zap.Error(err))
break
}
listUser[token.AccountID] = append(listUser[token.AccountID], userData.Embedded.Users...)
}
var usersForUpdateAndCreate []model.User
for accountID, users := range listUser {
mainAccount, err := m.repo.AmoRepo.GetCurrentAccount(ctx, accountID)
if err != nil {
m.logger.Error("error getting current account from db", zap.Error(err))
return err
}
for _, user := range users {
if user.ID == mainAccount.AmoID {
err := m.repo.AmoRepo.CheckMainUser(ctx, model.User{
Name: user.Name,
Role: int32(user.Rights.RoleID),
Group: int32(user.Rights.GroupID),
Email: user.Email,
AmoID: user.ID,
})
if err != nil {
m.logger.Error("error update main user data in db", zap.Error(err))
return err
}
}
usersForUpdateAndCreate = append(usersForUpdateAndCreate, model.User{
AmoID: user.ID,
Name: user.FullName,
Group: int32(user.Rights.GroupID),
Role: int32(user.Rights.RoleID),
Email: user.Email,
Amouserid: mainAccount.Amouserid,
})
}
}
err := m.repo.AmoRepo.CheckAndUpdateUsers(ctx, usersForUpdateAndCreate)
if err != nil {
m.logger.Error("error update users list data in db", zap.Error(err))
return err
}
return nil
}
func (m *Methods) CheckPipelinesAndSteps(ctx context.Context, tokens []model.Token) error {
for _, token := range tokens {
pipelines, err := m.amoClient.GetListPipelines(token.AccessToken)
if err != nil {
m.logger.Error("error fetching list pipelines from amo", zap.Error(err))
continue
}
if len(pipelines.Embedded.Pipelines) > 0 {
err = m.repo.AmoRepo.CheckPipelines(ctx, tools.ToPipeline(pipelines.Embedded.Pipelines))
if err != nil {
m.logger.Error("error update list pipelines in db:", zap.Error(err))
return err
}
for _, pipeline := range pipelines.Embedded.Pipelines {
steps, err := m.amoClient.GetListSteps(pipeline.ID, token.AccessToken)
if err != nil {
m.logger.Error("error getting list steps pipeline:", zap.Error(err))
continue
}
err = m.repo.AmoRepo.CheckSteps(ctx, tools.ToStep(steps.Embedded.Statuses))
if err != nil {
m.logger.Error("error update pipeline steps in db:", zap.Error(err))
return err
}
}
}
}
return nil
}
func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error {
for _, token := range tokens {
var wg sync.WaitGroup
wg.Add(4)
var tagsMap sync.Map
entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType}
for _, entityType := range entityTypes {
go func(entityType model.EntityType) {
defer wg.Done()
page := 1
limit := 250
for {
req := models.GetListTagsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
tags, err := m.amoClient.GetListTags(req, token.AccessToken)
if err != nil {
m.logger.Error("error getting list of tags", zap.Error(err))
return
}
if tags == nil || len(tags.Embedded.Tags) == 0 {
break
}
tagsMap.Store(entityType, tags.Embedded.Tags)
page++
}
}(entityType)
}
wg.Wait()
for _, entityType := range entityTypes {
if tags, ok := tagsMap.Load(entityType); ok {
if len(tags.([]models.Tag)) > 0 {
err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(tags.([]models.Tag), entityType), token.AccountID)
if err != nil {
switch entityType {
case model.LeadsType:
m.logger.Error("error updating leads tags in db", zap.Error(err))
return err
case model.ContactsType:
m.logger.Error("error updating contacts tags in db", zap.Error(err))
return err
case model.CompaniesType:
m.logger.Error("error updating companies tags in db", zap.Error(err))
return err
case model.CustomersType:
m.logger.Error("error updating customer tags in db", zap.Error(err))
return err
}
}
}
}
}
}
return nil
}
func (m *Methods) CheckFields(ctx context.Context, tokens []model.Token) error {
for _, token := range tokens {
var wg sync.WaitGroup
wg.Add(4)
var fieldsMap sync.Map
entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType}
for _, entityType := range entityTypes {
go func(entityType model.EntityType) {
defer wg.Done()
page := 1
limit := 50
for {
req := models.GetListFieldsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
fields, err := m.amoClient.GetListFields(req, token.AccessToken)
if err != nil {
m.logger.Error("error getting list of fields", zap.Error(err))
return
}
if fields == nil || len(fields.Embedded.CustomFields) == 0 {
break
}
fieldsMap.Store(entityType, fields.Embedded.CustomFields)
page++
}
}(entityType)
}
wg.Wait()
for _, entityType := range entityTypes {
if fields, ok := fieldsMap.Load(entityType); ok {
if len(fields.([]models.CustomField)) > 0 {
err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(fields.([]models.CustomField), entityType), token.AccountID)
if err != nil {
switch entityType {
case model.LeadsType:
m.logger.Error("error updating leads fields in db", zap.Error(err))
return err
case model.ContactsType:
m.logger.Error("error updating contacts fields in db", zap.Error(err))
return err
case model.CompaniesType:
m.logger.Error("error updating companies fields in db", zap.Error(err))
return err
case model.CustomersType:
m.logger.Error("error updating customer fields in db", zap.Error(err))
return err
}
}
}
}
}
}
return nil
}
func (m *Methods) GetTokenByID(ctx context.Context, accountID string) (*model.Token, error) {
token, err := m.repo.AmoRepo.GetTokenByID(ctx, accountID)
if err != nil {
m.logger.Error("error getting token by account id from db", zap.Error(err))
return nil, err
}
return token, nil
}
func (m *Methods) CreateUserFromWebHook(ctx context.Context, accountID string, authCode string) (*model.Token, error) {
// получаем аксес и рефреш токены по коду авторизации
forGetTokens := models.CreateWebHookReq{
GrantType: "authorization_code",
Code: authCode,
}
tokens, err := m.amoClient.CreateWebHook(&forGetTokens)
if err != nil {
m.logger.Error("error getting webhook in CreateUserFromWebHook:", zap.Error(err))
return nil, err
}
// получаем информацию о пользователе по аксес токену
userInfo, err := m.amoClient.GetUserInfo(tokens.AccessToken)
if err != nil {
m.logger.Error("error getting UserInfo in CreateUserFromWebHook:", zap.Error(err))
return nil, err
}
toCreate := model.User{
Name: userInfo.Name,
Subdomain: userInfo.Subdomain,
AmoID: userInfo.ID,
Amouserid: userInfo.ID,
Country: userInfo.Country,
}
err = m.repo.AmoRepo.CreateAccount(ctx, accountID, toCreate)
if err != nil {
m.logger.Error("error create account in db in CreateUserFromWebHook", zap.Error(err))
return nil, err
}
err = m.repo.AmoRepo.WebhookCreate(ctx, model.Token{
RefreshToken: tokens.RefreshToken,
AccessToken: tokens.AccessToken,
AccountID: accountID,
AuthCode: authCode,
Expiration: time.Now().Unix() + tokens.ExpiresIn,
CreatedAt: time.Now().Unix(),
})
if err != nil {
m.logger.Error("error adding tokens to db in CreateUserFromWebHook", zap.Error(err))
return nil, err
}
return &model.Token{
AccountID: accountID,
RefreshToken: tokens.RefreshToken,
AccessToken: tokens.AccessToken,
}, nil
}
func (m *Methods) CheckUTMs(ctx context.Context, token, accountID string, ids []int32) ([]model.Field, error) {
utms, err := m.repo.AmoRepo.GetUtmsByID(ctx, ids)
if err != nil {
m.logger.Error("error getting user UTM byList IDs", zap.Error(err))
return nil, err
}
user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, accountID)
if err != nil {
m.logger.Error("error getting user data by account id in check utms wc method", zap.Error(err))
return nil, err
}
fields, err := m.repo.AmoRepo.GetUserFieldsByID(ctx, user.AmoID)
if err != nil {
m.logger.Error("error getting user fields by amo account id", zap.Error(err))
return nil, err
}
if len(utms) < 0 {
return fields, err
}
toCreated, toUpdate := tools.ToCreatedUpdate(utms, fields)
if len(toUpdate) > 0 {
err = m.repo.AmoRepo.UpdateUtmsFields(ctx, toUpdate)
if err != nil {
m.logger.Error("error update utms fields in db", zap.Error(err))
return nil, err
}
}
if len(toCreated) > 0 {
createdFields, err := m.amoClient.AddFields(toCreated, model.LeadsType, token)
if err != nil {
m.logger.Error("error created amo fields", zap.Error(err))
return nil, err
}
newFields := tools.ToField(createdFields.Embedded.CustomFields, model.LeadsType)
err = m.repo.AmoRepo.CheckFields(ctx, newFields, accountID)
if err != nil {
m.logger.Error("error created amo fields in db", zap.Error(err))
return nil, err
}
forUpdate := tools.MatchingUTMs(utms, createdFields.Embedded.CustomFields)
err = m.repo.AmoRepo.UpdateUTMs(ctx, forUpdate)
if err != nil {
m.logger.Error("error update utms in db", zap.Error(err))
return nil, err
}
fields = append(fields, newFields...)
}
return fields, nil
}
func (m *Methods) CheckFieldRule(ctx context.Context, token string, accountID string, req models.KafkaRule, currentFields []model.Field) error {
var (
leadIDs, contactIDs, companyIDs, customerIDs []int32
leadQuestions, contactQuestions, companyQuestions, customerQuestions []model.Question
questionsTypeMap = make(map[model.EntityType][]model.Question)
newFields []model.Field
lead, contact, company, customer []model.FieldRule
currentFieldsRule = req.Fieldsrule
err error
)
leadIDs = tools.ToQuestionIDs(req.Fieldsrule.Lead)
contactIDs = tools.ToQuestionIDs(req.Fieldsrule.Contact)
customerIDs = tools.ToQuestionIDs(req.Fieldsrule.Customer)
companyIDs = tools.ToQuestionIDs(req.Fieldsrule.Company)
getQuestions := func(questionIDs []int32, questions *[]model.Question) {
if len(questionIDs) > 0 {
*questions, err = m.repo.QuestionRepo.GetQuestionListByIDs(ctx, questionIDs)
if err != nil {
m.logger.Error("error getting questions", zap.Error(err))
return
}
}
}
getQuestions(leadIDs, &leadQuestions)
getQuestions(contactIDs, &contactQuestions)
getQuestions(customerIDs, &customerQuestions)
getQuestions(companyIDs, &companyQuestions)
questionsTypeMap[model.LeadsType] = append(questionsTypeMap[model.LeadsType], leadQuestions...)
questionsTypeMap[model.ContactsType] = append(questionsTypeMap[model.ContactsType], contactQuestions...)
questionsTypeMap[model.CustomersType] = append(questionsTypeMap[model.CustomersType], customerQuestions...)
questionsTypeMap[model.CompaniesType] = append(questionsTypeMap[model.CompaniesType], companyQuestions...)
toCreated, toUpdate := tools.ToCreatedUpdateQuestionRules(questionsTypeMap, currentFields)
for entity, fields := range toCreated {
if len(fields) == 0 {
continue
}
createdFields, err := m.amoClient.AddFields(fields, entity, token)
if err != nil {
m.logger.Error("error adding fields to amo", zap.Any("type", entity), zap.Error(err))
continue
}
newFields = append(newFields, tools.ToField(createdFields.Embedded.CustomFields, entity)...)
}
if len(newFields) > 0 {
err = m.repo.AmoRepo.CheckFields(ctx, newFields, accountID)
if err != nil {
m.logger.Error("error updating fields rule in db Check Fields", zap.Error(err))
return err
}
}
constructFieldRules := func(fieldRuleArrCurrent []model.FieldRule, questions []model.Question, fieldRule *[]model.FieldRule) {
for _, rules := range fieldRuleArrCurrent {
for questionID := range rules.Questionid {
for _, question := range questions {
if fieldID, ok := toUpdate[questionID]; ok {
ruleMap := make(map[int]int)
ruleMap[questionID] = fieldID
*fieldRule = append(*fieldRule, model.FieldRule{Questionid: ruleMap})
break
}
if questionID == int(question.Id) {
for _, field := range newFields {
if question.Title == field.Name {
ruleMap := make(map[int]int)
ruleMap[questionID] = int(field.Amoid)
*fieldRule = append(*fieldRule, model.FieldRule{Questionid: ruleMap})
}
}
}
}
}
}
}
constructFieldRules(currentFieldsRule.Lead, leadQuestions, &lead)
constructFieldRules(currentFieldsRule.Contact, contactQuestions, &contact)
constructFieldRules(currentFieldsRule.Customer, customerQuestions, &customer)
constructFieldRules(currentFieldsRule.Company, companyQuestions, &company)
err = m.repo.AmoRepo.UpdateFieldRules(ctx, model.Fieldsrule{
Lead: lead,
Customer: customer,
Company: company,
Contact: contact,
}, accountID, req.QuizID)
if err != nil {
m.logger.Error("error updating fields rule in db", zap.Error(err))
return err
}
return nil
}