added notification logic if gigachat tokens run out
This commit is contained in:
parent
88f7f29259
commit
b3125d41be
@ -2,13 +2,11 @@ package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"gitea.pena/PenaSide/hlog"
|
||||
"gitea.pena/SQuiz/common/dal"
|
||||
"gitea.pena/SQuiz/worker/internal/answerwc"
|
||||
"gitea.pena/SQuiz/worker/internal/clients/gigachat"
|
||||
"gitea.pena/SQuiz/worker/internal/gigachatwc"
|
||||
"gitea.pena/SQuiz/worker/internal/initialize"
|
||||
"gitea.pena/SQuiz/worker/internal/privilegewc"
|
||||
@ -16,7 +14,6 @@ import (
|
||||
"gitea.pena/SQuiz/worker/internal/workers/shortstat"
|
||||
"gitea.pena/SQuiz/worker/internal/workers/timeout"
|
||||
"gitea.pena/SQuiz/worker/pkg/closer"
|
||||
"github.com/go-resty/resty/v2"
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
)
|
||||
@ -85,7 +82,11 @@ func New(ctx context.Context, cfg initialize.Config, build Build) error {
|
||||
return err
|
||||
}
|
||||
|
||||
clients := initialize.NewClients(cfg, zapLogger)
|
||||
clients, err := initialize.NewClients(ctx, cfg, zapLogger, redisClient)
|
||||
if err != nil {
|
||||
zapLogger.Error("failed init clients", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// tgSender, err := senders.NewTgSender(options.TgToken)
|
||||
// if err != nil {
|
||||
@ -140,26 +141,16 @@ func New(ctx context.Context, cfg initialize.Config, build Build) error {
|
||||
go toClientWorker.Start(ctx)
|
||||
go toRespWorker.Start(ctx)
|
||||
|
||||
gigaChatClient, err := gigachat.NewGigaChatClient(ctx, gigachat.Deps{
|
||||
Logger: zapLogger,
|
||||
Client: resty.New().SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}),
|
||||
BaseURL: cfg.GigaChatApiBaseURL,
|
||||
AuthKey: cfg.GigaChatApiAuthKey,
|
||||
RedisClient: redisClient,
|
||||
})
|
||||
if err != nil {
|
||||
zapLogger.Error("failed init giga chat client", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
// метод для обновления токенов гигачата
|
||||
go gigaChatClient.TokenResearch(ctx)
|
||||
go clients.GigaChatClient.TokenResearch(ctx)
|
||||
go clients.GigaChatClient.MonitorTokenBalance(ctx)
|
||||
|
||||
fmt.Println("INIT GGC WORKER", cfg.KafkaTopicGigaChat)
|
||||
gigaChatWorker, err := gigachatwc.NewGigaChatTaskScheduler(gigachatwc.Deps{
|
||||
KafkaBrokers: cfg.KafkaBrokers,
|
||||
KafkaTopic: cfg.KafkaTopicGigaChat,
|
||||
KafkaGroup: cfg.KafkaGroupGigaChat,
|
||||
GigaChatClient: gigaChatClient,
|
||||
GigaChatClient: clients.GigaChatClient,
|
||||
Logger: zapLogger,
|
||||
Dal: pgdal,
|
||||
})
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"gitea.pena/SQuiz/common/model"
|
||||
"gitea.pena/SQuiz/worker/internal/senders"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/go-resty/resty/v2"
|
||||
"github.com/google/uuid"
|
||||
@ -18,6 +19,8 @@ type Deps struct {
|
||||
BaseURL string
|
||||
AuthKey string
|
||||
RedisClient *redis.Client
|
||||
TgSender *senders.TgSender
|
||||
TgChatID int64
|
||||
}
|
||||
|
||||
type GigaChatClient struct {
|
||||
@ -26,6 +29,8 @@ type GigaChatClient struct {
|
||||
baseURL string
|
||||
authKey string
|
||||
redisClient *redis.Client
|
||||
tgSender *senders.TgSender
|
||||
tgChatID int64
|
||||
}
|
||||
|
||||
func NewGigaChatClient(ctx context.Context, deps Deps) (*GigaChatClient, error) {
|
||||
@ -35,6 +40,9 @@ func NewGigaChatClient(ctx context.Context, deps Deps) (*GigaChatClient, error)
|
||||
baseURL: deps.BaseURL,
|
||||
authKey: deps.AuthKey,
|
||||
redisClient: deps.RedisClient,
|
||||
|
||||
tgSender: deps.TgSender,
|
||||
tgChatID: deps.TgChatID,
|
||||
}
|
||||
|
||||
if err := client.updateToken(ctx); err != nil {
|
||||
@ -110,7 +118,7 @@ func (r *GigaChatClient) TokenResearch(ctx context.Context) {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
ttl, err := r.redisClient.TTL(ctx, "gigachat_token").Result()
|
||||
fmt.Println("GGCHATtoken", ttl, err, ttl<2*time.Minute)
|
||||
fmt.Println("GGCHATtoken", ttl, err, ttl < 2*time.Minute)
|
||||
if err != nil || ttl < 2*time.Minute {
|
||||
if err := r.updateToken(ctx); err != nil {
|
||||
r.logger.Error("failed to update GigaChat token", zap.Error(err))
|
||||
@ -148,7 +156,7 @@ func (r *GigaChatClient) updateToken(ctx context.Context) error {
|
||||
}
|
||||
|
||||
ttl := time.Until(time.Unix(int64(respData.ExpiresAt/1000), 0))
|
||||
fmt.Println("GGCTOKENEXP", respData.ExpiresAt, ttl, ttl<2*time.Minute, time.Now())
|
||||
fmt.Println("GGCTOKENEXP", respData.ExpiresAt, ttl, ttl < 2*time.Minute, time.Now())
|
||||
err = r.redisClient.Set(ctx, "gigachat_token", respData.AccessToken, ttl).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to save token to redis: %w", err)
|
||||
@ -156,3 +164,72 @@ func (r *GigaChatClient) updateToken(ctx context.Context) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *GigaChatClient) getBalance(ctx context.Context) (int, error) {
|
||||
var respData struct {
|
||||
Balance []struct {
|
||||
Usage string `json:"usage"`
|
||||
Value int `json:"value"`
|
||||
} `json:"balance"`
|
||||
}
|
||||
|
||||
token, err := r.redisClient.Get(ctx, "gigachat_token").Result()
|
||||
if err != nil {
|
||||
r.logger.Error("failed to get token from redis", zap.Error(err))
|
||||
return 0, err
|
||||
}
|
||||
|
||||
resp, err := r.client.R().
|
||||
SetHeader("Authorization", "Bearer "+token).
|
||||
SetResult(&respData).
|
||||
Get(r.baseURL + "/balance")
|
||||
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to fetch balance: %w", err)
|
||||
}
|
||||
if resp.IsError() {
|
||||
return 0, fmt.Errorf("balance request failed: %s", resp.Status())
|
||||
}
|
||||
|
||||
// прверяем то что используем для переформулирования
|
||||
for _, item := range respData.Balance {
|
||||
if item.Usage == "GigaChat-Max" {
|
||||
return item.Value, nil
|
||||
}
|
||||
}
|
||||
|
||||
return 0, errors.New("no used models found")
|
||||
}
|
||||
|
||||
func (r *GigaChatClient) MonitorTokenBalance(ctx context.Context) {
|
||||
ticker := time.NewTicker(5 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
alert := false // чтоб не спамить каждые 5 минут
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
balance, err := r.getBalance(ctx)
|
||||
if err != nil {
|
||||
r.logger.Error("failed to get GigaChat token", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
if balance < 500_000 && !alert {
|
||||
msg := fmt.Sprintf("Остаток токенов в GigaChat упал ниже 500000.\nТекущий баланс: %d токенов.", balance)
|
||||
if err := r.tgSender.SendMessage(r.tgChatID, msg); err != nil {
|
||||
r.logger.Error("failed to send Telegram alert", zap.Error(err))
|
||||
} else {
|
||||
alert = true
|
||||
}
|
||||
}
|
||||
|
||||
if balance >= 500_000 && alert {
|
||||
alert = false
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,17 +1,43 @@
|
||||
package initialize
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"gitea.pena/PenaSide/customer/pkg/customer_clients"
|
||||
"gitea.pena/SQuiz/common/clients"
|
||||
"gitea.pena/SQuiz/worker/internal/clients/gigachat"
|
||||
"gitea.pena/SQuiz/worker/internal/senders"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/go-resty/resty/v2"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type Clients struct {
|
||||
MailClient *clients.SmtpClient
|
||||
CustomerClient *customer_clients.CustomersClient
|
||||
GigaChatClient *gigachat.GigaChatClient
|
||||
}
|
||||
|
||||
func NewClients(cfg Config, logger *zap.Logger) *Clients {
|
||||
func NewClients(ctx context.Context, cfg Config, logger *zap.Logger, redisClient *redis.Client) (*Clients, error) {
|
||||
notifyTgClient, err := senders.NewTgSender(cfg.NotifyTelegramToken)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
gigaChatClient, err := gigachat.NewGigaChatClient(ctx, gigachat.Deps{
|
||||
Logger: logger,
|
||||
Client: resty.New().SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}),
|
||||
BaseURL: cfg.GigaChatApiBaseURL,
|
||||
AuthKey: cfg.GigaChatApiAuthKey,
|
||||
RedisClient: redisClient,
|
||||
|
||||
TgSender: notifyTgClient,
|
||||
TgChatID: cfg.NotifyChannelID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Clients{
|
||||
MailClient: clients.NewSmtpClient(clients.Deps{
|
||||
SmtpSender: cfg.Sender,
|
||||
@ -22,5 +48,6 @@ func NewClients(cfg Config, logger *zap.Logger) *Clients {
|
||||
Logger: logger,
|
||||
CustomerServiceHost: cfg.CustomerMicroserviceRPCURL,
|
||||
}),
|
||||
}
|
||||
GigaChatClient: gigaChatClient,
|
||||
}, nil
|
||||
}
|
||||
|
@ -29,6 +29,9 @@ type Config struct {
|
||||
KafkaGroupGigaChat string `env:"KAFKA_GROUP_GIGA_CHAT" default:"gigachat"`
|
||||
GigaChatApiBaseURL string `env:"GIGA_CHAT_API_BASE_URL"`
|
||||
GigaChatApiAuthKey string `env:"GIGA_CHAT_API_AUTH_KEY"`
|
||||
|
||||
NotifyTelegramToken string `env:"NOTIFY_TELEGRAM_TOKEN"`
|
||||
NotifyChannelID int64 `env:"NOTIFY_CHANNEL_ID"`
|
||||
}
|
||||
|
||||
func LoadConfig() (*Config, error) {
|
||||
|
@ -49,3 +49,11 @@ func (tg *TgSender) SendLead(data LeadData) error {
|
||||
func (tg *TgSender) Name() string {
|
||||
return "telegram"
|
||||
}
|
||||
|
||||
func (tg *TgSender) SendMessage(chatID int64, msg string) error {
|
||||
_, err := tg.bot.Send(telebot.ChatID(chatID), msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"gitea.pena/SQuiz/common/model"
|
||||
"gitea.pena/SQuiz/worker/internal/clients/gigachat"
|
||||
"gitea.pena/SQuiz/worker/internal/senders"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/go-resty/resty/v2"
|
||||
"go.uber.org/zap"
|
||||
@ -23,18 +24,27 @@ func TestGigachat(t *testing.T) {
|
||||
DB: 2,
|
||||
})
|
||||
|
||||
tgSender, err := senders.NewTgSender("6712573453:AAFqTOsgwe_j48ZQ1GzWKQDT5Nwr-SAWjz8")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
gigaChatClient, err := gigachat.NewGigaChatClient(ctx, gigachat.Deps{
|
||||
Logger: logger,
|
||||
Client: resty.New().SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}),
|
||||
BaseURL: "https://gigachat.devices.sberbank.ru/api/v1",
|
||||
AuthKey: "ZGM3MDY0ZjAtODM4Yi00ZTQ4LTgzMTgtZDA0ZDA3NmIwYzJjOjRkZWI4Y2NhLTc1YzUtNDg5ZS04YzY4LTVkNTdmMWU1YjU5Nw==",
|
||||
AuthKey: "Y2MzZWUxZDMtZGE5MC00ZTFjLWI5YzItM2ViMTZmMDM0YTkwOmY1NTlkOGM3LWUyNmQtNGUwMC1hODE0LTJlYjQ5NDA5ODdjMQ==",
|
||||
RedisClient: redisClient,
|
||||
|
||||
TgSender: tgSender,
|
||||
TgChatID: -1002217604546,
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
go gigaChatClient.TokenResearch(ctx)
|
||||
go gigaChatClient.MonitorTokenBalance(ctx)
|
||||
|
||||
result, err := gigaChatClient.SendMsg(ctx, model.GigaChatAudience{
|
||||
Sex: 1,
|
||||
|
Loading…
Reference in New Issue
Block a user