add redis for cashing fields data

This commit is contained in:
Pavel 2024-05-06 17:40:27 +03:00
parent 5ba2d6d5dd
commit 43db9572f2
7 changed files with 128 additions and 20 deletions

3
go.mod

@ -18,8 +18,11 @@ require (
require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/golang-jwt/jwt/v5 v5.2.0 // indirect
github.com/golang-migrate/migrate/v4 v4.17.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect

6
go.sum

@ -6,9 +6,13 @@ github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/caarlos0/env/v8 v8.0.0 h1:POhxHhSpuxrLMIdvTGARuZqR4Jjm8AYmoi/JKlcScs0=
github.com/caarlos0/env/v8 v8.0.0/go.mod h1:7K4wMY9bH0esiXSSHlfHLX5xKGQMnkH5Fk4TDSSSzfo=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dhui/dktest v0.4.0 h1:z05UmuXZHO/bgj/ds2bGMBu8FI4WA+Ag/m3ghL+om7M=
github.com/dhui/dktest v0.4.0/go.mod h1:v/Dbz1LgCBOi2Uki2nUqLBGa83hWBGFMu5MrgMDCc78=
github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8=
@ -21,6 +25,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/gofiber/fiber/v2 v2.52.4 h1:P+T+4iK7VaqUsq2PALYEfBBo6bJZ4q3FP8cZ84EggTM=
github.com/gofiber/fiber/v2 v2.52.4/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=

@ -9,6 +9,7 @@ import (
"amocrm/internal/tools"
"amocrm/internal/workers/data_updater"
"amocrm/internal/workers/limiter"
"amocrm/internal/workers/post_deals_worker"
"amocrm/internal/workers/post_fields_worker"
"amocrm/internal/workers/queueUpdater"
"amocrm/internal/workers_methods"
@ -37,9 +38,15 @@ func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) erro
shutdownGroup := closer.NewCloserGroup()
redisClient, err := initialize.Redis(ctx, config)
if err != nil {
logger.Error("error init redis client", zap.Error(err))
return err
}
kafka, err := initialize.KafkaConsumerInit(ctx, config)
if err != nil {
logger.Error("error init kafka consumer")
logger.Error("error init kafka consumer", zap.Error(err))
return err
}
@ -103,14 +110,23 @@ func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) erro
Methods: workerMethods,
})
dealsPoster := post_deals_worker.NewPostDealsWC(post_deals_worker.Deps{
AmoRepo: amoRepo,
AmoClient: amoClient,
RedisClient: redisClient,
Logger: logger,
})
fieldsPoster := post_fields_worker.NewPostFieldsWC(post_fields_worker.Deps{
AmoRepo: amoRepo,
AmoClient: amoClient,
Logger: logger,
AmoRepo: amoRepo,
AmoClient: amoClient,
RedisClient: redisClient,
Logger: logger,
})
go dataUpdater.Start(ctx)
go queUpdater.Start(ctx)
go dealsPoster.Start(ctx)
go fieldsPoster.Start(ctx)
server := http.NewServer(http.ServerConfig{
@ -133,6 +149,7 @@ func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) erro
shutdownGroup.Add(closer.CloserFunc(rateLimiter.Stop))
shutdownGroup.Add(closer.CloserFunc(dataUpdater.Stop))
shutdownGroup.Add(closer.CloserFunc(queUpdater.Stop))
shutdownGroup.Add(closer.CloserFunc(dealsPoster.Stop))
shutdownGroup.Add(closer.CloserFunc(fieldsPoster.Stop))
<-ctx.Done()

@ -14,6 +14,9 @@ type Config struct {
KafkaBrokers string `env:"KAFKA_BROKERS" envDefault:"localhost:9092"`
KafkaTopic string `env:"KAFKA_TOPIC" envDefault:"test-topic"`
KafkaGroup string `env:"KAFKA_GROUP" envDefault:"amoCRM"`
RedisAddr string `env:"REDIS_ADDR" envDefault:"localhost:6379"`
RedisPassword string `env:"REDIS_PASS" envDefault:"admin"`
RedisDB int `env:"REDIS_DB" envDefault:"2"`
// урл в соц аус сервисе для генерации ссылки для авторизации в амо
PenaSocialAuthURL string `env:"PENA_SOCIAL_AUTH_URL" envDefault:"http://localhost:8000/amocrm/auth"`
// урл на который будет возвращен пользователь после авторизации это webhook/create get

@ -0,0 +1,21 @@
package initialize
import (
"context"
"github.com/go-redis/redis/v8"
)
func Redis(ctx context.Context, cfg Config) (*redis.Client, error) {
rdb := redis.NewClient(&redis.Options{
Addr: cfg.RedisAddr,
Password: cfg.RedisPassword,
DB: cfg.RedisDB,
})
status := rdb.Ping(ctx)
if err := status.Err(); err != nil {
return nil, err
}
return rdb, nil
}

@ -1,4 +1,4 @@
package post_fields_worker
package post_deals_worker
import (
"amocrm/internal/models"
@ -6,6 +6,7 @@ import (
"amocrm/pkg/amoClient"
"context"
"fmt"
"github.com/go-redis/redis/v8"
"go.uber.org/zap"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/repository/amo"
@ -15,26 +16,28 @@ import (
)
type Deps struct {
AmoRepo *dal.AmoDal
AmoClient *amoClient.Amo
Logger *zap.Logger
AmoRepo *dal.AmoDal
AmoClient *amoClient.Amo
RedisClient *redis.Client
Logger *zap.Logger
}
type PostFields struct {
amoRepo *dal.AmoDal
amoClient *amoClient.Amo
logger *zap.Logger
type PostDeals struct {
amoRepo *dal.AmoDal
amoClient *amoClient.Amo
redisClient *redis.Client
logger *zap.Logger
}
func NewPostFieldsWC(deps Deps) *PostFields {
return &PostFields{
func NewPostDealsWC(deps Deps) *PostDeals {
return &PostDeals{
amoRepo: deps.AmoRepo,
amoClient: deps.AmoClient,
logger: deps.Logger,
}
}
func (wc *PostFields) Start(ctx context.Context) {
func (wc *PostDeals) Start(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
@ -49,7 +52,7 @@ func (wc *PostFields) Start(ctx context.Context) {
}
}
func (wc *PostFields) startFetching(ctx context.Context) {
func (wc *PostDeals) startFetching(ctx context.Context) {
results, err := wc.amoRepo.AmoRepo.GettingAmoUsersTrueResults(ctx)
if err != nil {
wc.logger.Error("error fetching users answers true results, for sending data to amo", zap.Error(err))
@ -64,7 +67,7 @@ func (wc *PostFields) startFetching(ctx context.Context) {
wc.logger.Error("error getting all user answers by result session", zap.Error(err))
return
}
//todo За один запрос можно передать не более 50 сделок.
// За один запрос можно передать не более 50 сделок.
deal := models.DealReq{
Name: fmt.Sprintf("deal quiz number %d", result.QuizID),
StatusID: result.StepID,
@ -108,7 +111,7 @@ func (wc *PostFields) startFetching(ctx context.Context) {
}
}
func (wc *PostFields) sendingDealsReq(ctx context.Context, mapDealReq map[string][]models.DealReq) error {
func (wc *PostDeals) sendingDealsReq(ctx context.Context, mapDealReq map[string][]models.DealReq) error {
for accessToken, deal := range mapDealReq {
resp, err := wc.amoClient.CreatingDeal(deal, accessToken)
if err != nil {
@ -123,7 +126,7 @@ func (wc *PostFields) sendingDealsReq(ctx context.Context, mapDealReq map[string
return nil
}
func (wc *PostFields) saveDealToDB(ctx context.Context, resp []models.DealResp, accessToken string, errResp error) error {
func (wc *PostDeals) saveDealToDB(ctx context.Context, resp []models.DealResp, accessToken string, errResp error) error {
status := "success"
if errResp != nil {
status = errResp.Error()
@ -145,6 +148,6 @@ func (wc *PostFields) saveDealToDB(ctx context.Context, resp []models.DealResp,
return nil
}
func (wc *PostFields) Stop(ctx context.Context) error {
func (wc *PostDeals) Stop(ctx context.Context) error {
return nil
}

@ -0,0 +1,55 @@
package post_fields_worker
import (
"amocrm/pkg/amoClient"
"context"
"github.com/go-redis/redis/v8"
"go.uber.org/zap"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
"time"
)
type Deps struct {
AmoRepo *dal.AmoDal
AmoClient *amoClient.Amo
RedisClient *redis.Client
Logger *zap.Logger
}
type PostFields struct {
amoRepo *dal.AmoDal
amoClient *amoClient.Amo
redisClient *redis.Client
logger *zap.Logger
}
func NewPostFieldsWC(deps Deps) *PostFields {
return &PostFields{
amoRepo: deps.AmoRepo,
amoClient: deps.AmoClient,
logger: deps.Logger,
}
}
func (wc *PostFields) Start(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
wc.startFetching(ctx)
case <-ctx.Done():
return
}
}
}
func (wc *PostFields) startFetching(ctx context.Context) {
}
func (wc *PostFields) Stop(ctx context.Context) error {
return nil
}