docxTemplaterWorker/worker/worker.go
Danil Solovyov ecede5afb1 - переписал SuperVisor.Start() теперь при старте проверяет задачи со статусом pending и processing
- теперь SuperVisor получает от воркера сигнал об окончании lifeTimer, соответственно SuperVisor закрывает воркер и удаляет его из пула. Это исправляет панику при обращении супервизора к закрытому воркеру
- переписал SuperVisor на вотч монги
2023-01-06 01:35:53 +05:00

289 lines
7.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package worker
import (
"context"
"errors"
"go.uber.org/zap"
"penahub.gitlab.yandexcloud.net/backend/templategen/amo"
"penahub.gitlab.yandexcloud.net/backend/templategen/dal"
"penahub.gitlab.yandexcloud.net/backend/templategen/dal/model"
gdisk "penahub.gitlab.yandexcloud.net/backend/templategen/gdisk"
"penahub.gitlab.yandexcloud.net/backend/templategen/templategen"
yadisk "penahub.gitlab.yandexcloud.net/backend/templategen/yadisk"
"reflect"
"strconv"
"time"
)
const (
WorkerLifetime = time.Second * 15
DebounceDuration = time.Millisecond * 165
CacheSize = 10
TaskTimeout = 5 * time.Minute
)
type Worker struct {
AmoId string
Tube chan *model.WorkerTask // Буферизированный канал с размером CacheSize
ctx context.Context
lifeTimer *time.Timer
dal *dal.MongoDAL
yaDisk *yadisk.ClientApp
gDisk *gdisk.ClientApp
amo *amo.ClientApp
logger *zap.Logger
}
func NewWorker(amoId string, ctx context.Context, dal *dal.MongoDAL, yaDisk *yadisk.ClientApp, gDisk *gdisk.ClientApp, amo *amo.ClientApp, logger *zap.Logger) *Worker {
return &Worker{
AmoId: amoId,
Tube: make(chan *model.WorkerTask, CacheSize),
ctx: ctx,
lifeTimer: time.NewTimer(WorkerLifetime),
dal: dal,
yaDisk: yaDisk,
gDisk: gDisk,
amo: amo,
logger: logger.With(zap.String("amo_id", amoId))}
}
func (w *Worker) Start(stopWorker chan string) {
w.logger.Info("worker started")
go func() {
for {
select {
case <-w.ctx.Done():
w.logger.Info("worker stopped - context done")
stopWorker <- w.AmoId
case <-w.lifeTimer.C:
w.logger.Info("worker stopped - timeout")
stopWorker <- w.AmoId
}
}
}()
go func() {
for task := range w.Tube {
if task == nil {
w.logger.Error("worker got empty task")
continue
}
_ = w.dal.WorkerTask.UpdateStatus(w.ctx, task.ID, model.WorkerTaskStatusProcessing)
if !w.DoTask(task) {
task.Status = model.WorkerTaskStatusFailed
}
task.Status = model.WorkerTaskStatusDone
err := w.dal.WorkerTask.UpdateByID(w.ctx, task)
if err != nil {
w.logger.Error("cannot update workerTask", zap.String("_id", task.ID))
}
}
}()
}
func (w *Worker) DoTask(task *model.WorkerTask) bool {
var err error
logger := w.logger.With(zap.String("_id", task.ID))
logger.Info("new task")
// сбрасываем таймер
w.lifeTimer.Stop()
w.lifeTimer.Reset(WorkerLifetime)
// находим историю к которой привязана задача
history, err := w.dal.History.GetByWorkerTaskID(w.ctx, task.ID)
if err != nil {
logger.Error("get history", zap.Error(err))
}
defer func() {
err = w.dal.History.UpdateByID(w.ctx, history)
if err != nil {
w.logger.Error("cannot update history", zap.Error(err))
}
}()
// проверяем чтобы все поля были на месте
if task.LeadId <= 0 {
err = errors.New("got bad lead_id")
return w.checkErrTask(logger, history, "bad data", err)
}
if task.UserID == "" {
err = errors.New("got bad user_id")
return w.checkErrTask(logger, history, "bad data", err)
}
if task.Source.File == "" {
err = errors.New("got bad source.file")
return w.checkErrTask(logger, history, "bad data", err)
}
if task.Source.StorageID == "" {
err = errors.New("got bad source.storage_id")
return w.checkErrTask(logger, history, "bad data", err)
}
if task.Source.StorageType == "" {
err = errors.New("got bad source.storage_type")
return w.checkErrTask(logger, history, "bad data", err)
}
if task.Target.StorageID == "" {
err = errors.New("got bad target.storage_id")
return w.checkErrTask(logger, history, "bad data", err)
}
if task.Target.StorageType == "" {
err = errors.New("got bad target.storage_type")
return w.checkErrTask(logger, history, "bad data", err)
}
// amo client
amoData, err := w.dal.Amo.GetByID(w.ctx, task.AmoID)
if err != nil {
return w.checkErrTask(logger, history, "cannot get amo data", err)
}
amoClient, err := w.amo.NewClient(w.ctx, amoData.Referer, amoData.Token(), "")
if err != nil {
return w.checkErrTask(logger, history, "cannot create amo client", err)
}
lead, err := DebounceWrapper(amoClient.GetLeadById)(strconv.FormatInt(task.LeadId, 10))
if err != nil {
return w.checkErrTask(logger, history, "cannot get lead", err)
}
if lead == nil {
err = errors.New("empty lead data")
return w.checkErrTask(logger, history, "cannot get lead", err)
}
dataTemplate := map[string]interface{}{}
// Добавляем Инфо Лида
for k, v := range templategen.AmoLeadFieldsToRuMap(lead) {
dataTemplate[k] = v
}
// Добавялем инфо контактов
contacts := []amo.Contact{}
for _, data := range lead.Embedded.Contacts {
contact, err := DebounceWrapper(amoClient.GetContactById)(strconv.Itoa(data.Id))
if err == nil {
contacts = append(contacts, *contact)
} else {
return w.checkErrTask(logger, history, "cannot get contact", err)
}
}
dataTemplate["Контакты"] = templategen.AmoContactsFieldsToRuMap(contacts)
// Добавляем инфо компаний
companies := []amo.Company{}
for _, data := range lead.Embedded.Companies {
company, err := DebounceWrapper(amoClient.GetCompanyById)(strconv.Itoa(data.Id))
if err == nil {
companies = append(companies, *company)
} else {
return w.checkErrTask(logger, history, "cannot get company", err)
}
}
dataTemplate["Компании"] = templategen.AmoCompaniesFieldsToRuMap(companies)
err = w.Generate(task, lead.Name, dataTemplate)
w.checkErrTask(logger, history, "cannot generate", err)
history.Target.File = task.Target.File
history.Target.StorageID = task.Target.StorageID
history.Target.StorageType = task.Target.StorageType
w.logger.Info("task done", zap.String("_id", task.ID))
return true
}
func (w *Worker) checkErrTask(logger *zap.Logger, history *model.History, msg string, err error) bool {
if err != nil {
logger.Error("task failed: "+msg, zap.Error(err))
if len(history.Errors) > 0 {
history.Errors = append(history.Errors, err.Error())
} else {
history.Errors = []string{err.Error()}
}
err = w.dal.History.UpdateByID(w.ctx, history)
if err != nil {
logger.Error("cannot update history", zap.Error(err))
}
return true
}
return false
}
// Generate - генерирует файл и отправляет его на указанный в task.source диск
// @TODO: после написания тестов сделать чтобы отправлял на диск task.target!!!
func (w *Worker) Generate(task *model.WorkerTask, name string, data any) error {
switch task.Source.StorageType {
case "gdisk":
gdiskData, err := w.dal.GDisk.GetByID(w.ctx, task.Source.StorageID)
if err != nil {
return err
}
client, err := w.gDisk.NewClient(w.ctx, gdiskData.Token())
if err != nil {
return err
}
task.Target.File, task.DownloadUrl, err = templategen.GDiskGenerateDoc(task.Source.File, name,
task.UserID, gdiskData.SaveFolderID, client, data)
if err != nil {
return err
}
case "yadisk":
yaDiskData, err := w.dal.YaDisk.GetByID(w.ctx, task.Source.StorageID)
if err != nil {
return err
}
client, err := w.yaDisk.NewClient(w.ctx, yaDiskData.Token(), "")
if err != nil {
return err
}
task.Target.File, task.DownloadUrl, err = templategen.YaDiskGenerateDoc(task.Source.File, name,
task.UserID,
yaDiskData.SaveFolder, client, data)
if err != nil {
return err
}
}
return nil
}
func (w *Worker) Stop() {
close(w.Tube)
w.lifeTimer.Stop()
}
func DebounceWrapper[T any](function T) T {
time.Sleep(DebounceDuration) // debounce
v := reflect.MakeFunc(reflect.TypeOf(function), func(in []reflect.Value) []reflect.Value {
f := reflect.ValueOf(function)
return f.Call(in)
})
return v.Interface().(T)
}