diff --git a/internal/app/app.go b/internal/app/app.go index 8848c35..ba0b360 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -2,13 +2,11 @@ package app import ( "context" - "crypto/tls" "errors" "fmt" "gitea.pena/PenaSide/hlog" "gitea.pena/SQuiz/common/dal" "gitea.pena/SQuiz/worker/internal/answerwc" - "gitea.pena/SQuiz/worker/internal/clients/gigachat" "gitea.pena/SQuiz/worker/internal/gigachatwc" "gitea.pena/SQuiz/worker/internal/initialize" "gitea.pena/SQuiz/worker/internal/privilegewc" @@ -16,7 +14,6 @@ import ( "gitea.pena/SQuiz/worker/internal/workers/shortstat" "gitea.pena/SQuiz/worker/internal/workers/timeout" "gitea.pena/SQuiz/worker/pkg/closer" - "github.com/go-resty/resty/v2" "go.uber.org/zap" "time" ) @@ -85,7 +82,11 @@ func New(ctx context.Context, cfg initialize.Config, build Build) error { return err } - clients := initialize.NewClients(cfg, zapLogger) + clients, err := initialize.NewClients(ctx, cfg, zapLogger, redisClient) + if err != nil { + zapLogger.Error("failed init clients", zap.Error(err)) + return err + } // tgSender, err := senders.NewTgSender(options.TgToken) // if err != nil { @@ -140,26 +141,16 @@ func New(ctx context.Context, cfg initialize.Config, build Build) error { go toClientWorker.Start(ctx) go toRespWorker.Start(ctx) - gigaChatClient, err := gigachat.NewGigaChatClient(ctx, gigachat.Deps{ - Logger: zapLogger, - Client: resty.New().SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}), - BaseURL: cfg.GigaChatApiBaseURL, - AuthKey: cfg.GigaChatApiAuthKey, - RedisClient: redisClient, - }) - if err != nil { - zapLogger.Error("failed init giga chat client", zap.Error(err)) - return err - } // метод для обновления токенов гигачата - go gigaChatClient.TokenResearch(ctx) + go clients.GigaChatClient.TokenResearch(ctx) + go clients.GigaChatClient.MonitorTokenBalance(ctx) fmt.Println("INIT GGC WORKER", cfg.KafkaTopicGigaChat) gigaChatWorker, err := gigachatwc.NewGigaChatTaskScheduler(gigachatwc.Deps{ KafkaBrokers: cfg.KafkaBrokers, KafkaTopic: cfg.KafkaTopicGigaChat, KafkaGroup: cfg.KafkaGroupGigaChat, - GigaChatClient: gigaChatClient, + GigaChatClient: clients.GigaChatClient, Logger: zapLogger, Dal: pgdal, }) diff --git a/internal/clients/gigachat/client.go b/internal/clients/gigachat/client.go index 2b55810..8a36280 100644 --- a/internal/clients/gigachat/client.go +++ b/internal/clients/gigachat/client.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "gitea.pena/SQuiz/common/model" + "gitea.pena/SQuiz/worker/internal/senders" "github.com/go-redis/redis/v8" "github.com/go-resty/resty/v2" "github.com/google/uuid" @@ -18,6 +19,8 @@ type Deps struct { BaseURL string AuthKey string RedisClient *redis.Client + TgSender *senders.TgSender + TgChatID int64 } type GigaChatClient struct { @@ -26,6 +29,8 @@ type GigaChatClient struct { baseURL string authKey string redisClient *redis.Client + tgSender *senders.TgSender + tgChatID int64 } func NewGigaChatClient(ctx context.Context, deps Deps) (*GigaChatClient, error) { @@ -35,6 +40,9 @@ func NewGigaChatClient(ctx context.Context, deps Deps) (*GigaChatClient, error) baseURL: deps.BaseURL, authKey: deps.AuthKey, redisClient: deps.RedisClient, + + tgSender: deps.TgSender, + tgChatID: deps.TgChatID, } if err := client.updateToken(ctx); err != nil { @@ -110,7 +118,7 @@ func (r *GigaChatClient) TokenResearch(ctx context.Context) { select { case <-ticker.C: ttl, err := r.redisClient.TTL(ctx, "gigachat_token").Result() - fmt.Println("GGCHATtoken", ttl, err, ttl<2*time.Minute) + fmt.Println("GGCHATtoken", ttl, err, ttl < 2*time.Minute) if err != nil || ttl < 2*time.Minute { if err := r.updateToken(ctx); err != nil { r.logger.Error("failed to update GigaChat token", zap.Error(err)) @@ -148,7 +156,7 @@ func (r *GigaChatClient) updateToken(ctx context.Context) error { } ttl := time.Until(time.Unix(int64(respData.ExpiresAt/1000), 0)) - fmt.Println("GGCTOKENEXP", respData.ExpiresAt, ttl, ttl<2*time.Minute, time.Now()) + fmt.Println("GGCTOKENEXP", respData.ExpiresAt, ttl, ttl < 2*time.Minute, time.Now()) err = r.redisClient.Set(ctx, "gigachat_token", respData.AccessToken, ttl).Err() if err != nil { return fmt.Errorf("failed to save token to redis: %w", err) @@ -156,3 +164,72 @@ func (r *GigaChatClient) updateToken(ctx context.Context) error { return nil } + +func (r *GigaChatClient) getBalance(ctx context.Context) (int, error) { + var respData struct { + Balance []struct { + Usage string `json:"usage"` + Value int `json:"value"` + } `json:"balance"` + } + + token, err := r.redisClient.Get(ctx, "gigachat_token").Result() + if err != nil { + r.logger.Error("failed to get token from redis", zap.Error(err)) + return 0, err + } + + resp, err := r.client.R(). + SetHeader("Authorization", "Bearer "+token). + SetResult(&respData). + Get(r.baseURL + "/balance") + + if err != nil { + return 0, fmt.Errorf("failed to fetch balance: %w", err) + } + if resp.IsError() { + return 0, fmt.Errorf("balance request failed: %s", resp.Status()) + } + + // прверяем то что используем для переформулирования + for _, item := range respData.Balance { + if item.Usage == "GigaChat-Max" { + return item.Value, nil + } + } + + return 0, errors.New("no used models found") +} + +func (r *GigaChatClient) MonitorTokenBalance(ctx context.Context) { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + alert := false // чтоб не спамить каждые 5 минут + + for { + select { + case <-ticker.C: + balance, err := r.getBalance(ctx) + if err != nil { + r.logger.Error("failed to get GigaChat token", zap.Error(err)) + continue + } + + if balance < 500_000 && !alert { + msg := fmt.Sprintf("Остаток токенов в GigaChat упал ниже 500000.\nТекущий баланс: %d токенов.", balance) + if err := r.tgSender.SendMessage(r.tgChatID, msg); err != nil { + r.logger.Error("failed to send Telegram alert", zap.Error(err)) + } else { + alert = true + } + } + + if balance >= 500_000 && alert { + alert = false + } + case <-ctx.Done(): + return + } + } +} diff --git a/internal/initialize/clients.go b/internal/initialize/clients.go index 47bbf31..ee1c351 100644 --- a/internal/initialize/clients.go +++ b/internal/initialize/clients.go @@ -1,17 +1,43 @@ package initialize import ( + "context" + "crypto/tls" "gitea.pena/PenaSide/customer/pkg/customer_clients" "gitea.pena/SQuiz/common/clients" + "gitea.pena/SQuiz/worker/internal/clients/gigachat" + "gitea.pena/SQuiz/worker/internal/senders" + "github.com/go-redis/redis/v8" + "github.com/go-resty/resty/v2" "go.uber.org/zap" ) type Clients struct { MailClient *clients.SmtpClient CustomerClient *customer_clients.CustomersClient + GigaChatClient *gigachat.GigaChatClient } -func NewClients(cfg Config, logger *zap.Logger) *Clients { +func NewClients(ctx context.Context, cfg Config, logger *zap.Logger, redisClient *redis.Client) (*Clients, error) { + notifyTgClient, err := senders.NewTgSender(cfg.NotifyTelegramToken) + if err != nil { + return nil, err + } + + gigaChatClient, err := gigachat.NewGigaChatClient(ctx, gigachat.Deps{ + Logger: logger, + Client: resty.New().SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}), + BaseURL: cfg.GigaChatApiBaseURL, + AuthKey: cfg.GigaChatApiAuthKey, + RedisClient: redisClient, + + TgSender: notifyTgClient, + TgChatID: cfg.NotifyChannelID, + }) + if err != nil { + return nil, err + } + return &Clients{ MailClient: clients.NewSmtpClient(clients.Deps{ SmtpSender: cfg.Sender, @@ -22,5 +48,6 @@ func NewClients(cfg Config, logger *zap.Logger) *Clients { Logger: logger, CustomerServiceHost: cfg.CustomerMicroserviceRPCURL, }), - } + GigaChatClient: gigaChatClient, + }, nil } diff --git a/internal/initialize/config.go b/internal/initialize/config.go index 3d92d2b..e3078e8 100644 --- a/internal/initialize/config.go +++ b/internal/initialize/config.go @@ -29,6 +29,9 @@ type Config struct { KafkaGroupGigaChat string `env:"KAFKA_GROUP_GIGA_CHAT" default:"gigachat"` GigaChatApiBaseURL string `env:"GIGA_CHAT_API_BASE_URL"` GigaChatApiAuthKey string `env:"GIGA_CHAT_API_AUTH_KEY"` + + NotifyTelegramToken string `env:"NOTIFY_TELEGRAM_TOKEN"` + NotifyChannelID int64 `env:"NOTIFY_CHANNEL_ID"` } func LoadConfig() (*Config, error) { diff --git a/internal/senders/tg_sender.go b/internal/senders/tg_sender.go index a2c5a5c..1b54253 100644 --- a/internal/senders/tg_sender.go +++ b/internal/senders/tg_sender.go @@ -49,3 +49,11 @@ func (tg *TgSender) SendLead(data LeadData) error { func (tg *TgSender) Name() string { return "telegram" } + +func (tg *TgSender) SendMessage(chatID int64, msg string) error { + _, err := tg.bot.Send(telebot.ChatID(chatID), msg) + if err != nil { + return err + } + return nil +} diff --git a/tests/gigachat_test.go b/tests/gigachat_test.go index f0f4cca..0ede42c 100644 --- a/tests/gigachat_test.go +++ b/tests/gigachat_test.go @@ -6,6 +6,7 @@ import ( "fmt" "gitea.pena/SQuiz/common/model" "gitea.pena/SQuiz/worker/internal/clients/gigachat" + "gitea.pena/SQuiz/worker/internal/senders" "github.com/go-redis/redis/v8" "github.com/go-resty/resty/v2" "go.uber.org/zap" @@ -23,18 +24,27 @@ func TestGigachat(t *testing.T) { DB: 2, }) + tgSender, err := senders.NewTgSender("6712573453:AAFqTOsgwe_j48ZQ1GzWKQDT5Nwr-SAWjz8") + if err != nil { + panic(err) + } + gigaChatClient, err := gigachat.NewGigaChatClient(ctx, gigachat.Deps{ Logger: logger, Client: resty.New().SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}), BaseURL: "https://gigachat.devices.sberbank.ru/api/v1", - AuthKey: "ZGM3MDY0ZjAtODM4Yi00ZTQ4LTgzMTgtZDA0ZDA3NmIwYzJjOjRkZWI4Y2NhLTc1YzUtNDg5ZS04YzY4LTVkNTdmMWU1YjU5Nw==", + AuthKey: "Y2MzZWUxZDMtZGE5MC00ZTFjLWI5YzItM2ViMTZmMDM0YTkwOmY1NTlkOGM3LWUyNmQtNGUwMC1hODE0LTJlYjQ5NDA5ODdjMQ==", RedisClient: redisClient, + + TgSender: tgSender, + TgChatID: -1002217604546, }) if err != nil { panic(err) } go gigaChatClient.TokenResearch(ctx) + go gigaChatClient.MonitorTokenBalance(ctx) result, err := gigaChatClient.SendMsg(ctx, model.GigaChatAudience{ Sex: 1,