244 lines
6.3 KiB
Go
244 lines
6.3 KiB
Go
package mongos
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
"go.uber.org/zap"
|
|
"penahub.gitlab.yandexcloud.net/backend/templategen/dal/model"
|
|
"time"
|
|
)
|
|
|
|
type WorkerTask struct {
|
|
coll *mongo.Collection
|
|
logger *zap.Logger
|
|
}
|
|
|
|
func InitWorkerTask(db *mongo.Database, logger *zap.Logger) *WorkerTask {
|
|
return &WorkerTask{coll: db.Collection("worker_task"), logger: logger}
|
|
}
|
|
|
|
func (d *WorkerTask) Insert(ctx context.Context, record *model.WorkerTask) (string, error) {
|
|
if !record.Status.Valid() {
|
|
return "", errors.New("invalid status")
|
|
}
|
|
|
|
record.CreatedAt = time.Now()
|
|
record.UpdatedAt = time.Now()
|
|
|
|
result, err := d.coll.InsertOne(ctx, record)
|
|
|
|
if err != nil {
|
|
d.logger.Error("ErrorInsertWorkerTask", zap.Error(err))
|
|
return "", err
|
|
}
|
|
|
|
id := result.InsertedID.(primitive.ObjectID).Hex()
|
|
|
|
d.logger.Info("InfoInsertWorkerTask", zap.String("id", id))
|
|
|
|
return id, nil
|
|
}
|
|
|
|
func (d *WorkerTask) GetByID(ctx context.Context, id string) (*model.WorkerTask, error) {
|
|
objID, err := primitive.ObjectIDFromHex(id)
|
|
|
|
if err != nil {
|
|
d.logger.Error("ErrorGetWorkerTask", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
filter := bson.M{"_id": objID}
|
|
|
|
var result model.WorkerTask
|
|
|
|
err = d.coll.FindOne(ctx, filter).Decode(&result)
|
|
|
|
if err == mongo.ErrNoDocuments {
|
|
return nil, nil
|
|
} else {
|
|
if err != nil {
|
|
d.logger.Error("ErrorGetWorkerTask", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
d.logger.Info("InfoGetWorkerTask", zap.String("id", result.ID))
|
|
return &result, nil
|
|
}
|
|
|
|
type WorkerStatusFilter struct {
|
|
UserID string `json:"user_id" bson:"user_id,omitempty"` // Пользователь пены
|
|
AmoID string `json:"amo_id" bson:"amo_id,omitempty"` // Амо аккаунт вызвавший генерацию
|
|
AmoUserID int64 `json:"amo_user_id" bson:"amo_user_id,omitempty"` // Пользователь амо вызвавший генерацию
|
|
Status model.WorkerTaskStatus `json:"status" bson:"status,omitempty"` // Статус генерации
|
|
Data map[string]any `json:"data" bson:"data,omitempty"` // Данные по которым происходит генерация
|
|
LeadId int64 `json:"lead_id" bson:"lead_id,omitempty"` // Сделка по которой происходит генерация
|
|
TemplateID string `json:"template_id" bson:"template_id,omitempty"` // Шаблон для генерации
|
|
Source *model.WorkerSource `json:"source" bson:"source,omitempty"` // Исходный файл-шаблон
|
|
Target *model.WorkerTarget `json:"target" bson:"target,omitempty"`
|
|
}
|
|
|
|
func (d *WorkerTask) GetByFilter(ctx context.Context, filter *WorkerStatusFilter,
|
|
opts *options.FindOptions) ([]model.WorkerTask, error) {
|
|
if filter == nil {
|
|
filter = &WorkerStatusFilter{}
|
|
}
|
|
|
|
cur, err := d.coll.Find(ctx, filter, opts)
|
|
if err == mongo.ErrNoDocuments {
|
|
return nil, nil
|
|
} else {
|
|
if err != nil {
|
|
d.logger.Error("ErrorGetWorkerTask", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
var result []model.WorkerTask
|
|
err = cur.All(ctx, &result)
|
|
if err != nil {
|
|
d.logger.Error("ErrorGetWorkerTask", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
d.logger.Info("InfoGetWorkerTaskList")
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (d *WorkerTask) UpdateStatus(ctx context.Context, id string, status model.WorkerTaskStatus) error {
|
|
if !status.Valid() {
|
|
err := errors.New("invalid status")
|
|
d.logger.Error("ErrorUpdateWorkerTaskStatus", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
objID, err := primitive.ObjectIDFromHex(id)
|
|
if err != nil {
|
|
d.logger.Error("ErrorUpdateWorkerTaskStatus", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
update := bson.M{"status": status}
|
|
|
|
_, err = d.coll.UpdateByID(ctx, objID, bson.D{{"$set", update}})
|
|
|
|
if err != nil {
|
|
d.logger.Error("ErrorUpdateWorkerTaskStatus", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
d.logger.Info("InfoUpdateWorkerTaskStatus", zap.String("_id", id))
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *WorkerTask) UpdateByID(ctx context.Context, record *model.WorkerTask) error {
|
|
if record.ID == "" {
|
|
err := errors.New("got empty id")
|
|
d.logger.Error("ErrorUpdateWorkerTaskStatus", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
objID, err := primitive.ObjectIDFromHex(record.ID)
|
|
|
|
if err != nil {
|
|
d.logger.Error("ErrorUpdateWorkerTaskStatus", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
filter := bson.M{"_id": objID}
|
|
|
|
update := bson.M{"updated_at": time.Now()}
|
|
|
|
if record.UserID != "" {
|
|
update["user_id"] = record.UserID
|
|
}
|
|
|
|
if record.AmoID != "" {
|
|
update["amo_id"] = record.AmoID
|
|
}
|
|
|
|
if record.AmoUserID > 0 {
|
|
update["amo_user_id"] = record.AmoUserID
|
|
}
|
|
|
|
if record.Status.String() != "" {
|
|
if !record.Status.Valid() {
|
|
err = errors.New("invalid status")
|
|
d.logger.Error("ErrorUpdateWorkerTaskStatus", zap.Error(err))
|
|
return err
|
|
}
|
|
update["status"] = record.Status
|
|
}
|
|
|
|
if record.LeadId > 0 {
|
|
update["lead_id"] = record.LeadId
|
|
}
|
|
|
|
if record.TemplateID != "" {
|
|
update["template_id"] = record.TemplateID
|
|
}
|
|
|
|
if record.Source.File != "" {
|
|
update["source.file"] = record.Source.File
|
|
}
|
|
|
|
if record.Source.StorageID != "" {
|
|
update["source.storage_id"] = record.Source.StorageID
|
|
}
|
|
|
|
if record.Source.StorageType != "" {
|
|
update["source.storage_type"] = record.Source.StorageType
|
|
}
|
|
|
|
if record.Target.File != "" {
|
|
update["target.file"] = record.Target.File
|
|
}
|
|
|
|
if record.Target.StorageID != "" {
|
|
update["target.storage_id"] = record.Target.StorageID
|
|
}
|
|
|
|
if record.Target.StorageType != "" {
|
|
update["target.storage_type"] = record.Target.StorageType
|
|
}
|
|
|
|
if record.DownloadUrl != "" {
|
|
update["download_url"] = record.DownloadUrl
|
|
}
|
|
|
|
_, err = d.coll.UpdateOne(ctx, filter, bson.D{{"$set", update}})
|
|
|
|
if err != nil {
|
|
d.logger.Error("ErrorUpdateWorkerTaskStatus", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *WorkerTask) Delete(ctx context.Context, id string) error {
|
|
objID, err := primitive.ObjectIDFromHex(id)
|
|
|
|
if err != nil {
|
|
d.logger.Error("ErrorDeleteWorkerTask", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
filter := bson.M{"_id": objID}
|
|
|
|
_, err = d.coll.DeleteOne(ctx, filter)
|
|
if err != nil {
|
|
d.logger.Error("ErrorDeleteWorkerTask", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
d.logger.Info("InfoDeleteWorkerTask", zap.String("_id", id))
|
|
return nil
|
|
}
|