package worker import ( "context" "errors" "go.uber.org/zap" "penahub.gitlab.yandexcloud.net/backend/templategen/amo" "penahub.gitlab.yandexcloud.net/backend/templategen/dal" "penahub.gitlab.yandexcloud.net/backend/templategen/dal/model" gdisk "penahub.gitlab.yandexcloud.net/backend/templategen/gdisk" "penahub.gitlab.yandexcloud.net/backend/templategen/templategen" yadisk "penahub.gitlab.yandexcloud.net/backend/templategen/yadisk" "reflect" "strconv" "time" ) const ( WorkerLifetime = time.Minute * 15 DebounceDuration = time.Millisecond * 165 CacheSize = 10 TaskTimeout = 5 * time.Minute ProcessingTimeout = 15 * time.Second ) type Worker struct { AmoId string Tube chan *model.WorkerTask // Буферизированный канал с размером CacheSize ctx context.Context lifeTimer *time.Timer dal *dal.MongoDAL yaDisk *yadisk.ClientApp gDisk *gdisk.ClientApp amo *amo.ClientApp logger *zap.Logger } func NewWorker(amoId string, ctx context.Context, dal *dal.MongoDAL, yaDisk *yadisk.ClientApp, gDisk *gdisk.ClientApp, amo *amo.ClientApp, logger *zap.Logger) *Worker { return &Worker{ AmoId: amoId, Tube: make(chan *model.WorkerTask, CacheSize), ctx: ctx, lifeTimer: time.NewTimer(WorkerLifetime), dal: dal, yaDisk: yaDisk, gDisk: gDisk, amo: amo, logger: logger.With(zap.String("amo_id", amoId))} } func (w *Worker) Start(stopWorker chan string) { w.logger.Info("worker started") go func() { for { select { case <-w.ctx.Done(): w.logger.Info("worker stopped - context done") stopWorker <- w.AmoId return case <-w.lifeTimer.C: w.logger.Info("worker stopped - timeout") stopWorker <- w.AmoId return } } }() go func() { for task := range w.Tube { if task == nil { w.logger.Error("worker got empty task") continue } err := w.dal.WorkerTask.UpdateStatus(w.ctx, task.ID, model.WorkerTaskStatusProcessing) if err != nil { w.logger.Error("cannot update workerTask", zap.String("_id", task.ID)) } task.Status = model.WorkerTaskStatusDone if !w.DoTask(task) { task.Status = model.WorkerTaskStatusFailed w.logger.Info("task failed", zap.String("_id", task.ID)) } else { w.logger.Info("task done", zap.String("_id", task.ID)) } err = w.dal.WorkerTask.UpdateByID(w.ctx, task) if err != nil { w.logger.Error("cannot update workerTask", zap.String("_id", task.ID)) } } }() } func (w *Worker) DoTask(task *model.WorkerTask) bool { var err error logger := w.logger.With(zap.String("_id", task.ID)) logger.Info("new task") // сбрасываем таймер w.lifeTimer.Stop() w.lifeTimer.Reset(WorkerLifetime) // находим историю к которой привязана задача history, err := w.dal.History.GetByWorkerTaskID(w.ctx, task.ID) if err != nil { logger.Error("get history", zap.Error(err)) } defer func() { err = w.dal.History.UpdateByID(w.ctx, history) if err != nil { w.logger.Error("cannot update history", zap.Error(err)) } }() // проверяем чтобы все поля были на месте if task.LeadId <= 0 { err = errors.New("got bad lead_id") return w.checkErrTask(logger, history, "bad data", err) } if task.UserID == "" { err = errors.New("got bad user_id") return w.checkErrTask(logger, history, "bad data", err) } if task.Source.File == "" { err = errors.New("got bad source.file") return w.checkErrTask(logger, history, "bad data", err) } if task.Source.StorageID == "" { err = errors.New("got bad source.storage_id") return w.checkErrTask(logger, history, "bad data", err) } if task.Source.StorageType == "" { err = errors.New("got bad source.storage_type") return w.checkErrTask(logger, history, "bad data", err) } if task.Target.StorageID == "" { err = errors.New("got bad target.storage_id") return w.checkErrTask(logger, history, "bad data", err) } if task.Target.StorageType == "" { err = errors.New("got bad target.storage_type") return w.checkErrTask(logger, history, "bad data", err) } // amo client amoData, err := w.dal.Amo.GetByID(w.ctx, task.AmoID) if err != nil { return w.checkErrTask(logger, history, "cannot get amo data", err) } amoClient, err := w.amo.NewClient(w.ctx, amoData.Referer, amoData.Token(), "") if err != nil { return w.checkErrTask(logger, history, "cannot create amo client", err) } lead, err := DebounceWrapper(amoClient.GetLeadById)(strconv.FormatInt(task.LeadId, 10)) if err != nil { return w.checkErrTask(logger, history, "cannot get lead", err) } if lead == nil { err = errors.New("empty lead data") return w.checkErrTask(logger, history, "cannot get lead", err) } dataTemplate := map[string]interface{}{} // Добавляем Инфо Лида for k, v := range templategen.AmoLeadFieldsToRuMap(lead) { dataTemplate[k] = v } // Добавялем инфо контактов contacts := []amo.Contact{} for _, data := range lead.Embedded.Contacts { contact, err := DebounceWrapper(amoClient.GetContactById)(strconv.Itoa(data.Id)) if err == nil { contacts = append(contacts, *contact) } else { return w.checkErrTask(logger, history, "cannot get contact", err) } } dataTemplate["Контакты"] = templategen.AmoContactsFieldsToRuMap(contacts) // Добавляем инфо компаний companies := []amo.Company{} for _, data := range lead.Embedded.Companies { company, err := DebounceWrapper(amoClient.GetCompanyById)(strconv.Itoa(data.Id)) if err == nil { companies = append(companies, *company) } else { return w.checkErrTask(logger, history, "cannot get company", err) } } dataTemplate["Компании"] = templategen.AmoCompaniesFieldsToRuMap(companies) err = w.Generate(task, lead.Name, dataTemplate) w.checkErrTask(logger, history, "cannot generate", err) history.Target.File = task.Target.File history.Target.StorageID = task.Target.StorageID history.Target.StorageType = task.Target.StorageType return true } func (w *Worker) checkErrTask(logger *zap.Logger, history *model.History, msg string, err error) bool { if err != nil { logger.Error("task failed: "+msg, zap.Error(err)) if len(history.Errors) > 0 { history.Errors = append(history.Errors, err.Error()) } else { history.Errors = []string{err.Error()} } err = w.dal.History.UpdateByID(w.ctx, history) if err != nil { logger.Error("cannot update history", zap.Error(err)) } return true } return false } // Generate - генерирует файл и отправляет его на указанный в task.source диск // @TODO: после написания тестов сделать чтобы отправлял на диск task.target!!! func (w *Worker) Generate(task *model.WorkerTask, name string, data any) error { switch task.Source.StorageType { case "gdisk": gdiskData, err := w.dal.GDisk.GetByID(w.ctx, task.Source.StorageID) if err != nil { return err } client, err := w.gDisk.NewClient(w.ctx, gdiskData.Token()) if err != nil { return err } task.Target.File, task.DownloadUrl, err = templategen.GDiskGenerateDoc(task.Source.File, name, task.UserID, gdiskData.SaveFolderID, client, data) if err != nil { return err } case "yadisk": yaDiskData, err := w.dal.YaDisk.GetByID(w.ctx, task.Source.StorageID) if err != nil { return err } client, err := w.yaDisk.NewClient(w.ctx, yaDiskData.Token(), "") if err != nil { return err } task.Target.File, task.DownloadUrl, err = templategen.YaDiskGenerateDoc(task.Source.File, name, task.UserID, yaDiskData.SaveFolder, client, data) if err != nil { return err } } return nil } func (w *Worker) Stop() { close(w.Tube) w.lifeTimer.Stop() } func DebounceWrapper[T any](function T) T { time.Sleep(DebounceDuration) // debounce v := reflect.MakeFunc(reflect.TypeOf(function), func(in []reflect.Value) []reflect.Value { f := reflect.ValueOf(function) return f.Call(in) }) return v.Interface().(T) }