add logic with redis for update and restorung data

This commit is contained in:
Pavel 2024-05-06 23:30:59 +03:00
parent 747cee3da9
commit 0967dd996d
9 changed files with 336 additions and 107 deletions

4
go.mod

@ -4,6 +4,7 @@ go 1.21.6
require ( require (
github.com/caarlos0/env/v8 v8.0.0 github.com/caarlos0/env/v8 v8.0.0
github.com/go-redis/redis/v8 v8.11.5
github.com/gofiber/fiber/v2 v2.52.4 github.com/gofiber/fiber/v2 v2.52.4
github.com/joho/godotenv v1.5.1 github.com/joho/godotenv v1.5.1
github.com/lib/pq v1.10.9 github.com/lib/pq v1.10.9
@ -12,7 +13,7 @@ require (
github.com/twmb/franz-go v1.16.1 github.com/twmb/franz-go v1.16.1
go.uber.org/zap v1.27.0 go.uber.org/zap v1.27.0
google.golang.org/protobuf v1.33.0 google.golang.org/protobuf v1.33.0
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240506110833-3afa7086d8e7 penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240506200710-edeece4bfbf3
penahub.gitlab.yandexcloud.net/backend/quiz/core.git v0.0.0-20240219174804-d78fd38511af penahub.gitlab.yandexcloud.net/backend/quiz/core.git v0.0.0-20240219174804-d78fd38511af
) )
@ -22,7 +23,6 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dustin/go-humanize v1.0.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/golang-jwt/jwt/v5 v5.2.0 // indirect github.com/golang-jwt/jwt/v5 v5.2.0 // indirect
github.com/golang-migrate/migrate/v4 v4.17.0 // indirect github.com/golang-migrate/migrate/v4 v4.17.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect github.com/golang/protobuf v1.5.3 // indirect

16
go.sum

@ -25,6 +25,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/gofiber/fiber/v2 v2.52.4 h1:P+T+4iK7VaqUsq2PALYEfBBo6bJZ4q3FP8cZ84EggTM= github.com/gofiber/fiber/v2 v2.52.4 h1:P+T+4iK7VaqUsq2PALYEfBBo6bJZ4q3FP8cZ84EggTM=
@ -88,6 +90,12 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM= github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM=
@ -152,12 +160,16 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogR
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240223054633-6cb3d5ce45b6 h1:oV+/HNX+JPoQ3/GUx08hio7d45WpY0AMGrFs7j70QlA= penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240223054633-6cb3d5ce45b6 h1:oV+/HNX+JPoQ3/GUx08hio7d45WpY0AMGrFs7j70QlA=
penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240223054633-6cb3d5ce45b6/go.mod h1:lTmpjry+8evVkXWbEC+WMOELcFkRD1lFMc7J09mOndM= penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240223054633-6cb3d5ce45b6/go.mod h1:lTmpjry+8evVkXWbEC+WMOELcFkRD1lFMc7J09mOndM=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240506110833-3afa7086d8e7 h1:SzwUo27l1O6mMuvCGzq/6LPyv7lZQWUAHyKliYClMh4= penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240506200710-edeece4bfbf3 h1:s2hbSHeeFlnGlKILgSGAoFnP+ZRIgXkhIgMonDtigQw=
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240506110833-3afa7086d8e7/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64= penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240506200710-edeece4bfbf3/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64=
penahub.gitlab.yandexcloud.net/backend/quiz/core.git v0.0.0-20240219174804-d78fd38511af h1:jQ7HaXSutDX5iepU7VRImxhikK7lV/lBKkiloOZ4Emo= penahub.gitlab.yandexcloud.net/backend/quiz/core.git v0.0.0-20240219174804-d78fd38511af h1:jQ7HaXSutDX5iepU7VRImxhikK7lV/lBKkiloOZ4Emo=
penahub.gitlab.yandexcloud.net/backend/quiz/core.git v0.0.0-20240219174804-d78fd38511af/go.mod h1:5S5YwjSXWmnEKjBjG6MtyGtFmljjukDRS8CwHk/CF/I= penahub.gitlab.yandexcloud.net/backend/quiz/core.git v0.0.0-20240219174804-d78fd38511af/go.mod h1:5S5YwjSXWmnEKjBjG6MtyGtFmljjukDRS8CwHk/CF/I=

@ -4,6 +4,7 @@ import (
"amocrm/internal/brokers" "amocrm/internal/brokers"
"amocrm/internal/controllers" "amocrm/internal/controllers"
"amocrm/internal/initialize" "amocrm/internal/initialize"
"amocrm/internal/repository"
"amocrm/internal/server/http" "amocrm/internal/server/http"
"amocrm/internal/service" "amocrm/internal/service"
"amocrm/internal/tools" "amocrm/internal/tools"
@ -79,6 +80,11 @@ func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) erro
RateLimiter: rateLimiter, RateLimiter: rateLimiter,
}) })
redisRepo := repository.NewRepository(repository.Deps{
RedisClient: redisClient,
Logger: logger,
})
svc := service.NewService(service.Deps{ svc := service.NewService(service.Deps{
Repository: amoRepo, Repository: amoRepo,
Logger: logger, Logger: logger,
@ -111,17 +117,17 @@ func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) erro
}) })
dealsPoster := post_deals_worker.NewPostDealsWC(post_deals_worker.Deps{ dealsPoster := post_deals_worker.NewPostDealsWC(post_deals_worker.Deps{
AmoRepo: amoRepo, AmoRepo: amoRepo,
AmoClient: amoClient, AmoClient: amoClient,
RedisClient: redisClient, RedisRepo: redisRepo,
Logger: logger, Logger: logger,
}) })
fieldsPoster := post_fields_worker.NewPostFieldsWC(post_fields_worker.Deps{ fieldsPoster := post_fields_worker.NewPostFieldsWC(post_fields_worker.Deps{
AmoRepo: amoRepo, AmoRepo: amoRepo,
AmoClient: amoClient, AmoClient: amoClient,
RedisClient: redisClient, RedisRepo: redisRepo,
Logger: logger, Logger: logger,
}) })
go dataUpdater.Start(ctx) go dataUpdater.Start(ctx)

