321 lines
8.8 KiB
Go
321 lines
8.8 KiB
Go
package mongos
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"time"
|
||
|
||
"go.mongodb.org/mongo-driver/bson"
|
||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||
"go.mongodb.org/mongo-driver/mongo"
|
||
"go.mongodb.org/mongo-driver/mongo/options"
|
||
"go.uber.org/zap"
|
||
"penahub.gitlab.yandexcloud.net/backend/templategen/dal/model"
|
||
)
|
||
|
||
type WorkerTask struct {
|
||
coll *mongo.Collection
|
||
logger *zap.Logger
|
||
}
|
||
|
||
func InitWorkerTask(db *mongo.Database, logger *zap.Logger) *WorkerTask {
|
||
return &WorkerTask{coll: db.Collection("worker_task"), logger: logger}
|
||
}
|
||
|
||
func (d *WorkerTask) Insert(ctx context.Context, record *model.WorkerTask) (string, error) {
|
||
if !record.Status.Valid() {
|
||
return "", errors.New("invalid status")
|
||
}
|
||
|
||
record.CreatedAt = time.Now()
|
||
record.UpdatedAt = record.CreatedAt
|
||
|
||
result, err := d.coll.InsertOne(ctx, record)
|
||
|
||
if err != nil {
|
||
d.logger.Error("ErrorInsertWorkerTask", zap.Error(err))
|
||
return "", err
|
||
}
|
||
|
||
id := result.InsertedID.(primitive.ObjectID).Hex()
|
||
|
||
d.logger.Info("InfoInsertWorkerTask", zap.String("id", id))
|
||
|
||
return id, nil
|
||
}
|
||
|
||
func (d *WorkerTask) GetByID(ctx context.Context, id string) (*model.WorkerTask, error) {
|
||
objID, err := primitive.ObjectIDFromHex(id)
|
||
|
||
if err != nil {
|
||
d.logger.Error("ErrorGetWorkerTask", zap.Error(err))
|
||
return nil, err
|
||
}
|
||
|
||
filter := bson.M{"_id": objID}
|
||
|
||
var result model.WorkerTask
|
||
|
||
err = d.coll.FindOne(ctx, filter).Decode(&result)
|
||
|
||
if err == mongo.ErrNoDocuments {
|
||
return nil, nil
|
||
} else if err != nil {
|
||
d.logger.Error("ErrorGetWorkerTask", zap.Error(err))
|
||
return nil, err
|
||
}
|
||
|
||
d.logger.Info("InfoGetWorkerTask", zap.String("id", result.ID))
|
||
return &result, nil
|
||
}
|
||
|
||
type WorkerStatusFilter struct {
|
||
PenaID string `json:"pena_id" bson:"pena_id,omitempty"` // Пользователь пены
|
||
AmoID string `json:"amo_id" bson:"amo_id,omitempty"` // Амо аккаунт вызвавший генерацию
|
||
AmoUserID int64 `json:"amo_user_id" bson:"amo_user_id,omitempty"` // Пользователь амо вызвавший генерацию
|
||
Status model.WorkerTaskStatus `json:"status" bson:"status,omitempty"` // Статус генерации
|
||
Data map[string]any `json:"data" bson:"data,omitempty"` // Данные по которым происходит генерация
|
||
LeadID int64 `json:"lead_id" bson:"lead_id,omitempty"` // Сделка по которой происходит генерация
|
||
TemplateID string `json:"template_id" bson:"template_id,omitempty"` // Шаблон для генерации
|
||
Source *model.WorkerSource `json:"source" bson:"source,omitempty"` // Исходный файл-шаблон
|
||
Target *model.WorkerTarget `json:"target" bson:"target,omitempty"`
|
||
}
|
||
|
||
func (d *WorkerTask) GetByFilter(ctx context.Context, filter *WorkerStatusFilter,
|
||
opts *options.FindOptions) ([]model.WorkerTask, error) {
|
||
if filter == nil {
|
||
filter = &WorkerStatusFilter{}
|
||
}
|
||
|
||
cur, err := d.coll.Find(ctx, filter, opts)
|
||
if err == mongo.ErrNoDocuments {
|
||
return nil, nil
|
||
} else if err != nil {
|
||
d.logger.Error("ErrorGetWorkerTask", zap.Error(err))
|
||
return nil, err
|
||
}
|
||
|
||
var result []model.WorkerTask
|
||
err = cur.All(ctx, &result)
|
||
if err != nil {
|
||
d.logger.Error("ErrorGetWorkerTask", zap.Error(err))
|
||
return nil, err
|
||
}
|
||
|
||
d.logger.Info("InfoGetWorkerTaskList")
|
||
|
||
return result, nil
|
||
}
|
||
|
||
func (d *WorkerTask) UpdateStatus(ctx context.Context, id string, status model.WorkerTaskStatus) error {
|
||
if !status.Valid() {
|
||
err := errors.New("invalid status")
|
||
d.logger.Error("ErrorUpdateWorkerTaskStatus", zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
objID, err := primitive.ObjectIDFromHex(id)
|
||
if err != nil {
|
||
d.logger.Error("ErrorUpdateWorkerTaskStatus", zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
update := bson.M{"status": status, "updated_at": time.Now()}
|
||
|
||
_, err = d.coll.UpdateByID(ctx, objID, bson.D{{Key: "$set", Value: update}})
|
||
|
||
if err != nil {
|
||
d.logger.Error("ErrorUpdateWorkerTaskStatus", zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
d.logger.Info("InfoUpdateWorkerTaskStatus", zap.String("_id", id))
|
||
|
||
return nil
|
||
}
|
||
|
||
func (d *WorkerTask) UpdateByID(ctx context.Context, record *model.WorkerTask) error {
|
||
if record.ID == "" {
|
||
err := errors.New("got empty id")
|
||
d.logger.Error("ErrorUpdateWorkerTask", zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
objID, err := primitive.ObjectIDFromHex(record.ID)
|
||
|
||
if err != nil {
|
||
d.logger.Error("ErrorUpdateWorkerTask", zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
filter := bson.M{"_id": objID}
|
||
|
||
update := bson.M{"updated_at": time.Now()}
|
||
|
||
if record.PenaID != "" {
|
||
update["pena_id"] = record.PenaID
|
||
}
|
||
|
||
if record.AmoID != "" {
|
||
update["amo_id"] = record.AmoID
|
||
}
|
||
|
||
if record.AmoUserID > 0 {
|
||
update["amo_user_id"] = record.AmoUserID
|
||
}
|
||
|
||
if record.Status.String() != "" {
|
||
if !record.Status.Valid() {
|
||
err = errors.New("invalid status")
|
||
d.logger.Error("ErrorUpdateWorkerTask", zap.Error(err))
|
||
return err
|
||
}
|
||
update["status"] = record.Status
|
||
}
|
||
|
||
// TODO: Изменить логику работы, когда Михаил примет решение по этому моменту.
|
||
if record.LeadID > 0 {
|
||
update["lead_id"] = record.LeadID
|
||
}
|
||
|
||
if record.TemplateID != "" {
|
||
update["template_id"] = record.TemplateID
|
||
}
|
||
|
||
if record.Source.File != "" {
|
||
update["source.file"] = record.Source.File
|
||
}
|
||
|
||
if record.Source.StorageID != "" {
|
||
update["source.storage_id"] = record.Source.StorageID
|
||
}
|
||
|
||
if record.Source.StorageType != "" {
|
||
update["source.storage_type"] = record.Source.StorageType
|
||
}
|
||
|
||
if record.Target.File != "" {
|
||
update["target.file"] = record.Target.File
|
||
}
|
||
|
||
if record.Target.StorageID != "" {
|
||
update["target.storage_id"] = record.Target.StorageID
|
||
}
|
||
|
||
if record.Target.StorageType != "" {
|
||
update["target.storage_type"] = record.Target.StorageType
|
||
}
|
||
|
||
if record.DownloadURL != "" {
|
||
update["download_url"] = record.DownloadURL
|
||
}
|
||
|
||
if record.PublicURL != "" {
|
||
update["public_url"] = record.PublicURL
|
||
}
|
||
|
||
_, err = d.coll.UpdateOne(ctx, filter, bson.D{{Key: "$set", Value: update}})
|
||
|
||
if err != nil {
|
||
d.logger.Error("ErrorUpdateWorkerTask", zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
d.logger.Info("InfoUpdateWorkerTask", zap.String("_id", record.ID))
|
||
|
||
return nil
|
||
}
|
||
|
||
func (d *WorkerTask) Delete(ctx context.Context, id string) error {
|
||
objID, err := primitive.ObjectIDFromHex(id)
|
||
|
||
if err != nil {
|
||
d.logger.Error("ErrorDeleteWorkerTask", zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
filter := bson.M{"_id": objID}
|
||
|
||
_, err = d.coll.DeleteOne(ctx, filter)
|
||
if err != nil {
|
||
d.logger.Error("ErrorDeleteWorkerTask", zap.Error(err))
|
||
return err
|
||
}
|
||
|
||
d.logger.Info("InfoDeleteWorkerTask", zap.String("_id", id))
|
||
return nil
|
||
}
|
||
|
||
// Listen - смотрит за добавлением и обновлением задач с выбранным статусом и возвращает их в выбранный канал.
|
||
func (d *WorkerTask) Listen(ctx context.Context, status model.WorkerTaskStatus, workerTaskChan chan model.WorkerTask) {
|
||
operationTypes := []bson.D{
|
||
{{Key: "operationType", Value: "insert"}},
|
||
{{Key: "operationType", Value: "update"}},
|
||
}
|
||
|
||
matchStageOpTypes := bson.D{
|
||
{Key: "$match", Value: bson.D{{Key: "$or", Value: operationTypes}}},
|
||
}
|
||
|
||
matchStageStatus := bson.D{
|
||
{Key: "$match", Value: bson.D{{Key: "fullDocument.status", Value: status}}},
|
||
}
|
||
|
||
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
|
||
|
||
changeStream, err := d.coll.Watch(ctx, mongo.Pipeline{matchStageOpTypes, matchStageStatus}, opts)
|
||
|
||
if err != nil {
|
||
d.logger.Error("ErrorWatchWorkerTask", zap.Error(err))
|
||
return
|
||
}
|
||
|
||
go func() {
|
||
// Перехват паники (см. ниже)
|
||
defer func() {
|
||
if err := recover(); err != nil {
|
||
d.logger.Error("ErrorWatchWorkerTask", zap.Error(err.(error)))
|
||
}
|
||
}()
|
||
|
||
for changeStream.Next(ctx) {
|
||
// При закрытии приложения происходит паника (change_stream.go 561). context не важен...'
|
||
current := changeStream.Current
|
||
|
||
if err != nil {
|
||
d.logger.Error("ErrorWatchWorkerTask", zap.Error(err))
|
||
return
|
||
}
|
||
|
||
var task model.WorkerTask
|
||
switch current.Lookup("operationType").StringValue() {
|
||
case "insert":
|
||
err = bson.Unmarshal(current.Lookup("fullDocument").Value, &task)
|
||
if err != nil {
|
||
d.logger.Error("ErrorWatchWorkerTask", zap.Error(err))
|
||
return
|
||
}
|
||
|
||
case "update":
|
||
// get updated and removed data
|
||
err = bson.Unmarshal(current.Lookup("fullDocument").Value, &task)
|
||
if err != nil {
|
||
d.logger.Error("ErrorWatchWorkerTask", zap.Error(err))
|
||
return
|
||
}
|
||
}
|
||
workerTaskChan <- task
|
||
}
|
||
if err = changeStream.Err(); err != nil {
|
||
d.logger.Error("ErrorWatchWorkerTask", zap.Error(err))
|
||
}
|
||
}()
|
||
|
||
<-ctx.Done()
|
||
|
||
err = changeStream.Close(context.Background()) // TODO: проверить сокращенную запись в ssa
|
||
if err != nil {
|
||
d.logger.Error("ErrorWatchWorkerTask", zap.Error(err))
|
||
}
|
||
}
|