2025-05-10 14:11:35 +00:00
|
|
|
|
package gigachatwc
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"errors"
|
2025-06-06 14:27:43 +00:00
|
|
|
|
"fmt"
|
2025-05-10 14:11:35 +00:00
|
|
|
|
"gitea.pena/SQuiz/common/dal"
|
|
|
|
|
"gitea.pena/SQuiz/common/model"
|
2025-05-16 08:33:04 +00:00
|
|
|
|
"gitea.pena/SQuiz/worker/internal/clients/gigachat"
|
2025-05-10 14:11:35 +00:00
|
|
|
|
"github.com/twmb/franz-go/pkg/kgo"
|
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type GigaChatTaskScheduler struct {
|
|
|
|
|
kafkaClient *kgo.Client
|
|
|
|
|
gigaChatClient *gigachat.GigaChatClient
|
|
|
|
|
logger *zap.Logger
|
|
|
|
|
dal *dal.DAL
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type Deps struct {
|
|
|
|
|
KafkaBrokers string
|
|
|
|
|
KafkaTopic string
|
|
|
|
|
KafkaGroup string
|
|
|
|
|
GigaChatClient *gigachat.GigaChatClient
|
|
|
|
|
Logger *zap.Logger
|
|
|
|
|
Dal *dal.DAL
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewGigaChatTaskScheduler(deps Deps) (*GigaChatTaskScheduler, error) {
|
2025-06-06 14:27:43 +00:00
|
|
|
|
fmt.Println("GGC NEW", deps.KafkaBrokers, deps.KafkaTopic)
|
2025-05-10 14:11:35 +00:00
|
|
|
|
client, err := kgo.NewClient(
|
|
|
|
|
kgo.SeedBrokers(deps.KafkaBrokers),
|
|
|
|
|
kgo.ConsumeTopics(deps.KafkaTopic),
|
2025-06-06 14:27:43 +00:00
|
|
|
|
kgo.ConsumerGroup("gigachat1"),
|
2025-05-10 14:11:35 +00:00
|
|
|
|
kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().UnixMilli())),
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
2025-06-06 14:27:43 +00:00
|
|
|
|
fmt.Println("GCCER", err)
|
2025-05-10 14:11:35 +00:00
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &GigaChatTaskScheduler{
|
|
|
|
|
kafkaClient: client,
|
|
|
|
|
gigaChatClient: deps.GigaChatClient,
|
|
|
|
|
logger: deps.Logger,
|
|
|
|
|
dal: deps.Dal,
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type MessageGigaChat struct {
|
2025-06-09 12:36:46 +00:00
|
|
|
|
ID int64 `json:"id"`
|
|
|
|
|
QuizID int64 `json:"quiz_id"`
|
|
|
|
|
Sex int32 `json:"sex"` // 0 - female, 1 - male, 2 - not sex
|
|
|
|
|
Age string `json:"age"`
|
|
|
|
|
AccountID string `json:"account_id"`
|
2025-05-10 14:11:35 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type UpdJsonQuestionData struct {
|
|
|
|
|
Title string `json:"title"`
|
|
|
|
|
Description string `json:"description"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *GigaChatTaskScheduler) Start(ctx context.Context) {
|
|
|
|
|
ticker := time.NewTicker(3 * time.Minute)
|
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ticker.C:
|
2025-06-06 14:27:43 +00:00
|
|
|
|
fmt.Println("FETCHED GGC")
|
2025-05-10 14:11:35 +00:00
|
|
|
|
r.fetchMessages(ctx)
|
|
|
|
|
case <-ctx.Done():
|
2025-06-06 14:27:43 +00:00
|
|
|
|
fmt.Println("GCC DONE")
|
2025-05-10 14:11:35 +00:00
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *GigaChatTaskScheduler) fetchMessages(ctx context.Context) {
|
|
|
|
|
fetches := r.kafkaClient.PollFetches(ctx)
|
2025-06-02 22:16:09 +00:00
|
|
|
|
|
2025-06-06 14:27:43 +00:00
|
|
|
|
fmt.Println("GGC CONS", fetches)
|
2025-05-10 14:11:35 +00:00
|
|
|
|
iter := fetches.RecordIter()
|
|
|
|
|
|
|
|
|
|
for !iter.Done() {
|
|
|
|
|
record := iter.Next()
|
|
|
|
|
|
|
|
|
|
var msg MessageGigaChat
|
|
|
|
|
if err := json.Unmarshal(record.Value, &msg); err != nil {
|
|
|
|
|
r.logger.Error("failed to unmarshal kafka message", zap.ByteString("value", record.Value), zap.Error(err))
|
|
|
|
|
continue
|
|
|
|
|
}
|
2025-06-06 14:27:43 +00:00
|
|
|
|
fmt.Println("GGC MES", msg)
|
2025-05-10 14:11:35 +00:00
|
|
|
|
|
|
|
|
|
r.handleMessage(ctx, msg)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *GigaChatTaskScheduler) handleMessage(ctx context.Context, msg MessageGigaChat) {
|
2025-05-14 12:58:58 +00:00
|
|
|
|
listQuestions, _, err := r.dal.QuestionRepo.GetQuestionList(ctx, 100_000, 0, 0, 0, uint64(msg.QuizID), false, false, "", "", 0)
|
2025-05-10 14:11:35 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
r.logger.Error("failed to get question list", zap.Error(err))
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
audience := model.GigaChatAudience{
|
|
|
|
|
ID: msg.ID,
|
|
|
|
|
QuizID: msg.QuizID,
|
|
|
|
|
Sex: msg.Sex,
|
|
|
|
|
Age: msg.Age,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, question := range listQuestions {
|
|
|
|
|
var (
|
|
|
|
|
updJson string
|
|
|
|
|
resp UpdJsonQuestionData
|
|
|
|
|
)
|
2025-05-14 11:19:08 +00:00
|
|
|
|
// вот дилемма сколько попыток нам нужно? может ли гигачат отлючить нам некоторые ограничения
|
|
|
|
|
// можем ли мы скипать вопросы с которыми у нас произошла ошибка
|
2025-05-10 14:11:35 +00:00
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
|
|
updJson, err = r.gigaChatClient.SendMsg(ctx, audience, question)
|
2025-06-06 14:27:43 +00:00
|
|
|
|
fmt.Println("GGC ANS", updJson, err)
|
2025-05-10 14:11:35 +00:00
|
|
|
|
if err == nil {
|
2025-05-14 11:48:19 +00:00
|
|
|
|
if err = json.Unmarshal([]byte(updJson), &resp); err == nil {
|
|
|
|
|
question.Title = resp.Title
|
|
|
|
|
question.Description = resp.Description
|
|
|
|
|
break
|
|
|
|
|
}
|
2025-05-10 14:11:35 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if errors.Is(err, model.EmptyResponseErrorGigaChat) {
|
|
|
|
|
r.logger.Warn("empty response from GigaChat, retrying...", zap.Int("attempt", i+1), zap.Uint64("question_id", question.Id))
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
r.logger.Error("failed to send message to GigaChat", zap.Error(err))
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
2025-05-14 11:19:08 +00:00
|
|
|
|
question.Auditory = msg.ID
|
2025-05-10 14:11:35 +00:00
|
|
|
|
|
|
|
|
|
_, err = r.dal.QuestionRepo.CreateQuestion(ctx, &question)
|
|
|
|
|
if err != nil {
|
|
|
|
|
r.logger.Error("failed to create updated question", zap.Error(err))
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|