change kafka message, add todo and change amo types

This commit is contained in:
Pavel 2024-04-29 17:09:40 +03:00
parent 1a3be6594a
commit eb70c134bf
8 changed files with 87 additions and 70 deletions

2
go.mod

@ -11,7 +11,7 @@ require (
github.com/twmb/franz-go v1.16.1
go.uber.org/zap v1.27.0
google.golang.org/protobuf v1.33.0
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240429122102-dea1519ce237
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240429140348-c5336475b2b9
penahub.gitlab.yandexcloud.net/backend/quiz/core.git v0.0.0-20240219174804-d78fd38511af
)

24
go.sum

@ -151,27 +151,7 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240223054633-6cb3d5ce45b6 h1:oV+/HNX+JPoQ3/GUx08hio7d45WpY0AMGrFs7j70QlA=
penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240223054633-6cb3d5ce45b6/go.mod h1:lTmpjry+8evVkXWbEC+WMOELcFkRD1lFMc7J09mOndM=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240423120424-9cbe8b6b275b h1:mVDoUMJON/WZSikdZ/i68Pt/A4K9irFhiI4diFRWL4A=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240423120424-9cbe8b6b275b/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240423150639-8f7c74046bec h1:o/bnugiwEuc+kskCmKhkBX7tlpCwFgKTFXR2h4vpF8g=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240423150639-8f7c74046bec/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240428130935-07bf63c8dfe5 h1:utb1fUT7/nW6btd1saxUF3t9FlI/KnQa3K6XN8JE+WI=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240428130935-07bf63c8dfe5/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240428142604-10012b3fc8dd h1:Lyf00e8jERXTh3UL2b5lXTcdv8VHage5ByNWVIDuB5o=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240428142604-10012b3fc8dd/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240428145500-e5a9eb4fe300 h1:Pu65fvuA/BIL/q07KyX60RpS+zYukY0l5lNNeX6qdLY=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240428145500-e5a9eb4fe300/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240429074720-d06dd24c3056 h1:Mv1HgYuL2aFm6u+8ax/DkE+WlQEqoW7zGxXo8T7CoaU=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240429074720-d06dd24c3056/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240429084500-13aacb936b85 h1:G8eNpgntqhgWjQvJbzsu7tENo/b51dkUonDz3g55cgg=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240429084500-13aacb936b85/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240429093212-6021decc203c h1:5iyvkNW9EnHlsyroFlcC67Jfg1Tz2Rh2C7cC+gXWIjc=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240429093212-6021decc203c/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240429103033-d5f9e390b44d h1:zKwnWM+1k04IEwvcFix3t7UFOl1lZXuNPwJHGNqhmCY=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240429103033-d5f9e390b44d/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240429105759-94595c15b582 h1:sOYLEkZrXzPnuomtnap62eDwPzomkDthKUvSisNtbNM=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240429105759-94595c15b582/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240429122102-dea1519ce237 h1:A+57F5s7elmKvvkn3JUkZdN3l1qgONQfwYU9Hxgbh08=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240429122102-dea1519ce237/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240429140348-c5336475b2b9 h1:2Nchy6DOWIdgqK+FAmhjJqNCUYTwDa35+MUA0eiyEsg=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240429140348-c5336475b2b9/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64=
penahub.gitlab.yandexcloud.net/backend/quiz/core.git v0.0.0-20240219174804-d78fd38511af h1:jQ7HaXSutDX5iepU7VRImxhikK7lV/lBKkiloOZ4Emo=
penahub.gitlab.yandexcloud.net/backend/quiz/core.git v0.0.0-20240219174804-d78fd38511af/go.mod h1:5S5YwjSXWmnEKjBjG6MtyGtFmljjukDRS8CwHk/CF/I=

