future with sync map

This commit is contained in:
Pavel 2024-05-03 18:25:50 +03:00
parent 712c44458e
commit eaf8ec63db
3 changed files with 99 additions and 133 deletions

2
go.mod

@ -12,7 +12,7 @@ require (
github.com/twmb/franz-go v1.16.1
go.uber.org/zap v1.27.0
google.golang.org/protobuf v1.33.0
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240503082031-f6c3470bd872
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240503151157-e610703cbfc7
penahub.gitlab.yandexcloud.net/backend/quiz/core.git v0.0.0-20240219174804-d78fd38511af
)

4
go.sum

@ -151,7 +151,7 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240223054633-6cb3d5ce45b6 h1:oV+/HNX+JPoQ3/GUx08hio7d45WpY0AMGrFs7j70QlA=
penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240223054633-6cb3d5ce45b6/go.mod h1:lTmpjry+8evVkXWbEC+WMOELcFkRD1lFMc7J09mOndM=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240503082031-f6c3470bd872 h1:yOSXTJkpx0MCxKaKnQxGtB2fNLob3FHxFE9SO7/35Q0=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240503082031-f6c3470bd872/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240503151157-e610703cbfc7 h1:ejEQamCmMllFGT3HlLftchBDq5lzWdvnBTlMqxCFiZo=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240503151157-e610703cbfc7/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64=
penahub.gitlab.yandexcloud.net/backend/quiz/core.git v0.0.0-20240219174804-d78fd38511af h1:jQ7HaXSutDX5iepU7VRImxhikK7lV/lBKkiloOZ4Emo=
penahub.gitlab.yandexcloud.net/backend/quiz/core.git v0.0.0-20240219174804-d78fd38511af/go.mod h1:5S5YwjSXWmnEKjBjG6MtyGtFmljjukDRS8CwHk/CF/I=

