diff --git a/go.mod b/go.mod index 1896567..5301043 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/lib/pq v1.10.9 github.com/twmb/franz-go v1.17.1 go.uber.org/zap v1.27.0 - penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240927143522-110e25e2853b + penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929145101-b0c03331bc2c penahub.gitlab.yandexcloud.net/devops/linters/golang.git v0.0.0-20240829220549-d35409b619a3 ) diff --git a/go.sum b/go.sum index 8a20d13..86aa225 100644 --- a/go.sum +++ b/go.sum @@ -143,5 +143,17 @@ penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240202120244-c4ef penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240202120244-c4ef330cfe5d/go.mod h1:lTmpjry+8evVkXWbEC+WMOELcFkRD1lFMc7J09mOndM= penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240927143522-110e25e2853b h1:tv9gmeZwVcWGx02SUse3H6w+Y19OQi/FGUcY/QZsL+I= penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240927143522-110e25e2853b/go.mod h1:uOuosXduBzd2WbLH6TDZO7ME7ZextulA662oZ6OsoB0= +penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929094431-3270d3f5b0e1 h1:X8qwZ9YG993BTSOuon9Fufytxafh0te7LvVCayhYkyk= +penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929094431-3270d3f5b0e1/go.mod h1:uOuosXduBzd2WbLH6TDZO7ME7ZextulA662oZ6OsoB0= +penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929101800-f4e3f5575f7f h1:BdS8YTl4M9urIKyQsRXMhL0NBB01sQAlu8kGpXGHlpU= +penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929101800-f4e3f5575f7f/go.mod h1:uOuosXduBzd2WbLH6TDZO7ME7ZextulA662oZ6OsoB0= +penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929110625-ad961d40c364 h1:MUApYm474qNcCwn08CEqN0l8I0T4PheK/q/BPeeCNxE= +penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929110625-ad961d40c364/go.mod h1:uOuosXduBzd2WbLH6TDZO7ME7ZextulA662oZ6OsoB0= +penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929134459-1059762a0b6b h1:mA5qKW2xQ9+UgDToT+ggr+NRoUtHI9sgOpyqZ5DzhHY= +penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929134459-1059762a0b6b/go.mod h1:uOuosXduBzd2WbLH6TDZO7ME7ZextulA662oZ6OsoB0= +penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929144614-fe9b788024f5 h1:JGnvdffETkkD6lgzMacZyT8Kda2/c5UPF6Jv7BCOwOM= +penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929144614-fe9b788024f5/go.mod h1:uOuosXduBzd2WbLH6TDZO7ME7ZextulA662oZ6OsoB0= +penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929145101-b0c03331bc2c h1:yiKPtHsXZ4zIrkXUrZQ9rQm9EwpA8B15cBjEWWMTYI0= +penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240929145101-b0c03331bc2c/go.mod h1:uOuosXduBzd2WbLH6TDZO7ME7ZextulA662oZ6OsoB0= penahub.gitlab.yandexcloud.net/devops/linters/golang.git v0.0.0-20240829220549-d35409b619a3 h1:sf6e2mp582L3i/FMDd2q6QuWm1njRXzYpIX0SipsvM4= penahub.gitlab.yandexcloud.net/devops/linters/golang.git v0.0.0-20240829220549-d35409b619a3/go.mod h1:i7M72RIpkSjcQtHID6KKj9RT/EYZ1rxS6tIPKWa/BSY= diff --git a/internal/models/getUserList.go b/internal/models/getUserList.go index 0d28901..65caf6d 100644 --- a/internal/models/getUserList.go +++ b/internal/models/getUserList.go @@ -6,27 +6,16 @@ type ResponseGetListUsers struct { } type User struct { - ID string `json:"ID"` - Name string `json:"NAME"` - LastName string `json:"LAST_NAME"` - SecondName string `json:"SECOND_NAME"` - Title string `json:"TITLE"` - Email string `json:"EMAIL"` - UFDepartment []int `json:"UF_DEPARTMENT"` - WorkPosition string `json:"WORK_POSITION"` + ID string `json:"ID"` + Name string `json:"NAME"` + LastName string `json:"LAST_NAME"` + SecondName string `json:"SECOND_NAME"` + Title string `json:"TITLE"` + Email string `json:"EMAIL"` + UFDepartment []int32 `json:"UF_DEPARTMENT"` + WorkPosition string `json:"WORK_POSITION"` } type ResponseGetCurrentUser struct { - Result CurrentUser `json:"result"` -} - -type CurrentUser struct { - ID string `json:"ID"` - Name string `json:"NAME"` - LastName string `json:"LAST_NAME"` - SecondName string `json:"SECOND_NAME"` - Title string `json:"TITLE"` - Email string `json:"EMAIL"` - UFDepartment []int `json:"UF_DEPARTMENT"` - WorkPosition string `json:"WORK_POSITION"` + Result User `json:"result"` } diff --git a/internal/models/kafkaMess.go b/internal/models/kafkaMess.go index e896a07..0615b11 100644 --- a/internal/models/kafkaMess.go +++ b/internal/models/kafkaMess.go @@ -4,16 +4,8 @@ type KafkaMessage struct { AccountID string AuthCode string RefererURL string + MemberID string Type MessageType - Rule KafkaRule -} - -type KafkaRule struct { - //QuizID int32 - //PerformerID int32 // айдишник ответственного за сделку - //PipelineID int32 // айдишник воронки - //StepID int32 // айдишник этапа - //Fieldsrule model.Fieldsrule } type MessageType string @@ -27,4 +19,5 @@ const ( AllDataUpdate MessageType = "allDataUpdate" RuleCheck MessageType = "ruleCheck" UserReLogin MessageType = "userReLogin" + StepsUpdate MessageType = "steps" ) diff --git a/internal/service/fields.go b/internal/service/fields.go index 32fe346..4203073 100644 --- a/internal/service/fields.go +++ b/internal/service/fields.go @@ -5,6 +5,7 @@ import ( "database/sql" "go.uber.org/zap" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/models" + "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/tools" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/pj_errors" ) diff --git a/internal/service/steps.go b/internal/service/steps.go index 02e7efd..20d2be2 100644 --- a/internal/service/steps.go +++ b/internal/service/steps.go @@ -24,7 +24,7 @@ func (s *Service) GetStepsWithPagination(ctx context.Context, req *model.Paginat func (s *Service) UpdateListSteps(ctx context.Context, accountID string) error { message := models.KafkaMessage{ AccountID: accountID, - Type: models.PipelinesUpdate, + Type: models.StepsUpdate, } err := s.producer.ToKafkaUpdate(ctx, message) diff --git a/internal/service/webhook.go b/internal/service/webhook.go index 402701a..cdf76ce 100644 --- a/internal/service/webhook.go +++ b/internal/service/webhook.go @@ -29,6 +29,7 @@ func (s *Service) WebhookCreate(ctx context.Context, req ParamsWebhookCreate) er AccountID: req.AccountID, AuthCode: req.Code, RefererURL: req.Domain, + MemberID: req.MemberID, Type: models.UserCreate, } diff --git a/internal/tools/construct.go b/internal/tools/construct.go new file mode 100644 index 0000000..9ec4558 --- /dev/null +++ b/internal/tools/construct.go @@ -0,0 +1,56 @@ +package tools + +import ( + "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/models" + "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model" + "strconv" +) + +func ToPipeline(bitrixPipelines []models.Category, bitrixID string) []model.PipelineBitrix { + var pipelines []model.PipelineBitrix + for _, p := range bitrixPipelines { + pipelines = append(pipelines, model.PipelineBitrix{ + BitrixID: p.ID, + Name: p.Name, + EntityTypeId: p.EntityTypeId, + AccountID: bitrixID, + }) + } + return pipelines +} + +func ToStep(bitrixPipelines []models.Steps, bitrixID string) ([]model.StepBitrix, error) { + var pipelines []model.StepBitrix + for _, p := range bitrixPipelines { + pipelineID, err := strconv.ParseInt(p.ID, 10, 64) + if err != nil { + return nil, err + } + pipelines = append(pipelines, model.StepBitrix{ + BitrixID: p.ID, + AccountID: bitrixID, + EntityID: p.EntityID, + StatusID: p.StatusID, + Name: p.Name, + NameInit: p.NameInit, + Color: p.Color, + PipelineID: int32(pipelineID), + }) + } + return pipelines, nil +} + +func ToField(bitrixFields []models.Fields, bitrixID string) []model.BitrixField { + var fields []model.BitrixField + for _, f := range bitrixFields { + fields = append(fields, model.BitrixField{ + AccountID: bitrixID, + BitrixID: f.ID, + EntityID: f.EntityID, + FieldName: f.FieldName, + EditFromLabel: f.EditFormLabel, + FieldType: f.UserTypeID, + }) + } + return fields +} diff --git a/internal/tools/validate.go b/internal/tools/validate.go new file mode 100644 index 0000000..491cf92 --- /dev/null +++ b/internal/tools/validate.go @@ -0,0 +1,42 @@ +package tools + +import ( + "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model" +) + +func ValidateUtmFields(response *model.UserListBitrixFieldsResp) *model.UserListBitrixFieldsResp { + checkUTM := map[string]struct{}{ + "utm_content": {}, + "utm_medium": {}, + "utm_campaign": {}, + "utm_source": {}, + "utm_term": {}, + "utm_referrer": {}, + "roistat": {}, + "referrer": {}, + "openstat_service": {}, + "openstat_campaign": {}, + "openstat_ad": {}, + "openstat_source": {}, + "from": {}, + "gclientid": {}, + "_ym_uid": {}, + "_ym_counter": {}, + "gclid": {}, + "yclid": {}, + "fbclid": {}, + } + + data := &model.UserListBitrixFieldsResp{ + Count: response.Count, + Items: []model.BitrixField{}, + } + + for _, r := range response.Items { + if _, ok := checkUTM[r.EditFromLabel]; !ok { + data.Items = append(data.Items, r) + } + } + + return data +} diff --git a/internal/workers/queueUpdater/queue_updater.go b/internal/workers/queueUpdater/queue_updater.go new file mode 100644 index 0000000..e1f4cef --- /dev/null +++ b/internal/workers/queueUpdater/queue_updater.go @@ -0,0 +1,260 @@ +package queueUpdater + +import ( + "context" + "encoding/json" + "github.com/twmb/franz-go/pkg/kgo" + "go.uber.org/zap" + "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/models" + "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/workers_methods" + "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model" + "time" +) + +type QueueUpdater struct { + logger *zap.Logger + kafkaClient *kgo.Client + methods *workers_methods.Methods +} + +type Deps struct { + Logger *zap.Logger + KafkaClient *kgo.Client + Methods *workers_methods.Methods +} + +func NewQueueUpdater(deps Deps) *QueueUpdater { + return &QueueUpdater{ + logger: deps.Logger, + kafkaClient: deps.KafkaClient, + methods: deps.Methods, + } +} + +func (wc *QueueUpdater) Start(ctx context.Context) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + wc.consumeMessages(ctx) + + case <-ctx.Done(): + return + } + } +} + +func (wc *QueueUpdater) consumeMessages(ctx context.Context) { + fetches := wc.kafkaClient.PollFetches(ctx) + iter := fetches.RecordIter() + for !iter.Done() { + record := iter.Next() + var message models.KafkaMessage + + err := json.Unmarshal(record.Value, &message) + if err != nil { + wc.logger.Error("error unmarshal kafka message:", zap.Error(err)) + continue + } + + err = wc.processMessages(ctx, message) + if err != nil { + wc.logger.Error("error processing kafka message:", zap.Error(err)) + } + } +} + +func (wc *QueueUpdater) processMessages(ctx context.Context, message models.KafkaMessage) error { + switch message.Type { + case models.UsersUpdate: + token, err := wc.methods.GetTokenByID(ctx, message.AccountID) + if err != nil { + wc.logger.Error("error getting user token from db", zap.Error(err)) + return err + } + + if token != nil { + err = wc.methods.CheckUsers(ctx, []model.Token{*token}) + if err != nil { + wc.logger.Error("error update user information in queue worker", zap.Error(err)) + return err + } + } + + case models.PipelinesUpdate: + token, err := wc.methods.GetTokenByID(ctx, message.AccountID) + if err != nil { + wc.logger.Error("error getting user token from db", zap.Error(err)) + return err + } + + if token != nil { + err = wc.methods.CheckPipelines(ctx, []model.Token{*token}) + if err != nil { + wc.logger.Error("error update user pipelines and steps information in queue worker", zap.Error(err)) + return err + } + } + case models.StepsUpdate: + token, err := wc.methods.GetTokenByID(ctx, message.AccountID) + if err != nil { + wc.logger.Error("error getting user token from db", zap.Error(err)) + return err + } + + if token != nil { + err = wc.methods.CheckSteps(ctx, []model.Token{*token}) + if err != nil { + wc.logger.Error("error update user pipelines and steps information in queue worker", zap.Error(err)) + return err + } + } + + case models.FieldsUpdate: + token, err := wc.methods.GetTokenByID(ctx, message.AccountID) + if err != nil { + wc.logger.Error("error getting user token from db", zap.Error(err)) + return err + } + + if token != nil { + err = wc.methods.CheckFields(ctx, []model.Token{*token}) + if err != nil { + wc.logger.Error("error update user fields information in queue worker", zap.Error(err)) + return err + } + } + + //case models.TagsUpdate: + // token, err := wc.methods.GetTokenByID(ctx, message.AccountID) + // if err != nil { + // wc.logger.Error("error getting user token from db", zap.Error(err)) + // return err + // } + // + // if token != nil { + // err = wc.methods.CheckTags(ctx, []model.Token{*token}) + // if err != nil { + // wc.logger.Error("error update user tags information in queue worker", zap.Error(err)) + // return err + // } + // } + + case models.UserCreate: + token, err := wc.methods.CreateUserFromWebHook(ctx, message) + if err != nil { + wc.logger.Error("error creating user from webhook request", zap.Error(err)) + return err + } + + err = wc.methods.CheckUsers(ctx, []model.Token{token}) + if err != nil { + wc.logger.Error("error update user information in queue worker", zap.Error(err)) + return err + } + + err = wc.methods.CheckPipelines(ctx, []model.Token{token}) + if err != nil { + wc.logger.Error("error update user pipelines information in queue worker", zap.Error(err)) + return err + } + + err = wc.methods.CheckSteps(ctx, []model.Token{token}) + if err != nil { + wc.logger.Error("error update user steps information in queue worker", zap.Error(err)) + return err + } + + err = wc.methods.CheckFields(ctx, []model.Token{token}) + if err != nil { + wc.logger.Error("error update user fields information in queue worker", zap.Error(err)) + return err + } + + //err = wc.methods.CheckTags(ctx, []model.Token{*token}) + //if err != nil { + // wc.logger.Error("error update user tags information in queue worker", zap.Error(err)) + // return err + //} + case models.AllDataUpdate: + // сначала получаем список токенов + newTokens, err := wc.methods.UpdateTokens(ctx) + if err != nil { + wc.logger.Error("error updating tokens and getting new tokens", zap.Error(err)) + return err + } + + if len(newTokens) > 0 { + // обновляем информацию о пользователях + err = wc.methods.CheckUsers(ctx, newTokens) + if err != nil { + wc.logger.Error("error update users information", zap.Error(err)) + return err + } + + // обновляем информацию о pipelines + err = wc.methods.CheckPipelines(ctx, newTokens) + if err != nil { + wc.logger.Error("error updating users pipelines and users pipelines-steps", zap.Error(err)) + return err + } + + // обновляем информацию о steps + err = wc.methods.CheckSteps(ctx, newTokens) + if err != nil { + wc.logger.Error("error updating users pipelines and users pipelines-steps", zap.Error(err)) + return err + } + + // обновляем информацию о tags + //err = wc.methods.CheckTags(ctx, newTokens) + //if err != nil { + // wc.logger.Error("error updating users tags", zap.Error(err)) + // return err + //} + + // обновляем информацию о fields + err = wc.methods.CheckFields(ctx, newTokens) + if err != nil { + wc.logger.Error("error updating users fields", zap.Error(err)) + return err + } + } + //case models.RuleCheck: + // token, err := wc.methods.GetTokenByID(ctx, message.AccountID) + // if err != nil { + // wc.logger.Error("error getting user token from db", zap.Error(err)) + // return err + // } + // if token != nil { + // err = wc.methods.CheckFields(ctx, []model.Token{*token}) + // if err != nil { + // wc.logger.Error("error update user fields information in queue worker", zap.Error(err)) + // return err + // } + // + // err = wc.methods.CheckFieldRule(ctx, token.AccessToken, message) + // if err != nil { + // wc.logger.Error("error check field rules for fields rules", zap.Error(err)) + // return err + // } + // } + case models.UserReLogin: + err := wc.methods.UserReLogin(ctx, message) + if err != nil { + wc.logger.Error("error update user information in re-login method", zap.Error(err)) + return err + } + + default: + wc.logger.Error("incorrect message type", zap.Any("Type:", message)) + return nil + } + return nil +} + +func (wc *QueueUpdater) Stop(_ context.Context) error { + return nil +} diff --git a/internal/workers_methods/methods.go b/internal/workers_methods/methods.go new file mode 100644 index 0000000..5eaaf17 --- /dev/null +++ b/internal/workers_methods/methods.go @@ -0,0 +1,760 @@ +package workers_methods + +import ( + "context" + "encoding/json" + "fmt" + "go.uber.org/zap" + "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/models" + "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/tools" + "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/pkg/bitrixClient" + "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal" + "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model" + "strings" + "sync" + "time" +) + +type Methods struct { + repo *dal.BitrixDal + bitrixClient *bitrixClient.Bitrix + logger *zap.Logger +} + +type Deps struct { + Repo *dal.BitrixDal + BitrixClient *bitrixClient.Bitrix + Logger *zap.Logger +} + +func NewWorkersMethods(deps Deps) *Methods { + return &Methods{ + repo: deps.Repo, + bitrixClient: deps.BitrixClient, + logger: deps.Logger, + } +} + +func (m *Methods) UpdateTokens(ctx context.Context) ([]model.Token, error) { + allTokens, err := m.repo.BitrixRepo.GetAllTokens(ctx) + if err != nil { + m.logger.Error("error getting all tokens from db in UpdateTokens", zap.Error(err)) + return nil, err + } + + for _, oldToken := range allTokens { + user, err := m.repo.BitrixRepo.GetCurrentAccount(ctx, oldToken.AccountID) + if err != nil { + m.logger.Error("error getting account by id in UpdateTokens", zap.Error(err)) + return nil, err + } + req := models.UpdateWebHookReq{ + GrantType: "refresh_token", + RefreshToken: oldToken.RefreshToken, + } + + resp, err := m.bitrixClient.CreateWebHook(&req, user.Subdomain) + if err != nil { + m.logger.Error("error create webhook in UpdateTokens", zap.Error(err)) + continue + } + + newToken := model.Token{ + AccountID: oldToken.AccountID, + RefreshToken: resp.RefreshToken, + AccessToken: resp.AccessToken, + Expiration: time.Now().Unix() + resp.ExpiresIn, + CreatedAt: time.Now().Unix(), + } + + err = m.repo.BitrixRepo.WebhookUpdate(ctx, newToken) + if err != nil { + m.logger.Error("error update token in db", zap.Error(err)) + return nil, err + } + } + + newTokens, err := m.repo.BitrixRepo.GetAllTokens(ctx) + if err != nil { + m.logger.Error("error getting all new updated tokens from db in UpdateTokens", zap.Error(err)) + return nil, err + } + + return newTokens, nil +} + +func (m *Methods) CheckUsers(ctx context.Context, allTokens []model.Token) error { + listUser := make(map[string][]models.User) + for _, token := range allTokens { + currentCompany, err := m.repo.BitrixRepo.GetCurrentAccount(ctx, token.AccountID) + if err != nil { + m.logger.Error("error getting current company", zap.Error(err)) + return err + } + userData, err := m.bitrixClient.GetUserList(token.AccessToken, currentCompany.Subdomain) + if err != nil { + m.logger.Error("error fetching list users", zap.Error(err)) + break + } + + listUser[token.AccountID] = append(listUser[token.AccountID], userData.Result...) + } + + for accountID, users := range listUser { + currentCompany, err := m.repo.BitrixRepo.GetCurrentAccount(ctx, accountID) + if err != nil { + m.logger.Error("error getting current company", zap.Error(err)) + return err + } + + currentUserUsers, err := m.repo.BitrixRepo.GetUserUsersByID(ctx, currentCompany.BitrixID) + if err != nil { + m.logger.Error("error getting user users by bitrix user id", zap.Error(err)) + return err + } + + for _, user := range users { + found := false + for _, currentUser := range currentUserUsers { + if user.ID == currentUser.BitrixIDUserID { + found = true + err := m.repo.BitrixRepo.UpdateBitrixAccountUser(ctx, model.BitrixAccountUser{ + AccountID: currentUser.AccountID, + BitrixIDUserID: currentUser.BitrixIDUserID, + Name: user.Name, + LastName: user.LastName, + SecondName: user.SecondName, + Title: user.Title, + Email: user.Email, + UFDepartment: user.UFDepartment, + WorkPosition: user.WorkPosition, + }) + if err != nil { + m.logger.Error("failed update user bitrix account in db", zap.Error(err)) + return err + } + } + } + if !found { + err := m.repo.BitrixRepo.AddBitrixAccountUser(ctx, model.BitrixAccountUser{ + AccountID: currentCompany.BitrixID, + BitrixIDUserID: user.ID, + Name: user.Name, + LastName: user.LastName, + SecondName: user.SecondName, + Title: user.Title, + Email: user.Email, + UFDepartment: user.UFDepartment, + WorkPosition: user.WorkPosition, + }) + if err != nil { + m.logger.Error("failed insert user bitrix account in db", zap.Error(err)) + return err + } + } + } + + var deletedUserIDs []int64 + for _, currentUserUser := range currentUserUsers { + found := false + for _, user := range users { + if currentUserUser.BitrixIDUserID == user.ID { + found = true + break + } + } + + if !found { + deletedUserIDs = append(deletedUserIDs, currentUserUser.ID) + } + } + + if len(deletedUserIDs) > 0 { + err := m.repo.BitrixRepo.DeleteUsers(ctx, deletedUserIDs) + if err != nil { + m.logger.Error("error deleting users in db", zap.Error(err)) + return err + } + } + } + return nil +} + +func (m *Methods) CheckPipelines(ctx context.Context, tokens []model.Token) error { + for _, token := range tokens { + currentCompany, err := m.repo.BitrixRepo.GetCurrentAccount(ctx, token.AccountID) + if err != nil { + m.logger.Error("error getting bitrix company by account quiz id", zap.Error(err)) + return err + } + currentCompanyPipelines, err := m.repo.BitrixRepo.GetUserPipelinesByID(ctx, currentCompany.BitrixID) + if err != nil { + m.logger.Error("error getting company pipelines by bitrix id", zap.Error(err)) + return err + } + + var listPipelines []models.Category + for _, categoryType := range model.CategoryArr { + pipelinesResp, err := m.bitrixClient.GetListPipelines(categoryType, token.AccessToken, currentCompany.Subdomain) + if err != nil { + m.logger.Error("error fetching list pipelines from bitrix", zap.Error(err)) + continue + } + listPipelines = append(listPipelines, pipelinesResp.Result.Categories...) + } + + if len(listPipelines) > 0 { + receivedPipelines := tools.ToPipeline(listPipelines, currentCompany.BitrixID) + err = m.repo.BitrixRepo.CheckPipelines(ctx, receivedPipelines) + if err != nil { + m.logger.Error("error checking pipelines", zap.Error(err)) + } + + var deletedPipelineIDs []int64 + for _, currentUserPipeline := range currentCompanyPipelines { + found := false + for _, receivedPipeline := range receivedPipelines { + if currentUserPipeline.BitrixID == receivedPipeline.BitrixID && currentUserPipeline.AccountID == currentCompany.BitrixID { + found = true + break + } + } + if !found { + deletedPipelineIDs = append(deletedPipelineIDs, currentUserPipeline.ID) + } + } + + if len(deletedPipelineIDs) > 0 { + err := m.repo.BitrixRepo.DeletePipelines(ctx, deletedPipelineIDs) + if err != nil { + m.logger.Error("error deleting pipelines in db", zap.Error(err)) + return err + } + } + } + } + + return nil +} + +func (m *Methods) CheckSteps(ctx context.Context, tokens []model.Token) error { + for _, token := range tokens { + currentCompany, err := m.repo.BitrixRepo.GetCurrentAccount(ctx, token.AccountID) + if err != nil { + m.logger.Error("error getting bitrix company by account quiz id", zap.Error(err)) + return err + } + + currentCompanySteps, err := m.repo.BitrixRepo.GetUserStepsByID(ctx, currentCompany.BitrixID) + if err != nil { + m.logger.Error("error getting company steps by bitrix id", zap.Error(err)) + return err + } + + var listSteps []models.Steps + stepsResp, err := m.bitrixClient.GetListSteps(token.AccessToken, currentCompany.Subdomain) + if err != nil { + m.logger.Error("error fetching list steps from bitrix", zap.Error(err)) + continue + } + listSteps = append(listSteps, stepsResp.Result...) + + if len(listSteps) > 0 { + receivedSteps, err := tools.ToStep(listSteps, currentCompany.BitrixID) + if err != nil { + m.logger.Error("error converting steps to bitrix", zap.Error(err)) + return err + } + err = m.repo.BitrixRepo.CheckSteps(ctx, receivedSteps) + if err != nil { + m.logger.Error("error checking steps", zap.Error(err)) + } + + var deletedStepIDs []int64 + for _, currentUserStep := range currentCompanySteps { + found := false + for _, receivedStep := range receivedSteps { + if currentUserStep.BitrixID == receivedStep.BitrixID && currentUserStep.AccountID == currentCompany.BitrixID { + found = true + break + } + } + if !found { + deletedStepIDs = append(deletedStepIDs, currentUserStep.ID) + } + } + + if len(deletedStepIDs) > 0 { + err := m.repo.BitrixRepo.DeleteSteps(ctx, deletedStepIDs) + if err != nil { + m.logger.Error("error deleting steps in db", zap.Error(err)) + return err + } + } + } + } + + return nil +} + +//func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error { +// for _, token := range tokens { +// user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, token.AccountID) +// if err != nil { +// m.logger.Error("error getting amoUserInfo by account quiz id", zap.Error(err)) +// return err +// } +// +// currentUserTags, err := m.repo.AmoRepo.GetUserTagsByID(ctx, user.AmoID) +// if err != nil { +// m.logger.Error("error getting user tags by amo id", zap.Error(err)) +// return err +// } +// +// var wg sync.WaitGroup +// wg.Add(4) +// +// var tagsMap sync.Map +// entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType} +// for _, entityType := range entityTypes { +// go func(entityType model.EntityType) { +// defer wg.Done() +// page := 1 +// limit := 250 +// +// for { +// req := models.GetListTagsReq{ +// Page: page, +// Limit: limit, +// EntityType: entityType, +// } +// tags, err := m.amoClient.GetListTags(req, token.AccessToken, user.Subdomain) +// if err != nil { +// m.logger.Error("error getting list of tags", zap.Error(err)) +// return +// } +// +// if tags == nil || len(tags.Embedded.Tags) == 0 { +// break +// } +// +// tagsMap.Store(entityType, tags.Embedded.Tags) +// +// page++ +// } +// }(entityType) +// } +// +// wg.Wait() +// +// var deletedTagIDs []int64 +// for _, currentUserTag := range currentUserTags { +// found := false +// for _, entityType := range entityTypes { +// if tags, ok := tagsMap.Load(entityType); ok { +// if len(tags.([]models.Tag)) > 0 { +// receivedTags := tools.ToTag(tags.([]models.Tag), entityType) +// for _, tag := range receivedTags { +// if currentUserTag.Amoid == tag.Amoid && currentUserTag.Accountid == user.AmoID && currentUserTag.Entity == entityType { +// found = true +// break +// } +// } +// } +// } +// if found { +// break +// } +// } +// +// if !found { +// deletedTagIDs = append(deletedTagIDs, currentUserTag.ID) +// } +// } +// +// if len(deletedTagIDs) > 0 { +// err = m.repo.AmoRepo.DeleteTags(ctx, deletedTagIDs) +// if err != nil { +// m.logger.Error("error deleting tags in db", zap.Error(err)) +// return err +// } +// } +// +// for _, entityType := range entityTypes { +// if tags, ok := tagsMap.Load(entityType); ok { +// if len(tags.([]models.Tag)) > 0 { +// err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(tags.([]models.Tag), entityType), token.AccountID) +// if err != nil { +// switch entityType { +// case model.LeadsType: +// m.logger.Error("error updating leads tags in db", zap.Error(err)) +// return err +// case model.ContactsType: +// m.logger.Error("error updating contacts tags in db", zap.Error(err)) +// return err +// case model.CompaniesType: +// m.logger.Error("error updating companies tags in db", zap.Error(err)) +// return err +// case model.CustomersType: +// m.logger.Error("error updating customer tags in db", zap.Error(err)) +// return err +// } +// } +// } +// } +// } +// } +// return nil +//} + +func (m *Methods) CheckFields(ctx context.Context, tokens []model.Token) error { + for _, token := range tokens { + currentCompany, err := m.repo.BitrixRepo.GetCurrentAccount(ctx, token.AccountID) + if err != nil { + m.logger.Error("error getting company by account quiz id", zap.Error(err)) + return err + } + + currentUserFields, err := m.repo.BitrixRepo.GetUserFieldsByID(ctx, currentCompany.BitrixID) + if err != nil { + m.logger.Error("error getting user fields by bitrix id", zap.Error(err)) + return err + } + + var wg sync.WaitGroup + wg.Add(4) + + var fieldsMap sync.Map + entityTypes := []model.FieldsType{model.FieldTypeCompany, model.FieldTypeLead, model.FieldTypeContact, model.FieldTypeDeal} + for _, entityType := range entityTypes { + go func(entityType model.FieldsType) { + defer wg.Done() + for { + fields, err := m.bitrixClient.GetListFields(entityType, token.AccessToken, currentCompany.Subdomain) + if err != nil { + m.logger.Error("error getting list of fields", zap.Error(err)) + return + } + + if fields == nil || len(fields.Result) == 0 { + break + } + + fieldsMap.Store(entityType, fields.Result) + break + } + }(entityType) + } + + wg.Wait() + + var deletedFieldIDs []int64 + for _, currentUserField := range currentUserFields { + found := false + for _, entityType := range entityTypes { + if fields, ok := fieldsMap.Load(entityType); ok { + if len(fields.([]models.Fields)) > 0 { + receivedFields := tools.ToField(fields.([]models.Fields), currentCompany.BitrixID) + for _, field := range receivedFields { + if currentUserField.BitrixID == field.BitrixID && currentUserField.AccountID == currentCompany.BitrixID && currentUserField.EntityID == entityType { + found = true + break + } + } + } + } + if found { + break + } + } + + if !found { + deletedFieldIDs = append(deletedFieldIDs, currentUserField.ID) + } + } + + if len(deletedFieldIDs) > 0 { + err = m.repo.BitrixRepo.DeleteFields(ctx, deletedFieldIDs) + if err != nil { + m.logger.Error("error deleting fields in db", zap.Error(err)) + return err + } + } + + for _, entityType := range entityTypes { + if fields, ok := fieldsMap.Load(entityType); ok { + if len(fields.([]models.Fields)) > 0 { + err := m.repo.BitrixRepo.CheckFields(ctx, tools.ToField(fields.([]models.Fields), currentCompany.BitrixID), token.AccountID) + if err != nil { + switch entityType { + case model.FieldTypeLead: + m.logger.Error("error updating leads fields in db", zap.Error(err)) + return err + case model.FieldTypeContact: + m.logger.Error("error updating contacts fields in db", zap.Error(err)) + return err + case model.FieldTypeCompany: + m.logger.Error("error updating companies fields in db", zap.Error(err)) + return err + case model.FieldTypeDeal: + m.logger.Error("error updating deal fields in db", zap.Error(err)) + return err + } + } + } + } + } + } + + return nil +} + +func (m *Methods) GetTokenByID(ctx context.Context, accountID string) (*model.Token, error) { + token, err := m.repo.BitrixRepo.GetTokenByID(ctx, accountID) + if err != nil { + m.logger.Error("error getting token by account id from db", zap.Error(err)) + return nil, err + } + + return token, nil +} + +func (m *Methods) CreateUserFromWebHook(ctx context.Context, msg models.KafkaMessage) (model.Token, error) { + // получаем аксес и рефреш токены по коду авторизации, id битрикса ==member id // todo надо в этом убедиться + forGetTokens := models.CreateWebHookReq{ + GrantType: "authorization_code", + Code: msg.AuthCode, + } + + tokens, err := m.bitrixClient.CreateWebHook(&forGetTokens, msg.RefererURL) + if err != nil { + m.logger.Error("error getting webhook in CreateUserFromWebHook:", zap.Error(err)) + return model.Token{}, err + } + + toCreate := model.BitrixAccount{ + AccountID: msg.AccountID, + BitrixID: msg.MemberID, + Subdomain: msg.RefererURL, + } + + err = m.repo.BitrixRepo.CreateAccount(ctx, toCreate) + if err != nil { + m.logger.Error("error create account in db in CreateUserFromWebHook", zap.Error(err)) + return model.Token{}, err + } + + err = m.repo.BitrixRepo.WebhookCreate(ctx, model.Token{ + RefreshToken: tokens.RefreshToken, + AccessToken: tokens.AccessToken, + AccountID: msg.AccountID, + AuthCode: msg.AuthCode, + Expiration: time.Now().Unix() + tokens.ExpiresIn, + CreatedAt: time.Now().Unix(), + }) + if err != nil { + m.logger.Error("error adding tokens to db in CreateUserFromWebHook", zap.Error(err)) + return model.Token{}, err + } + + return model.Token{ + AccountID: msg.AccountID, + RefreshToken: tokens.RefreshToken, + AccessToken: tokens.AccessToken, + }, nil +} + +func (m *Methods) CheckFieldRule(ctx context.Context, token string, msg models.KafkaMessage) error { + var ( + leadIDs, companyIDs, customerIDs, contactIDs []int32 + leadQuestions, companyQuestions, customerQuestions, contactQuestions []model.Question + questionsTypeMap = make(map[model.EntityType][]model.Question) + newFields []model.Field + lead, company, customer, contact model.FieldRule + currentFieldsRule = msg.Rule.Fieldsrule + err error + ) + + user, err := m.repo.AmoRepo.GetCurrentAccount(ctx, msg.AccountID) + if err != nil { + m.logger.Error("error getting user data by account id in check utms wc method", zap.Error(err)) + return err + } + + currentFields, err := m.repo.AmoRepo.GetUserFieldsByID(ctx, user.AmoID) + if err != nil { + m.logger.Error("error getting user fields by amo account id", zap.Error(err)) + return err + } + + quiz, err := m.repo.QuizRepo.GetQuizById(ctx, msg.AccountID, uint64(msg.Rule.QuizID)) + if err != nil { + m.logger.Error("error getting quiz by quizID and accountID", zap.Error(err)) + return err + } + + var quizConfig model.QuizContact + err = json.Unmarshal([]byte(quiz.Config), &quizConfig) + if err != nil { + m.logger.Error("error serialization quizConfig to model QuizContact", zap.Error(err)) + return err + } + + leadIDs = tools.ToQuestionIDs(msg.Rule.Fieldsrule.Lead.Questionid) + customerIDs = tools.ToQuestionIDs(msg.Rule.Fieldsrule.Customer.Questionid) + companyIDs = tools.ToQuestionIDs(msg.Rule.Fieldsrule.Company.Questionid) + contactIDs = tools.ToQuestionIDs(msg.Rule.Fieldsrule.Contact.Questionid) + + getQuestions := func(questionIDs []int32, questions *[]model.Question) { + if len(questionIDs) > 0 { + *questions, err = m.repo.QuestionRepo.GetQuestionListByIDs(ctx, questionIDs) + if err != nil { + m.logger.Error("error getting questions", zap.Error(err)) + return + } + } + } + + getQuestions(leadIDs, &leadQuestions) + getQuestions(customerIDs, &customerQuestions) + getQuestions(companyIDs, &companyQuestions) + getQuestions(contactIDs, &contactQuestions) + + questionsTypeMap[model.LeadsType] = append(questionsTypeMap[model.LeadsType], leadQuestions...) + questionsTypeMap[model.CustomersType] = append(questionsTypeMap[model.CustomersType], customerQuestions...) + questionsTypeMap[model.CompaniesType] = append(questionsTypeMap[model.CompaniesType], companyQuestions...) + questionsTypeMap[model.ContactsType] = append(questionsTypeMap[model.ContactsType], contactQuestions...) + + toCreated, toUpdate := tools.ToCreatedUpdateQuestionRules(questionsTypeMap, currentFields) + contactFieldsToCreate, forAdding := tools.ForContactRules(quizConfig, currentFields) + + for entity, fields := range toCreated { + if len(fields) == 0 { + continue + } + + createdFields, err := m.amoClient.AddFields(fields, entity, token, user.Subdomain) + if err != nil { + m.logger.Error("error adding fields to amo", zap.Any("type", entity), zap.Error(err)) + continue + } + newFields = append(newFields, tools.ToField(createdFields.Embedded.CustomFields, entity)...) + } + + if len(contactFieldsToCreate) > 0 { + createdFields, err := m.amoClient.AddFields(contactFieldsToCreate, model.ContactsType, token, user.Subdomain) + if err != nil { + m.logger.Error("error adding fields to amo", zap.Any("type", model.ContactsType), zap.Error(err)) + } + + contructedFields := tools.ToField(createdFields.Embedded.CustomFields, model.ContactsType) + + newFields = append(newFields, contructedFields...) + + for _, field := range contructedFields { + if _, ok := forAdding[field.Name]; ok { + forAdding[field.Name] = int(field.Amoid) + } + } + } + + if len(newFields) > 0 { + err = m.repo.AmoRepo.CheckFields(ctx, newFields, msg.AccountID) + if err != nil { + m.logger.Error("error updating fields rule in db Check Fields", zap.Error(err)) + return err + } + } + + constructFieldRules := func(fieldRuleArrCurrent map[int]int, questions []model.Question, fieldRule *model.FieldRule, currentEntity model.EntityType) { + ruleMap := make(map[int]int) + for questionID, fieldID := range fieldRuleArrCurrent { + if fieldID != 0 { + // если fieldID уже заполнен добавляем его как есть + ruleMap[questionID] = fieldID + continue + } + for _, question := range questions { + if dataQues, ok := toUpdate[questionID]; ok { + if dataQues.Entity == currentEntity { + ruleMap[questionID] = dataQues.FieldID + break + } + } + if questionID == int(question.Id) { + // тут также делаем чтобы сверить филд с вопросом + title := strings.ToLower(strings.ReplaceAll(question.Title, " ", "")) + if title == "" { + question.Title = fmt.Sprintf("Вопрос №%d", question.Page) + } + title = strings.ToLower(strings.ReplaceAll(question.Title, " ", "")) + for _, field := range newFields { + fieldName := strings.ToLower(strings.ReplaceAll(field.Name, " ", "")) + if title == fieldName && field.Entity == currentEntity { + ruleMap[questionID] = int(field.Amoid) + } + } + } + } + } + fieldRule.Questionid = ruleMap + } + + constructFieldRules(currentFieldsRule.Lead.Questionid, leadQuestions, &lead, model.LeadsType) + constructFieldRules(currentFieldsRule.Customer.Questionid, customerQuestions, &customer, model.CustomersType) + constructFieldRules(currentFieldsRule.Company.Questionid, companyQuestions, &company, model.CompaniesType) + constructFieldRules(currentFieldsRule.Contact.Questionid, contactQuestions, &contact, model.ContactsType) + + err = m.repo.AmoRepo.UpdateFieldRules(ctx, model.Fieldsrule{ + Lead: lead, + Customer: customer, + Company: company, + Contact: model.ContactRules{ContactRuleMap: forAdding, Questionid: contact.Questionid}, + }, msg.AccountID, msg.Rule.QuizID) + + if err != nil { + m.logger.Error("error updating fields rule in db", zap.Error(err)) + return err + } + + return nil +} + +func (m *Methods) UserReLogin(ctx context.Context, msg models.KafkaMessage) error { + forGetTokens := models.CreateWebHookReq{ + GrantType: "authorization_code", + Code: msg.AuthCode, + } + + tokens, err := m.bitrixClient.CreateWebHook(&forGetTokens, msg.RefererURL) + if err != nil { + m.logger.Error("error getting tokens in method user re-login:", zap.Error(err)) + return err + } + + toUpdate := model.BitrixAccount{ + AccountID: msg.AccountID, + BitrixID: msg.MemberID, + Subdomain: msg.RefererURL, + } + + err = m.repo.BitrixRepo.UpdateCurrentAccount(ctx, toUpdate) + if err != nil { + m.logger.Error("error update account in db in method user re-login", zap.Error(err)) + return err + } + + err = m.repo.BitrixRepo.WebhookUpdate(ctx, model.Token{ + RefreshToken: tokens.RefreshToken, + AccessToken: tokens.AccessToken, + AccountID: msg.AccountID, + Expiration: time.Now().Unix() + tokens.ExpiresIn, + CreatedAt: time.Now().Unix(), + }) + if err != nil { + m.logger.Error("error update tokens in db in method user re-login", zap.Error(err)) + return err + } + + return nil +} diff --git a/pkg/bitrixClient/bitrix_test.go b/pkg/bitrixClient/bitrix_test.go index d6c3bb2..baa9b65 100644 --- a/pkg/bitrixClient/bitrix_test.go +++ b/pkg/bitrixClient/bitrix_test.go @@ -2,11 +2,9 @@ package bitrixClient import ( "context" - "encoding/json" "fmt" "go.uber.org/zap" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/workers/limiter" - "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model" "testing" "time" ) @@ -23,7 +21,13 @@ func TestGetListFields(t *testing.T) { RateLimiter: lim, }) - arr := []model.FieldsType{model.FieldTypeLead, model.FieldTypeCompany, model.FieldTypeContact, model.FieldTypeDeal} + //result, err := b.GetUserList("262df9660000071b00717f9200000001000007c9148fd5a4211fc98142ea9bc41fc8d3", "b24-ld76ub.bitrix24.ru") + //if err != nil { + // t.Fatal(err) + //} + //fmt.Println(result) + + //arr := []model.FieldsType{model.FieldTypeLead, model.FieldTypeCompany, model.FieldTypeContact, model.FieldTypeDeal} // //for i, tipe := range arr { // req := models.AddFields{ @@ -115,7 +119,7 @@ func TestGetListFields(t *testing.T) { //} // //for _, tipe := range model.CategoryArr { - // result, err := b.GetListPipelines(tipe, "0d7af5660000071b00717f9200000001000007f22d556448dc46077a5fd2eaf9b024ed", "b24-ld76ub.bitrix24.ru") + // result, err := b.GetListPipelines(tipe, "9d5bf9660000071b00717f9200000001000007b8da5b64a2142c5a0abcfb3e65f89b0c", "b24-ld76ub.bitrix24.ru") // if err != nil { // fmt.Println(err) // } @@ -124,25 +128,25 @@ func TestGetListFields(t *testing.T) { // fmt.Println(string(r)) //} // - for _, tipe := range arr { - result, err := b.GetListFields(tipe, "07cff6660000071b00717f92000000010000079e5af88b052dbcfe9e9d98cac38710ad", "b24-ld76ub.bitrix24.ru") - if err != nil { - fmt.Println(err) - } - - r, _ := json.Marshal(result) - fmt.Println(string(r)) - fmt.Println(tipe) - } + //for _, tipe := range arr { + // result, err := b.GetListFields(tipe, "07cff6660000071b00717f92000000010000079e5af88b052dbcfe9e9d98cac38710ad", "b24-ld76ub.bitrix24.ru") + // if err != nil { + // fmt.Println(err) + // } + // + // r, _ := json.Marshal(result) + // fmt.Println(string(r)) + // fmt.Println(tipe) + //} //arr2 := []model.TypeStepsEntityID{model.StatusStepsEntityID, model.DealTypeStepsEntityID, model.DealStageStepsEntityID, model.SourceStepsEntityID, model.ContactTypeStepsEntityID, model.CompanyTypeStepsEntityID, model.EmployeesStepsEntityID, model.IndustryStepsEntityID, model.SmartInvoiceStageStepsEntityID, model.QuoteStatusStepsEntityID, model.HonorificStepsEntityID, model.CallListStepsEntityID, model.SmartDocumentStageStepsEntityID} //for _, stepType := range arr2 { - //result, err := b.GetListSteps("0d7af5660000071b00717f9200000001000007f22d556448dc46077a5fd2eaf9b024ed", "b24-ld76ub.bitrix24.ru") - //if err != nil { - // fmt.Println(err) - //} - //for _, i := range result.Result { - // fmt.Println(i.Name, i.CategoryID, i.EntityID) - //} + result, err := b.GetListSteps("9d5bf9660000071b00717f9200000001000007b8da5b64a2142c5a0abcfb3e65f89b0c", "b24-ld76ub.bitrix24.ru") + if err != nil { + fmt.Println(err) + } + for _, i := range result.Result { + fmt.Println(i.ID) + } //} // //"CATEGORY_ID":"1"