added worker for gigachat
This commit is contained in:
parent
5d781ec1e2
commit
0809918f3c
51
app/app.go
51
app/app.go
@ -2,23 +2,27 @@ package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"gitea.pena/SQuiz/common/dal"
|
||||
"gitea.pena/SQuiz/common/model"
|
||||
"gitea.pena/SQuiz/worker/answerwc"
|
||||
"gitea.pena/SQuiz/worker/clients/customer"
|
||||
"gitea.pena/SQuiz/worker/clients/gigachat"
|
||||
"gitea.pena/SQuiz/worker/clients/mailclient"
|
||||
"gitea.pena/SQuiz/worker/gigachatwc"
|
||||
"gitea.pena/SQuiz/worker/privilegewc"
|
||||
"gitea.pena/SQuiz/worker/workers/shortstat"
|
||||
"gitea.pena/SQuiz/worker/workers/timeout"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/go-resty/resty/v2"
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
"github.com/skeris/appInit"
|
||||
"github.com/themakers/hlog"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"gitea.pena/SQuiz/common/dal"
|
||||
"gitea.pena/SQuiz/common/model"
|
||||
"gitea.pena/SQuiz/worker/answerwc"
|
||||
"gitea.pena/SQuiz/worker/clients/customer"
|
||||
"gitea.pena/SQuiz/worker/clients/mailclient"
|
||||
"gitea.pena/SQuiz/worker/privilegewc"
|
||||
"gitea.pena/SQuiz/worker/workers/shortstat"
|
||||
"gitea.pena/SQuiz/worker/workers/timeout"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -71,6 +75,10 @@ type Options struct {
|
||||
SmtpApiKey string `env:"SMTP_API_KEY" default:"P0YsjUB137upXrr1NiJefHmXVKW1hmBWlpev"`
|
||||
SmtpApiUrl string `env:"SMTP_API_URL" default:"https://api.smtp.bz/v1/smtp/send"`
|
||||
CustomerServiceAddress string `env:"CUSTOMER_SERVICE_ADDRESS"`
|
||||
KafkaGroup string `env:"KAFKA_GROUP" default:"squiz"`
|
||||
KafkaTopicGigaChat string `env:"KAFKA_TOPIC_GIGA_CHAT"`
|
||||
GigaChatApiBaseURL string `env:"GIGA_CHAT_API_BASE_URL"`
|
||||
GigaChatApiAuthKey string `env:"GIGA_CHAT_API_AUTH_KEY"`
|
||||
}
|
||||
|
||||
func New(ctx context.Context, opts interface{}, ver appInit.Version) (appInit.CommonApp, error) {
|
||||
@ -156,6 +164,33 @@ func New(ctx context.Context, opts interface{}, ver appInit.Version) (appInit.Co
|
||||
return nil, err
|
||||
}
|
||||
|
||||
gigaChatClient, err := gigachat.NewGigaChatClient(ctx, gigachat.Deps{
|
||||
Logger: zapLogger,
|
||||
Client: resty.New().SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}),
|
||||
BaseURL: options.GigaChatApiBaseURL,
|
||||
AuthKey: options.GigaChatApiAuthKey,
|
||||
RedisClient: redisClient,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// метод для обновления токенов гигачата
|
||||
go gigaChatClient.TokenResearch(ctx)
|
||||
|
||||
gigaChatWorker, err := gigachatwc.NewGigaChatTaskScheduler(gigachatwc.Deps{
|
||||
KafkaBrokers: options.KafkaBroker,
|
||||
KafkaTopic: options.KafkaTopicGigaChat,
|
||||
KafkaGroup: options.KafkaGroup,
|
||||
GigaChatClient: gigaChatClient,
|
||||
Logger: zapLogger,
|
||||
Dal: pgdal,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go gigaChatWorker.Start(ctx)
|
||||
|
||||
kafkaWorker, err := privilegewc.NewKafkaConsumerWorker(privilegewc.Config{
|
||||
KafkaBroker: options.KafkaBroker,
|
||||
KafkaTopic: options.KafkaTopic,
|
||||
|
147
gigachatwc/scheduler.go
Normal file
147
gigachatwc/scheduler.go
Normal file
@ -0,0 +1,147 @@
|
||||
package gigachatwc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"gitea.pena/SQuiz/common/dal"
|
||||
"gitea.pena/SQuiz/common/model"
|
||||
"gitea.pena/SQuiz/worker/clients/gigachat"
|
||||
"github.com/twmb/franz-go/pkg/kgo"
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
)
|
||||
|
||||
type GigaChatTaskScheduler struct {
|
||||
kafkaClient *kgo.Client
|
||||
gigaChatClient *gigachat.GigaChatClient
|
||||
logger *zap.Logger
|
||||
dal *dal.DAL
|
||||
}
|
||||
|
||||
type Deps struct {
|
||||
KafkaBrokers string
|
||||
KafkaTopic string
|
||||
KafkaGroup string
|
||||
GigaChatClient *gigachat.GigaChatClient
|
||||
Logger *zap.Logger
|
||||
Dal *dal.DAL
|
||||
}
|
||||
|
||||
func NewGigaChatTaskScheduler(deps Deps) (*GigaChatTaskScheduler, error) {
|
||||
client, err := kgo.NewClient(
|
||||
kgo.SeedBrokers(deps.KafkaBrokers),
|
||||
kgo.ConsumerGroup(deps.KafkaGroup),
|
||||
kgo.ConsumeTopics(deps.KafkaTopic),
|
||||
kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().UnixMilli())),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &GigaChatTaskScheduler{
|
||||
kafkaClient: client,
|
||||
gigaChatClient: deps.GigaChatClient,
|
||||
logger: deps.Logger,
|
||||
dal: deps.Dal,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type MessageGigaChat struct {
|
||||
ID int64 `json:"id"`
|
||||
QuizID int64 `json:"quiz_id"`
|
||||
Sex bool `json:"sex"`
|
||||
Age string `json:"age"`
|
||||
}
|
||||
|
||||
type UpdJsonQuestionData struct {
|
||||
Title string `json:"title"`
|
||||
Description string `json:"description"`
|
||||
}
|
||||
|
||||
func (r *GigaChatTaskScheduler) Start(ctx context.Context) {
|
||||
ticker := time.NewTicker(3 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
r.fetchMessages(ctx)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *GigaChatTaskScheduler) fetchMessages(ctx context.Context) {
|
||||
fetches := r.kafkaClient.PollFetches(ctx)
|
||||
iter := fetches.RecordIter()
|
||||
|
||||
for !iter.Done() {
|
||||
record := iter.Next()
|
||||
|
||||
var msg MessageGigaChat
|
||||
if err := json.Unmarshal(record.Value, &msg); err != nil {
|
||||
r.logger.Error("failed to unmarshal kafka message", zap.ByteString("value", record.Value), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
r.handleMessage(ctx, msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *GigaChatTaskScheduler) handleMessage(ctx context.Context, msg MessageGigaChat) {
|
||||
listQuestions, _, err := r.dal.QuestionRepo.GetQuestionList(ctx, 100_000, 0, 0, 0, uint64(msg.QuizID), false, false, "", "")
|
||||
if err != nil {
|
||||
r.logger.Error("failed to get question list", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
audience := model.GigaChatAudience{
|
||||
ID: msg.ID,
|
||||
QuizID: msg.QuizID,
|
||||
Sex: msg.Sex,
|
||||
Age: msg.Age,
|
||||
}
|
||||
|
||||
for _, question := range listQuestions {
|
||||
var (
|
||||
updJson string
|
||||
resp UpdJsonQuestionData
|
||||
)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
updJson, err = r.gigaChatClient.SendMsg(ctx, audience, question)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
if errors.Is(err, model.EmptyResponseErrorGigaChat) {
|
||||
r.logger.Warn("empty response from GigaChat, retrying...", zap.Int("attempt", i+1), zap.Uint64("question_id", question.Id))
|
||||
continue
|
||||
}
|
||||
|
||||
r.logger.Error("failed to send message to GigaChat", zap.Error(err))
|
||||
break
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
err = json.Unmarshal([]byte(updJson), &resp)
|
||||
if err != nil {
|
||||
r.logger.Error("failed to unmarshal GigaChat response", zap.String("raw", updJson), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
question.Title = resp.Title
|
||||
question.Description = resp.Description
|
||||
|
||||
_, err = r.dal.QuestionRepo.CreateQuestion(ctx, &question)
|
||||
if err != nil {
|
||||
r.logger.Error("failed to create updated question", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
@ -37,8 +37,8 @@ func TestGigachat(t *testing.T) {
|
||||
go gigaChatClient.TokenResearch(ctx)
|
||||
|
||||
result, err := gigaChatClient.SendMsg(ctx, model.GigaChatAudience{
|
||||
Gender: "женский",
|
||||
Age: "17-23",
|
||||
Sex: false,
|
||||
Age: "17-23",
|
||||
}, model.Question{
|
||||
Title: "О личной жизни",
|
||||
Description: "Как много у вас котят?",
|
||||
|
Loading…
Reference in New Issue
Block a user