@ -1,10 +1,21 @@
package models
import "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
type KafkaMessage struct {
AccountID string
AuthCode *string
Type MessageType
UTMs []int32
Rule KafkaRule
}
type KafkaRule struct {
QuizID int32
PerformerID int32 // айдишник ответственного за сделку
PipelineID int32 // айдишник воронки
StepID int32 // айдишник этапа
Utms []int32 // список UTM для этого опроса
Fieldsrule model.Fieldsrule
}
type MessageType string
@ -16,5 +27,5 @@ const (
TagsUpdate MessageType = "tags"
UserCreate MessageType = "userCreate"
AllDataUpdate MessageType = "allDataUpdate"
CheckUTMs MessageType = "checkUTMs"
RuleCheck MessageType = "ruleCheck"
)

@ -10,8 +10,15 @@ import (
func (s *Service) ChangeQuizSettings(ctx context.Context, request *model.RulesReq, accountID string, quizID int) (*model.Rule, error) {
messageForUTM := models.KafkaMessage{
AccountID: accountID,
Type: models.CheckUTMs,
UTMs: request.Utms,
Type: models.RuleCheck,
Rule: models.KafkaRule{
QuizID: int32(quizID),
PerformerID: request.PerformerID,
PipelineID: request.PipelineID,
StepID: request.StepID,
Utms: request.Utms,
Fieldsrule: request.Fieldsrule,
},
}
err := s.producer.ToKafkaUpdate(ctx, messageForUTM)
@ -22,8 +29,6 @@ func (s *Service) ChangeQuizSettings(ctx context.Context, request *model.RulesRe
}
}
//todo fieldsrule
rule, err := s.repository.AmoRepo.ChangeQuizSettings(ctx, request, accountID, quizID)
if err != nil {
s.logger.Error("error change quiz settings", zap.Error(err))
@ -36,8 +41,15 @@ func (s *Service) ChangeQuizSettings(ctx context.Context, request *model.RulesRe
func (s *Service) SetQuizSettings(ctx context.Context, request *model.RulesReq, accountID string, quizID int) (*model.Rule, error) {
messageForUTM := models.KafkaMessage{
AccountID: accountID,
Type: models.CheckUTMs,
UTMs: request.Utms,
Type: models.RuleCheck,
Rule: models.KafkaRule{
QuizID: int32(quizID),
PerformerID: request.PerformerID,
PipelineID: request.PipelineID,
StepID: request.StepID,
Utms: request.Utms,
Fieldsrule: request.Fieldsrule,
},
}
err := s.producer.ToKafkaUpdate(ctx, messageForUTM)

@ -192,7 +192,7 @@ func (wc *QueueUpdater) processMessages(ctx context.Context, message models.Kafk
wc.logger.Error("error updating users fields", zap.Error(err))
return err
}
case models.CheckUTMs:
case models.RuleCheck:
token, err := wc.methods.GetTokenByID(ctx, message.AccountID)
if err != nil {
wc.logger.Error("error getting user token from db", zap.Error(err))
@ -205,11 +205,17 @@ func (wc *QueueUpdater) processMessages(ctx context.Context, message models.Kafk
return err
}
err = wc.methods.CheckUTMs(ctx, token.AccessToken, message.UTMs)
err = wc.methods.CheckUTMs(ctx, token.AccessToken, message.Rule.Utms)
if err != nil {
wc.logger.Error("error check user utms and add fields or update in amo or db", zap.Error(err))
return err
}
err = wc.methods.CheckFieldRule(ctx, token.AccessToken, message.AccountID, message.Rule)
if err != nil {
wc.logger.Error("error check field rules for fields rules", zap.Error(err))
return err
}
}
default:

@ -170,7 +170,7 @@ func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error {
var companiesTags []models.Tag
var customersTags []models.Tag
entityTypes := []model.EntityType{model.LeadsTags, model.ContactsTags, model.CompaniesTags, model.CustomersTags}
entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType}
for _, entityType := range entityTypes {
page := 1
limit := 250
@ -192,13 +192,13 @@ func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error {
}
switch entityType {
case model.LeadsTags:
case model.LeadsType:
leadsTags = append(leadsTags, tags.Embedded.Tags...)
case model.ContactsTags:
case model.ContactsType:
contactsTags = append(contactsTags, tags.Embedded.Tags...)
case model.CompaniesTags:
case model.CompaniesType:
companiesTags = append(companiesTags, tags.Embedded.Tags...)
case model.CustomersTags:
case model.CustomersType:
customersTags = append(customersTags, tags.Embedded.Tags...)
}
@ -208,7 +208,7 @@ func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error {
for _, entityType := range entityTypes {
switch entityType {
case model.LeadsTags:
case model.LeadsType:
if len(leadsTags) > 0 {
err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(leadsTags, entityType), token.AccountID)
if err != nil {
@ -216,7 +216,7 @@ func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error {
return err
}
}
case model.ContactsTags:
case model.ContactsType:
if len(contactsTags) > 0 {
err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(contactsTags, entityType), token.AccountID)
if err != nil {
@ -224,7 +224,7 @@ func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error {
return err
}
}
case model.CompaniesTags:
case model.CompaniesType:
if len(companiesTags) > 0 {
err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(companiesTags, entityType), token.AccountID)
if err != nil {
@ -232,7 +232,7 @@ func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error {
return err
}
}
case model.CustomersTags:
case model.CustomersType:
if len(customersTags) > 0 {
err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(customersTags, entityType), token.AccountID)
if err != nil {
@ -253,7 +253,7 @@ func (m *Methods) CheckFields(ctx context.Context, tokens []model.Token) error {
var companiesFields []models.CustomField
var customersFields []models.CustomField
entityTypes := []model.EntityType{model.LeadsTags, model.ContactsTags, model.CompaniesTags, model.CustomersTags}
entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType}
for _, entityType := range entityTypes {
page := 1
limit := 50
@ -275,13 +275,13 @@ func (m *Methods) CheckFields(ctx context.Context, tokens []model.Token) error {
}
switch entityType {
case model.LeadsTags:
case model.LeadsType:
leadsFields = append(leadsFields, fields.Embedded.CustomFields...)
case model.ContactsTags:
case model.ContactsType:
contactsFields = append(contactsFields, fields.Embedded.CustomFields...)
case model.CompaniesTags:
case model.CompaniesType:
companiesFields = append(companiesFields, fields.Embedded.CustomFields...)
case model.CustomersTags:
case model.CustomersType:
customersFields = append(customersFields, fields.Embedded.CustomFields...)
}
@ -291,7 +291,7 @@ func (m *Methods) CheckFields(ctx context.Context, tokens []model.Token) error {
for _, entityType := range entityTypes {
switch entityType {
case model.LeadsTags:
case model.LeadsType:
if len(leadsFields) > 0 {
err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(leadsFields, entityType), token.AccountID)
if err != nil {
@ -299,7 +299,7 @@ func (m *Methods) CheckFields(ctx context.Context, tokens []model.Token) error {
return err
}
}
case model.ContactsTags:
case model.ContactsType:
if len(contactsFields) > 0 {
err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(contactsFields, entityType), token.AccountID)
if err != nil {
@ -307,7 +307,7 @@ func (m *Methods) CheckFields(ctx context.Context, tokens []model.Token) error {
return err
}
}
case model.CompaniesTags:
case model.CompaniesType:
if len(companiesFields) > 0 {
err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(companiesFields, entityType), token.AccountID)
if err != nil {
@ -315,7 +315,7 @@ func (m *Methods) CheckFields(ctx context.Context, tokens []model.Token) error {
return err
}
}
case model.CustomersTags:
case model.CustomersType:
if len(customersFields) > 0 {
err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(customersFields, entityType), token.AccountID)
if err != nil {
@ -412,7 +412,7 @@ func (m *Methods) CheckUTMs(ctx context.Context, token string, ids []int32) erro
}
if len(toCreated) > 0 {
createdFields, err := m.amoClient.AddLeadsFields(toCreated, token)
createdFields, err := m.amoClient.AddFields(toCreated, model.LeadsType, token)
if err != nil {
m.logger.Error("error created amo fields", zap.Error(err))
return err
@ -428,3 +428,10 @@ func (m *Methods) CheckUTMs(ctx context.Context, token string, ids []int32) erro
return nil
}
func (m *Methods) CheckFieldRule(ctx context.Context, token string, accountId string, req models.KafkaRule) error {
// todo
// после того как после utm филды обновились, можно проверить также Fieldsrule, добавить в амо поля и обновить в правилах его
return nil
}

@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"net/url"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
"time"
"github.com/gofiber/fiber/v2"
@ -387,10 +388,10 @@ func (a *Amo) GetUserByID(id int32, accessToken string) (*models.OneUserInfo, er
}
}
func (a *Amo) AddLeadsFields(req []models.AddLeadsFields, accessToken string) (*models.ResponseGetListFields, error) {
func (a *Amo) AddFields(req []models.AddLeadsFields, entity model.EntityType, accessToken string) (*models.ResponseGetListFields, error) {
for {
if a.rateLimiter.Check() {
uri := fmt.Sprintf("%s/api/v4/leads/custom_fields", a.baseApiURL)
uri := fmt.Sprintf("%s/api/v4/%s/custom_fields", a.baseApiURL, entity)
bodyBytes, err := json.Marshal(req)
if err != nil {
a.logger.Error("error marshal req in AddLeadsFields:", zap.Error(err))

@ -315,19 +315,19 @@ func checkTags(ctx context.Context, repo *dal.AmoDal) error {
}
for i := 0; i < 9; i++ {
accID := strconv.Itoa(i)
err = repo.AmoRepo.CheckTags(ctx, tools.ToTag(testLeadsTags.Embedded.Tags, model.LeadsTags), accID)
err = repo.AmoRepo.CheckTags(ctx, tools.ToTag(testLeadsTags.Embedded.Tags, model.LeadsType), accID)
if err != nil {
fmt.Println(err)
}
err = repo.AmoRepo.CheckTags(ctx, tools.ToTag(testCompaniesTags.Embedded.Tags, model.CompaniesTags), accID)
err = repo.AmoRepo.CheckTags(ctx, tools.ToTag(testCompaniesTags.Embedded.Tags, model.CompaniesType), accID)
if err != nil {
fmt.Println(err)
}
err = repo.AmoRepo.CheckTags(ctx, tools.ToTag(testCustomersTags.Embedded.Tags, model.CustomersTags), accID)
err = repo.AmoRepo.CheckTags(ctx, tools.ToTag(testCustomersTags.Embedded.Tags, model.CustomersType), accID)
if err != nil {
fmt.Println(err)
}
err = repo.AmoRepo.CheckTags(ctx, tools.ToTag(testContactsTags.Embedded.Tags, model.ContactsTags), accID)
err = repo.AmoRepo.CheckTags(ctx, tools.ToTag(testContactsTags.Embedded.Tags, model.ContactsType), accID)
if err != nil {
fmt.Println(err)
}
@ -335,19 +335,19 @@ func checkTags(ctx context.Context, repo *dal.AmoDal) error {
for i := 0; i < 9; i++ {
accID := strconv.Itoa(i)
err = repo.AmoRepo.CheckTags(ctx, tools.ToTag(testLeadsTags2.Embedded.Tags, model.LeadsTags), accID)
err = repo.AmoRepo.CheckTags(ctx, tools.ToTag(testLeadsTags2.Embedded.Tags, model.LeadsType), accID)
if err != nil {
fmt.Println(err)
}
err = repo.AmoRepo.CheckTags(ctx, tools.ToTag(testCompaniesTags2.Embedded.Tags, model.CompaniesTags), accID)
err = repo.AmoRepo.CheckTags(ctx, tools.ToTag(testCompaniesTags2.Embedded.Tags, model.CompaniesType), accID)
if err != nil {
fmt.Println(err)
}
err = repo.AmoRepo.CheckTags(ctx, tools.ToTag(testCustomersTags2.Embedded.Tags, model.CustomersTags), accID)
err = repo.AmoRepo.CheckTags(ctx, tools.ToTag(testCustomersTags2.Embedded.Tags, model.CustomersType), accID)
if err != nil {
fmt.Println(err)
}
err = repo.AmoRepo.CheckTags(ctx, tools.ToTag(testContactsTags2.Embedded.Tags, model.ContactsTags), accID)
err = repo.AmoRepo.CheckTags(ctx, tools.ToTag(testContactsTags2.Embedded.Tags, model.ContactsType), accID)
if err != nil {
fmt.Println(err)
}
@ -428,19 +428,19 @@ func checkFields(ctx context.Context, repo *dal.AmoDal) error {
for i := 0; i < 9; i++ {
accID := strconv.Itoa(i)
err = repo.AmoRepo.CheckFields(ctx, tools.ToField(testLeadsFields.Embedded.CustomFields, model.LeadsTags), accID)
err = repo.AmoRepo.CheckFields(ctx, tools.ToField(testLeadsFields.Embedded.CustomFields, model.LeadsType), accID)
if err != nil {
return err
}
err = repo.AmoRepo.CheckFields(ctx, tools.ToField(testCompaniesFields.Embedded.CustomFields, model.CompaniesTags), accID)
err = repo.AmoRepo.CheckFields(ctx, tools.ToField(testCompaniesFields.Embedded.CustomFields, model.CompaniesType), accID)
if err != nil {
return err
}
err = repo.AmoRepo.CheckFields(ctx, tools.ToField(testCustomersFields.Embedded.CustomFields, model.CustomersTags), accID)
err = repo.AmoRepo.CheckFields(ctx, tools.ToField(testCustomersFields.Embedded.CustomFields, model.CustomersType), accID)
if err != nil {
return err
}
err = repo.AmoRepo.CheckFields(ctx, tools.ToField(testContactsFields.Embedded.CustomFields, model.ContactsTags), accID)
err = repo.AmoRepo.CheckFields(ctx, tools.ToField(testContactsFields.Embedded.CustomFields, model.ContactsType), accID)
if err != nil {
return err
}
@ -448,19 +448,19 @@ func checkFields(ctx context.Context, repo *dal.AmoDal) error {
for i := 0; i < 9; i++ {
accID := strconv.Itoa(i)
err = repo.AmoRepo.CheckFields(ctx, tools.ToField(testLeadsFields2.Embedded.CustomFields, model.LeadsTags), accID)
err = repo.AmoRepo.CheckFields(ctx, tools.ToField(testLeadsFields2.Embedded.CustomFields, model.LeadsType), accID)
if err != nil {
return err
}
err = repo.AmoRepo.CheckFields(ctx, tools.ToField(testCompaniesFields2.Embedded.CustomFields, model.CompaniesTags), accID)
err = repo.AmoRepo.CheckFields(ctx, tools.ToField(testCompaniesFields2.Embedded.CustomFields, model.CompaniesType), accID)
if err != nil {
return err
}
err = repo.AmoRepo.CheckFields(ctx, tools.ToField(testCustomersFields2.Embedded.CustomFields, model.CustomersTags), accID)
err = repo.AmoRepo.CheckFields(ctx, tools.ToField(testCustomersFields2.Embedded.CustomFields, model.CustomersType), accID)
if err != nil {
return err
}
err = repo.AmoRepo.CheckFields(ctx, tools.ToField(testContactsFields2.Embedded.CustomFields, model.ContactsTags), accID)
err = repo.AmoRepo.CheckFields(ctx, tools.ToField(testContactsFields2.Embedded.CustomFields, model.ContactsType), accID)
if err != nil {
return err
}