@ -69,3 +69,19 @@ type DealResp struct {
Merged bool `json:"merged"` // Флаг, который показывает, найден дубль подходящий под условия поиска дублей и произведено объединение или нет Merged bool `json:"merged"` // Флаг, который показывает, найден дубль подходящий под условия поиска дублей и произведено объединение или нет
RequestID []string `json:"request_id"` // Массив строк с пользовательскими идентификаторами, которые были переданы с каждой сущностью RequestID []string `json:"request_id"` // Массив строк с пользовательскими идентификаторами, которые были переданы с каждой сущностью
} }
type UpdateDealReq struct {
DealID int32 `json:"id"` // ID сделки
CustomFieldsValues []FieldsValues `json:"custom_fields_values"` // Массив полей которые заполняются значениями
}
type UpdateDealResp struct {
Embedded EmbeddedUpdateDeal `json:"_embedded"`
}
type EmbeddedUpdateDeal struct {
Leads []struct {
ID int32 `json:"id"`
UpdatedAt int64 `json:"updated_at"`
}
}

@ -5,3 +5,14 @@ type SaveDeal struct {
DealID int32 DealID int32
AccessToken string AccessToken string
} }
type MappingDealsData struct {
AnswerID int64
DealID int32
LeadFields []FieldsValues
}
type ForRestoringData struct {
SaveDeal SaveDeal
LeadFields []FieldsValues
}

