From 619a8747138db0c990d07ffa9c33a9e3c7feab0c Mon Sep 17 00:00:00 2001 From: pasha1coil Date: Fri, 16 May 2025 11:33:04 +0300 Subject: [PATCH] fix after merge, now gigachat worker is init and start --- answerwc/to_client.go | 0 app/app.go | 0 clients/mailclient/client.go | 0 internal/app/app.go | 51 +++++++++++++++---- .../clients}/gigachat/client.go | 0 .../gigachatwc}/scheduler.go | 2 +- internal/initialize/config.go | 5 ++ main.go | 0 privilegewc/check.go | 0 tests/gigachat_test.go | 2 +- 10 files changed, 49 insertions(+), 11 deletions(-) delete mode 100644 answerwc/to_client.go delete mode 100644 app/app.go delete mode 100644 clients/mailclient/client.go rename {clients => internal/clients}/gigachat/client.go (100%) rename {gigachatwc => internal/gigachatwc}/scheduler.go (98%) delete mode 100644 main.go delete mode 100644 privilegewc/check.go diff --git a/answerwc/to_client.go b/answerwc/to_client.go deleted file mode 100644 index e69de29..0000000 diff --git a/app/app.go b/app/app.go deleted file mode 100644 index e69de29..0000000 diff --git a/clients/mailclient/client.go b/clients/mailclient/client.go deleted file mode 100644 index e69de29..0000000 diff --git a/internal/app/app.go b/internal/app/app.go index 1427fa0..04c0069 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -2,16 +2,20 @@ package app import ( "context" + "crypto/tls" "errors" "gitea.pena/PenaSide/hlog" "gitea.pena/SQuiz/common/dal" - answerwc2 "gitea.pena/SQuiz/worker/internal/answerwc" + "gitea.pena/SQuiz/worker/internal/answerwc" + "gitea.pena/SQuiz/worker/internal/clients/gigachat" + "gitea.pena/SQuiz/worker/internal/gigachatwc" "gitea.pena/SQuiz/worker/internal/initialize" - privilegewc2 "gitea.pena/SQuiz/worker/internal/privilegewc" - senders2 "gitea.pena/SQuiz/worker/internal/senders" + "gitea.pena/SQuiz/worker/internal/privilegewc" + "gitea.pena/SQuiz/worker/internal/senders" "gitea.pena/SQuiz/worker/internal/workers/shortstat" "gitea.pena/SQuiz/worker/internal/workers/timeout" "gitea.pena/SQuiz/worker/pkg/closer" + "github.com/go-resty/resty/v2" "go.uber.org/zap" "time" ) @@ -87,8 +91,8 @@ func New(ctx context.Context, cfg initialize.Config, build Build) error { // fmt.Println(err) // return nil, err // } - mailSender := senders2.NewMailLeadSender(clients.MailClient) - leadSenders := []senders2.LeadSender{mailSender /* , tgSender */} + mailSender := senders.NewMailLeadSender(clients.MailClient) + leadSenders := []senders.LeadSender{mailSender /* , tgSender */} pgdal, err := dal.New(ctx, cfg.PostgresURL, minioClient) if err != nil { @@ -96,7 +100,7 @@ func New(ctx context.Context, cfg initialize.Config, build Build) error { return err } - kafkaWorker, err := privilegewc2.NewKafkaConsumerWorker(privilegewc2.Config{ + kafkaWorker, err := privilegewc.NewKafkaConsumerWorker(privilegewc.Config{ KafkaBroker: cfg.KafkaBrokers, KafkaTopic: cfg.KafkaTopicTariff, ServiceKey: cfg.ServiceName, @@ -108,7 +112,7 @@ func New(ctx context.Context, cfg initialize.Config, build Build) error { return err } - checkWorker := privilegewc2.NewCheckWorker(privilegewc2.Deps{ + checkWorker := privilegewc.NewCheckWorker(privilegewc.Deps{ PrivilegeIDsDays: []string{"quizUnlimTime", "squizHideBadge"}, PrivilegeIDsCount: []string{"quizCnt", "quizManual"}, TickerInterval: time.Minute, @@ -119,14 +123,14 @@ func New(ctx context.Context, cfg initialize.Config, build Build) error { go kafkaWorker.Start(ctx) go checkWorker.Start(ctx) - toClientWorker := answerwc2.NewSendToClient(answerwc2.DepsSendToClient{ + toClientWorker := answerwc.NewSendToClient(answerwc.DepsSendToClient{ Redis: redisClient, Dal: pgdal, LeadSenders: leadSenders, CustomerService: clients.CustomerClient, }, errChan) - toRespWorker := answerwc2.NewRespWorker(answerwc2.DepsRespWorker{ + toRespWorker := answerwc.NewRespWorker(answerwc.DepsRespWorker{ Redis: redisClient, Dal: pgdal, MailClient: mailSender, @@ -135,6 +139,35 @@ func New(ctx context.Context, cfg initialize.Config, build Build) error { go toClientWorker.Start(ctx) go toRespWorker.Start(ctx) + gigaChatClient, err := gigachat.NewGigaChatClient(ctx, gigachat.Deps{ + Logger: zapLogger, + Client: resty.New().SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}), + BaseURL: cfg.GigaChatApiBaseURL, + AuthKey: cfg.GigaChatApiAuthKey, + RedisClient: redisClient, + }) + if err != nil { + zapLogger.Error("failed init giga chat client", zap.Error(err)) + return err + } + // метод для обновления токенов гигачата + go gigaChatClient.TokenResearch(ctx) + + gigaChatWorker, err := gigachatwc.NewGigaChatTaskScheduler(gigachatwc.Deps{ + KafkaBrokers: cfg.KafkaBrokers, + KafkaTopic: cfg.KafkaTopicGigaChat, + KafkaGroup: cfg.KafkaGroupGigaChat, + GigaChatClient: gigaChatClient, + Logger: zapLogger, + Dal: pgdal, + }) + if err != nil { + zapLogger.Error("failed init giga chat task worker", zap.Error(err)) + return err + } + + go gigaChatWorker.Start(ctx) + tow := timeout.New(pgdal, time.Minute) statW := shortstat.New(pgdal, 5*time.Minute) tow.ExposeErr(ctx, &workerErr) diff --git a/clients/gigachat/client.go b/internal/clients/gigachat/client.go similarity index 100% rename from clients/gigachat/client.go rename to internal/clients/gigachat/client.go diff --git a/gigachatwc/scheduler.go b/internal/gigachatwc/scheduler.go similarity index 98% rename from gigachatwc/scheduler.go rename to internal/gigachatwc/scheduler.go index 7ceaa3b..6a0c8e2 100644 --- a/gigachatwc/scheduler.go +++ b/internal/gigachatwc/scheduler.go @@ -6,7 +6,7 @@ import ( "errors" "gitea.pena/SQuiz/common/dal" "gitea.pena/SQuiz/common/model" - "gitea.pena/SQuiz/worker/clients/gigachat" + "gitea.pena/SQuiz/worker/internal/clients/gigachat" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/zap" "time" diff --git a/internal/initialize/config.go b/internal/initialize/config.go index 94d5c2e..3d92d2b 100644 --- a/internal/initialize/config.go +++ b/internal/initialize/config.go @@ -24,6 +24,11 @@ type Config struct { ApiUrl string `env:"API_URL" envDefault:"https://api.smtp.bz/v1/smtp/send"` CustomerMicroserviceRPCURL string `env:"CUSTOMER_MICROSERVICE_RPC_URL" envDefault:"localhost:9001"` TelegramToken string `env:"TELEGRAM_TOKEN"` + + KafkaTopicGigaChat string `env:"KAFKA_TOPIC_GIGA_CHAT"` + KafkaGroupGigaChat string `env:"KAFKA_GROUP_GIGA_CHAT" default:"gigachat"` + GigaChatApiBaseURL string `env:"GIGA_CHAT_API_BASE_URL"` + GigaChatApiAuthKey string `env:"GIGA_CHAT_API_AUTH_KEY"` } func LoadConfig() (*Config, error) { diff --git a/main.go b/main.go deleted file mode 100644 index e69de29..0000000 diff --git a/privilegewc/check.go b/privilegewc/check.go deleted file mode 100644 index e69de29..0000000 diff --git a/tests/gigachat_test.go b/tests/gigachat_test.go index 5dab529..7d43c9d 100644 --- a/tests/gigachat_test.go +++ b/tests/gigachat_test.go @@ -5,7 +5,7 @@ import ( "crypto/tls" "fmt" "gitea.pena/SQuiz/common/model" - "gitea.pena/SQuiz/worker/clients/gigachat" + "gitea.pena/SQuiz/worker/internal/clients/gigachat" "github.com/go-redis/redis/v8" "github.com/go-resty/resty/v2" "go.uber.org/zap"