package gigachatwc import ( "context" "encoding/json" "errors" "gitea.pena/SQuiz/common/dal" "gitea.pena/SQuiz/common/model" "gitea.pena/SQuiz/worker/internal/clients/gigachat" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/zap" "time" ) type GigaChatTaskScheduler struct { kafkaClient *kgo.Client gigaChatClient *gigachat.GigaChatClient logger *zap.Logger dal *dal.DAL } type Deps struct { KafkaBrokers string KafkaTopic string KafkaGroup string GigaChatClient *gigachat.GigaChatClient Logger *zap.Logger Dal *dal.DAL } func NewGigaChatTaskScheduler(deps Deps) (*GigaChatTaskScheduler, error) { client, err := kgo.NewClient( kgo.SeedBrokers(deps.KafkaBrokers), kgo.ConsumerGroup(deps.KafkaGroup), kgo.ConsumeTopics(deps.KafkaTopic), kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().UnixMilli())), ) if err != nil { return nil, err } return &GigaChatTaskScheduler{ kafkaClient: client, gigaChatClient: deps.GigaChatClient, logger: deps.Logger, dal: deps.Dal, }, nil } type MessageGigaChat struct { ID int64 `json:"id"` QuizID int64 `json:"quiz_id"` Sex bool `json:"sex"` Age string `json:"age"` } type UpdJsonQuestionData struct { Title string `json:"title"` Description string `json:"description"` } func (r *GigaChatTaskScheduler) Start(ctx context.Context) { ticker := time.NewTicker(3 * time.Minute) defer ticker.Stop() for { select { case <-ticker.C: r.fetchMessages(ctx) case <-ctx.Done(): return } } } func (r *GigaChatTaskScheduler) fetchMessages(ctx context.Context) { fetches := r.kafkaClient.PollFetches(ctx) iter := fetches.RecordIter() for !iter.Done() { record := iter.Next() var msg MessageGigaChat if err := json.Unmarshal(record.Value, &msg); err != nil { r.logger.Error("failed to unmarshal kafka message", zap.ByteString("value", record.Value), zap.Error(err)) continue } r.handleMessage(ctx, msg) } } func (r *GigaChatTaskScheduler) handleMessage(ctx context.Context, msg MessageGigaChat) { listQuestions, _, err := r.dal.QuestionRepo.GetQuestionList(ctx, 100_000, 0, 0, 0, uint64(msg.QuizID), false, false, "", "", 0) if err != nil { r.logger.Error("failed to get question list", zap.Error(err)) return } audience := model.GigaChatAudience{ ID: msg.ID, QuizID: msg.QuizID, Sex: msg.Sex, Age: msg.Age, } for _, question := range listQuestions { var ( updJson string resp UpdJsonQuestionData ) // вот дилемма сколько попыток нам нужно? может ли гигачат отлючить нам некоторые ограничения // можем ли мы скипать вопросы с которыми у нас произошла ошибка for i := 0; i < 10; i++ { updJson, err = r.gigaChatClient.SendMsg(ctx, audience, question) if err == nil { if err = json.Unmarshal([]byte(updJson), &resp); err == nil { question.Title = resp.Title question.Description = resp.Description break } } if errors.Is(err, model.EmptyResponseErrorGigaChat) { r.logger.Warn("empty response from GigaChat, retrying...", zap.Int("attempt", i+1), zap.Uint64("question_id", question.Id)) continue } r.logger.Error("failed to send message to GigaChat", zap.Error(err)) break } question.Auditory = msg.ID _, err = r.dal.QuestionRepo.CreateQuestion(ctx, &question) if err != nil { r.logger.Error("failed to create updated question", zap.Error(err)) continue } } }