@ -0,0 +1,126 @@
package repository
import (
"amocrm/internal/models"
"context"
"encoding/json"
"github.com/go-redis/redis/v8"
"go.uber.org/zap"
"strconv"
"sync"
)
type Repository struct {
redisClient *redis.Client
logger *zap.Logger
}
type Deps struct {
RedisClient *redis.Client
Logger *zap.Logger
}
func NewRepository(deps Deps) *Repository {
return &Repository{
redisClient: deps.RedisClient,
logger: deps.Logger,
}
}
func (r *Repository) CachingDealToRedis(ctx context.Context, deps models.SaveDeal) error {
key := "deal:" + strconv.FormatInt(deps.AnswerID, 10) + ":" + strconv.Itoa(int(deps.DealID))
valueJson, err := json.Marshal(deps)
if err != nil {
return err
}
err = r.redisClient.Set(ctx, key, valueJson, 0).Err()
if err != nil {
return err
}
return nil
}
func (r *Repository) CachingLeadFieldsToRedis(ctx context.Context, answerID int64, leadFields []models.FieldsValues) error {
key := strconv.FormatInt(answerID, 10)
leadFieldsJson, err := json.Marshal(leadFields)
if err != nil {
return err
}
err = r.redisClient.Set(ctx, key, leadFieldsJson, 0).Err()
if err != nil {
return err
}
return nil
}
func (r *Repository) FetchingDeals(ctx context.Context) (map[string][]models.MappingDealsData, map[int32]models.ForRestoringData, error) {
keys, err := r.redisClient.Keys(ctx, "deal:*").Result()
if err != nil {
r.logger.Error("error fetching keys from Redis", zap.Error(err))
return nil, nil, err
}
var (
mu sync.Mutex
dealsDataForUpdate = make(map[string][]models.MappingDealsData)
forRestoringMap = make(map[int32]models.ForRestoringData)
wg sync.WaitGroup
)
wg.Add(len(keys))
for _, key := range keys {
go func(key string) {
defer wg.Done()
saveDealJSON, err := r.redisClient.GetDel(ctx, key).Result()
if err != nil {
r.logger.Error("error getting saveDeal JSON from Redis", zap.Error(err))
return
}
var saveDeal models.SaveDeal
err = json.Unmarshal([]byte(saveDealJSON), &saveDeal)
if err != nil {
r.logger.Error("error unmarshal saveDeal JSON", zap.Error(err))
return
}
answerIDStr := strconv.FormatInt(saveDeal.AnswerID, 10)
leadFieldsJSON, err := r.redisClient.GetDel(ctx, answerIDStr).Result()
if err != nil {
r.logger.Error("error getting leadFields JSON from Redis", zap.Error(err))
return
}
var leadFields []models.FieldsValues
err = json.Unmarshal([]byte(leadFieldsJSON), &leadFields)
if err != nil {
r.logger.Error("error unmarshal leadFields JSON", zap.Error(err))
return
}
mu.Lock()
defer mu.Unlock()
dealsDataForUpdate[saveDeal.AccessToken] = append(dealsDataForUpdate[saveDeal.AccessToken], models.MappingDealsData{
AnswerID: saveDeal.AnswerID,
DealID: saveDeal.DealID,
LeadFields: leadFields,
})
forRestoringMap[saveDeal.DealID] = models.ForRestoringData{
SaveDeal: saveDeal,
LeadFields: leadFields,
}
}(key)
}
wg.Wait()
return dealsDataForUpdate, forRestoringMap, nil
}

