package mongos import ( "context" "errors" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" "penahub.gitlab.yandexcloud.net/backend/templategen/dal/model" ) type WorkerTask struct { coll *mongo.Collection logger *zap.Logger } func InitWorkerTask(db *mongo.Database, logger *zap.Logger) *WorkerTask { return &WorkerTask{coll: db.Collection("worker_task"), logger: logger} } func (d *WorkerTask) Insert(ctx context.Context, record *model.WorkerTask) (string, error) { if !record.Status.Valid() { return "", errors.New("invalid status") } record.CreatedAt = time.Now() record.UpdatedAt = record.CreatedAt result, err := d.coll.InsertOne(ctx, record) if err != nil { d.logger.Error("ErrorInsertWorkerTask", zap.Error(err)) return "", err } id := result.InsertedID.(primitive.ObjectID).Hex() d.logger.Info("InfoInsertWorkerTask", zap.String("id", id)) return id, nil } func (d *WorkerTask) GetByID(ctx context.Context, id string) (*model.WorkerTask, error) { objID, err := primitive.ObjectIDFromHex(id) if err != nil { d.logger.Error("ErrorGetWorkerTask", zap.Error(err)) return nil, err } filter := bson.M{"_id": objID} var result model.WorkerTask err = d.coll.FindOne(ctx, filter).Decode(&result) if err == mongo.ErrNoDocuments { return nil, nil } else if err != nil { d.logger.Error("ErrorGetWorkerTask", zap.Error(err)) return nil, err } d.logger.Info("InfoGetWorkerTask", zap.String("id", result.ID)) return &result, nil } type WorkerStatusFilter struct { PenaID string `json:"pena_id" bson:"pena_id,omitempty"` // Пользователь пены AmoID string `json:"amo_id" bson:"amo_id,omitempty"` // Амо аккаунт вызвавший генерацию AmoUserID int64 `json:"amo_user_id" bson:"amo_user_id,omitempty"` // Пользователь амо вызвавший генерацию Status model.WorkerTaskStatus `json:"status" bson:"status,omitempty"` // Статус генерации Data map[string]any `json:"data" bson:"data,omitempty"` // Данные по которым происходит генерация LeadID int64 `json:"lead_id" bson:"lead_id,omitempty"` // Сделка по которой происходит генерация TemplateID string `json:"template_id" bson:"template_id,omitempty"` // Шаблон для генерации Source *model.WorkerSource `json:"source" bson:"source,omitempty"` // Исходный файл-шаблон Target *model.WorkerTarget `json:"target" bson:"target,omitempty"` } func (d *WorkerTask) GetByFilter(ctx context.Context, filter *WorkerStatusFilter, opts *options.FindOptions) ([]model.WorkerTask, error) { if filter == nil { filter = &WorkerStatusFilter{} } cur, err := d.coll.Find(ctx, filter, opts) if err == mongo.ErrNoDocuments { return nil, nil } else if err != nil { d.logger.Error("ErrorGetWorkerTask", zap.Error(err)) return nil, err } var result []model.WorkerTask err = cur.All(ctx, &result) if err != nil { d.logger.Error("ErrorGetWorkerTask", zap.Error(err)) return nil, err } d.logger.Info("InfoGetWorkerTaskList") return result, nil } func (d *WorkerTask) UpdateStatus(ctx context.Context, id string, status model.WorkerTaskStatus) error { if !status.Valid() { err := errors.New("invalid status") d.logger.Error("ErrorUpdateWorkerTaskStatus", zap.Error(err)) return err } objID, err := primitive.ObjectIDFromHex(id) if err != nil { d.logger.Error("ErrorUpdateWorkerTaskStatus", zap.Error(err)) return err } update := bson.M{"status": status, "updated_at": time.Now()} _, err = d.coll.UpdateByID(ctx, objID, bson.D{{Key: "$set", Value: update}}) if err != nil { d.logger.Error("ErrorUpdateWorkerTaskStatus", zap.Error(err)) return err } d.logger.Info("InfoUpdateWorkerTaskStatus", zap.String("_id", id)) return nil } func (d *WorkerTask) UpdateByID(ctx context.Context, record *model.WorkerTask) error { if record.ID == "" { err := errors.New("got empty id") d.logger.Error("ErrorUpdateWorkerTask", zap.Error(err)) return err } objID, err := primitive.ObjectIDFromHex(record.ID) if err != nil { d.logger.Error("ErrorUpdateWorkerTask", zap.Error(err)) return err } filter := bson.M{"_id": objID} update := bson.M{"updated_at": time.Now()} if record.PenaID != "" { update["pena_id"] = record.PenaID } if record.AmoID != "" { update["amo_id"] = record.AmoID } if record.AmoUserID > 0 { update["amo_user_id"] = record.AmoUserID } if record.Status.String() != "" { if !record.Status.Valid() { err = errors.New("invalid status") d.logger.Error("ErrorUpdateWorkerTask", zap.Error(err)) return err } update["status"] = record.Status } // TODO: Изменить логику работы, когда Михаил примет решение по этому моменту. if record.LeadID > 0 { update["lead_id"] = record.LeadID } if record.TemplateID != "" { update["template_id"] = record.TemplateID } if record.Source.File != "" { update["source.file"] = record.Source.File } if record.Source.StorageID != "" { update["source.storage_id"] = record.Source.StorageID } if record.Source.StorageType != "" { update["source.storage_type"] = record.Source.StorageType } if record.Target.File != "" { update["target.file"] = record.Target.File } if record.Target.StorageID != "" { update["target.storage_id"] = record.Target.StorageID } if record.Target.StorageType != "" { update["target.storage_type"] = record.Target.StorageType } if record.DownloadURL != "" { update["download_url"] = record.DownloadURL } if record.PublicURL != "" { update["public_url"] = record.PublicURL } _, err = d.coll.UpdateOne(ctx, filter, bson.D{{Key: "$set", Value: update}}) if err != nil { d.logger.Error("ErrorUpdateWorkerTask", zap.Error(err)) return err } d.logger.Info("InfoUpdateWorkerTask", zap.String("_id", record.ID)) return nil } func (d *WorkerTask) Delete(ctx context.Context, id string) error { objID, err := primitive.ObjectIDFromHex(id) if err != nil { d.logger.Error("ErrorDeleteWorkerTask", zap.Error(err)) return err } filter := bson.M{"_id": objID} _, err = d.coll.DeleteOne(ctx, filter) if err != nil { d.logger.Error("ErrorDeleteWorkerTask", zap.Error(err)) return err } d.logger.Info("InfoDeleteWorkerTask", zap.String("_id", id)) return nil } // Listen - смотрит за добавлением и обновлением задач с выбранным статусом и возвращает их в выбранный канал. func (d *WorkerTask) Listen(ctx context.Context, status model.WorkerTaskStatus, workerTaskChan chan model.WorkerTask) { operationTypes := []bson.D{ {{Key: "operationType", Value: "insert"}}, {{Key: "operationType", Value: "update"}}, } matchStageOpTypes := bson.D{ {Key: "$match", Value: bson.D{{Key: "$or", Value: operationTypes}}}, } matchStageStatus := bson.D{ {Key: "$match", Value: bson.D{{Key: "fullDocument.status", Value: status}}}, } opts := options.ChangeStream().SetFullDocument(options.UpdateLookup) changeStream, err := d.coll.Watch(ctx, mongo.Pipeline{matchStageOpTypes, matchStageStatus}, opts) if err != nil { d.logger.Error("ErrorWatchWorkerTask", zap.Error(err)) return } go func() { // Перехват паники (см. ниже) defer func() { if err := recover(); err != nil { d.logger.Error("ErrorWatchWorkerTask", zap.Error(err.(error))) } }() for changeStream.Next(ctx) { // При закрытии приложения происходит паника (change_stream.go 561). context не важен...' current := changeStream.Current if err != nil { d.logger.Error("ErrorWatchWorkerTask", zap.Error(err)) return } var task model.WorkerTask switch current.Lookup("operationType").StringValue() { case "insert": err = bson.Unmarshal(current.Lookup("fullDocument").Value, &task) if err != nil { d.logger.Error("ErrorWatchWorkerTask", zap.Error(err)) return } case "update": // get updated and removed data err = bson.Unmarshal(current.Lookup("fullDocument").Value, &task) if err != nil { d.logger.Error("ErrorWatchWorkerTask", zap.Error(err)) return } } workerTaskChan <- task } if err = changeStream.Err(); err != nil { d.logger.Error("ErrorWatchWorkerTask", zap.Error(err)) } }() <-ctx.Done() err = changeStream.Close(context.Background()) // TODO: проверить сокращенную запись в ssa if err != nil { d.logger.Error("ErrorWatchWorkerTask", zap.Error(err)) } }