worker/internal/gigachatwc/scheduler.go
skeris 525d54645c
All checks were successful
Deploy / CreateImage (push) Successful in 2m23s
Deploy / DeployService (push) Successful in 29s
debug: gigachat won't create questions
2025-06-03 21:00:05 +03:00

151 lines
3.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package gigachatwc
import (
"context"
"encoding/json"
"errors"
"gitea.pena/SQuiz/common/dal"
"gitea.pena/SQuiz/common/model"
"gitea.pena/SQuiz/worker/internal/clients/gigachat"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"time"
"fmt"
)
type GigaChatTaskScheduler struct {
kafkaClient *kgo.Client
gigaChatClient *gigachat.GigaChatClient
logger *zap.Logger
dal *dal.DAL
}
type Deps struct {
KafkaBrokers string
KafkaTopic string
KafkaGroup string
GigaChatClient *gigachat.GigaChatClient
Logger *zap.Logger
Dal *dal.DAL
}
func NewGigaChatTaskScheduler(deps Deps) (*GigaChatTaskScheduler, error) {
fmt.Println("GGC NEW", deps.KafkaBrokers, deps.KafkaTopic)
client, err := kgo.NewClient(
kgo.SeedBrokers(deps.KafkaBrokers),
kgo.ConsumeTopics(deps.KafkaTopic),
kgo.ConsumerGroup("gigachat1"),
kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().UnixMilli())),
)
if err != nil {
fmt.Println("GCCER", err)
return nil, err
}
return &GigaChatTaskScheduler{
kafkaClient: client,
gigaChatClient: deps.GigaChatClient,
logger: deps.Logger,
dal: deps.Dal,
}, nil
}
type MessageGigaChat struct {
ID int64 `json:"id"`
QuizID int64 `json:"quiz_id"`
Sex bool `json:"sex"`
Age string `json:"age"`
}
type UpdJsonQuestionData struct {
Title string `json:"title"`
Description string `json:"description"`
}
func (r *GigaChatTaskScheduler) Start(ctx context.Context) {
ticker := time.NewTicker(3 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Println("FETCHED GGC")
r.fetchMessages(ctx)
case <-ctx.Done():
fmt.Println("GCC DONE")
return
}
}
}
func (r *GigaChatTaskScheduler) fetchMessages(ctx context.Context) {
fetches := r.kafkaClient.PollFetches(ctx)
fmt.Println("GGC CONS", fetches)
iter := fetches.RecordIter()
for !iter.Done() {
record := iter.Next()
var msg MessageGigaChat
if err := json.Unmarshal(record.Value, &msg); err != nil {
r.logger.Error("failed to unmarshal kafka message", zap.ByteString("value", record.Value), zap.Error(err))
continue
}
fmt.Println("GGC MES", msg)
r.handleMessage(ctx, msg)
}
}
func (r *GigaChatTaskScheduler) handleMessage(ctx context.Context, msg MessageGigaChat) {
listQuestions, _, err := r.dal.QuestionRepo.GetQuestionList(ctx, 100_000, 0, 0, 0, uint64(msg.QuizID), false, false, "", "", 0)
if err != nil {
r.logger.Error("failed to get question list", zap.Error(err))
return
}
audience := model.GigaChatAudience{
ID: msg.ID,
QuizID: msg.QuizID,
Sex: msg.Sex,
Age: msg.Age,
}
for _, question := range listQuestions {
var (
updJson string
resp UpdJsonQuestionData
)
// вот дилемма сколько попыток нам нужно? может ли гигачат отлючить нам некоторые ограничения
// можем ли мы скипать вопросы с которыми у нас произошла ошибка
for i := 0; i < 10; i++ {
updJson, err = r.gigaChatClient.SendMsg(ctx, audience, question)
fmt.Println("GGC ANS", updJson, err)
if err == nil {
if err = json.Unmarshal([]byte(updJson), &resp); err == nil {
question.Title = resp.Title
question.Description = resp.Description
break
}
}
if errors.Is(err, model.EmptyResponseErrorGigaChat) {
r.logger.Warn("empty response from GigaChat, retrying...", zap.Int("attempt", i+1), zap.Uint64("question_id", question.Id))
continue
}
r.logger.Error("failed to send message to GigaChat", zap.Error(err))
break
}
question.Auditory = msg.ID
_, err = r.dal.QuestionRepo.CreateQuestion(ctx, &question)
if err != nil {
r.logger.Error("failed to create updated question", zap.Error(err))
continue
}
}
}