worker/internal/gigachatwc/scheduler.go
pasha1coil 130304d8ae
All checks were successful
Deploy / CreateImage (push) Successful in 2m44s
Deploy / ValidateConfig (push) Successful in 29s
Deploy / MigrateDatabase (push) Successful in 48s
Deploy / DeployService (push) Successful in 27s
added logic for update privilege gigachat count
2025-06-09 15:36:46 +03:00

169 lines
4.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package gigachatwc
import (
"context"
"encoding/json"
"errors"
"fmt"
"gitea.pena/SQuiz/common/dal"
"gitea.pena/SQuiz/common/model"
"gitea.pena/SQuiz/worker/internal/clients/gigachat"
"gitea.pena/SQuiz/worker/internal/wctools"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"time"
)
type GigaChatTaskScheduler struct {
kafkaClient *kgo.Client
gigaChatClient *gigachat.GigaChatClient
logger *zap.Logger
dal *dal.DAL
}
type Deps struct {
KafkaBrokers string
KafkaTopic string
KafkaGroup string
GigaChatClient *gigachat.GigaChatClient
Logger *zap.Logger
Dal *dal.DAL
}
func NewGigaChatTaskScheduler(deps Deps) (*GigaChatTaskScheduler, error) {
fmt.Println("GGC NEW", deps.KafkaBrokers, deps.KafkaTopic)
client, err := kgo.NewClient(
kgo.SeedBrokers(deps.KafkaBrokers),
kgo.ConsumeTopics(deps.KafkaTopic),
kgo.ConsumerGroup("gigachat1"),
kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().UnixMilli())),
)
if err != nil {
fmt.Println("GCCER", err)
return nil, err
}
return &GigaChatTaskScheduler{
kafkaClient: client,
gigaChatClient: deps.GigaChatClient,
logger: deps.Logger,
dal: deps.Dal,
}, nil
}
type MessageGigaChat struct {
ID int64 `json:"id"`
QuizID int64 `json:"quiz_id"`
Sex int32 `json:"sex"` // 0 - female, 1 - male, 2 - not sex
Age string `json:"age"`
AccountID string `json:"account_id"`
}
type UpdJsonQuestionData struct {
Title string `json:"title"`
Description string `json:"description"`
}
func (r *GigaChatTaskScheduler) Start(ctx context.Context) {
ticker := time.NewTicker(3 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Println("FETCHED GGC")
r.fetchMessages(ctx)
case <-ctx.Done():
fmt.Println("GCC DONE")
return
}
}
}
func (r *GigaChatTaskScheduler) fetchMessages(ctx context.Context) {
fetches := r.kafkaClient.PollFetches(ctx)
fmt.Println("GGC CONS", fetches)
iter := fetches.RecordIter()
for !iter.Done() {
record := iter.Next()
var msg MessageGigaChat
if err := json.Unmarshal(record.Value, &msg); err != nil {
r.logger.Error("failed to unmarshal kafka message", zap.ByteString("value", record.Value), zap.Error(err))
continue
}
fmt.Println("GGC MES", msg)
r.handleMessage(ctx, msg)
}
}
func (r *GigaChatTaskScheduler) handleMessage(ctx context.Context, msg MessageGigaChat) {
listQuestions, _, err := r.dal.QuestionRepo.GetQuestionList(ctx, 100_000, 0, 0, 0, uint64(msg.QuizID), false, false, "", "", 0)
if err != nil {
r.logger.Error("failed to get question list", zap.Error(err))
return
}
audience := model.GigaChatAudience{
ID: msg.ID,
QuizID: msg.QuizID,
Sex: msg.Sex,
Age: msg.Age,
}
for _, question := range listQuestions {
var (
updJson string
resp UpdJsonQuestionData
)
// вот дилемма сколько попыток нам нужно? может ли гигачат отлючить нам некоторые ограничения
// можем ли мы скипать вопросы с которыми у нас произошла ошибка
for i := 0; i < 10; i++ {
updJson, err = r.gigaChatClient.SendMsg(ctx, audience, question)
fmt.Println("GGC ANS", updJson, err)
if err == nil {
if err = json.Unmarshal([]byte(updJson), &resp); err == nil {
question.Title = resp.Title
question.Description = resp.Description
break
}
}
if errors.Is(err, model.EmptyResponseErrorGigaChat) {
r.logger.Warn("empty response from GigaChat, retrying...", zap.Int("attempt", i+1), zap.Uint64("question_id", question.Id))
continue
}
r.logger.Error("failed to send message to GigaChat", zap.Error(err))
break
}
question.Auditory = msg.ID
_, err = r.dal.QuestionRepo.CreateQuestion(ctx, &question)
if err != nil {
r.logger.Error("failed to create updated question", zap.Error(err))
continue
}
_, privileges, err := r.dal.AccountRepo.GetAccAndPrivilegeByEmail(ctx, msg.AccountID)
if err != nil {
r.logger.Error("failed to get acc/privilege", zap.Error(err))
}
privilege := wctools.HasQuizGigaChatPrivilege(privileges)
if privilege != nil {
privilege.Amount--
err = r.dal.AccountRepo.UpdatePrivilegeAmount(ctx, privilege.ID, privilege.Amount)
if err != nil {
r.logger.Error("failed to update privilege", zap.Error(err))
}
continue
}
}
}