worker/internal/gigachatwc/scheduler.go
skeris 00d607cd18
Some checks failed
Deploy / DeployService (push) Has been skipped
Deploy / CreateImage (push) Failing after 34s
debug: gigachat won't create questions
2025-06-03 01:54:48 +03:00

146 lines
3.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package gigachatwc
import (
"context"
"encoding/json"
"errors"
"gitea.pena/SQuiz/common/dal"
"gitea.pena/SQuiz/common/model"
"gitea.pena/SQuiz/worker/internal/clients/gigachat"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"time"
"fmt"
)
type GigaChatTaskScheduler struct {
kafkaClient *kgo.Client
gigaChatClient *gigachat.GigaChatClient
logger *zap.Logger
dal *dal.DAL
}
type Deps struct {
KafkaBrokers string
KafkaTopic string
KafkaGroup string
GigaChatClient *gigachat.GigaChatClient
Logger *zap.Logger
Dal *dal.DAL
}
func NewGigaChatTaskScheduler(deps Deps) (*GigaChatTaskScheduler, error) {
client, err := kgo.NewClient(
kgo.SeedBrokers(deps.KafkaBrokers),
kgo.ConsumerGroup(deps.KafkaGroup),
kgo.ConsumeTopics(deps.KafkaTopic),
kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().UnixMilli())),
)
if err != nil {
return nil, err
}
return &GigaChatTaskScheduler{
kafkaClient: client,
gigaChatClient: deps.GigaChatClient,
logger: deps.Logger,
dal: deps.Dal,
}, nil
}
type MessageGigaChat struct {
ID int64 `json:"id"`
QuizID int64 `json:"quiz_id"`
Sex bool `json:"sex"`
Age string `json:"age"`
}
type UpdJsonQuestionData struct {
Title string `json:"title"`
Description string `json:"description"`
}
func (r *GigaChatTaskScheduler) Start(ctx context.Context) {
ticker := time.NewTicker(3 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Println("FETCHED GGC")
r.fetchMessages(ctx)
case <-ctx.Done():
return
}
}
}
func (r *GigaChatTaskScheduler) fetchMessages(ctx context.Context) {
fetches := r.kafkaClient.PollFetches(ctx)
iter := fetches.RecordIter()
for !iter.Done() {
record := iter.Next()
var msg MessageGigaChat
if err := json.Unmarshal(record.Value, &msg); err != nil {
r.logger.Error("failed to unmarshal kafka message", zap.ByteString("value", record.Value), zap.Error(err))
continue
}
fmt.Println("GGC MES", msg)
r.handleMessage(ctx, msg)
}
}
func (r *GigaChatTaskScheduler) handleMessage(ctx context.Context, msg MessageGigaChat) {
listQuestions, _, err := r.dal.QuestionRepo.GetQuestionList(ctx, 100_000, 0, 0, 0, uint64(msg.QuizID), false, false, "", "", 0)
if err != nil {
r.logger.Error("failed to get question list", zap.Error(err))
return
}
audience := model.GigaChatAudience{
ID: msg.ID,
QuizID: msg.QuizID,
Sex: msg.Sex,
Age: msg.Age,
}
for _, question := range listQuestions {
var (
updJson string
resp UpdJsonQuestionData
)
// вот дилемма сколько попыток нам нужно? может ли гигачат отлючить нам некоторые ограничения
// можем ли мы скипать вопросы с которыми у нас произошла ошибка
for i := 0; i < 10; i++ {
updJson, err = r.gigaChatClient.SendMsg(ctx, audience, question)
fmt.Println("GGC ANS", updJson, err)
if err == nil {
if err = json.Unmarshal([]byte(updJson), &resp); err == nil {
question.Title = resp.Title
question.Description = resp.Description
break
}
}
if errors.Is(err, model.EmptyResponseErrorGigaChat) {
r.logger.Warn("empty response from GigaChat, retrying...", zap.Int("attempt", i+1), zap.Uint64("question_id", question.Id))
continue
}
r.logger.Error("failed to send message to GigaChat", zap.Error(err))
break
}
question.Auditory = msg.ID
_, err = r.dal.QuestionRepo.CreateQuestion(ctx, &question)
if err != nil {
r.logger.Error("failed to create updated question", zap.Error(err))
continue
}
}
}