396 lines
11 KiB
Go
396 lines
11 KiB
Go
package worker
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"reflect"
|
||
"strconv"
|
||
"time"
|
||
|
||
"go.uber.org/zap"
|
||
"penahub.gitlab.yandexcloud.net/backend/templategen/amo"
|
||
"penahub.gitlab.yandexcloud.net/backend/templategen/dal"
|
||
"penahub.gitlab.yandexcloud.net/backend/templategen/dal/model"
|
||
"penahub.gitlab.yandexcloud.net/backend/templategen/gdisk"
|
||
"penahub.gitlab.yandexcloud.net/backend/templategen/penadisk"
|
||
"penahub.gitlab.yandexcloud.net/backend/templategen/templategen"
|
||
"penahub.gitlab.yandexcloud.net/backend/templategen/yadisk"
|
||
)
|
||
|
||
const (
|
||
WorkerLifetime = time.Minute * 15
|
||
DebounceDuration = time.Millisecond * 165
|
||
CacheSize = 10
|
||
TaskTimeout = 5 * time.Minute
|
||
)
|
||
|
||
type WorkerDeps struct {
|
||
AmoID string
|
||
Dal *dal.MongoDAL
|
||
YaDisk *yadisk.ClientApp
|
||
GDisk *gdisk.ClientApp
|
||
Amo *amo.ClientApp
|
||
Logger *zap.Logger
|
||
}
|
||
|
||
type Worker struct {
|
||
AmoID string
|
||
Tube chan *model.WorkerTask // Буферизированный канал с размером CacheSize
|
||
lifeTimer *time.Timer
|
||
dal *dal.MongoDAL
|
||
yaDisk *yadisk.ClientApp
|
||
gDisk *gdisk.ClientApp
|
||
amo *amo.ClientApp
|
||
logger *zap.Logger
|
||
}
|
||
|
||
func NewWorker(deps WorkerDeps) (*Worker, error) {
|
||
if deps.AmoID == "" {
|
||
return nil, errors.New("empty WorkerDeps.AmoID")
|
||
}
|
||
|
||
if deps.Dal == nil {
|
||
return nil, errors.New("nil WorkerDeps.Dal")
|
||
}
|
||
|
||
if deps.YaDisk == nil {
|
||
return nil, errors.New("nil WorkerDeps.YaDisk")
|
||
}
|
||
|
||
if deps.GDisk == nil {
|
||
return nil, errors.New("nil WorkerDeps.GDisk")
|
||
}
|
||
|
||
if deps.Amo == nil {
|
||
return nil, errors.New("nil WorkerDeps.Amo")
|
||
}
|
||
|
||
if deps.Logger == nil {
|
||
return nil, errors.New("nil WorkerDeps.Logger")
|
||
}
|
||
|
||
return &Worker{
|
||
AmoID: deps.AmoID,
|
||
Tube: make(chan *model.WorkerTask, CacheSize),
|
||
lifeTimer: time.NewTimer(WorkerLifetime),
|
||
dal: deps.Dal,
|
||
yaDisk: deps.YaDisk,
|
||
gDisk: deps.GDisk,
|
||
amo: deps.Amo,
|
||
logger: deps.Logger.With(zap.String("amo_id", deps.AmoID)),
|
||
}, nil
|
||
}
|
||
|
||
func (w *Worker) Start(ctx context.Context, stopWorker chan string) {
|
||
w.logger.Info("worker started")
|
||
go func() {
|
||
select {
|
||
case <-ctx.Done():
|
||
w.logger.Info("worker stopped - context done")
|
||
stopWorker <- w.AmoID
|
||
return
|
||
case <-w.lifeTimer.C:
|
||
w.logger.Info("worker stopped - timeout")
|
||
stopWorker <- w.AmoID
|
||
return
|
||
}
|
||
}()
|
||
|
||
go func() {
|
||
for task := range w.Tube {
|
||
if task == nil {
|
||
w.logger.Error("worker got empty task")
|
||
continue
|
||
}
|
||
|
||
w.logger.Info("new task", zap.String("_id", task.ID))
|
||
|
||
var err error
|
||
|
||
if err = w.dal.WorkerTask.UpdateStatus(ctx, task.ID, model.WorkerTaskStatusProcessing); err != nil {
|
||
w.logger.Error("cannot update workerTask", zap.String("_id", task.ID))
|
||
}
|
||
|
||
task.Status = model.WorkerTaskStatusDone
|
||
if !w.DoTask(ctx, task) {
|
||
task.Status = model.WorkerTaskStatusFailed
|
||
w.logger.Info("task failed", zap.String("_id", task.ID))
|
||
} else {
|
||
w.logger.Info("task done", zap.String("_id", task.ID))
|
||
}
|
||
|
||
if err = w.dal.WorkerTask.UpdateByID(ctx, task); err != nil {
|
||
w.logger.Error("cannot update workerTask", zap.String("_id", task.ID))
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
|
||
func (w *Worker) DoTask(ctx context.Context, task *model.WorkerTask) bool {
|
||
logger := w.logger.With(zap.String("_id", task.ID))
|
||
|
||
// сбрасываем таймер
|
||
w.lifeTimer.Stop()
|
||
w.lifeTimer.Reset(WorkerLifetime)
|
||
|
||
var err error
|
||
|
||
// находим историю к которой привязана задача
|
||
history, err := w.dal.History.GetByWorkerTaskID(ctx, task.ID)
|
||
if err != nil {
|
||
logger.Error("get history", zap.Error(err))
|
||
}
|
||
|
||
defer func() {
|
||
err = w.dal.History.UpdateByID(ctx, history)
|
||
if err != nil {
|
||
w.logger.Error("cannot update history", zap.Error(err))
|
||
}
|
||
}()
|
||
|
||
// проверяем чтобы все поля были на месте
|
||
if task.LeadID <= 0 {
|
||
err = errors.New("got bad lead_id")
|
||
return w.checkErrTask(ctx, logger, history, "bad data", err)
|
||
}
|
||
|
||
if task.PenaID == "" {
|
||
err = errors.New("got bad pena_id")
|
||
return w.checkErrTask(ctx, logger, history, "bad data", err)
|
||
}
|
||
|
||
if task.Source.File == "" {
|
||
err = errors.New("got bad source.file")
|
||
return w.checkErrTask(ctx, logger, history, "bad data", err)
|
||
}
|
||
if task.Source.StorageID == "" {
|
||
err = errors.New("got bad source.storage_id")
|
||
return w.checkErrTask(ctx, logger, history, "bad data", err)
|
||
}
|
||
|
||
if task.Source.StorageType == "" {
|
||
err = errors.New("got bad source.storage_type")
|
||
return w.checkErrTask(ctx, logger, history, "bad data", err)
|
||
}
|
||
|
||
if task.Target.StorageID == "" {
|
||
err = errors.New("got bad target.storage_id")
|
||
return w.checkErrTask(ctx, logger, history, "bad data", err)
|
||
}
|
||
|
||
if task.Target.StorageType == "" {
|
||
err = errors.New("got bad target.storage_type")
|
||
return w.checkErrTask(ctx, logger, history, "bad data", err)
|
||
}
|
||
|
||
// amo client
|
||
|
||
amoData, err := w.dal.Amo.GetByID(ctx, task.AmoID)
|
||
if err != nil {
|
||
return w.checkErrTask(ctx, logger, history, "cannot get amo data", err)
|
||
}
|
||
|
||
// Check privileges
|
||
if len(amoData.Privileges) == 0 {
|
||
return w.checkErrTask(ctx, logger, history, "check privileges", errors.New("user has no tariff"))
|
||
}
|
||
|
||
// По количеству оставшихся генераций
|
||
var privilegeAmount int64
|
||
|
||
privilege, privilegeCountExists := amoData.Privileges[model.PrivilegeTemplateCount]
|
||
if privilegeCountExists {
|
||
privilegeAmount = privilege.Amount
|
||
|
||
if privilegeAmount < 1 {
|
||
return w.checkErrTask(ctx, logger, history, "check privileges", errors.New("tariff ended - not enough count"))
|
||
}
|
||
}
|
||
|
||
// По дате
|
||
privilege, privilegeUnlimTimeExists := amoData.Privileges[model.PrivilegeTemplateUnlimTime]
|
||
if privilegeUnlimTimeExists {
|
||
if privilege.CreatedAt.AddDate(0, 0, int(privilege.Amount)).After(time.Now()) {
|
||
return w.checkErrTask(ctx, logger, history, "check privileges", errors.New("tariff ended - by time"))
|
||
}
|
||
}
|
||
|
||
amoClient, err := w.amo.NewClient(ctx, amoData.Referer, amoData.Token(), "")
|
||
if err != nil {
|
||
return w.checkErrTask(ctx, logger, history, "cannot create amo client", err)
|
||
}
|
||
|
||
lead, err := DebounceWrapper(amoClient.GetLeadByID)(ctx, strconv.FormatInt(task.LeadID, 10))
|
||
if err != nil {
|
||
return w.checkErrTask(ctx, logger, history, "cannot get lead", err)
|
||
}
|
||
|
||
if lead == nil {
|
||
err = errors.New("empty lead data")
|
||
return w.checkErrTask(ctx, logger, history, "cannot get lead", err)
|
||
}
|
||
|
||
dataTemplate := map[string]interface{}{}
|
||
|
||
// Добавляем Инфо Лида
|
||
for k, v := range templategen.AmoLeadFieldsToRuMap(lead) {
|
||
dataTemplate[k] = v
|
||
}
|
||
|
||
// Добавялем инфо контактов
|
||
contacts := []amo.Contact{}
|
||
for _, data := range lead.Embedded.Contacts {
|
||
var contact *amo.Contact
|
||
contact, err = DebounceWrapper(amoClient.GetContactByID)(ctx, strconv.Itoa(data.ID))
|
||
if err == nil {
|
||
contacts = append(contacts, *contact)
|
||
} else {
|
||
return w.checkErrTask(ctx, logger, history, "cannot get contact", err)
|
||
}
|
||
}
|
||
|
||
dataTemplate["Контакты"] = templategen.AmoContactsFieldsToRuMap(contacts)
|
||
|
||
// Добавляем инфо компаний
|
||
companies := []amo.Company{}
|
||
for _, data := range lead.Embedded.Companies {
|
||
var company *amo.Company
|
||
company, err = DebounceWrapper(amoClient.GetCompanyByID)(ctx, strconv.Itoa(data.ID))
|
||
if err == nil {
|
||
companies = append(companies, *company)
|
||
} else {
|
||
return w.checkErrTask(ctx, logger, history, "cannot get company", err)
|
||
}
|
||
}
|
||
|
||
dataTemplate["Компании"] = templategen.AmoCompaniesFieldsToRuMap(companies)
|
||
|
||
err = w.Generate(ctx, task, lead.Name, dataTemplate)
|
||
|
||
if err != nil {
|
||
return w.checkErrTask(ctx, logger, history, "cannot generate", err)
|
||
}
|
||
|
||
if privilegeCountExists {
|
||
err = w.dal.Amo.UpdateAmountPrivilege(ctx, amoData.ID, model.PrivilegeTemplateCount, privilegeAmount-1)
|
||
}
|
||
|
||
if err != nil {
|
||
return w.checkErrTask(ctx, logger, history, "cannot update amo.privilege.amount", err)
|
||
}
|
||
|
||
history.Target.File = task.Target.File
|
||
history.Target.StorageID = task.Target.StorageID
|
||
history.Target.StorageType = task.Target.StorageType
|
||
history.DownloadURL = task.DownloadURL
|
||
history.PublicURL = task.PublicURL
|
||
|
||
return true
|
||
}
|
||
|
||
// checkErrTask - если найдена ошибка, пишет лог, добавляет в историю и возвращает false.
|
||
func (w *Worker) checkErrTask(ctx context.Context, logger *zap.Logger, history *model.History, msg string, err error) bool {
|
||
if err != nil {
|
||
logger.Error("task failed: "+msg, zap.Error(err))
|
||
if len(history.Errors) > 0 {
|
||
history.Errors = append(history.Errors, err.Error())
|
||
} else {
|
||
history.Errors = []string{err.Error()}
|
||
}
|
||
|
||
if err = w.dal.History.UpdateByID(ctx, history); err != nil {
|
||
logger.Error("cannot update history", zap.Error(err))
|
||
}
|
||
return false
|
||
}
|
||
|
||
return true
|
||
}
|
||
|
||
// Generate - генерирует файл и отправляет его на указанный в task.source диск
|
||
// TODO: !!!ВНИМАНИЕ!!! После написания тестов сделать чтобы отправлял на диск task.target!!!
|
||
func (w *Worker) Generate(ctx context.Context, task *model.WorkerTask, name string, data any) error {
|
||
switch task.Source.StorageType {
|
||
case "gdisk":
|
||
gdiskData, err := w.dal.GDisk.GetByID(ctx, task.Source.StorageID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
client, err := w.gDisk.NewClient(ctx, gdiskData.Token())
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
task.Target.File, task.DownloadURL, err = templategen.GDiskGenerateDoc(task.Source.File, name,
|
||
gdiskData.SaveFolderID, client, data)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
case "yadisk":
|
||
yaDiskData, err := w.dal.YaDisk.GetByID(ctx, task.Source.StorageID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if yaDiskData == nil {
|
||
return fmt.Errorf("no such yadisk")
|
||
}
|
||
|
||
client, err := w.yaDisk.NewClient(ctx, yaDiskData.Token(), "")
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
task.Target.File, task.DownloadURL, err = templategen.YaDiskGenerateDoc(ctx, task.Source.File, name,
|
||
yaDiskData.SaveFolder, client, data)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if err = client.PublishResource(ctx, task.Target.File); err != nil {
|
||
return err
|
||
}
|
||
|
||
res, err := client.GetResources(ctx, task.Target.File, 0, 0)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
task.PublicURL = res.PublicURL
|
||
|
||
case "penadisk":
|
||
penadiskData, err := w.dal.PenaDisk.GetByPenaID(ctx, task.PenaID)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
client := penadisk.NewClient(task.PenaID)
|
||
|
||
task.Target.File, task.DownloadURL, err = templategen.PenaDiskGenerateDocBytes(ctx, task.Source.File, name,
|
||
penadiskData.SaveFolder, client, data)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
task.PublicURL = task.DownloadURL
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (w *Worker) Stop() {
|
||
close(w.Tube)
|
||
w.lifeTimer.Stop()
|
||
}
|
||
|
||
func DebounceWrapper[T any](function T) T {
|
||
time.Sleep(DebounceDuration) // debounce
|
||
v := reflect.MakeFunc(reflect.TypeOf(function), func(in []reflect.Value) []reflect.Value {
|
||
f := reflect.ValueOf(function)
|
||
return f.Call(in)
|
||
})
|
||
return v.Interface().(T)
|
||
}
|