@ -8,6 +8,7 @@ import (
"go.uber.org/zap"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
"sync"
"time"
)
@ -89,6 +90,7 @@ func (m *Methods) CheckUsers(ctx context.Context, allTokens []model.Token) error
listUser[token.AccountID] = append(listUser[token.AccountID], userData.Embedded.Users...)
}
var usersForUpdateAndCreate []model.User
for accountID, users := range listUser {
mainAccount, err := m.repo.AmoRepo.GetCurrentAccount(ctx, accountID)
if err != nil {
@ -110,7 +112,7 @@ func (m *Methods) CheckUsers(ctx context.Context, allTokens []model.Token) error
}
}
err := m.repo.AmoRepo.CheckAndUpdateUsers(ctx, model.User{
usersForUpdateAndCreate = append(usersForUpdateAndCreate, model.User{
AmoID: user.ID,
Name: user.FullName,
Group: int32(user.Rights.GroupID),
@ -118,13 +120,13 @@ func (m *Methods) CheckUsers(ctx context.Context, allTokens []model.Token) error
Email: user.Email,
Amouserid: mainAccount.Amouserid,
})
if err != nil {
m.logger.Error("error update users list data in db", zap.Error(err))
return err
}
}
}
err := m.repo.AmoRepo.CheckAndUpdateUsers(ctx, usersForUpdateAndCreate)
if err != nil {
m.logger.Error("error update users list data in db", zap.Error(err))
return err
}
return nil
@ -165,79 +167,61 @@ func (m *Methods) CheckPipelinesAndSteps(ctx context.Context, tokens []model.Tok
func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error {
for _, token := range tokens {
var leadsTags []models.Tag
var contactsTags []models.Tag
var companiesTags []models.Tag
var customersTags []models.Tag
var wg sync.WaitGroup
wg.Add(4)
var tagsMap sync.Map
entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType}
for _, entityType := range entityTypes {
page := 1
limit := 250
go func(entityType model.EntityType) {
defer wg.Done()
page := 1
limit := 250
for {
req := models.GetListTagsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
tags, err := m.amoClient.GetListTags(req, token.AccessToken)
if err != nil {
m.logger.Error("error getting list of tags", zap.Error(err))
break
}
for {
req := models.GetListTagsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
tags, err := m.amoClient.GetListTags(req, token.AccessToken)
if err != nil {
m.logger.Error("error getting list of tags", zap.Error(err))
return
}
if tags == nil || len(tags.Embedded.Tags) == 0 {
break
}
if tags == nil || len(tags.Embedded.Tags) == 0 {
break
}
switch entityType {
case model.LeadsType:
leadsTags = append(leadsTags, tags.Embedded.Tags...)
case model.ContactsType:
contactsTags = append(contactsTags, tags.Embedded.Tags...)
case model.CompaniesType:
companiesTags = append(companiesTags, tags.Embedded.Tags...)
case model.CustomersType:
customersTags = append(customersTags, tags.Embedded.Tags...)
}
tagsMap.Store(entityType, tags.Embedded.Tags)
page++
}
page++
}
}(entityType)
}
wg.Wait()
for _, entityType := range entityTypes {
switch entityType {
case model.LeadsType:
if len(leadsTags) > 0 {
err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(leadsTags, entityType), token.AccountID)
if tags, ok := tagsMap.Load(entityType); ok {
if len(tags.([]models.Tag)) > 0 {
err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(tags.([]models.Tag), entityType), token.AccountID)
if err != nil {
m.logger.Error("error update leads tags in db", zap.Error(err))
return err
}
}
case model.ContactsType:
if len(contactsTags) > 0 {
err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(contactsTags, entityType), token.AccountID)
if err != nil {
m.logger.Error("error update contacts tags in db", zap.Error(err))
return err
}
}
case model.CompaniesType:
if len(companiesTags) > 0 {
err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(companiesTags, entityType), token.AccountID)
if err != nil {
m.logger.Error("error update companies tags in db", zap.Error(err))
return err
}
}
case model.CustomersType:
if len(customersTags) > 0 {
err := m.repo.AmoRepo.CheckTags(ctx, tools.ToTag(customersTags, entityType), token.AccountID)
if err != nil {
m.logger.Error("error update customer tags in db", zap.Error(err))
return err
switch entityType {
case model.LeadsType:
m.logger.Error("error updating leads tags in db", zap.Error(err))
return err
case model.ContactsType:
m.logger.Error("error updating contacts tags in db", zap.Error(err))
return err
case model.CompaniesType:
m.logger.Error("error updating companies tags in db", zap.Error(err))
return err
case model.CustomersType:
m.logger.Error("error updating customer tags in db", zap.Error(err))
return err
}
}
}
}
@ -248,79 +232,61 @@ func (m *Methods) CheckTags(ctx context.Context, tokens []model.Token) error {
func (m *Methods) CheckFields(ctx context.Context, tokens []model.Token) error {
for _, token := range tokens {
var leadsFields []models.CustomField
var contactsFields []models.CustomField
var companiesFields []models.CustomField
var customersFields []models.CustomField
var wg sync.WaitGroup
wg.Add(4)
var fieldsMap sync.Map
entityTypes := []model.EntityType{model.LeadsType, model.ContactsType, model.CompaniesType, model.CustomersType}
for _, entityType := range entityTypes {
page := 1
limit := 50
go func(entityType model.EntityType) {
defer wg.Done()
page := 1
limit := 50
for {
req := models.GetListFieldsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
fields, err := m.amoClient.GetListFields(req, token.AccessToken)
if err != nil {
m.logger.Error("error getting list of fields", zap.Error(err))
break
}
for {
req := models.GetListFieldsReq{
Page: page,
Limit: limit,
EntityType: entityType,
}
fields, err := m.amoClient.GetListFields(req, token.AccessToken)
if err != nil {
m.logger.Error("error getting list of fields", zap.Error(err))
return
}
if fields == nil || len(fields.Embedded.CustomFields) == 0 {
break
}
if fields == nil || len(fields.Embedded.CustomFields) == 0 {
break
}
switch entityType {
case model.LeadsType:
leadsFields = append(leadsFields, fields.Embedded.CustomFields...)
case model.ContactsType:
contactsFields = append(contactsFields, fields.Embedded.CustomFields...)
case model.CompaniesType:
companiesFields = append(companiesFields, fields.Embedded.CustomFields...)
case model.CustomersType:
customersFields = append(customersFields, fields.Embedded.CustomFields...)
}
fieldsMap.Store(entityType, fields.Embedded.CustomFields)
page++
}
page++
}
}(entityType)
}
wg.Wait()
for _, entityType := range entityTypes {
switch entityType {
case model.LeadsType:
if len(leadsFields) > 0 {
err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(leadsFields, entityType), token.AccountID)
if fields, ok := fieldsMap.Load(entityType); ok {
if len(fields.([]models.CustomField)) > 0 {
err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(fields.([]models.CustomField), entityType), token.AccountID)
if err != nil {
m.logger.Error("error update leads fields in db", zap.Error(err))
return err
}
}
case model.ContactsType:
if len(contactsFields) > 0 {
err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(contactsFields, entityType), token.AccountID)
if err != nil {
m.logger.Error("error update contacts fields in db", zap.Error(err))
return err
}
}
case model.CompaniesType:
if len(companiesFields) > 0 {
err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(companiesFields, entityType), token.AccountID)
if err != nil {
m.logger.Error("error update companies fields", zap.Error(err))
return err
}
}
case model.CustomersType:
if len(customersFields) > 0 {
err := m.repo.AmoRepo.CheckFields(ctx, tools.ToField(customersFields, entityType), token.AccountID)
if err != nil {
m.logger.Error("error update customer fields", zap.Error(err))
return err
switch entityType {
case model.LeadsType:
m.logger.Error("error updating leads fields in db", zap.Error(err))
return err
case model.ContactsType:
m.logger.Error("error updating contacts fields in db", zap.Error(err))
return err
case model.CompaniesType:
m.logger.Error("error updating companies fields in db", zap.Error(err))
return err
case model.CustomersType:
m.logger.Error("error updating customer fields in db", zap.Error(err))
return err
}
}
}
}