diff --git a/app/app.go b/app/app.go index b209c05..4b59a59 100644 --- a/app/app.go +++ b/app/app.go @@ -2,23 +2,27 @@ package app import ( "context" + "crypto/tls" "errors" "fmt" + "gitea.pena/SQuiz/common/dal" + "gitea.pena/SQuiz/common/model" + "gitea.pena/SQuiz/worker/answerwc" + "gitea.pena/SQuiz/worker/clients/customer" + "gitea.pena/SQuiz/worker/clients/gigachat" + "gitea.pena/SQuiz/worker/clients/mailclient" + "gitea.pena/SQuiz/worker/gigachatwc" + "gitea.pena/SQuiz/worker/privilegewc" + "gitea.pena/SQuiz/worker/workers/shortstat" + "gitea.pena/SQuiz/worker/workers/timeout" "github.com/go-redis/redis/v8" + "github.com/go-resty/resty/v2" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/skeris/appInit" "github.com/themakers/hlog" "go.uber.org/zap" "google.golang.org/grpc" - "gitea.pena/SQuiz/common/dal" - "gitea.pena/SQuiz/common/model" - "gitea.pena/SQuiz/worker/answerwc" - "gitea.pena/SQuiz/worker/clients/customer" - "gitea.pena/SQuiz/worker/clients/mailclient" - "gitea.pena/SQuiz/worker/privilegewc" - "gitea.pena/SQuiz/worker/workers/shortstat" - "gitea.pena/SQuiz/worker/workers/timeout" "time" ) @@ -71,6 +75,10 @@ type Options struct { SmtpApiKey string `env:"SMTP_API_KEY" default:"P0YsjUB137upXrr1NiJefHmXVKW1hmBWlpev"` SmtpApiUrl string `env:"SMTP_API_URL" default:"https://api.smtp.bz/v1/smtp/send"` CustomerServiceAddress string `env:"CUSTOMER_SERVICE_ADDRESS"` + KafkaGroup string `env:"KAFKA_GROUP" default:"squiz"` + KafkaTopicGigaChat string `env:"KAFKA_TOPIC_GIGA_CHAT"` + GigaChatApiBaseURL string `env:"GIGA_CHAT_API_BASE_URL"` + GigaChatApiAuthKey string `env:"GIGA_CHAT_API_AUTH_KEY"` } func New(ctx context.Context, opts interface{}, ver appInit.Version) (appInit.CommonApp, error) { @@ -156,6 +164,33 @@ func New(ctx context.Context, opts interface{}, ver appInit.Version) (appInit.Co return nil, err } + gigaChatClient, err := gigachat.NewGigaChatClient(ctx, gigachat.Deps{ + Logger: zapLogger, + Client: resty.New().SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}), + BaseURL: options.GigaChatApiBaseURL, + AuthKey: options.GigaChatApiAuthKey, + RedisClient: redisClient, + }) + if err != nil { + return nil, err + } + // метод для обновления токенов гигачата + go gigaChatClient.TokenResearch(ctx) + + gigaChatWorker, err := gigachatwc.NewGigaChatTaskScheduler(gigachatwc.Deps{ + KafkaBrokers: options.KafkaBroker, + KafkaTopic: options.KafkaTopicGigaChat, + KafkaGroup: options.KafkaGroup, + GigaChatClient: gigaChatClient, + Logger: zapLogger, + Dal: pgdal, + }) + if err != nil { + return nil, err + } + + go gigaChatWorker.Start(ctx) + kafkaWorker, err := privilegewc.NewKafkaConsumerWorker(privilegewc.Config{ KafkaBroker: options.KafkaBroker, KafkaTopic: options.KafkaTopic, diff --git a/gigachatwc/scheduler.go b/gigachatwc/scheduler.go new file mode 100644 index 0000000..cf3b250 --- /dev/null +++ b/gigachatwc/scheduler.go @@ -0,0 +1,147 @@ +package gigachatwc + +import ( + "context" + "encoding/json" + "errors" + "gitea.pena/SQuiz/common/dal" + "gitea.pena/SQuiz/common/model" + "gitea.pena/SQuiz/worker/clients/gigachat" + "github.com/twmb/franz-go/pkg/kgo" + "go.uber.org/zap" + "time" +) + +type GigaChatTaskScheduler struct { + kafkaClient *kgo.Client + gigaChatClient *gigachat.GigaChatClient + logger *zap.Logger + dal *dal.DAL +} + +type Deps struct { + KafkaBrokers string + KafkaTopic string + KafkaGroup string + GigaChatClient *gigachat.GigaChatClient + Logger *zap.Logger + Dal *dal.DAL +} + +func NewGigaChatTaskScheduler(deps Deps) (*GigaChatTaskScheduler, error) { + client, err := kgo.NewClient( + kgo.SeedBrokers(deps.KafkaBrokers), + kgo.ConsumerGroup(deps.KafkaGroup), + kgo.ConsumeTopics(deps.KafkaTopic), + kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().UnixMilli())), + ) + if err != nil { + return nil, err + } + + return &GigaChatTaskScheduler{ + kafkaClient: client, + gigaChatClient: deps.GigaChatClient, + logger: deps.Logger, + dal: deps.Dal, + }, nil +} + +type MessageGigaChat struct { + ID int64 `json:"id"` + QuizID int64 `json:"quiz_id"` + Sex bool `json:"sex"` + Age string `json:"age"` +} + +type UpdJsonQuestionData struct { + Title string `json:"title"` + Description string `json:"description"` +} + +func (r *GigaChatTaskScheduler) Start(ctx context.Context) { + ticker := time.NewTicker(3 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + r.fetchMessages(ctx) + case <-ctx.Done(): + return + } + } +} + +func (r *GigaChatTaskScheduler) fetchMessages(ctx context.Context) { + fetches := r.kafkaClient.PollFetches(ctx) + iter := fetches.RecordIter() + + for !iter.Done() { + record := iter.Next() + + var msg MessageGigaChat + if err := json.Unmarshal(record.Value, &msg); err != nil { + r.logger.Error("failed to unmarshal kafka message", zap.ByteString("value", record.Value), zap.Error(err)) + continue + } + + r.handleMessage(ctx, msg) + } +} + +func (r *GigaChatTaskScheduler) handleMessage(ctx context.Context, msg MessageGigaChat) { + listQuestions, _, err := r.dal.QuestionRepo.GetQuestionList(ctx, 100_000, 0, 0, 0, uint64(msg.QuizID), false, false, "", "") + if err != nil { + r.logger.Error("failed to get question list", zap.Error(err)) + return + } + + audience := model.GigaChatAudience{ + ID: msg.ID, + QuizID: msg.QuizID, + Sex: msg.Sex, + Age: msg.Age, + } + + for _, question := range listQuestions { + var ( + updJson string + resp UpdJsonQuestionData + ) + + for i := 0; i < 10; i++ { + updJson, err = r.gigaChatClient.SendMsg(ctx, audience, question) + if err == nil { + break + } + + if errors.Is(err, model.EmptyResponseErrorGigaChat) { + r.logger.Warn("empty response from GigaChat, retrying...", zap.Int("attempt", i+1), zap.Uint64("question_id", question.Id)) + continue + } + + r.logger.Error("failed to send message to GigaChat", zap.Error(err)) + break + } + + if err != nil { + continue + } + + err = json.Unmarshal([]byte(updJson), &resp) + if err != nil { + r.logger.Error("failed to unmarshal GigaChat response", zap.String("raw", updJson), zap.Error(err)) + continue + } + + question.Title = resp.Title + question.Description = resp.Description + + _, err = r.dal.QuestionRepo.CreateQuestion(ctx, &question) + if err != nil { + r.logger.Error("failed to create updated question", zap.Error(err)) + continue + } + } +} diff --git a/tests/gigachat_test.go b/tests/gigachat_test.go index 029fa09..5dab529 100644 --- a/tests/gigachat_test.go +++ b/tests/gigachat_test.go @@ -37,8 +37,8 @@ func TestGigachat(t *testing.T) { go gigaChatClient.TokenResearch(ctx) result, err := gigaChatClient.SendMsg(ctx, model.GigaChatAudience{ - Gender: "женский", - Age: "17-23", + Sex: false, + Age: "17-23", }, model.Question{ Title: "О личной жизни", Description: "Как много у вас котят?",