docxTemplaterWorker/worker/worker.go
2023-04-17 23:11:18 +00:00

339 lines
9.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package worker
import (
"context"
"errors"
"go.uber.org/zap"
"penahub.gitlab.yandexcloud.net/backend/templategen/amo"
"penahub.gitlab.yandexcloud.net/backend/templategen/dal"
"penahub.gitlab.yandexcloud.net/backend/templategen/dal/model"
gdisk "penahub.gitlab.yandexcloud.net/backend/templategen/gdisk"
"penahub.gitlab.yandexcloud.net/backend/templategen/penadisk"
"penahub.gitlab.yandexcloud.net/backend/templategen/templategen"
yadisk "penahub.gitlab.yandexcloud.net/backend/templategen/yadisk"
"fmt"
"reflect"
"strconv"
"time"
)
const (
WorkerLifetime = time.Minute * 15
DebounceDuration = time.Millisecond * 165
CacheSize = 10
TaskTimeout = 5 * time.Minute
)
type Worker struct {
AmoId string
Tube chan *model.WorkerTask // Буферизированный канал с размером CacheSize
ctx context.Context
lifeTimer *time.Timer
dal *dal.MongoDAL
yaDisk *yadisk.ClientApp
gDisk *gdisk.ClientApp
amo *amo.ClientApp
logger *zap.Logger
}
func NewWorker(amoId string, ctx context.Context, dal *dal.MongoDAL, yaDisk *yadisk.ClientApp, gDisk *gdisk.ClientApp, amo *amo.ClientApp, logger *zap.Logger) *Worker {
return &Worker{
AmoId: amoId,
Tube: make(chan *model.WorkerTask, CacheSize),
ctx: ctx,
lifeTimer: time.NewTimer(WorkerLifetime),
dal: dal,
yaDisk: yaDisk,
gDisk: gDisk,
amo: amo,
logger: logger.With(zap.String("amo_id", amoId))}
}
func (w *Worker) Start(stopWorker chan string) {
w.logger.Info("worker started")
go func() {
select {
case <-w.ctx.Done():
w.logger.Info("worker stopped - context done")
stopWorker <- w.AmoId
return
case <-w.lifeTimer.C:
w.logger.Info("worker stopped - timeout")
stopWorker <- w.AmoId
return
}
}()
go func() {
for task := range w.Tube {
w.logger.Info("new task", zap.String("_id", task.ID))
if task == nil {
w.logger.Error("worker got empty task")
continue
}
err := w.dal.WorkerTask.UpdateStatus(w.ctx, task.ID, model.WorkerTaskStatusProcessing)
if err != nil {
w.logger.Error("cannot update workerTask", zap.String("_id", task.ID))
}
task.Status = model.WorkerTaskStatusDone
if !w.DoTask(task) {
task.Status = model.WorkerTaskStatusFailed
w.logger.Info("task failed", zap.String("_id", task.ID))
} else {
w.logger.Info("task done", zap.String("_id", task.ID))
}
err = w.dal.WorkerTask.UpdateByID(w.ctx, task)
if err != nil {
w.logger.Error("cannot update workerTask", zap.String("_id", task.ID))
}
}
}()
}
func (w *Worker) DoTask(task *model.WorkerTask) bool {
var err error
logger := w.logger.With(zap.String("_id", task.ID))
// сбрасываем таймер
w.lifeTimer.Stop()
w.lifeTimer.Reset(WorkerLifetime)
// находим историю к которой привязана задача
history, err := w.dal.History.GetByWorkerTaskID(w.ctx, task.ID)
if err != nil {
logger.Error("get history", zap.Error(err))
}
defer func() {
err = w.dal.History.UpdateByID(w.ctx, history)
if err != nil {
w.logger.Error("cannot update history", zap.Error(err))
}
}()
// проверяем чтобы все поля были на месте
if task.LeadId <= 0 {
err = errors.New("got bad lead_id")
return w.checkErrTask(logger, history, "bad data", err)
}
if task.UserID == "" {
err = errors.New("got bad user_id")
return w.checkErrTask(logger, history, "bad data", err)
}
if task.Source.File == "" {
err = errors.New("got bad source.file")
return w.checkErrTask(logger, history, "bad data", err)
}
if task.Source.StorageID == "" {
err = errors.New("got bad source.storage_id")
return w.checkErrTask(logger, history, "bad data", err)
}
if task.Source.StorageType == "" {
err = errors.New("got bad source.storage_type")
return w.checkErrTask(logger, history, "bad data", err)
}
if task.Target.StorageID == "" {
err = errors.New("got bad target.storage_id")
return w.checkErrTask(logger, history, "bad data", err)
}
if task.Target.StorageType == "" {
err = errors.New("got bad target.storage_type")
return w.checkErrTask(logger, history, "bad data", err)
}
// amo client
amoData, err := w.dal.Amo.GetByID(w.ctx, task.AmoID)
if err != nil {
return w.checkErrTask(logger, history, "cannot get amo data", err)
}
amoClient, err := w.amo.NewClient(w.ctx, amoData.Referer, amoData.Token(), "")
if err != nil {
return w.checkErrTask(logger, history, "cannot create amo client", err)
}
lead, err := DebounceWrapper(amoClient.GetLeadById)(strconv.FormatInt(task.LeadId, 10))
if err != nil {
return w.checkErrTask(logger, history, "cannot get lead", err)
}
if lead == nil {
err = errors.New("empty lead data")
return w.checkErrTask(logger, history, "cannot get lead", err)
}
dataTemplate := map[string]interface{}{}
// Добавляем Инфо Лида
for k, v := range templategen.AmoLeadFieldsToRuMap(lead) {
dataTemplate[k] = v
}
// Добавялем инфо контактов
contacts := []amo.Contact{}
for _, data := range lead.Embedded.Contacts {
contact, err := DebounceWrapper(amoClient.GetContactById)(strconv.Itoa(data.Id))
if err == nil {
contacts = append(contacts, *contact)
} else {
return w.checkErrTask(logger, history, "cannot get contact", err)
}
}
dataTemplate["Контакты"] = templategen.AmoContactsFieldsToRuMap(contacts)
// Добавляем инфо компаний
companies := []amo.Company{}
for _, data := range lead.Embedded.Companies {
company, err := DebounceWrapper(amoClient.GetCompanyById)(strconv.Itoa(data.Id))
if err == nil {
companies = append(companies, *company)
} else {
return w.checkErrTask(logger, history, "cannot get company", err)
}
}
dataTemplate["Компании"] = templategen.AmoCompaniesFieldsToRuMap(companies)
err = w.Generate(task, lead.Name, dataTemplate)
history.Target.File = task.Target.File
history.Target.StorageID = task.Target.StorageID
history.Target.StorageType = task.Target.StorageType
history.DownloadUrl = task.DownloadUrl
history.PublicUrl = task.PublicUrl
return w.checkErrTask(logger, history, "cannot generate", err)
}
// checkErrTask - если найдена ошибка, пишет лог, добавляет в историю и возвращает false
func (w *Worker) checkErrTask(logger *zap.Logger, history *model.History, msg string, err error) bool {
if err != nil {
logger.Error("task failed: "+msg, zap.Error(err))
if len(history.Errors) > 0 {
history.Errors = append(history.Errors, err.Error())
} else {
history.Errors = []string{err.Error()}
}
err = w.dal.History.UpdateByID(w.ctx, history)
if err != nil {
logger.Error("cannot update history", zap.Error(err))
}
return false
}
return true
}
// Generate - генерирует файл и отправляет его на указанный в task.source диск
// @TODO: после написания тестов сделать чтобы отправлял на диск task.target!!!
func (w *Worker) Generate(task *model.WorkerTask, name string, data any) error {
switch task.Source.StorageType {
case "gdisk":
gdiskData, err := w.dal.GDisk.GetByID(w.ctx, task.Source.StorageID)
if err != nil {
return err
}
client, err := w.gDisk.NewClient(w.ctx, gdiskData.Token())
if err != nil {
return err
}
task.Target.File, task.DownloadUrl, err = templategen.GDiskGenerateDoc(task.Source.File, name,
task.UserID, gdiskData.SaveFolderID, client, data)
if err != nil {
return err
}
case "yadisk":
fmt.Println("YADI", task.Source.StorageID)
yaDiskData, err := w.dal.YaDisk.GetByID(w.ctx, task.Source.StorageID)
if err != nil {
return err
}
client, err := w.yaDisk.NewClient(w.ctx, yaDiskData.Token(), "")
if err != nil {
return err
}
task.Target.File, task.DownloadUrl, err = templategen.YaDiskGenerateDoc(task.Source.File, name,
task.UserID,
yaDiskData.SaveFolder, client, data)
if err != nil {
return err
}
err = client.PublishResource(task.Target.File)
if err != nil {
return err
}
res, err := client.GetResources(task.Target.File, 0, 0)
if err != nil {
return err
}
task.PublicUrl = res.PublicUrl
case "penadisk":
fmt.Println("ORAORA", task.UserID)
penadiskData, err := w.dal.PenaDisk.GetByUserID(w.ctx, task.UserID)
fmt.Println("ORAORA1", penadiskData, err)
if err != nil {
return err
}
client := penadisk.NewClient(task.UserID)
task.Target.File, task.DownloadUrl, err = templategen.PenaDiskGenerateDocBytes(task.Source.File, name,
task.UserID, penadiskData.SaveFolder, client, data)
fmt.Println("ORAORA2", task.Target.File, task.DownloadUrl)
if err != nil {
return err
}
res, err := client.PublishResources(task.Target.File)
fmt.Println("ORAORA3", res, err)
if err != nil {
return err
}
task.PublicUrl = res
}
return nil
}
func (w *Worker) Stop() {
close(w.Tube)
w.lifeTimer.Stop()
}
func DebounceWrapper[T any](function T) T {
time.Sleep(DebounceDuration) // debounce
v := reflect.MakeFunc(reflect.TypeOf(function), func(in []reflect.Value) []reflect.Value {
f := reflect.ValueOf(function)
return f.Call(in)
})
return v.Interface().(T)
}