From 0967dd996d41109f80d8ba83756025cc91b8c2f1 Mon Sep 17 00:00:00 2001 From: Pavel Date: Mon, 6 May 2024 23:30:59 +0300 Subject: [PATCH] add logic with redis for update and restorung data --- go.mod | 4 +- go.sum | 16 ++- internal/app/app.go | 22 +-- internal/models/createDeal.go | 16 +++ internal/models/forRedis.go | 11 ++ internal/repository/redis_repo.go | 126 +++++++++++++++++ .../workers/post_deals_worker/deals_worker.go | 74 +++------- .../post_fields_worker/fields_worker.go | 132 ++++++++++++------ pkg/amoClient/amo.go | 42 ++++++ 9 files changed, 336 insertions(+), 107 deletions(-) create mode 100644 internal/repository/redis_repo.go diff --git a/go.mod b/go.mod index 2b9cb66..b75bafc 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.21.6 require ( github.com/caarlos0/env/v8 v8.0.0 + github.com/go-redis/redis/v8 v8.11.5 github.com/gofiber/fiber/v2 v2.52.4 github.com/joho/godotenv v1.5.1 github.com/lib/pq v1.10.9 @@ -12,7 +13,7 @@ require ( github.com/twmb/franz-go v1.16.1 go.uber.org/zap v1.27.0 google.golang.org/protobuf v1.33.0 - penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240506110833-3afa7086d8e7 + penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240506200710-edeece4bfbf3 penahub.gitlab.yandexcloud.net/backend/quiz/core.git v0.0.0-20240219174804-d78fd38511af ) @@ -22,7 +23,6 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.1 // indirect - github.com/go-redis/redis/v8 v8.11.5 // indirect github.com/golang-jwt/jwt/v5 v5.2.0 // indirect github.com/golang-migrate/migrate/v4 v4.17.0 // indirect github.com/golang/protobuf v1.5.3 // indirect diff --git a/go.sum b/go.sum index 91a428b..6337414 100644 --- a/go.sum +++ b/go.sum @@ -25,6 +25,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/gofiber/fiber/v2 v2.52.4 h1:P+T+4iK7VaqUsq2PALYEfBBo6bJZ4q3FP8cZ84EggTM= @@ -88,6 +90,12 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM= @@ -152,12 +160,16 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogR gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240223054633-6cb3d5ce45b6 h1:oV+/HNX+JPoQ3/GUx08hio7d45WpY0AMGrFs7j70QlA= penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240223054633-6cb3d5ce45b6/go.mod h1:lTmpjry+8evVkXWbEC+WMOELcFkRD1lFMc7J09mOndM= -penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240506110833-3afa7086d8e7 h1:SzwUo27l1O6mMuvCGzq/6LPyv7lZQWUAHyKliYClMh4= -penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240506110833-3afa7086d8e7/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64= +penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240506200710-edeece4bfbf3 h1:s2hbSHeeFlnGlKILgSGAoFnP+ZRIgXkhIgMonDtigQw= +penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240506200710-edeece4bfbf3/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64= penahub.gitlab.yandexcloud.net/backend/quiz/core.git v0.0.0-20240219174804-d78fd38511af h1:jQ7HaXSutDX5iepU7VRImxhikK7lV/lBKkiloOZ4Emo= penahub.gitlab.yandexcloud.net/backend/quiz/core.git v0.0.0-20240219174804-d78fd38511af/go.mod h1:5S5YwjSXWmnEKjBjG6MtyGtFmljjukDRS8CwHk/CF/I= diff --git a/internal/app/app.go b/internal/app/app.go index 0c6b475..89cb815 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -4,6 +4,7 @@ import ( "amocrm/internal/brokers" "amocrm/internal/controllers" "amocrm/internal/initialize" + "amocrm/internal/repository" "amocrm/internal/server/http" "amocrm/internal/service" "amocrm/internal/tools" @@ -79,6 +80,11 @@ func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) erro RateLimiter: rateLimiter, }) + redisRepo := repository.NewRepository(repository.Deps{ + RedisClient: redisClient, + Logger: logger, + }) + svc := service.NewService(service.Deps{ Repository: amoRepo, Logger: logger, @@ -111,17 +117,17 @@ func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) erro }) dealsPoster := post_deals_worker.NewPostDealsWC(post_deals_worker.Deps{ - AmoRepo: amoRepo, - AmoClient: amoClient, - RedisClient: redisClient, - Logger: logger, + AmoRepo: amoRepo, + AmoClient: amoClient, + RedisRepo: redisRepo, + Logger: logger, }) fieldsPoster := post_fields_worker.NewPostFieldsWC(post_fields_worker.Deps{ - AmoRepo: amoRepo, - AmoClient: amoClient, - RedisClient: redisClient, - Logger: logger, + AmoRepo: amoRepo, + AmoClient: amoClient, + RedisRepo: redisRepo, + Logger: logger, }) go dataUpdater.Start(ctx) diff --git a/internal/models/createDeal.go b/internal/models/createDeal.go index 127b809..57bff5a 100644 --- a/internal/models/createDeal.go +++ b/internal/models/createDeal.go @@ -69,3 +69,19 @@ type DealResp struct { Merged bool `json:"merged"` // Флаг, который показывает, найден дубль подходящий под условия поиска дублей и произведено объединение или нет RequestID []string `json:"request_id"` // Массив строк с пользовательскими идентификаторами, которые были переданы с каждой сущностью } + +type UpdateDealReq struct { + DealID int32 `json:"id"` // ID сделки + CustomFieldsValues []FieldsValues `json:"custom_fields_values"` // Массив полей которые заполняются значениями +} + +type UpdateDealResp struct { + Embedded EmbeddedUpdateDeal `json:"_embedded"` +} + +type EmbeddedUpdateDeal struct { + Leads []struct { + ID int32 `json:"id"` + UpdatedAt int64 `json:"updated_at"` + } +} diff --git a/internal/models/forRedis.go b/internal/models/forRedis.go index 41b30c5..c72e316 100644 --- a/internal/models/forRedis.go +++ b/internal/models/forRedis.go @@ -5,3 +5,14 @@ type SaveDeal struct { DealID int32 AccessToken string } + +type MappingDealsData struct { + AnswerID int64 + DealID int32 + LeadFields []FieldsValues +} + +type ForRestoringData struct { + SaveDeal SaveDeal + LeadFields []FieldsValues +} diff --git a/internal/repository/redis_repo.go b/internal/repository/redis_repo.go new file mode 100644 index 0000000..12317b3 --- /dev/null +++ b/internal/repository/redis_repo.go @@ -0,0 +1,126 @@ +package repository + +import ( + "amocrm/internal/models" + "context" + "encoding/json" + "github.com/go-redis/redis/v8" + "go.uber.org/zap" + "strconv" + "sync" +) + +type Repository struct { + redisClient *redis.Client + logger *zap.Logger +} + +type Deps struct { + RedisClient *redis.Client + Logger *zap.Logger +} + +func NewRepository(deps Deps) *Repository { + return &Repository{ + redisClient: deps.RedisClient, + logger: deps.Logger, + } +} + +func (r *Repository) CachingDealToRedis(ctx context.Context, deps models.SaveDeal) error { + key := "deal:" + strconv.FormatInt(deps.AnswerID, 10) + ":" + strconv.Itoa(int(deps.DealID)) + valueJson, err := json.Marshal(deps) + if err != nil { + return err + } + + err = r.redisClient.Set(ctx, key, valueJson, 0).Err() + if err != nil { + return err + } + + return nil +} + +func (r *Repository) CachingLeadFieldsToRedis(ctx context.Context, answerID int64, leadFields []models.FieldsValues) error { + key := strconv.FormatInt(answerID, 10) + leadFieldsJson, err := json.Marshal(leadFields) + if err != nil { + return err + } + + err = r.redisClient.Set(ctx, key, leadFieldsJson, 0).Err() + if err != nil { + return err + } + + return nil +} + +func (r *Repository) FetchingDeals(ctx context.Context) (map[string][]models.MappingDealsData, map[int32]models.ForRestoringData, error) { + keys, err := r.redisClient.Keys(ctx, "deal:*").Result() + if err != nil { + r.logger.Error("error fetching keys from Redis", zap.Error(err)) + return nil, nil, err + } + + var ( + mu sync.Mutex + dealsDataForUpdate = make(map[string][]models.MappingDealsData) + forRestoringMap = make(map[int32]models.ForRestoringData) + wg sync.WaitGroup + ) + + wg.Add(len(keys)) + + for _, key := range keys { + go func(key string) { + defer wg.Done() + + saveDealJSON, err := r.redisClient.GetDel(ctx, key).Result() + if err != nil { + r.logger.Error("error getting saveDeal JSON from Redis", zap.Error(err)) + return + } + + var saveDeal models.SaveDeal + err = json.Unmarshal([]byte(saveDealJSON), &saveDeal) + if err != nil { + r.logger.Error("error unmarshal saveDeal JSON", zap.Error(err)) + return + } + answerIDStr := strconv.FormatInt(saveDeal.AnswerID, 10) + + leadFieldsJSON, err := r.redisClient.GetDel(ctx, answerIDStr).Result() + if err != nil { + r.logger.Error("error getting leadFields JSON from Redis", zap.Error(err)) + return + } + + var leadFields []models.FieldsValues + err = json.Unmarshal([]byte(leadFieldsJSON), &leadFields) + if err != nil { + r.logger.Error("error unmarshal leadFields JSON", zap.Error(err)) + return + } + + mu.Lock() + defer mu.Unlock() + + dealsDataForUpdate[saveDeal.AccessToken] = append(dealsDataForUpdate[saveDeal.AccessToken], models.MappingDealsData{ + AnswerID: saveDeal.AnswerID, + DealID: saveDeal.DealID, + LeadFields: leadFields, + }) + + forRestoringMap[saveDeal.DealID] = models.ForRestoringData{ + SaveDeal: saveDeal, + LeadFields: leadFields, + } + }(key) + } + + wg.Wait() + + return dealsDataForUpdate, forRestoringMap, nil +} diff --git a/internal/workers/post_deals_worker/deals_worker.go b/internal/workers/post_deals_worker/deals_worker.go index 3e9c093..d78b656 100644 --- a/internal/workers/post_deals_worker/deals_worker.go +++ b/internal/workers/post_deals_worker/deals_worker.go @@ -2,12 +2,11 @@ package post_deals_worker import ( "amocrm/internal/models" + "amocrm/internal/repository" "amocrm/internal/tools" "amocrm/pkg/amoClient" "context" - "encoding/json" "fmt" - "github.com/go-redis/redis/v8" "go.uber.org/zap" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/repository/amo" @@ -17,25 +16,25 @@ import ( ) type Deps struct { - AmoRepo *dal.AmoDal - AmoClient *amoClient.Amo - RedisClient *redis.Client - Logger *zap.Logger + AmoRepo *dal.AmoDal + AmoClient *amoClient.Amo + RedisRepo *repository.Repository + Logger *zap.Logger } type PostDeals struct { - amoRepo *dal.AmoDal - amoClient *amoClient.Amo - redisClient *redis.Client - logger *zap.Logger + amoRepo *dal.AmoDal + amoClient *amoClient.Amo + redisRepo *repository.Repository + logger *zap.Logger } func NewPostDealsWC(deps Deps) *PostDeals { return &PostDeals{ - amoRepo: deps.AmoRepo, - amoClient: deps.AmoClient, - redisClient: deps.RedisClient, - logger: deps.Logger, + amoRepo: deps.AmoRepo, + amoClient: deps.AmoClient, + redisRepo: deps.RedisRepo, + logger: deps.Logger, } } @@ -90,7 +89,7 @@ func (wc *PostDeals) startFetching(ctx context.Context) { leadFields, contactData, companyData := tools.ConstructField(allAnswers, result) - err = wc.cachingLeadFieldsToRedis(ctx, result.AnswerID, leadFields) + err = wc.redisRepo.CachingLeadFieldsToRedis(ctx, result.AnswerID, leadFields) if err != nil { wc.logger.Error("error saving leads fields in redis", zap.Error(err)) return @@ -147,13 +146,18 @@ func (wc *PostDeals) saveDealToDB(ctx context.Context, resp []models.DealResp, a return err } - err = wc.amoRepo.AmoRepo.SaveDealStatus(ctx, amo.SaveDealDeps{DealID: dealResp.DealID, AnswerID: answerID, AccessToken: accessToken, Status: status}) + err = wc.amoRepo.AmoRepo.SaveDealAmoStatus(ctx, amo.SaveDealAmoDeps{DealID: dealResp.DealID, AnswerID: answerID, AccessToken: accessToken, Status: status}) if err != nil { wc.logger.Error("error saving deal status to database", zap.Error(err)) return err } - err = wc.cachingDealToRedis(ctx, answerID, dealResp.DealID, accessToken) + err = wc.redisRepo.CachingDealToRedis(ctx, models.SaveDeal{ + AnswerID: answerID, + DealID: dealResp.DealID, + AccessToken: accessToken, + }) + if err != nil { wc.logger.Error("error saving deal to redis", zap.Error(err)) return err @@ -162,42 +166,6 @@ func (wc *PostDeals) saveDealToDB(ctx context.Context, resp []models.DealResp, a return nil } -func (wc *PostDeals) cachingDealToRedis(ctx context.Context, answerID int64, dealID int32, accessToken string) error { - key := "deal:" + strconv.FormatInt(answerID, 10) + ":" + strconv.Itoa(int(dealID)) - value := models.SaveDeal{ - AnswerID: answerID, - DealID: dealID, - AccessToken: accessToken, - } - - valueJson, err := json.Marshal(value) - if err != nil { - return err - } - - err = wc.redisClient.Set(ctx, key, valueJson, 0).Err() - if err != nil { - return err - } - - return nil -} - -func (wc *PostDeals) cachingLeadFieldsToRedis(ctx context.Context, answerID int64, leadFields []models.FieldsValues) error { - key := strconv.FormatInt(answerID, 10) - leadFieldsJson, err := json.Marshal(leadFields) - if err != nil { - return err - } - - err = wc.redisClient.Set(ctx, key, leadFieldsJson, 0).Err() - if err != nil { - return err - } - - return nil -} - func (wc *PostDeals) Stop(ctx context.Context) error { return nil } diff --git a/internal/workers/post_fields_worker/fields_worker.go b/internal/workers/post_fields_worker/fields_worker.go index be44c82..5ea937a 100644 --- a/internal/workers/post_fields_worker/fields_worker.go +++ b/internal/workers/post_fields_worker/fields_worker.go @@ -2,36 +2,35 @@ package post_fields_worker import ( "amocrm/internal/models" + "amocrm/internal/repository" "amocrm/pkg/amoClient" "context" - "encoding/json" - "github.com/go-redis/redis/v8" "go.uber.org/zap" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal" - "strconv" + "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/repository/amo" "time" ) type Deps struct { - AmoRepo *dal.AmoDal - AmoClient *amoClient.Amo - RedisClient *redis.Client - Logger *zap.Logger + AmoRepo *dal.AmoDal + AmoClient *amoClient.Amo + RedisRepo *repository.Repository + Logger *zap.Logger } type PostFields struct { - amoRepo *dal.AmoDal - amoClient *amoClient.Amo - redisClient *redis.Client - logger *zap.Logger + amoRepo *dal.AmoDal + amoClient *amoClient.Amo + redisRepo *repository.Repository + logger *zap.Logger } func NewPostFieldsWC(deps Deps) *PostFields { return &PostFields{ - amoRepo: deps.AmoRepo, - amoClient: deps.AmoClient, - redisClient: deps.RedisClient, - logger: deps.Logger, + amoRepo: deps.AmoRepo, + amoClient: deps.AmoClient, + redisRepo: deps.RedisRepo, + logger: deps.Logger, } } @@ -42,7 +41,7 @@ func (wc *PostFields) Start(ctx context.Context) { for { select { case <-ticker.C: - wc.startFetching(ctx) + wc.processTask(ctx) case <-ctx.Done(): return @@ -50,43 +49,92 @@ func (wc *PostFields) Start(ctx context.Context) { } } -func (wc *PostFields) startFetching(ctx context.Context) { - keys, err := wc.redisClient.Keys(ctx, "deal:*").Result() +func (wc *PostFields) processTask(ctx context.Context) { + dealsDataForUpdate, forRestoringMap, err := wc.redisRepo.FetchingDeals(ctx) if err != nil { - wc.logger.Error("error fetching keys from Redis", zap.Error(err)) + wc.logger.Error("error fetching deals for update in redis", zap.Error(err)) return } - - for _, key := range keys { - saveDealJSON, err := wc.redisClient.Get(ctx, key).Result() + for token, dealsData := range dealsDataForUpdate { + errorCheckerMap, err := wc.sendForUpdate(ctx, token, dealsData) if err != nil { - wc.logger.Error("error getting saveDeal JSON from Redis", zap.Error(err)) - continue + wc.logger.Error("error updating deals fields in db", zap.Error(err)) } - var saveDeal models.SaveDeal - err = json.Unmarshal([]byte(saveDealJSON), &saveDeal) - if err != nil { - wc.logger.Error("error unmarshal saveDeal JSON", zap.Error(err)) - continue - } - answerIDStr := strconv.FormatInt(saveDeal.AnswerID, 10) + for dealID, _ := range errorCheckerMap { + restoringData := forRestoringMap[dealID] + err = wc.redisRepo.CachingDealToRedis(ctx, restoringData.SaveDeal) + if err != nil { + wc.logger.Error("error restoring deal in redis", zap.Error(err)) + return + } - leadFieldsJSON, err := wc.redisClient.Get(ctx, answerIDStr).Result() - if err != nil { - wc.logger.Error("error getting leadFields JSON from Redis", zap.Error(err)) - continue - } - - var leadFields []models.FieldsValues - err = json.Unmarshal([]byte(leadFieldsJSON), &leadFields) - if err != nil { - wc.logger.Error("error unmarshal leadFields JSON", zap.Error(err)) - continue + err = wc.redisRepo.CachingLeadFieldsToRedis(ctx, restoringData.SaveDeal.AnswerID, restoringData.LeadFields) + if err != nil { + wc.logger.Error("error restoring deal fields in redis", zap.Error(err)) + return + } } } } +func (wc *PostFields) sendForUpdate(ctx context.Context, token string, dealsData []models.MappingDealsData) (map[int32]struct{}, error) { + errorCheckerMap := make(map[int32]struct{}) + var reqToUpdate []models.UpdateDealReq + for _, data := range dealsData { + req := models.UpdateDealReq{ + DealID: data.DealID, + CustomFieldsValues: data.LeadFields, + } + reqToUpdate = append(reqToUpdate, req) + } + + resp, err := wc.amoClient.UpdatingDeal(reqToUpdate, token) + if err != nil { + wc.logger.Error("error sendig request for update deal fields", zap.Error(err)) + for _, data := range reqToUpdate { + errorCheckerMap[data.DealID] = struct{}{} + } + + } + + err = wc.updateDealStatus(ctx, DealStatus{ + Resp: resp, + AccessToken: token, + ErrResp: err, + }) + + if err != nil { + wc.logger.Error("error update deal Status after updating fields", zap.Error(err)) + return errorCheckerMap, err + } + + return errorCheckerMap, nil +} + +type DealStatus struct { + Resp *models.UpdateDealResp + AccessToken string + ErrResp error +} + +func (wc *PostFields) updateDealStatus(ctx context.Context, deps DealStatus) error { + status := "success" + if deps.ErrResp != nil { + status = deps.ErrResp.Error() + } + for _, lead := range deps.Resp.Embedded.Leads { + dealID := lead.ID + err := wc.amoRepo.AmoRepo.UpdatingDealAmoStatus(ctx, amo.SaveDealAmoDeps{DealID: dealID, AccessToken: deps.AccessToken, Status: status}) + if err != nil { + wc.logger.Error("error saving deal status update to database", zap.Error(err)) + return err + } + } + + return nil +} + func (wc *PostFields) Stop(ctx context.Context) error { return nil } diff --git a/pkg/amoClient/amo.go b/pkg/amoClient/amo.go index a24f01b..e49198a 100644 --- a/pkg/amoClient/amo.go +++ b/pkg/amoClient/amo.go @@ -468,3 +468,45 @@ func (a *Amo) CreatingDeal(req []models.DealReq, accessToken string) ([]models.D time.Sleep(a.rateLimiter.Interval) } } + +func (a *Amo) UpdatingDeal(req []models.UpdateDealReq, accessToken string) (*models.UpdateDealResp, error) { + for { + if a.rateLimiter.Check() { + uri := fmt.Sprintf("%s/api/v4/leads", a.baseApiURL) + bodyBytes, err := json.Marshal(req) + if err != nil { + a.logger.Error("error marshal req in Updating Deal:", zap.Error(err)) + return nil, err + } + + agent := a.fiberClient.Patch(uri) + agent.Set("Content-Type", "application/json").Body(bodyBytes) + agent.Set("Authorization", "Bearer "+accessToken) + + statusCode, resBody, errs := agent.Bytes() + if len(errs) > 0 { + for _, err = range errs { + a.logger.Error("error sending request in Updating Deal for updating deals", zap.Error(err)) + } + return nil, fmt.Errorf("request failed: %v", errs[0]) + } + + if statusCode != fiber.StatusOK { + errorMessage := fmt.Sprintf("received an incorrect response from Updating Deal: %s", string(resBody)) + a.logger.Error(errorMessage, zap.Int("status", statusCode)) + return nil, fmt.Errorf(errorMessage) + } + + var resp models.UpdateDealResp + err = json.Unmarshal(resBody, &resp) + if err != nil { + a.logger.Error("error unmarshal response body in Updating Deal:", zap.Error(err)) + return nil, err + } + + return &resp, nil + } + time.Sleep(a.rateLimiter.Interval) + } + +}