amocrm/internal/workers_methods/methods.go

450 lines
13 KiB
Go
Raw Normal View History

package workers_methods
import (
"amocrm/internal/models"
"amocrm/internal/tools"
"amocrm/pkg/amoClient"
"context"
"go.uber.org/zap"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
"time"
)
type Methods struct {
repo *dal.AmoDal
amoClient *amoClient.Amo
logger *zap.Logger
}
type Deps struct {
Repo *dal.AmoDal
AmoClient *amoClient.Amo
Logger *zap.Logger
}
func NewWorkersMethods(deps Deps) *Methods {
return &Methods{
repo: deps.Repo,
amoClient: deps.AmoClient,
logger: deps.Logger,
}
}
func (m *Methods) UpdateTokens(ctx context.Context) ([]model.Token, error) {
allTokens, err := m.repo.AmoRepo.GetAllTokens(ctx)
if err != nil {
m.logger.Error("error getting all tokens from db in UpdateTokens", zap.Error(err))
return nil, err
}
var newTokens []model.Token
for _, oldToken := range allTokens {
req := models.UpdateWebHookReq{
GrantType: "refresh_token",
RefreshToken: oldToken.RefreshToken,
}
resp, err := m.amoClient.CreateWebHook(&req)
if err != nil {
m.logger.Error("error create webhook in UpdateTokens", zap.Error(err))
continue
}
newToken := model.Token{
AccountID: oldToken.AccountID,
RefreshToken: resp.RefreshToken,
AccessToken: resp.AccessToken,
Expiration: time.Now().Unix() + resp.ExpiresIn,
CreatedAt: time.Now().Unix(),
}
newTokens = append(newTokens, newToken)
}
err = m.repo.AmoRepo.WebhookUpdate(ctx, newTokens)
if err != nil {
m.logger.Error("error update newTokens in UpdateTokens", zap.Error(err))
return nil, err
}
return newTokens, nil
}
func (m *Methods) CheckUsers(ctx context.Context, allTokens []model.Token) error {
listUser := make(map[string][]models.Users)
for _, token := range allTokens {
page := 1
limit := 250
userData, err := m.amoClient.GetUserList(models.RequestGetListUsers{
Page: page,
Limit: limit,
}, token.AccessToken)
if err != nil {
m.logger.Error("error fetching list users", zap.Error(err))
break
}
listUser[token.AccountID] = append(listUser[token.AccountID], userData.Embedded.Users...)
}
for accountID, users := range listUser {
mainAccount, err := m.repo.AmoRepo.GetCurrentAccount(ctx, accountID)
if err != nil {
m.logger.Error("error getting current account from db", zap.Error(err))
return err
}
for _, user := range users {
if user.ID == mainAccount.AmoID {
err := m.repo.AmoRepo.CheckMainUser(ctx, model.User{
Name: user.Name,
Role: int32(user.Rights.RoleID),
Group: int32(user.Rights.GroupID),
Email: user.Email,
AmoID: user.ID,
})
if err != nil {
m.logger.Error("error update main user data in db", zap.Error(err))
return err
}
}
err := m.repo.AmoRepo.CheckAndUpdateUsers(ctx, model.User{
AmoID: user.ID,
Name: user.FullName,
Group: int32(user.Rights.GroupID),
Role: int32(user.Rights.RoleID),
Email: user.Email,
Amouserid: mainAccount.Amouserid,
})
if err != nil {
m.logger.Error("error update users list data in db", zap.Error(err))
return err
}
}
}
return nil
}
func (m *Methods) CheckPipelinesAndSteps(ctx context.Context, tokens []model.Token) error {
for _, token := range tokens {
pipelines, err := m.amoClient.GetListPipelines(token.AccessToken)
if err != nil {
m.logger.Error("error fetching list pipelines from amo", zap.Error(err))
continue
}
2024-04-23 14:00:51 +00:00
if len(pipelines.Embedded.Pipelines) > 0 {
err = m.repo.AmoRepo.CheckPipelines(ctx, tools.ToPipeline(pipelines.Embedded.Pipelines))
if err != nil {
2024-04-23 14:00:51 +00:00
m.logger.Error("error update list pipelines in db:", zap.Error(err))
return err
}
2024-04-23 14:00:51 +00:00
for _, pipeline := range pipelines.Embedded.Pipelines {
steps, err := m.amoClient.GetListSteps(pipeline.ID, token.AccessToken)
if err != nil {
m.logger.Error("error getting list steps pipeline:", zap.Error(err))
continue
}
err = m.repo.AmoRepo.CheckSteps(ctx, tools.ToStep(steps.Embedded.Statuses))
if err != nil {
m.logger.Error("error update pipeline steps in db:", zap.Error(err))
return err
}
}
}
}
return nil
}
func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error {
for _, token := range tokens {
var leadsTags []models.Tag
var contactsTags []models.Tag
var companiesTags []models.Tag
var customersTags []models.Tag
entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType}
for _, entityType := range entityTypes {
page := 1
limit := 250
for {
req := models.GetListTagsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
tags, err := m.amoClient.GetListTags(req, token.AccessToken)
if err != nil {
m.logger.Error("error getting list of tags", zap.Error(err))
break
}
if tags == nil || len(tags.Embedded.Tags) == 0 {
break
}
switch entityType {
case model.LeadsType:
leadsTags = append(leadsTags, tags.Embedded.Tags...)
case model.ContactsType:
contactsTags = append(contactsTags, tags.Embedded.Tags...)
case model.CompaniesType:
companiesTags = append(companiesTags, tags.Embedded.Tags...)
case model.CustomersType:
customersTags = append(customersTags, tags.Embedded.Tags...)
}
page++
}
}
for _, entityType := range entityTypes {
switch entityType {
case model.LeadsType:
2024-04-23 14:00:51 +00:00
if len(leadsTags) > 0 {
err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(leadsTags, entityType), token.AccountID)
if err != nil {
m.logger.Error("error update leads tags in db", zap.Error(err))
return err
}
}
case model.ContactsType:
2024-04-23 14:00:51 +00:00
if len(contactsTags) > 0 {
err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(contactsTags, entityType), token.AccountID)
if err != nil {
m.logger.Error("error update contacts tags in db", zap.Error(err))
return err
}
}
case model.CompaniesType:
2024-04-23 14:00:51 +00:00
if len(companiesTags) > 0 {
err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(companiesTags, entityType), token.AccountID)
if err != nil {
m.logger.Error("error update companies tags in db", zap.Error(err))
return err
}
}
case model.CustomersType:
2024-04-23 14:00:51 +00:00
if len(customersTags) > 0 {
err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(customersTags, entityType), token.AccountID)
if err != nil {
m.logger.Error("error update customer tags in db", zap.Error(err))
return err
}
}
}
}
}
return nil
}
func (m *Methods) CheckFields(ctx context.Context, tokens []model.Token) error {
for _, token := range tokens {
var leadsFields []models.CustomField
var contactsFields []models.CustomField
var companiesFields []models.CustomField
var customersFields []models.CustomField
entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType}
for _, entityType := range entityTypes {
page := 1
limit := 50
for {
req := models.GetListFieldsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
fields, err := m.amoClient.GetListFields(req, token.AccessToken)
if err != nil {
m.logger.Error("error getting list of fields", zap.Error(err))
break
}
if fields == nil || len(fields.Embedded.CustomFields) == 0 {
break
}
switch entityType {
case model.LeadsType:
leadsFields = append(leadsFields, fields.Embedded.CustomFields...)
case model.ContactsType:
contactsFields = append(contactsFields, fields.Embedded.CustomFields...)
case model.CompaniesType:
companiesFields = append(companiesFields, fields.Embedded.CustomFields...)
case model.CustomersType:
customersFields = append(customersFields, fields.Embedded.CustomFields...)
}
page++
}
}
for _, entityType := range entityTypes {
switch entityType {
case model.LeadsType:
2024-04-23 14:00:51 +00:00
if len(leadsFields) > 0 {
err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(leadsFields, entityType), token.AccountID)
if err != nil {
m.logger.Error("error update leads fields in db", zap.Error(err))
return err
}
}
case model.ContactsType:
2024-04-23 14:00:51 +00:00
if len(contactsFields) > 0 {
err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(contactsFields, entityType), token.AccountID)
if err != nil {
m.logger.Error("error update contacts fields in db", zap.Error(err))
return err
}
}
case model.CompaniesType:
2024-04-23 14:00:51 +00:00
if len(companiesFields) > 0 {
err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(companiesFields, entityType), token.AccountID)
if err != nil {
m.logger.Error("error update companies fields", zap.Error(err))
return err
}
}
case model.CustomersType:
2024-04-23 14:00:51 +00:00
if len(customersFields) > 0 {
err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(customersFields, entityType), token.AccountID)
if err != nil {
m.logger.Error("error update customer fields", zap.Error(err))
return err
}
}
}
}
}
return nil
}
func (m *Methods) GetTokenByID(ctx context.Context, accountID string) (*model.Token, error) {
token, err := m.repo.AmoRepo.GetTokenByID(ctx, accountID)
if err != nil {
m.logger.Error("error getting token by account id from db", zap.Error(err))
return nil, err
}
return token, nil
}
func (m *Methods) CreateUserFromWebHook(ctx context.Context, accountID string, authCode string) (*model.Token, error) {
// получаем аксес и рефреш токены по коду авторизации
forGetTokens := models.CreateWebHookReq{
GrantType: "authorization_code",
Code: authCode,
}
tokens, err := m.amoClient.CreateWebHook(&forGetTokens)
if err != nil {
m.logger.Error("error getting webhook in CreateUserFromWebHook:", zap.Error(err))
return nil, err
}
// получаем информацию о пользователе по аксес токену
userInfo, err := m.amoClient.GetUserInfo(tokens.AccessToken)
if err != nil {
m.logger.Error("error getting UserInfo in CreateUserFromWebHook:", zap.Error(err))
return nil, err
}
toCreate := model.User{
Name: userInfo.Name,
Subdomain: userInfo.Subdomain,
AmoID: userInfo.ID,
Amouserid: userInfo.ID,
Country: userInfo.Country,
}
err = m.repo.AmoRepo.CreateAccount(ctx, accountID, toCreate)
if err != nil {
m.logger.Error("error create account in db in CreateUserFromWebHook", zap.Error(err))
return nil, err
}
err = m.repo.AmoRepo.WebhookCreate(ctx, model.Token{
RefreshToken: tokens.RefreshToken,
AccessToken: tokens.AccessToken,
AccountID: accountID,
AuthCode: authCode,
Expiration: time.Now().Unix() + tokens.ExpiresIn,
CreatedAt: time.Now().Unix(),
})
if err != nil {
m.logger.Error("error adding tokens to db in CreateUserFromWebHook", zap.Error(err))
return nil, err
}
return &model.Token{
AccountID: accountID,
RefreshToken: tokens.RefreshToken,
AccessToken: tokens.AccessToken,
}, nil
}
2024-04-29 22:36:49 +00:00
func (m *Methods) CheckUTMs(ctx context.Context, token, accountID string, ids []int32) error {
utms, err := m.repo.AmoRepo.GetUtmsByID(ctx, ids)
if err != nil {
m.logger.Error("error getting user UTM byList IDs", zap.Error(err))
return err
}
2024-04-29 22:36:49 +00:00
fields, err := m.repo.AmoRepo.GetUserFieldsByID(ctx, utms[0].Accountid)
if err != nil {
m.logger.Error("error getting user fields by amo account id", zap.Error(err))
return err
}
toCreated, toUpdate := tools.ToCreatedUpdate(utms, fields)
if len(toUpdate) > 0 {
err = m.repo.AmoRepo.UpdateUtmsFields(ctx, toUpdate)
if err != nil {
m.logger.Error("error update utms fields in db", zap.Error(err))
return err
}
}
if len(toCreated) > 0 {
createdFields, err := m.amoClient.AddFields(toCreated, model.LeadsType, token)
if err != nil {
m.logger.Error("error created amo fields", zap.Error(err))
return err
}
2024-04-29 22:36:49 +00:00
err = m.repo.AmoRepo.CheckFields(ctx, tools.ToField(createdFields.Embedded.CustomFields, model.LeadsType), accountID)
if err != nil {
m.logger.Error("error created amo fields in db", zap.Error(err))
return err
}
forUpdate := tools.MatchingUTMs(utms, createdFields.Embedded.CustomFields)
err = m.repo.AmoRepo.UpdateUTMs(ctx, forUpdate)
if err != nil {
m.logger.Error("error update utms in db", zap.Error(err))
return err
}
}
return nil
}
func (m *Methods) CheckFieldRule(ctx context.Context, token string, accountId string, req models.KafkaRule) error {
// todo
// после того как после utm филды обновились, можно проверить также Fieldsrule, добавить в амо поля и обновить в правилах его
return nil
}