@ -2,12 +2,11 @@ package post_deals_worker
import ( import (
"amocrm/internal/models" "amocrm/internal/models"
"amocrm/internal/repository"
"amocrm/internal/tools" "amocrm/internal/tools"
"amocrm/pkg/amoClient" "amocrm/pkg/amoClient"
"context" "context"
"encoding/json"
"fmt" "fmt"
"github.com/go-redis/redis/v8"
"go.uber.org/zap" "go.uber.org/zap"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/repository/amo" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/repository/amo"
@ -17,25 +16,25 @@ import (
) )
type Deps struct { type Deps struct {
AmoRepo *dal.AmoDal AmoRepo *dal.AmoDal
AmoClient *amoClient.Amo AmoClient *amoClient.Amo
RedisClient *redis.Client RedisRepo *repository.Repository
Logger *zap.Logger Logger *zap.Logger
} }
type PostDeals struct { type PostDeals struct {
amoRepo *dal.AmoDal amoRepo *dal.AmoDal
amoClient *amoClient.Amo amoClient *amoClient.Amo
redisClient *redis.Client redisRepo *repository.Repository
logger *zap.Logger logger *zap.Logger
} }
func NewPostDealsWC(deps Deps) *PostDeals { func NewPostDealsWC(deps Deps) *PostDeals {
return &PostDeals{ return &PostDeals{
amoRepo: deps.AmoRepo, amoRepo: deps.AmoRepo,
amoClient: deps.AmoClient, amoClient: deps.AmoClient,
redisClient: deps.RedisClient, redisRepo: deps.RedisRepo,
logger: deps.Logger, logger: deps.Logger,
} }
} }
@ -90,7 +89,7 @@ func (wc *PostDeals) startFetching(ctx context.Context) {
leadFields, contactData, companyData := tools.ConstructField(allAnswers, result) leadFields, contactData, companyData := tools.ConstructField(allAnswers, result)
err = wc.cachingLeadFieldsToRedis(ctx, result.AnswerID, leadFields) err = wc.redisRepo.CachingLeadFieldsToRedis(ctx, result.AnswerID, leadFields)
if err != nil { if err != nil {
wc.logger.Error("error saving leads fields in redis", zap.Error(err)) wc.logger.Error("error saving leads fields in redis", zap.Error(err))
return return
@ -147,13 +146,18 @@ func (wc *PostDeals) saveDealToDB(ctx context.Context, resp []models.DealResp, a
return err return err
} }
err = wc.amoRepo.AmoRepo.SaveDealStatus(ctx, amo.SaveDealDeps{DealID: dealResp.DealID, AnswerID: answerID, AccessToken: accessToken, Status: status}) err = wc.amoRepo.AmoRepo.SaveDealAmoStatus(ctx, amo.SaveDealAmoDeps{DealID: dealResp.DealID, AnswerID: answerID, AccessToken: accessToken, Status: status})
if err != nil { if err != nil {
wc.logger.Error("error saving deal status to database", zap.Error(err)) wc.logger.Error("error saving deal status to database", zap.Error(err))
return err return err
} }
err = wc.cachingDealToRedis(ctx, answerID, dealResp.DealID, accessToken) err = wc.redisRepo.CachingDealToRedis(ctx, models.SaveDeal{
AnswerID: answerID,
DealID: dealResp.DealID,
AccessToken: accessToken,
})
if err != nil { if err != nil {
wc.logger.Error("error saving deal to redis", zap.Error(err)) wc.logger.Error("error saving deal to redis", zap.Error(err))
return err return err
@ -162,42 +166,6 @@ func (wc *PostDeals) saveDealToDB(ctx context.Context, resp []models.DealResp, a
return nil return nil
} }
func (wc *PostDeals) cachingDealToRedis(ctx context.Context, answerID int64, dealID int32, accessToken string) error {
key := "deal:" + strconv.FormatInt(answerID, 10) + ":" + strconv.Itoa(int(dealID))
value := models.SaveDeal{
AnswerID: answerID,
DealID: dealID,
AccessToken: accessToken,
}
valueJson, err := json.Marshal(value)
if err != nil {
return err
}
err = wc.redisClient.Set(ctx, key, valueJson, 0).Err()
if err != nil {
return err
}
return nil
}
func (wc *PostDeals) cachingLeadFieldsToRedis(ctx context.Context, answerID int64, leadFields []models.FieldsValues) error {
key := strconv.FormatInt(answerID, 10)
leadFieldsJson, err := json.Marshal(leadFields)
if err != nil {
return err
}
err = wc.redisClient.Set(ctx, key, leadFieldsJson, 0).Err()
if err != nil {
return err
}
return nil
}
func (wc *PostDeals) Stop(ctx context.Context) error { func (wc *PostDeals) Stop(ctx context.Context) error {
return nil return nil
} }

@ -2,36 +2,35 @@ package post_fields_worker
import ( import (
"amocrm/internal/models" "amocrm/internal/models"
"amocrm/internal/repository"
"amocrm/pkg/amoClient" "amocrm/pkg/amoClient"
"context" "context"
"encoding/json"
"github.com/go-redis/redis/v8"
"go.uber.org/zap" "go.uber.org/zap"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
"strconv" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/repository/amo"
"time" "time"
) )
type Deps struct { type Deps struct {
AmoRepo *dal.AmoDal AmoRepo *dal.AmoDal
AmoClient *amoClient.Amo AmoClient *amoClient.Amo
RedisClient *redis.Client RedisRepo *repository.Repository
Logger *zap.Logger Logger *zap.Logger
} }
type PostFields struct { type PostFields struct {
amoRepo *dal.AmoDal amoRepo *dal.AmoDal
amoClient *amoClient.Amo amoClient *amoClient.Amo
redisClient *redis.Client redisRepo *repository.Repository
logger *zap.Logger logger *zap.Logger
} }
func NewPostFieldsWC(deps Deps) *PostFields { func NewPostFieldsWC(deps Deps) *PostFields {
return &PostFields{ return &PostFields{
amoRepo: deps.AmoRepo, amoRepo: deps.AmoRepo,
amoClient: deps.AmoClient, amoClient: deps.AmoClient,
redisClient: deps.RedisClient, redisRepo: deps.RedisRepo,
logger: deps.Logger, logger: deps.Logger,
} }
} }
@ -42,7 +41,7 @@ func (wc *PostFields) Start(ctx context.Context) {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
wc.startFetching(ctx) wc.processTask(ctx)
case <-ctx.Done(): case <-ctx.Done():
return return
@ -50,43 +49,92 @@ func (wc *PostFields) Start(ctx context.Context) {
} }
} }
func (wc *PostFields) startFetching(ctx context.Context) { func (wc *PostFields) processTask(ctx context.Context) {
keys, err := wc.redisClient.Keys(ctx, "deal:*").Result() dealsDataForUpdate, forRestoringMap, err := wc.redisRepo.FetchingDeals(ctx)
if err != nil { if err != nil {
wc.logger.Error("error fetching keys from Redis", zap.Error(err)) wc.logger.Error("error fetching deals for update in redis", zap.Error(err))
return return
} }
for token, dealsData := range dealsDataForUpdate {
for _, key := range keys { errorCheckerMap, err := wc.sendForUpdate(ctx, token, dealsData)
saveDealJSON, err := wc.redisClient.Get(ctx, key).Result()
if err != nil { if err != nil {
wc.logger.Error("error getting saveDeal JSON from Redis", zap.Error(err)) wc.logger.Error("error updating deals fields in db", zap.Error(err))
continue
} }
var saveDeal models.SaveDeal for dealID, _ := range errorCheckerMap {
err = json.Unmarshal([]byte(saveDealJSON), &saveDeal) restoringData := forRestoringMap[dealID]
if err != nil { err = wc.redisRepo.CachingDealToRedis(ctx, restoringData.SaveDeal)
wc.logger.Error("error unmarshal saveDeal JSON", zap.Error(err)) if err != nil {
continue wc.logger.Error("error restoring deal in redis", zap.Error(err))
} return
answerIDStr := strconv.FormatInt(saveDeal.AnswerID, 10) }
leadFieldsJSON, err := wc.redisClient.Get(ctx, answerIDStr).Result() err = wc.redisRepo.CachingLeadFieldsToRedis(ctx, restoringData.SaveDeal.AnswerID, restoringData.LeadFields)
if err != nil { if err != nil {
wc.logger.Error("error getting leadFields JSON from Redis", zap.Error(err)) wc.logger.Error("error restoring deal fields in redis", zap.Error(err))
continue return
} }
var leadFields []models.FieldsValues
err = json.Unmarshal([]byte(leadFieldsJSON), &leadFields)
if err != nil {
wc.logger.Error("error unmarshal leadFields JSON", zap.Error(err))
continue
} }
} }
} }
func (wc *PostFields) sendForUpdate(ctx context.Context, token string, dealsData []models.MappingDealsData) (map[int32]struct{}, error) {
errorCheckerMap := make(map[int32]struct{})
var reqToUpdate []models.UpdateDealReq
for _, data := range dealsData {
req := models.UpdateDealReq{
DealID: data.DealID,
CustomFieldsValues: data.LeadFields,
}
reqToUpdate = append(reqToUpdate, req)
}
resp, err := wc.amoClient.UpdatingDeal(reqToUpdate, token)
if err != nil {
wc.logger.Error("error sendig request for update deal fields", zap.Error(err))
for _, data := range reqToUpdate {
errorCheckerMap[data.DealID] = struct{}{}
}
}
err = wc.updateDealStatus(ctx, DealStatus{
Resp: resp,
AccessToken: token,
ErrResp: err,
})
if err != nil {
wc.logger.Error("error update deal Status after updating fields", zap.Error(err))
return errorCheckerMap, err
}
return errorCheckerMap, nil
}
type DealStatus struct {
Resp *models.UpdateDealResp
AccessToken string
ErrResp error
}
func (wc *PostFields) updateDealStatus(ctx context.Context, deps DealStatus) error {
status := "success"
if deps.ErrResp != nil {
status = deps.ErrResp.Error()
}
for _, lead := range deps.Resp.Embedded.Leads {
dealID := lead.ID
err := wc.amoRepo.AmoRepo.UpdatingDealAmoStatus(ctx, amo.SaveDealAmoDeps{DealID: dealID, AccessToken: deps.AccessToken, Status: status})
if err != nil {
wc.logger.Error("error saving deal status update to database", zap.Error(err))
return err
}
}
return nil
}
func (wc *PostFields) Stop(ctx context.Context) error { func (wc *PostFields) Stop(ctx context.Context) error {
return nil return nil
} }

@ -468,3 +468,45 @@ func (a *Amo) CreatingDeal(req []models.DealReq, accessToken string) ([]models.D
time.Sleep(a.rateLimiter.Interval) time.Sleep(a.rateLimiter.Interval)
} }
} }
func (a *Amo) UpdatingDeal(req []models.UpdateDealReq, accessToken string) (*models.UpdateDealResp, error) {
for {
if a.rateLimiter.Check() {
uri := fmt.Sprintf("%s/api/v4/leads", a.baseApiURL)
bodyBytes, err := json.Marshal(req)
if err != nil {
a.logger.Error("error marshal req in Updating Deal:", zap.Error(err))
return nil, err
}
agent := a.fiberClient.Patch(uri)
agent.Set("Content-Type", "application/json").Body(bodyBytes)
agent.Set("Authorization", "Bearer "+accessToken)
statusCode, resBody, errs := agent.Bytes()
if len(errs) > 0 {
for _, err = range errs {
a.logger.Error("error sending request in Updating Deal for updating deals", zap.Error(err))
}
return nil, fmt.Errorf("request failed: %v", errs[0])
}
if statusCode != fiber.StatusOK {
errorMessage := fmt.Sprintf("received an incorrect response from Updating Deal: %s", string(resBody))
a.logger.Error(errorMessage, zap.Int("status", statusCode))
return nil, fmt.Errorf(errorMessage)
}
var resp models.UpdateDealResp
err = json.Unmarshal(resBody, &resp)
if err != nil {
a.logger.Error("error unmarshal response body in Updating Deal:", zap.Error(err))
return nil, err
}
return &resp, nil
}
time.Sleep(a.rateLimiter.Interval)
}
}