re-write queue_updater from amo to bitrix entity
This commit is contained in:
parent
578be84bb7
commit
27f1265317
2
go.mod
2
go.mod
@ -10,7 +10,7 @@ require (
|
||||
github.com/lib/pq v1.10.9
|
||||
github.com/twmb/franz-go v1.17.1
|
||||
go.uber.org/zap v1.27.0
|
||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240927143522-110e25e2853b
|
||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929145101-b0c03331bc2c
|
||||
penahub.gitlab.yandexcloud.net/devops/linters/golang.git v0.0.0-20240829220549-d35409b619a3
|
||||
)
|
||||
|
||||
|
12
go.sum
12
go.sum
@ -143,5 +143,17 @@ penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240202120244-c4ef
|
||||
penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240202120244-c4ef330cfe5d/go.mod h1:lTmpjry+8evVkXWbEC+WMOELcFkRD1lFMc7J09mOndM=
|
||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240927143522-110e25e2853b h1:tv9gmeZwVcWGx02SUse3H6w+Y19OQi/FGUcY/QZsL+I=
|
||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240927143522-110e25e2853b/go.mod h1:uOuosXduBzd2WbLH6TDZO7ME7ZextulA662oZ6OsoB0=
|
||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929094431-3270d3f5b0e1 h1:X8qwZ9YG993BTSOuon9Fufytxafh0te7LvVCayhYkyk=
|
||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929094431-3270d3f5b0e1/go.mod h1:uOuosXduBzd2WbLH6TDZO7ME7ZextulA662oZ6OsoB0=
|
||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929101800-f4e3f5575f7f h1:BdS8YTl4M9urIKyQsRXMhL0NBB01sQAlu8kGpXGHlpU=
|
||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929101800-f4e3f5575f7f/go.mod h1:uOuosXduBzd2WbLH6TDZO7ME7ZextulA662oZ6OsoB0=
|
||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929110625-ad961d40c364 h1:MUApYm474qNcCwn08CEqN0l8I0T4PheK/q/BPeeCNxE=
|
||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929110625-ad961d40c364/go.mod h1:uOuosXduBzd2WbLH6TDZO7ME7ZextulA662oZ6OsoB0=
|
||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929134459-1059762a0b6b h1:mA5qKW2xQ9+UgDToT+ggr+NRoUtHI9sgOpyqZ5DzhHY=
|
||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929134459-1059762a0b6b/go.mod h1:uOuosXduBzd2WbLH6TDZO7ME7ZextulA662oZ6OsoB0=
|
||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929144614-fe9b788024f5 h1:JGnvdffETkkD6lgzMacZyT8Kda2/c5UPF6Jv7BCOwOM=
|
||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929144614-fe9b788024f5/go.mod h1:uOuosXduBzd2WbLH6TDZO7ME7ZextulA662oZ6OsoB0=
|
||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929145101-b0c03331bc2c h1:yiKPtHsXZ4zIrkXUrZQ9rQm9EwpA8B15cBjEWWMTYI0=
|
||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929145101-b0c03331bc2c/go.mod h1:uOuosXduBzd2WbLH6TDZO7ME7ZextulA662oZ6OsoB0=
|
||||
penahub.gitlab.yandexcloud.net/devops/linters/golang.git v0.0.0-20240829220549-d35409b619a3 h1:sf6e2mp582L3i/FMDd2q6QuWm1njRXzYpIX0SipsvM4=
|
||||
penahub.gitlab.yandexcloud.net/devops/linters/golang.git v0.0.0-20240829220549-d35409b619a3/go.mod h1:i7M72RIpkSjcQtHID6KKj9RT/EYZ1rxS6tIPKWa/BSY=
|
||||
|
@ -12,21 +12,10 @@ type User struct {
|
||||
SecondName string `json:"SECOND_NAME"`
|
||||
Title string `json:"TITLE"`
|
||||
Email string `json:"EMAIL"`
|
||||
UFDepartment []int `json:"UF_DEPARTMENT"`
|
||||
UFDepartment []int32 `json:"UF_DEPARTMENT"`
|
||||
WorkPosition string `json:"WORK_POSITION"`
|
||||
}
|
||||
|
||||
type ResponseGetCurrentUser struct {
|
||||
Result CurrentUser `json:"result"`
|
||||
}
|
||||
|
||||
type CurrentUser struct {
|
||||
ID string `json:"ID"`
|
||||
Name string `json:"NAME"`
|
||||
LastName string `json:"LAST_NAME"`
|
||||
SecondName string `json:"SECOND_NAME"`
|
||||
Title string `json:"TITLE"`
|
||||
Email string `json:"EMAIL"`
|
||||
UFDepartment []int `json:"UF_DEPARTMENT"`
|
||||
WorkPosition string `json:"WORK_POSITION"`
|
||||
Result User `json:"result"`
|
||||
}
|
||||
|
@ -4,16 +4,8 @@ type KafkaMessage struct {
|
||||
AccountID string
|
||||
AuthCode string
|
||||
RefererURL string
|
||||
MemberID string
|
||||
Type MessageType
|
||||
Rule KafkaRule
|
||||
}
|
||||
|
||||
type KafkaRule struct {
|
||||
//QuizID int32
|
||||
//PerformerID int32 // айдишник ответственного за сделку
|
||||
//PipelineID int32 // айдишник воронки
|
||||
//StepID int32 // айдишник этапа
|
||||
//Fieldsrule model.Fieldsrule
|
||||
}
|
||||
|
||||
type MessageType string
|
||||
@ -27,4 +19,5 @@ const (
|
||||
AllDataUpdate MessageType = "allDataUpdate"
|
||||
RuleCheck MessageType = "ruleCheck"
|
||||
UserReLogin MessageType = "userReLogin"
|
||||
StepsUpdate MessageType = "steps"
|
||||
)
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"database/sql"
|
||||
"go.uber.org/zap"
|
||||
"penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/models"
|
||||
"penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/tools"
|
||||
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
|
||||
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/pj_errors"
|
||||
)
|
||||
|
@ -24,7 +24,7 @@ func (s *Service) GetStepsWithPagination(ctx context.Context, req *model.Paginat
|
||||
func (s *Service) UpdateListSteps(ctx context.Context, accountID string) error {
|
||||
message := models.KafkaMessage{
|
||||
AccountID: accountID,
|
||||
Type: models.PipelinesUpdate,
|
||||
Type: models.StepsUpdate,
|
||||
}
|
||||
|
||||
err := s.producer.ToKafkaUpdate(ctx, message)
|
||||
|
@ -29,6 +29,7 @@ func (s *Service) WebhookCreate(ctx context.Context, req ParamsWebhookCreate) er
|
||||
AccountID: req.AccountID,
|
||||
AuthCode: req.Code,
|
||||
RefererURL: req.Domain,
|
||||
MemberID: req.MemberID,
|
||||
Type: models.UserCreate,
|
||||
}
|
||||
|
||||
|
56
internal/tools/construct.go
Normal file
56
internal/tools/construct.go
Normal file
@ -0,0 +1,56 @@
|
||||
package tools
|
||||
|
||||
import (
|
||||
"penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/models"
|
||||
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
func ToPipeline(bitrixPipelines []models.Category, bitrixID string) []model.PipelineBitrix {
|
||||
var pipelines []model.PipelineBitrix
|
||||
for _, p := range bitrixPipelines {
|
||||
pipelines = append(pipelines, model.PipelineBitrix{
|
||||
BitrixID: p.ID,
|
||||
Name: p.Name,
|
||||
EntityTypeId: p.EntityTypeId,
|
||||
AccountID: bitrixID,
|
||||
})
|
||||
}
|
||||
return pipelines
|
||||
}
|
||||
|
||||
func ToStep(bitrixPipelines []models.Steps, bitrixID string) ([]model.StepBitrix, error) {
|
||||
var pipelines []model.StepBitrix
|
||||
for _, p := range bitrixPipelines {
|
||||
pipelineID, err := strconv.ParseInt(p.ID, 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pipelines = append(pipelines, model.StepBitrix{
|
||||
BitrixID: p.ID,
|
||||
AccountID: bitrixID,
|
||||
EntityID: p.EntityID,
|
||||
StatusID: p.StatusID,
|
||||
Name: p.Name,
|
||||
NameInit: p.NameInit,
|
||||
Color: p.Color,
|
||||
PipelineID: int32(pipelineID),
|
||||
})
|
||||
}
|
||||
return pipelines, nil
|
||||
}
|
||||
|
||||
func ToField(bitrixFields []models.Fields, bitrixID string) []model.BitrixField {
|
||||
var fields []model.BitrixField
|
||||
for _, f := range bitrixFields {
|
||||
fields = append(fields, model.BitrixField{
|
||||
AccountID: bitrixID,
|
||||
BitrixID: f.ID,
|
||||
EntityID: f.EntityID,
|
||||
FieldName: f.FieldName,
|
||||
EditFromLabel: f.EditFormLabel,
|
||||
FieldType: f.UserTypeID,
|
||||
})
|
||||
}
|
||||
return fields
|
||||
}
|
42
internal/tools/validate.go
Normal file
42
internal/tools/validate.go
Normal file
@ -0,0 +1,42 @@
|
||||
package tools
|
||||
|
||||
import (
|
||||
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
|
||||
)
|
||||
|
||||
func ValidateUtmFields(response *model.UserListBitrixFieldsResp) *model.UserListBitrixFieldsResp {
|
||||
checkUTM := map[string]struct{}{
|
||||
"utm_content": {},
|
||||
"utm_medium": {},
|
||||
"utm_campaign": {},
|
||||
"utm_source": {},
|
||||
"utm_term": {},
|
||||
"utm_referrer": {},
|
||||
"roistat": {},
|
||||
"referrer": {},
|
||||
"openstat_service": {},
|
||||
"openstat_campaign": {},
|
||||
"openstat_ad": {},
|
||||
"openstat_source": {},
|
||||
"from": {},
|
||||
"gclientid": {},
|
||||
"_ym_uid": {},
|
||||
"_ym_counter": {},
|
||||
"gclid": {},
|
||||
"yclid": {},
|
||||
"fbclid": {},
|
||||
}
|
||||
|
||||
data := &model.UserListBitrixFieldsResp{
|
||||
Count: response.Count,
|
||||
Items: []model.BitrixField{},
|
||||
}
|
||||
|
||||
for _, r := range response.Items {
|
||||
if _, ok := checkUTM[r.EditFromLabel]; !ok {
|
||||
data.Items = append(data.Items, r)
|
||||
}
|
||||
}
|
||||
|
||||
return data
|
||||
}
|
260
internal/workers/queueUpdater/queue_updater.go
Normal file
260
internal/workers/queueUpdater/queue_updater.go
Normal file
@ -0,0 +1,260 @@
|
||||
package queueUpdater
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/twmb/franz-go/pkg/kgo"
|
||||
"go.uber.org/zap"
|
||||
"penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/models"
|
||||
"penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/workers_methods"
|
||||
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
|
||||
"time"
|
||||
)
|
||||
|
||||
type QueueUpdater struct {
|
||||
logger *zap.Logger
|
||||
kafkaClient *kgo.Client
|
||||
methods *workers_methods.Methods
|
||||
}
|
||||
|
||||
type Deps struct {
|
||||
Logger *zap.Logger
|
||||
KafkaClient *kgo.Client
|
||||
Methods *workers_methods.Methods
|
||||
}
|
||||
|
||||
func NewQueueUpdater(deps Deps) *QueueUpdater {
|
||||
return &QueueUpdater{
|
||||
logger: deps.Logger,
|
||||
kafkaClient: deps.KafkaClient,
|
||||
methods: deps.Methods,
|
||||
}
|
||||
}
|
||||
|
||||
func (wc *QueueUpdater) Start(ctx context.Context) {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
wc.consumeMessages(ctx)
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (wc *QueueUpdater) consumeMessages(ctx context.Context) {
|
||||
fetches := wc.kafkaClient.PollFetches(ctx)
|
||||
iter := fetches.RecordIter()
|
||||
for !iter.Done() {
|
||||
record := iter.Next()
|
||||
var message models.KafkaMessage
|
||||
|
||||
err := json.Unmarshal(record.Value, &message)
|
||||
if err != nil {
|
||||
wc.logger.Error("error unmarshal kafka message:", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
err = wc.processMessages(ctx, message)
|
||||
if err != nil {
|
||||
wc.logger.Error("error processing kafka message:", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (wc *QueueUpdater) processMessages(ctx context.Context, message models.KafkaMessage) error {
|
||||
switch message.Type {
|
||||
case models.UsersUpdate:
|
||||
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
||||
if err != nil {
|
||||
wc.logger.Error("error getting user token from db", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if token != nil {
|
||||
err = wc.methods.CheckUsers(ctx, []model.Token{*token})
|
||||
if err != nil {
|
||||
wc.logger.Error("error update user information in queue worker", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
case models.PipelinesUpdate:
|
||||
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
||||
if err != nil {
|
||||
wc.logger.Error("error getting user token from db", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if token != nil {
|
||||
err = wc.methods.CheckPipelines(ctx, []model.Token{*token})
|
||||
if err != nil {
|
||||
wc.logger.Error("error update user pipelines and steps information in queue worker", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
case models.StepsUpdate:
|
||||
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
||||
if err != nil {
|
||||
wc.logger.Error("error getting user token from db", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if token != nil {
|
||||
err = wc.methods.CheckSteps(ctx, []model.Token{*token})
|
||||
if err != nil {
|
||||
wc.logger.Error("error update user pipelines and steps information in queue worker", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
case models.FieldsUpdate:
|
||||
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
||||
if err != nil {
|
||||
wc.logger.Error("error getting user token from db", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if token != nil {
|
||||
err = wc.methods.CheckFields(ctx, []model.Token{*token})
|
||||
if err != nil {
|
||||
wc.logger.Error("error update user fields information in queue worker", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
//case models.TagsUpdate:
|
||||
// token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
||||
// if err != nil {
|
||||
// wc.logger.Error("error getting user token from db", zap.Error(err))
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// if token != nil {
|
||||
// err = wc.methods.CheckTags(ctx, []model.Token{*token})
|
||||
// if err != nil {
|
||||
// wc.logger.Error("error update user tags information in queue worker", zap.Error(err))
|
||||
// return err
|
||||
// }
|
||||
// }
|
||||
|
||||
case models.UserCreate:
|
||||
token, err := wc.methods.CreateUserFromWebHook(ctx, message)
|
||||
if err != nil {
|
||||
wc.logger.Error("error creating user from webhook request", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
err = wc.methods.CheckUsers(ctx, []model.Token{token})
|
||||
if err != nil {
|
||||
wc.logger.Error("error update user information in queue worker", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
err = wc.methods.CheckPipelines(ctx, []model.Token{token})
|
||||
if err != nil {
|
||||
wc.logger.Error("error update user pipelines information in queue worker", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
err = wc.methods.CheckSteps(ctx, []model.Token{token})
|
||||
if err != nil {
|
||||
wc.logger.Error("error update user steps information in queue worker", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
err = wc.methods.CheckFields(ctx, []model.Token{token})
|
||||
if err != nil {
|
||||
wc.logger.Error("error update user fields information in queue worker", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
//err = wc.methods.CheckTags(ctx, []model.Token{*token})
|
||||
//if err != nil {
|
||||
// wc.logger.Error("error update user tags information in queue worker", zap.Error(err))
|
||||
// return err
|
||||
//}
|
||||
case models.AllDataUpdate:
|
||||
// сначала получаем список токенов
|
||||
newTokens, err := wc.methods.UpdateTokens(ctx)
|
||||
if err != nil {
|
||||
wc.logger.Error("error updating tokens and getting new tokens", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if len(newTokens) > 0 {
|
||||
// обновляем информацию о пользователях
|
||||
err = wc.methods.CheckUsers(ctx, newTokens)
|
||||
if err != nil {
|
||||
wc.logger.Error("error update users information", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// обновляем информацию о pipelines
|
||||
err = wc.methods.CheckPipelines(ctx, newTokens)
|
||||
if err != nil {
|
||||
wc.logger.Error("error updating users pipelines and users pipelines-steps", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// обновляем информацию о steps
|
||||
err = wc.methods.CheckSteps(ctx, newTokens)
|
||||
if err != nil {
|
||||
wc.logger.Error("error updating users pipelines and users pipelines-steps", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// обновляем информацию о tags
|
||||
//err = wc.methods.CheckTags(ctx, newTokens)
|
||||
//if err != nil {
|
||||
// wc.logger.Error("error updating users tags", zap.Error(err))
|
||||
// return err
|
||||
//}
|
||||
|
||||
// обновляем информацию о fields
|
||||
err = wc.methods.CheckFields(ctx, newTokens)
|
||||
if err != nil {
|
||||
wc.logger.Error("error updating users fields", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
//case models.RuleCheck:
|
||||
// token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
|
||||
// if err != nil {
|
||||
// wc.logger.Error("error getting user token from db", zap.Error(err))
|
||||
// return err
|
||||
// }
|
||||
// if token != nil {
|
||||
// err = wc.methods.CheckFields(ctx, []model.Token{*token})
|
||||
// if err != nil {
|
||||
// wc.logger.Error("error update user fields information in queue worker", zap.Error(err))
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// err = wc.methods.CheckFieldRule(ctx, token.AccessToken, message)
|
||||
// if err != nil {
|
||||
// wc.logger.Error("error check field rules for fields rules", zap.Error(err))
|
||||
// return err
|
||||
// }
|
||||
// }
|
||||
case models.UserReLogin:
|
||||
err := wc.methods.UserReLogin(ctx, message)
|
||||
if err != nil {
|
||||
wc.logger.Error("error update user information in re-login method", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
default:
|
||||
wc.logger.Error("incorrect message type", zap.Any("Type:", message))
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (wc *QueueUpdater) Stop(_ context.Context) error {
|
||||
return nil
|
||||
}
|
760
internal/workers_methods/methods.go
Normal file
760
internal/workers_methods/methods.go
Normal file
@ -0,0 +1,760 @@
|
||||
package workers_methods
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"go.uber.org/zap"
|
||||
"penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/models"
|
||||
"penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/tools"
|
||||
"penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/pkg/bitrixClient"
|
||||
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
|
||||
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Methods struct {
|
||||
repo *dal.BitrixDal
|
||||
bitrixClient *bitrixClient.Bitrix
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
type Deps struct {
|
||||
Repo *dal.BitrixDal
|
||||
BitrixClient *bitrixClient.Bitrix
|
||||
Logger *zap.Logger
|
||||
}
|
||||
|
||||
func NewWorkersMethods(deps Deps) *Methods {
|
||||
return &Methods{
|
||||
repo: deps.Repo,
|
||||
bitrixClient: deps.BitrixClient,
|
||||
logger: deps.Logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Methods) UpdateTokens(ctx context.Context) ([]model.Token, error) {
|
||||
allTokens, err := m.repo.BitrixRepo.GetAllTokens(ctx)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting all tokens from db in UpdateTokens", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, oldToken := range allTokens {
|
||||
user, err := m.repo.BitrixRepo.GetCurrentAccount(ctx, oldToken.AccountID)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting account by id in UpdateTokens", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
req := models.UpdateWebHookReq{
|
||||
GrantType: "refresh_token",
|
||||
RefreshToken: oldToken.RefreshToken,
|
||||
}
|
||||
|
||||
resp, err := m.bitrixClient.CreateWebHook(&req, user.Subdomain)
|
||||
if err != nil {
|
||||
m.logger.Error("error create webhook in UpdateTokens", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
newToken := model.Token{
|
||||
AccountID: oldToken.AccountID,
|
||||
RefreshToken: resp.RefreshToken,
|
||||
AccessToken: resp.AccessToken,
|
||||
Expiration: time.Now().Unix() + resp.ExpiresIn,
|
||||
CreatedAt: time.Now().Unix(),
|
||||
}
|
||||
|
||||
err = m.repo.BitrixRepo.WebhookUpdate(ctx, newToken)
|
||||
if err != nil {
|
||||
m.logger.Error("error update token in db", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
newTokens, err := m.repo.BitrixRepo.GetAllTokens(ctx)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting all new updated tokens from db in UpdateTokens", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return newTokens, nil
|
||||
}
|
||||
|
||||
func (m *Methods) CheckUsers(ctx context.Context, allTokens []model.Token) error {
|
||||
listUser := make(map[string][]models.User)
|
||||
for _, token := range allTokens {
|
||||
currentCompany, err := m.repo.BitrixRepo.GetCurrentAccount(ctx, token.AccountID)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting current company", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
userData, err := m.bitrixClient.GetUserList(token.AccessToken, currentCompany.Subdomain)
|
||||
if err != nil {
|
||||
m.logger.Error("error fetching list users", zap.Error(err))
|
||||
break
|
||||
}
|
||||
|
||||
listUser[token.AccountID] = append(listUser[token.AccountID], userData.Result...)
|
||||
}
|
||||
|
||||
for accountID, users := range listUser {
|
||||
currentCompany, err := m.repo.BitrixRepo.GetCurrentAccount(ctx, accountID)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting current company", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
currentUserUsers, err := m.repo.BitrixRepo.GetUserUsersByID(ctx, currentCompany.BitrixID)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting user users by bitrix user id", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
for _, user := range users {
|
||||
found := false
|
||||
for _, currentUser := range currentUserUsers {
|
||||
if user.ID == currentUser.BitrixIDUserID {
|
||||
found = true
|
||||
err := m.repo.BitrixRepo.UpdateBitrixAccountUser(ctx, model.BitrixAccountUser{
|
||||
AccountID: currentUser.AccountID,
|
||||
BitrixIDUserID: currentUser.BitrixIDUserID,
|
||||
Name: user.Name,
|
||||
LastName: user.LastName,
|
||||
SecondName: user.SecondName,
|
||||
Title: user.Title,
|
||||
Email: user.Email,
|
||||
UFDepartment: user.UFDepartment,
|
||||
WorkPosition: user.WorkPosition,
|
||||
})
|
||||
if err != nil {
|
||||
m.logger.Error("failed update user bitrix account in db", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
err := m.repo.BitrixRepo.AddBitrixAccountUser(ctx, model.BitrixAccountUser{
|
||||
AccountID: currentCompany.BitrixID,
|
||||
BitrixIDUserID: user.ID,
|
||||
Name: user.Name,
|
||||
LastName: user.LastName,
|
||||
SecondName: user.SecondName,
|
||||
Title: user.Title,
|
||||
Email: user.Email,
|
||||
UFDepartment: user.UFDepartment,
|
||||
WorkPosition: user.WorkPosition,
|
||||
})
|
||||
if err != nil {
|
||||
m.logger.Error("failed insert user bitrix account in db", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var deletedUserIDs []int64
|
||||
for _, currentUserUser := range currentUserUsers {
|
||||
found := false
|
||||
for _, user := range users {
|
||||
if currentUserUser.BitrixIDUserID == user.ID {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
deletedUserIDs = append(deletedUserIDs, currentUserUser.ID)
|
||||
}
|
||||
}
|
||||
|
||||
if len(deletedUserIDs) > 0 {
|
||||
err := m.repo.BitrixRepo.DeleteUsers(ctx, deletedUserIDs)
|
||||
if err != nil {
|
||||
m.logger.Error("error deleting users in db", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Methods) CheckPipelines(ctx context.Context, tokens []model.Token) error {
|
||||
for _, token := range tokens {
|
||||
currentCompany, err := m.repo.BitrixRepo.GetCurrentAccount(ctx, token.AccountID)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting bitrix company by account quiz id", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
currentCompanyPipelines, err := m.repo.BitrixRepo.GetUserPipelinesByID(ctx, currentCompany.BitrixID)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting company pipelines by bitrix id", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
var listPipelines []models.Category
|
||||
for _, categoryType := range model.CategoryArr {
|
||||
pipelinesResp, err := m.bitrixClient.GetListPipelines(categoryType, token.AccessToken, currentCompany.Subdomain)
|
||||
if err != nil {
|
||||
m.logger.Error("error fetching list pipelines from bitrix", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
listPipelines = append(listPipelines, pipelinesResp.Result.Categories...)
|
||||
}
|
||||
|
||||
if len(listPipelines) > 0 {
|
||||
receivedPipelines := tools.ToPipeline(listPipelines, currentCompany.BitrixID)
|
||||
err = m.repo.BitrixRepo.CheckPipelines(ctx, receivedPipelines)
|
||||
if err != nil {
|
||||
m.logger.Error("error checking pipelines", zap.Error(err))
|
||||
}
|
||||
|
||||
var deletedPipelineIDs []int64
|
||||
for _, currentUserPipeline := range currentCompanyPipelines {
|
||||
found := false
|
||||
for _, receivedPipeline := range receivedPipelines {
|
||||
if currentUserPipeline.BitrixID == receivedPipeline.BitrixID && currentUserPipeline.AccountID == currentCompany.BitrixID {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
deletedPipelineIDs = append(deletedPipelineIDs, currentUserPipeline.ID)
|
||||
}
|
||||
}
|
||||
|
||||
if len(deletedPipelineIDs) > 0 {
|
||||
err := m.repo.BitrixRepo.DeletePipelines(ctx, deletedPipelineIDs)
|
||||
if err != nil {
|
||||
m.logger.Error("error deleting pipelines in db", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Methods) CheckSteps(ctx context.Context, tokens []model.Token) error {
|
||||
for _, token := range tokens {
|
||||
currentCompany, err := m.repo.BitrixRepo.GetCurrentAccount(ctx, token.AccountID)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting bitrix company by account quiz id", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
currentCompanySteps, err := m.repo.BitrixRepo.GetUserStepsByID(ctx, currentCompany.BitrixID)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting company steps by bitrix id", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
var listSteps []models.Steps
|
||||
stepsResp, err := m.bitrixClient.GetListSteps(token.AccessToken, currentCompany.Subdomain)
|
||||
if err != nil {
|
||||
m.logger.Error("error fetching list steps from bitrix", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
listSteps = append(listSteps, stepsResp.Result...)
|
||||
|
||||
if len(listSteps) > 0 {
|
||||
receivedSteps, err := tools.ToStep(listSteps, currentCompany.BitrixID)
|
||||
if err != nil {
|
||||
m.logger.Error("error converting steps to bitrix", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
err = m.repo.BitrixRepo.CheckSteps(ctx, receivedSteps)
|
||||
if err != nil {
|
||||
m.logger.Error("error checking steps", zap.Error(err))
|
||||
}
|
||||
|
||||
var deletedStepIDs []int64
|
||||
for _, currentUserStep := range currentCompanySteps {
|
||||
found := false
|
||||
for _, receivedStep := range receivedSteps {
|
||||
if currentUserStep.BitrixID == receivedStep.BitrixID && currentUserStep.AccountID == currentCompany.BitrixID {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
deletedStepIDs = append(deletedStepIDs, currentUserStep.ID)
|
||||
}
|
||||
}
|
||||
|
||||
if len(deletedStepIDs) > 0 {
|
||||
err := m.repo.BitrixRepo.DeleteSteps(ctx, deletedStepIDs)
|
||||
if err != nil {
|
||||
m.logger.Error("error deleting steps in db", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error {
|
||||
// for _, token := range tokens {
|
||||
// user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, token.AccountID)
|
||||
// if err != nil {
|
||||
// m.logger.Error("error getting amoUserInfo by account quiz id", zap.Error(err))
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// currentUserTags, err := m.repo.AmoRepo.GetUserTagsByID(ctx, user.AmoID)
|
||||
// if err != nil {
|
||||
// m.logger.Error("error getting user tags by amo id", zap.Error(err))
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// var wg sync.WaitGroup
|
||||
// wg.Add(4)
|
||||
//
|
||||
// var tagsMap sync.Map
|
||||
// entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType}
|
||||
// for _, entityType := range entityTypes {
|
||||
// go func(entityType model.EntityType) {
|
||||
// defer wg.Done()
|
||||
// page := 1
|
||||
// limit := 250
|
||||
//
|
||||
// for {
|
||||
// req := models.GetListTagsReq{
|
||||
// Page: page,
|
||||
// Limit: limit,
|
||||
// EntityType: entityType,
|
||||
// }
|
||||
// tags, err := m.amoClient.GetListTags(req, token.AccessToken, user.Subdomain)
|
||||
// if err != nil {
|
||||
// m.logger.Error("error getting list of tags", zap.Error(err))
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// if tags == nil || len(tags.Embedded.Tags) == 0 {
|
||||
// break
|
||||
// }
|
||||
//
|
||||
// tagsMap.Store(entityType, tags.Embedded.Tags)
|
||||
//
|
||||
// page++
|
||||
// }
|
||||
// }(entityType)
|
||||
// }
|
||||
//
|
||||
// wg.Wait()
|
||||
//
|
||||
// var deletedTagIDs []int64
|
||||
// for _, currentUserTag := range currentUserTags {
|
||||
// found := false
|
||||
// for _, entityType := range entityTypes {
|
||||
// if tags, ok := tagsMap.Load(entityType); ok {
|
||||
// if len(tags.([]models.Tag)) > 0 {
|
||||
// receivedTags := tools.ToTag(tags.([]models.Tag), entityType)
|
||||
// for _, tag := range receivedTags {
|
||||
// if currentUserTag.Amoid == tag.Amoid && currentUserTag.Accountid == user.AmoID && currentUserTag.Entity == entityType {
|
||||
// found = true
|
||||
// break
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// if found {
|
||||
// break
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// if !found {
|
||||
// deletedTagIDs = append(deletedTagIDs, currentUserTag.ID)
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// if len(deletedTagIDs) > 0 {
|
||||
// err = m.repo.AmoRepo.DeleteTags(ctx, deletedTagIDs)
|
||||
// if err != nil {
|
||||
// m.logger.Error("error deleting tags in db", zap.Error(err))
|
||||
// return err
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// for _, entityType := range entityTypes {
|
||||
// if tags, ok := tagsMap.Load(entityType); ok {
|
||||
// if len(tags.([]models.Tag)) > 0 {
|
||||
// err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(tags.([]models.Tag), entityType), token.AccountID)
|
||||
// if err != nil {
|
||||
// switch entityType {
|
||||
// case model.LeadsType:
|
||||
// m.logger.Error("error updating leads tags in db", zap.Error(err))
|
||||
// return err
|
||||
// case model.ContactsType:
|
||||
// m.logger.Error("error updating contacts tags in db", zap.Error(err))
|
||||
// return err
|
||||
// case model.CompaniesType:
|
||||
// m.logger.Error("error updating companies tags in db", zap.Error(err))
|
||||
// return err
|
||||
// case model.CustomersType:
|
||||
// m.logger.Error("error updating customer tags in db", zap.Error(err))
|
||||
// return err
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// return nil
|
||||
//}
|
||||
|
||||
func (m *Methods) CheckFields(ctx context.Context, tokens []model.Token) error {
|
||||
for _, token := range tokens {
|
||||
currentCompany, err := m.repo.BitrixRepo.GetCurrentAccount(ctx, token.AccountID)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting company by account quiz id", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
currentUserFields, err := m.repo.BitrixRepo.GetUserFieldsByID(ctx, currentCompany.BitrixID)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting user fields by bitrix id", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(4)
|
||||
|
||||
var fieldsMap sync.Map
|
||||
entityTypes := []model.FieldsType{model.FieldTypeCompany, model.FieldTypeLead, model.FieldTypeContact, model.FieldTypeDeal}
|
||||
for _, entityType := range entityTypes {
|
||||
go func(entityType model.FieldsType) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
fields, err := m.bitrixClient.GetListFields(entityType, token.AccessToken, currentCompany.Subdomain)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting list of fields", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if fields == nil || len(fields.Result) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
fieldsMap.Store(entityType, fields.Result)
|
||||
break
|
||||
}
|
||||
}(entityType)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
var deletedFieldIDs []int64
|
||||
for _, currentUserField := range currentUserFields {
|
||||
found := false
|
||||
for _, entityType := range entityTypes {
|
||||
if fields, ok := fieldsMap.Load(entityType); ok {
|
||||
if len(fields.([]models.Fields)) > 0 {
|
||||
receivedFields := tools.ToField(fields.([]models.Fields), currentCompany.BitrixID)
|
||||
for _, field := range receivedFields {
|
||||
if currentUserField.BitrixID == field.BitrixID && currentUserField.AccountID == currentCompany.BitrixID && currentUserField.EntityID == entityType {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if found {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
deletedFieldIDs = append(deletedFieldIDs, currentUserField.ID)
|
||||
}
|
||||
}
|
||||
|
||||
if len(deletedFieldIDs) > 0 {
|
||||
err = m.repo.BitrixRepo.DeleteFields(ctx, deletedFieldIDs)
|
||||
if err != nil {
|
||||
m.logger.Error("error deleting fields in db", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, entityType := range entityTypes {
|
||||
if fields, ok := fieldsMap.Load(entityType); ok {
|
||||
if len(fields.([]models.Fields)) > 0 {
|
||||
err := m.repo.BitrixRepo.CheckFields(ctx, tools.ToField(fields.([]models.Fields), currentCompany.BitrixID), token.AccountID)
|
||||
if err != nil {
|
||||
switch entityType {
|
||||
case model.FieldTypeLead:
|
||||
m.logger.Error("error updating leads fields in db", zap.Error(err))
|
||||
return err
|
||||
case model.FieldTypeContact:
|
||||
m.logger.Error("error updating contacts fields in db", zap.Error(err))
|
||||
return err
|
||||
case model.FieldTypeCompany:
|
||||
m.logger.Error("error updating companies fields in db", zap.Error(err))
|
||||
return err
|
||||
case model.FieldTypeDeal:
|
||||
m.logger.Error("error updating deal fields in db", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Methods) GetTokenByID(ctx context.Context, accountID string) (*model.Token, error) {
|
||||
token, err := m.repo.BitrixRepo.GetTokenByID(ctx, accountID)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting token by account id from db", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return token, nil
|
||||
}
|
||||
|
||||
func (m *Methods) CreateUserFromWebHook(ctx context.Context, msg models.KafkaMessage) (model.Token, error) {
|
||||
// получаем аксес и рефреш токены по коду авторизации, id битрикса ==member id // todo надо в этом убедиться
|
||||
forGetTokens := models.CreateWebHookReq{
|
||||
GrantType: "authorization_code",
|
||||
Code: msg.AuthCode,
|
||||
}
|
||||
|
||||
tokens, err := m.bitrixClient.CreateWebHook(&forGetTokens, msg.RefererURL)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting webhook in CreateUserFromWebHook:", zap.Error(err))
|
||||
return model.Token{}, err
|
||||
}
|
||||
|
||||
toCreate := model.BitrixAccount{
|
||||
AccountID: msg.AccountID,
|
||||
BitrixID: msg.MemberID,
|
||||
Subdomain: msg.RefererURL,
|
||||
}
|
||||
|
||||
err = m.repo.BitrixRepo.CreateAccount(ctx, toCreate)
|
||||
if err != nil {
|
||||
m.logger.Error("error create account in db in CreateUserFromWebHook", zap.Error(err))
|
||||
return model.Token{}, err
|
||||
}
|
||||
|
||||
err = m.repo.BitrixRepo.WebhookCreate(ctx, model.Token{
|
||||
RefreshToken: tokens.RefreshToken,
|
||||
AccessToken: tokens.AccessToken,
|
||||
AccountID: msg.AccountID,
|
||||
AuthCode: msg.AuthCode,
|
||||
Expiration: time.Now().Unix() + tokens.ExpiresIn,
|
||||
CreatedAt: time.Now().Unix(),
|
||||
})
|
||||
if err != nil {
|
||||
m.logger.Error("error adding tokens to db in CreateUserFromWebHook", zap.Error(err))
|
||||
return model.Token{}, err
|
||||
}
|
||||
|
||||
return model.Token{
|
||||
AccountID: msg.AccountID,
|
||||
RefreshToken: tokens.RefreshToken,
|
||||
AccessToken: tokens.AccessToken,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *Methods) CheckFieldRule(ctx context.Context, token string, msg models.KafkaMessage) error {
|
||||
var (
|
||||
leadIDs, companyIDs, customerIDs, contactIDs []int32
|
||||
leadQuestions, companyQuestions, customerQuestions, contactQuestions []model.Question
|
||||
questionsTypeMap = make(map[model.EntityType][]model.Question)
|
||||
newFields []model.Field
|
||||
lead, company, customer, contact model.FieldRule
|
||||
currentFieldsRule = msg.Rule.Fieldsrule
|
||||
err error
|
||||
)
|
||||
|
||||
user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, msg.AccountID)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting user data by account id in check utms wc method", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
currentFields, err := m.repo.AmoRepo.GetUserFieldsByID(ctx, user.AmoID)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting user fields by amo account id", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
quiz, err := m.repo.QuizRepo.GetQuizById(ctx, msg.AccountID, uint64(msg.Rule.QuizID))
|
||||
if err != nil {
|
||||
m.logger.Error("error getting quiz by quizID and accountID", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
var quizConfig model.QuizContact
|
||||
err = json.Unmarshal([]byte(quiz.Config), &quizConfig)
|
||||
if err != nil {
|
||||
m.logger.Error("error serialization quizConfig to model QuizContact", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
leadIDs = tools.ToQuestionIDs(msg.Rule.Fieldsrule.Lead.Questionid)
|
||||
customerIDs = tools.ToQuestionIDs(msg.Rule.Fieldsrule.Customer.Questionid)
|
||||
companyIDs = tools.ToQuestionIDs(msg.Rule.Fieldsrule.Company.Questionid)
|
||||
contactIDs = tools.ToQuestionIDs(msg.Rule.Fieldsrule.Contact.Questionid)
|
||||
|
||||
getQuestions := func(questionIDs []int32, questions *[]model.Question) {
|
||||
if len(questionIDs) > 0 {
|
||||
*questions, err = m.repo.QuestionRepo.GetQuestionListByIDs(ctx, questionIDs)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting questions", zap.Error(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
getQuestions(leadIDs, &leadQuestions)
|
||||
getQuestions(customerIDs, &customerQuestions)
|
||||
getQuestions(companyIDs, &companyQuestions)
|
||||
getQuestions(contactIDs, &contactQuestions)
|
||||
|
||||
questionsTypeMap[model.LeadsType] = append(questionsTypeMap[model.LeadsType], leadQuestions...)
|
||||
questionsTypeMap[model.CustomersType] = append(questionsTypeMap[model.CustomersType], customerQuestions...)
|
||||
questionsTypeMap[model.CompaniesType] = append(questionsTypeMap[model.CompaniesType], companyQuestions...)
|
||||
questionsTypeMap[model.ContactsType] = append(questionsTypeMap[model.ContactsType], contactQuestions...)
|
||||
|
||||
toCreated, toUpdate := tools.ToCreatedUpdateQuestionRules(questionsTypeMap, currentFields)
|
||||
contactFieldsToCreate, forAdding := tools.ForContactRules(quizConfig, currentFields)
|
||||
|
||||
for entity, fields := range toCreated {
|
||||
if len(fields) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
createdFields, err := m.amoClient.AddFields(fields, entity, token, user.Subdomain)
|
||||
if err != nil {
|
||||
m.logger.Error("error adding fields to amo", zap.Any("type", entity), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
newFields = append(newFields, tools.ToField(createdFields.Embedded.CustomFields, entity)...)
|
||||
}
|
||||
|
||||
if len(contactFieldsToCreate) > 0 {
|
||||
createdFields, err := m.amoClient.AddFields(contactFieldsToCreate, model.ContactsType, token, user.Subdomain)
|
||||
if err != nil {
|
||||
m.logger.Error("error adding fields to amo", zap.Any("type", model.ContactsType), zap.Error(err))
|
||||
}
|
||||
|
||||
contructedFields := tools.ToField(createdFields.Embedded.CustomFields, model.ContactsType)
|
||||
|
||||
newFields = append(newFields, contructedFields...)
|
||||
|
||||
for _, field := range contructedFields {
|
||||
if _, ok := forAdding[field.Name]; ok {
|
||||
forAdding[field.Name] = int(field.Amoid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(newFields) > 0 {
|
||||
err = m.repo.AmoRepo.CheckFields(ctx, newFields, msg.AccountID)
|
||||
if err != nil {
|
||||
m.logger.Error("error updating fields rule in db Check Fields", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
constructFieldRules := func(fieldRuleArrCurrent map[int]int, questions []model.Question, fieldRule *model.FieldRule, currentEntity model.EntityType) {
|
||||
ruleMap := make(map[int]int)
|
||||
for questionID, fieldID := range fieldRuleArrCurrent {
|
||||
if fieldID != 0 {
|
||||
// если fieldID уже заполнен добавляем его как есть
|
||||
ruleMap[questionID] = fieldID
|
||||
continue
|
||||
}
|
||||
for _, question := range questions {
|
||||
if dataQues, ok := toUpdate[questionID]; ok {
|
||||
if dataQues.Entity == currentEntity {
|
||||
ruleMap[questionID] = dataQues.FieldID
|
||||
break
|
||||
}
|
||||
}
|
||||
if questionID == int(question.Id) {
|
||||
// тут также делаем чтобы сверить филд с вопросом
|
||||
title := strings.ToLower(strings.ReplaceAll(question.Title, " ", ""))
|
||||
if title == "" {
|
||||
question.Title = fmt.Sprintf("Вопрос №%d", question.Page)
|
||||
}
|
||||
title = strings.ToLower(strings.ReplaceAll(question.Title, " ", ""))
|
||||
for _, field := range newFields {
|
||||
fieldName := strings.ToLower(strings.ReplaceAll(field.Name, " ", ""))
|
||||
if title == fieldName && field.Entity == currentEntity {
|
||||
ruleMap[questionID] = int(field.Amoid)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
fieldRule.Questionid = ruleMap
|
||||
}
|
||||
|
||||
constructFieldRules(currentFieldsRule.Lead.Questionid, leadQuestions, &lead, model.LeadsType)
|
||||
constructFieldRules(currentFieldsRule.Customer.Questionid, customerQuestions, &customer, model.CustomersType)
|
||||
constructFieldRules(currentFieldsRule.Company.Questionid, companyQuestions, &company, model.CompaniesType)
|
||||
constructFieldRules(currentFieldsRule.Contact.Questionid, contactQuestions, &contact, model.ContactsType)
|
||||
|
||||
err = m.repo.AmoRepo.UpdateFieldRules(ctx, model.Fieldsrule{
|
||||
Lead: lead,
|
||||
Customer: customer,
|
||||
Company: company,
|
||||
Contact: model.ContactRules{ContactRuleMap: forAdding, Questionid: contact.Questionid},
|
||||
}, msg.AccountID, msg.Rule.QuizID)
|
||||
|
||||
if err != nil {
|
||||
m.logger.Error("error updating fields rule in db", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Methods) UserReLogin(ctx context.Context, msg models.KafkaMessage) error {
|
||||
forGetTokens := models.CreateWebHookReq{
|
||||
GrantType: "authorization_code",
|
||||
Code: msg.AuthCode,
|
||||
}
|
||||
|
||||
tokens, err := m.bitrixClient.CreateWebHook(&forGetTokens, msg.RefererURL)
|
||||
if err != nil {
|
||||
m.logger.Error("error getting tokens in method user re-login:", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
toUpdate := model.BitrixAccount{
|
||||
AccountID: msg.AccountID,
|
||||
BitrixID: msg.MemberID,
|
||||
Subdomain: msg.RefererURL,
|
||||
}
|
||||
|
||||
err = m.repo.BitrixRepo.UpdateCurrentAccount(ctx, toUpdate)
|
||||
if err != nil {
|
||||
m.logger.Error("error update account in db in method user re-login", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
err = m.repo.BitrixRepo.WebhookUpdate(ctx, model.Token{
|
||||
RefreshToken: tokens.RefreshToken,
|
||||
AccessToken: tokens.AccessToken,
|
||||
AccountID: msg.AccountID,
|
||||
Expiration: time.Now().Unix() + tokens.ExpiresIn,
|
||||
CreatedAt: time.Now().Unix(),
|
||||
})
|
||||
if err != nil {
|
||||
m.logger.Error("error update tokens in db in method user re-login", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -2,11 +2,9 @@ package bitrixClient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"go.uber.org/zap"
|
||||
"penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/workers/limiter"
|
||||
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
@ -23,7 +21,13 @@ func TestGetListFields(t *testing.T) {
|
||||
RateLimiter: lim,
|
||||
})
|
||||
|
||||
arr := []model.FieldsType{model.FieldTypeLead, model.FieldTypeCompany, model.FieldTypeContact, model.FieldTypeDeal}
|
||||
//result, err := b.GetUserList("262df9660000071b00717f9200000001000007c9148fd5a4211fc98142ea9bc41fc8d3", "b24-ld76ub.bitrix24.ru")
|
||||
//if err != nil {
|
||||
// t.Fatal(err)
|
||||
//}
|
||||
//fmt.Println(result)
|
||||
|
||||
//arr := []model.FieldsType{model.FieldTypeLead, model.FieldTypeCompany, model.FieldTypeContact, model.FieldTypeDeal}
|
||||
//
|
||||
//for i, tipe := range arr {
|
||||
// req := models.AddFields{
|
||||
@ -115,7 +119,7 @@ func TestGetListFields(t *testing.T) {
|
||||
//}
|
||||
//
|
||||
//for _, tipe := range model.CategoryArr {
|
||||
// result, err := b.GetListPipelines(tipe, "0d7af5660000071b00717f9200000001000007f22d556448dc46077a5fd2eaf9b024ed", "b24-ld76ub.bitrix24.ru")
|
||||
// result, err := b.GetListPipelines(tipe, "9d5bf9660000071b00717f9200000001000007b8da5b64a2142c5a0abcfb3e65f89b0c", "b24-ld76ub.bitrix24.ru")
|
||||
// if err != nil {
|
||||
// fmt.Println(err)
|
||||
// }
|
||||
@ -124,25 +128,25 @@ func TestGetListFields(t *testing.T) {
|
||||
// fmt.Println(string(r))
|
||||
//}
|
||||
//
|
||||
for _, tipe := range arr {
|
||||
result, err := b.GetListFields(tipe, "07cff6660000071b00717f92000000010000079e5af88b052dbcfe9e9d98cac38710ad", "b24-ld76ub.bitrix24.ru")
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
|
||||
r, _ := json.Marshal(result)
|
||||
fmt.Println(string(r))
|
||||
fmt.Println(tipe)
|
||||
}
|
||||
//arr2 := []model.TypeStepsEntityID{model.StatusStepsEntityID, model.DealTypeStepsEntityID, model.DealStageStepsEntityID, model.SourceStepsEntityID, model.ContactTypeStepsEntityID, model.CompanyTypeStepsEntityID, model.EmployeesStepsEntityID, model.IndustryStepsEntityID, model.SmartInvoiceStageStepsEntityID, model.QuoteStatusStepsEntityID, model.HonorificStepsEntityID, model.CallListStepsEntityID, model.SmartDocumentStageStepsEntityID}
|
||||
//for _, stepType := range arr2 {
|
||||
//result, err := b.GetListSteps("0d7af5660000071b00717f9200000001000007f22d556448dc46077a5fd2eaf9b024ed", "b24-ld76ub.bitrix24.ru")
|
||||
//for _, tipe := range arr {
|
||||
// result, err := b.GetListFields(tipe, "07cff6660000071b00717f92000000010000079e5af88b052dbcfe9e9d98cac38710ad", "b24-ld76ub.bitrix24.ru")
|
||||
// if err != nil {
|
||||
// fmt.Println(err)
|
||||
// }
|
||||
//for _, i := range result.Result {
|
||||
// fmt.Println(i.Name, i.CategoryID, i.EntityID)
|
||||
//
|
||||
// r, _ := json.Marshal(result)
|
||||
// fmt.Println(string(r))
|
||||
// fmt.Println(tipe)
|
||||
//}
|
||||
//arr2 := []model.TypeStepsEntityID{model.StatusStepsEntityID, model.DealTypeStepsEntityID, model.DealStageStepsEntityID, model.SourceStepsEntityID, model.ContactTypeStepsEntityID, model.CompanyTypeStepsEntityID, model.EmployeesStepsEntityID, model.IndustryStepsEntityID, model.SmartInvoiceStageStepsEntityID, model.QuoteStatusStepsEntityID, model.HonorificStepsEntityID, model.CallListStepsEntityID, model.SmartDocumentStageStepsEntityID}
|
||||
//for _, stepType := range arr2 {
|
||||
result, err := b.GetListSteps("9d5bf9660000071b00717f9200000001000007b8da5b64a2142c5a0abcfb3e65f89b0c", "b24-ld76ub.bitrix24.ru")
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
for _, i := range result.Result {
|
||||
fmt.Println(i.ID)
|
||||
}
|
||||
//}
|
||||
//
|
||||
//"CATEGORY_ID":"1"
|
||||
|
Loading…
Reference in New Issue
Block a user