Произвел рефакторинг

This commit is contained in:
Danil Solovyov 2023-07-14 04:00:30 +05:00
parent 4617bf4378
commit ce62454820
8 changed files with 346 additions and 173 deletions

@ -1 +1,4 @@
templategenworker_linux
.idea
go.work
go.work.sum

3
.gitignore vendored

@ -1 +1,4 @@
templategenworker_linux
.idea
go.work
go.work.sum

167
.golangci.yaml Normal file

@ -0,0 +1,167 @@
run:
timeout: 5m
skip-files:
- \.pb\.go$
- \.pb\.validate\.go$
- \.pb\.gw\.go$
- \.gen\.go$
skip-dirs:
- mocks
- proto
linters:
disable-all: true
enable:
- asasalint
- asciicheck
- bidichk
- bodyclose
- containedctx
# - depguard
- dogsled
- dupword
- durationcheck
- errcheck
- errchkjson
- exportloopref
# - goconst Временно отключен, ругается на dal\postgres\user.go [159, 18]
- gocritic
- godot
- gofmt
- gci
- goprintffuncname
- gosec
- gosimple
- govet
- importas
- ineffassign
- misspell
- nakedret
- nilerr
- noctx
- nolintlint
- nosprintfhostport
- prealloc
- predeclared
- revive
- rowserrcheck
- staticcheck
- stylecheck
- thelper
- typecheck
- unconvert
- unparam
- unused
- usestdlibvars
- whitespace
linters-settings:
errcheck:
exclude-functions:
- (io.Closer).Close
govet:
check-shadowing: true
gci:
custom-order: false
section-separators:
- newLine
sections:
- standard # Standard section: captures all standard packages.
- default # Default section: contains all imports that could not be matched to another section type.
- blank # Blank section: contains all blank imports. This section is not present unless explicitly enabled.
- dot # Dot section: contains all dot imports. This section is not present unless explicitly enabled.
importas:
no-unaliased: true
alias:
# Foundation libraries
- pkg: git.sbercloud.tech/products/paas/shared/foundation/management-server
alias: mgmtserver
maligned:
suggest-new: true
goconst:
min-len: 2
min-occurrences: 2
lll:
line-length: 140
revive:
rules:
# The following rules are recommended https://github.com/mgechev/revive#recommended-configuration
- name: blank-imports
- name: context-as-argument
- name: context-keys-type
- name: dot-imports
- name: error-return
- name: error-strings
- name: error-naming
# - name: exported
- name: if-return
- name: increment-decrement
- name: var-naming
- name: var-declaration
# - name: package-comments
- name: range
- name: receiver-naming
- name: time-naming
- name: unexported-return
- name: indent-error-flow
- name: errorf
- name: empty-block
- name: superfluous-else
- name: unused-parameter
- name: unreachable-code
- name: redefines-builtin-id
#
# Rules in addition to the recommended configuration above.
#
- name: bool-literal-in-expr
- name: constant-logical-expr
gosec:
excludes:
- G304 # Potential file inclusion via variable
- G307 # Deferring unsafe method "Close" on type "\*os.File"
- G107 # Potential HTTP request made with variable url
- G108 # Profiling endpoint is automatically exposed on /debug/pprof
gocritic:
enabled-tags:
- diagnostic
- experimental
- performance
disabled-checks:
- appendAssign
- dupImport # https://github.com/go-critic/go-critic/issues/845
- evalOrder
- ifElseChain
- octalLiteral
- regexpSimplify
- sloppyReassign
- truncateCmp
- typeDefFirst
- unnamedResult
- unnecessaryDefer
- whyNoLint
- wrapperFunc
- rangeValCopy
- hugeParam
issues:
fix: true
exclude-rules:
- text: "at least one file in a package should have a package comment"
linters:
- stylecheck
- text: "should have a package comment, unless it's in another file for this package"
linters:
- golint
- text: "should have comment or be unexported"
linters:
- golint
- path: _test\.go
linters:
- gosec
- dupl
exclude-use-default: false
output:
# colored-line-number|line-number|json|tab|checkstyle, default is "colored-line-number"
format: colored-line-number
print-linter-name: true

@ -11,8 +11,7 @@ import (
func getEnv(mask interface{}) Env {
r := reflect.ValueOf(mask)
var argTypeRV reflect.Value
argTypeRV = reflect.New(r.Type())
argTypeRV := reflect.New(r.Type())
for i := 0; i < r.NumField(); i++ {
v := r.Type().Field(i).Tag.Get("env")
@ -30,7 +29,7 @@ func getEnv(mask interface{}) Env {
case "string":
argTypeRV.Elem().Field(i).SetString(val)
case "bool":
if strings.ToLower(val) == "true" {
if strings.EqualFold(val, "true") {
argTypeRV.Elem().Field(i).SetBool(true)
} else {
argTypeRV.Elem().Field(i).SetBool(false)
@ -42,7 +41,7 @@ func getEnv(mask interface{}) Env {
}
argTypeRV.Elem().Field(i).SetUint(num)
default:
panic(errors.New("Something strange happend: " + t))
panic(errors.New("Something strange happened: " + t))
}
}

39
main.go

@ -3,24 +3,24 @@ package main
import (
"context"
"fmt"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"log"
"os"
"os/signal"
"syscall"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"penahub.gitlab.yandexcloud.net/backend/templategen/amo"
"penahub.gitlab.yandexcloud.net/backend/templategen/dal"
GDisk "penahub.gitlab.yandexcloud.net/backend/templategen/gdisk"
YaDisk "penahub.gitlab.yandexcloud.net/backend/templategen/yadisk"
"penahub.gitlab.yandexcloud.net/backend/templategen/gdisk"
"penahub.gitlab.yandexcloud.net/backend/templategen/yadisk"
"penahub.gitlab.yandexcloud.net/backend/templategenworker/worker"
"syscall"
)
type Env struct {
Domain string `env:"DOMAIN" default:"tempgen.pena.digital"`
LogFile string `env:"LOG_FILE" default:"./tmp/logs.log"`
MongoUrl string `env:"MONGO_URL" default:"mongodb://mongo1:30001,mongo2:30002,mongo3:30003/?replicaSet=penahub-rs"`
DbName string `env:"DB_NAME" default:"templategen"`
MongoURL string `env:"MONGO_URL" default:"mongodb://mongo1:30001,mongo2:30002,mongo3:30003/?replicaSet=penahub-rs"`
DBName string `env:"DB_NAME" default:"templategen"`
YaDiskClientID string `env:"YADISK_CLIENT_ID" default:"94482c181e5148c096ae6ad3b2a981ea"`
YaDiskClientSecret string `env:"YADISK_CLIENT_SECRET" default:"7dc4f541c3f64f4a9078e59d7494d222"`
GDiskCredentials string `env:"GDISK_CREDENTIALS" default:"./static/gdisk-credentials.json"`
@ -52,14 +52,15 @@ func main() {
// Start Data Access Layer
mongoDal, err := dal.InitMongoDAL(ctx, opts.MongoUrl, opts.DbName, logger)
mongoDal, err := dal.InitMongoDAL(ctx, opts.MongoURL, opts.DBName, logger)
defer mongoDal.Disconnect()
if err != nil {
logger.Fatal("ErrorConnectToDAL", zap.Error(err))
logger.Error("ErrorConnectToDAL", zap.Error(err))
return
}
// Yandex Disk
yaDisk := YaDisk.NewClientApp(
yaDisk := yadisk.NewClientApp(
opts.YaDiskClientID,
opts.YaDiskClientSecret,
fmt.Sprintf("https://%v/yadisk", opts.Domain),
@ -67,9 +68,10 @@ func main() {
)
// Google Drive
gDisk, err := GDisk.NewClientApp(opts.GDiskCredentials)
gDisk, err := gdisk.NewClientApp(opts.GDiskCredentials)
if err != nil {
log.Fatal("ErrorCreateGoogleDriveClientApp:", err)
logger.Error("ErrorCreateGoogleDriveClientApp:", zap.Error(err))
return
}
// Amo
@ -79,10 +81,10 @@ func main() {
fmt.Sprintf("https://%v/amo", opts.Domain))
// Start supervisor worker
supervisor := worker.NewSuperVisor(ctx, mongoDal, yaDisk, gDisk, amoApp, logger)
supervisor := worker.NewSuperVisor(mongoDal, yaDisk, gDisk, amoApp, logger)
defer supervisor.Stop()
supervisor.Start()
supervisor.Start(ctx)
// Graceful Shutdown
interrupt := make(chan os.Signal, 1)
@ -90,11 +92,12 @@ func main() {
killSignal := <-interrupt
switch killSignal {
case os.Interrupt:
logger.Fatal("AppInterrupted")
logger.Error("AppInterrupted")
return
case syscall.SIGTERM:
logger.Fatal("AppTerminated")
logger.Error("AppTerminated")
return
}
defer cancel()
}

@ -13,23 +13,23 @@ func NewPool() *Pool {
return &Pool{workers: map[string]*Worker{}, mux: sync.RWMutex{}}
}
func (p *Pool) Set(amoId string, worker *Worker) {
func (p *Pool) Set(amoID string, worker *Worker) {
p.mux.Lock()
p.workers[amoId] = worker
p.workers[amoID] = worker
p.mux.Unlock()
}
func (p *Pool) Get(amoId string) *Worker {
func (p *Pool) Get(amoID string) *Worker {
p.mux.Lock()
worker := p.workers[amoId]
worker := p.workers[amoID]
p.mux.Unlock()
return worker
}
// Unset - останавливает воркер и удаляет его из пула
func (p *Pool) Unset(amoId string) {
// Unset - останавливает воркер и удаляет его из пула.
func (p *Pool) Unset(amoID string) {
p.mux.Lock()
p.workers[amoId].Stop()
delete(p.workers, amoId)
p.workers[amoID].Stop()
delete(p.workers, amoID)
p.mux.Unlock()
}

@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
@ -11,9 +13,8 @@ import (
"penahub.gitlab.yandexcloud.net/backend/templategen/dal"
"penahub.gitlab.yandexcloud.net/backend/templategen/dal/model"
"penahub.gitlab.yandexcloud.net/backend/templategen/dal/mongos"
gdisk "penahub.gitlab.yandexcloud.net/backend/templategen/gdisk"
yadisk "penahub.gitlab.yandexcloud.net/backend/templategen/yadisk"
"time"
"penahub.gitlab.yandexcloud.net/backend/templategen/gdisk"
"penahub.gitlab.yandexcloud.net/backend/templategen/yadisk"
)
const (
@ -21,7 +22,6 @@ const (
)
type SuperVisor struct {
ctx context.Context
dal *dal.MongoDAL
yaDisk *yadisk.ClientApp
gDisk *gdisk.ClientApp
@ -33,9 +33,8 @@ type SuperVisor struct {
interrupter chan bool
}
func NewSuperVisor(ctx context.Context, dal *dal.MongoDAL, yaDisk *yadisk.ClientApp, gDisk *gdisk.ClientApp, amo *amo.ClientApp, logger *zap.Logger) *SuperVisor {
func NewSuperVisor(dal *dal.MongoDAL, yaDisk *yadisk.ClientApp, gDisk *gdisk.ClientApp, amo *amo.ClientApp, logger *zap.Logger) *SuperVisor {
return &SuperVisor{
ctx: ctx,
dal: dal,
yaDisk: yaDisk,
gDisk: gDisk,
@ -48,12 +47,12 @@ func NewSuperVisor(ctx context.Context, dal *dal.MongoDAL, yaDisk *yadisk.Client
}
}
func (sv *SuperVisor) Start() {
func (sv *SuperVisor) Start(ctx context.Context) {
// Проверяем задачи со статусом new, pending и processing, если не просрочены отправляем их в работу
tasks, err := sv.dal.WorkerTask.GetByFilter(
sv.ctx,
ctx,
&mongos.WorkerStatusFilter{Status: model.WorkerTaskStatusNew},
options.Find().SetSort(bson.D{{"created_at", 1}}),
options.Find().SetSort(bson.D{{Key: "created_at", Value: 1}}),
)
if err != nil {
@ -64,11 +63,11 @@ func (sv *SuperVisor) Start() {
sv.logger.Info(fmt.Sprintf("got %v tasks with status=new", len(tasks)))
}
for _, task := range tasks {
if task.UpdatedAt.Add(TaskTimeout).Before(time.Now()) {
sv.SetTaskTimeout(&task)
for i := range tasks {
if tasks[i].UpdatedAt.Add(TaskTimeout).Before(time.Now()) {
sv.SetTaskTimeout(ctx, &tasks[i])
} else {
go sv.SetTask(task)
go sv.SetTask(ctx, tasks[i])
}
}
@ -76,9 +75,9 @@ func (sv *SuperVisor) Start() {
time.Sleep(time.Second * 5)
tasks, err = sv.dal.WorkerTask.GetByFilter(
sv.ctx,
ctx,
&mongos.WorkerStatusFilter{Status: model.WorkerTaskStatusProcessing},
options.Find().SetSort(bson.D{{"created_at", 1}}),
options.Find().SetSort(bson.D{{Key: "created_at", Value: 1}}),
)
if err != nil {
@ -89,11 +88,11 @@ func (sv *SuperVisor) Start() {
sv.logger.Info(fmt.Sprintf("got %v tasks with status=processing", len(tasks)))
}
for _, task := range tasks {
if task.UpdatedAt.Add(TaskTimeout).Before(time.Now()) {
sv.SetTaskTimeout(&task)
for i := range tasks {
if tasks[i].UpdatedAt.Add(TaskTimeout).Before(time.Now()) {
sv.SetTaskTimeout(ctx, &tasks[i])
} else {
go sv.SetTask(task)
go sv.SetTask(ctx, tasks[i])
}
}
@ -101,9 +100,9 @@ func (sv *SuperVisor) Start() {
time.Sleep(time.Second * 5)
tasks, err = sv.dal.WorkerTask.GetByFilter(
sv.ctx,
ctx,
&mongos.WorkerStatusFilter{Status: model.WorkerTaskStatusPending},
options.Find().SetSort(bson.D{{"created_at", 1}}),
options.Find().SetSort(bson.D{{Key: "created_at", Value: 1}}),
)
if err != nil {
@ -114,11 +113,11 @@ func (sv *SuperVisor) Start() {
sv.logger.Info(fmt.Sprintf("got %v tasks with status=pending", len(tasks)))
}
for _, task := range tasks {
if task.UpdatedAt.Add(TaskTimeout).Before(time.Now()) {
sv.SetTaskTimeout(&task)
for i := range tasks {
if tasks[i].UpdatedAt.Add(TaskTimeout).Before(time.Now()) {
sv.SetTaskTimeout(ctx, &tasks[i])
} else {
go sv.SetTask(task)
go sv.SetTask(ctx, tasks[i])
}
}
@ -128,21 +127,24 @@ func (sv *SuperVisor) Start() {
// Запускаем горутину c получением новых задач
go func() {
workerTaskChan := make(chan model.WorkerTask)
go sv.dal.WorkerTask.Listen(sv.ctx, model.WorkerTaskStatusNew, workerTaskChan)
go sv.dal.WorkerTask.Listen(ctx, model.WorkerTaskStatusNew, workerTaskChan)
for task := range workerTaskChan {
go sv.SetTask(task)
go sv.SetTask(ctx, task)
}
}()
// Запускаем горутину проверяющую просрочку задач по статусу и отвечающую за закрытие супервизора
go func() {
ticker := time.NewTicker(SuperVisorTick)
defer ticker.Stop()
for {
// Проверяем задачи в processing и при просрочке ставим статус timeout
tasks, err = sv.dal.WorkerTask.GetByFilter(
sv.ctx,
ctx,
&mongos.WorkerStatusFilter{Status: model.WorkerTaskStatusProcessing},
options.Find().SetSort(bson.D{{"created_at", 1}}),
options.Find().SetSort(bson.D{{Key: "created_at", Value: 1}}),
)
if err != nil {
@ -153,17 +155,17 @@ func (sv *SuperVisor) Start() {
sv.logger.Info(fmt.Sprintf("got %v tasks with status=processing", len(tasks)))
}
for _, task := range tasks {
if task.UpdatedAt.Add(TaskTimeout).Before(time.Now()) {
sv.SetTaskTimeout(&task)
for i := range tasks {
if tasks[i].UpdatedAt.Add(TaskTimeout).Before(time.Now()) {
sv.SetTaskTimeout(ctx, &tasks[i])
}
}
// Проверяем задачи в pending и при просрочке ставим статус timeout
tasks, err = sv.dal.WorkerTask.GetByFilter(
sv.ctx,
ctx,
&mongos.WorkerStatusFilter{Status: model.WorkerTaskStatusPending},
options.Find().SetSort(bson.D{{"created_at", 1}}),
options.Find().SetSort(bson.D{{Key: "created_at", Value: 1}}),
)
if err != nil {
@ -174,17 +176,17 @@ func (sv *SuperVisor) Start() {
sv.logger.Info(fmt.Sprintf("got %v tasks with status=processing", len(tasks)))
}
for _, task := range tasks {
if task.UpdatedAt.Add(TaskTimeout).Before(time.Now()) {
sv.SetTaskTimeout(&task)
for i := range tasks {
if tasks[i].UpdatedAt.Add(TaskTimeout).Before(time.Now()) {
sv.SetTaskTimeout(ctx, &tasks[i])
}
}
select {
case <-sv.ctx.Done():
case <-ctx.Done():
sv.Stop()
return
case <-time.Tick(SuperVisorTick):
case <-ticker.C:
continue
case <-sv.interrupter:
return
@ -196,10 +198,10 @@ func (sv *SuperVisor) Start() {
go func() {
for {
select {
case workerAmoId := <-sv.stopWorker:
sv.pool.Unset(workerAmoId)
case workerAmoID := <-sv.stopWorker:
sv.pool.Unset(workerAmoID)
continue
case <-sv.ctx.Done():
case <-ctx.Done():
sv.Stop()
return
case <-sv.interrupter:
@ -209,10 +211,10 @@ func (sv *SuperVisor) Start() {
}()
}
// SetTask - ставит задаче статус pending, отправляет её в воркер. Если воркер не существует, то создает его
func (sv *SuperVisor) SetTask(task model.WorkerTask) {
// SetTask - ставит задаче статус pending, отправляет её в воркер. Если воркер не существует, то создает его.
func (sv *SuperVisor) SetTask(ctx context.Context, task model.WorkerTask) {
// Ставим статус ожидания
err := sv.dal.WorkerTask.UpdateStatus(sv.ctx, task.ID, model.WorkerTaskStatusPending)
err := sv.dal.WorkerTask.UpdateStatus(ctx, task.ID, model.WorkerTaskStatusPending)
if err != nil {
sv.logger.Error("cannot update workerTask status", zap.Error(err), zap.String("_id", task.ID))
}
@ -223,22 +225,20 @@ func (sv *SuperVisor) SetTask(task model.WorkerTask) {
sv.logger.Error(err.Error(), zap.String("_id", task.ID))
var historyFound []model.History
historyFound, err = sv.dal.History.GetByFilter(sv.ctx, &mongos.HistoryFilter{WorkerTaskID: task.ID})
historyFound, err = sv.dal.History.GetByFilter(ctx, &mongos.HistoryFilter{WorkerTaskID: task.ID})
if err != nil {
sv.logger.Error("cannot get history", zap.Error(err), zap.String("_id", task.ID))
} else {
if len(historyFound) > 0 {
historyFound[0].Errors = append(historyFound[0].Errors, err.Error())
err = sv.dal.History.UpdateByID(sv.ctx, &historyFound[0])
} else if len(historyFound) > 0 {
historyFound[0].Errors = append(historyFound[0].Errors, err.Error())
err = sv.dal.History.UpdateByID(ctx, &historyFound[0])
if err != nil {
sv.logger.Error("cannot update history", zap.Error(err), zap.String("_id", task.ID))
}
if err != nil {
sv.logger.Error("cannot update history", zap.Error(err), zap.String("_id", task.ID))
}
}
err = sv.dal.WorkerTask.UpdateStatus(sv.ctx, task.ID, model.WorkerTaskStatusFailed)
err = sv.dal.WorkerTask.UpdateStatus(ctx, task.ID, model.WorkerTaskStatusFailed)
if err != nil {
sv.logger.Error("cannot update workerTask status", zap.Error(err), zap.String("_id", task.ID))
}
@ -248,37 +248,35 @@ func (sv *SuperVisor) SetTask(task model.WorkerTask) {
// Создаем воркеры
worker := sv.pool.Get(task.AmoID)
if worker == nil {
worker = NewWorker(task.AmoID, sv.ctx, sv.dal, sv.yaDisk, sv.gDisk, sv.amo, sv.logger)
worker = NewWorker(task.AmoID, sv.dal, sv.yaDisk, sv.gDisk, sv.amo, sv.logger)
sv.pool.Set(task.AmoID, worker)
worker.Start(sv.stopWorker)
worker.Start(ctx, sv.stopWorker)
}
sv.pool.Get(task.AmoID).Tube <- &task
}
func (sv *SuperVisor) SetTaskTimeout(task *model.WorkerTask) {
func (sv *SuperVisor) SetTaskTimeout(ctx context.Context, task *model.WorkerTask) {
sv.logger.Info("task timeout", zap.String("_id", task.ID))
historyFound, err := sv.dal.History.GetByFilter(sv.ctx, &mongos.HistoryFilter{WorkerTaskID: task.ID})
historyFound, err := sv.dal.History.GetByFilter(ctx, &mongos.HistoryFilter{WorkerTaskID: task.ID})
if err != nil {
sv.logger.Error("cannot get history", zap.Error(err), zap.String("_id", task.ID))
} else {
if len(historyFound) > 0 {
err = errors.New("task timeout")
if len(historyFound[0].Errors) > 0 {
historyFound[0].Errors = append(historyFound[0].Errors, err.Error())
} else {
historyFound[0].Errors = []string{err.Error()}
}
} else if len(historyFound) > 0 {
err = errors.New("task timeout")
if len(historyFound[0].Errors) > 0 {
historyFound[0].Errors = append(historyFound[0].Errors, err.Error())
} else {
historyFound[0].Errors = []string{err.Error()}
}
err = sv.dal.History.UpdateByID(sv.ctx, &historyFound[0])
if err != nil {
sv.logger.Error("cannot update history", zap.Error(err), zap.String("_id", task.ID))
}
err = sv.dal.History.UpdateByID(ctx, &historyFound[0])
if err != nil {
sv.logger.Error("cannot update history", zap.Error(err), zap.String("_id", task.ID))
}
}
err = sv.dal.WorkerTask.UpdateStatus(sv.ctx, task.ID, model.WorkerTaskStatusTimeout)
err = sv.dal.WorkerTask.UpdateStatus(ctx, task.ID, model.WorkerTaskStatusTimeout)
if err != nil {
sv.logger.Error("cannot update workerTask status", zap.Error(err), zap.String("_id", task.ID))
}
@ -286,7 +284,7 @@ func (sv *SuperVisor) SetTaskTimeout(task *model.WorkerTask) {
func (sv *SuperVisor) Stop() {
for _, worker := range sv.pool.workers {
sv.pool.Unset(worker.AmoId)
sv.pool.Unset(worker.AmoID)
}
close(sv.workerTaskChan)
sv.interrupter <- true

@ -3,18 +3,19 @@ package worker
import (
"context"
"errors"
"go.uber.org/zap"
"penahub.gitlab.yandexcloud.net/backend/templategen/amo"
"penahub.gitlab.yandexcloud.net/backend/templategen/dal"
"penahub.gitlab.yandexcloud.net/backend/templategen/dal/model"
gdisk "penahub.gitlab.yandexcloud.net/backend/templategen/gdisk"
"penahub.gitlab.yandexcloud.net/backend/templategen/penadisk"
"penahub.gitlab.yandexcloud.net/backend/templategen/templategen"
yadisk "penahub.gitlab.yandexcloud.net/backend/templategen/yadisk"
"fmt"
"reflect"
"strconv"
"time"
"go.uber.org/zap"
"penahub.gitlab.yandexcloud.net/backend/templategen/amo"
"penahub.gitlab.yandexcloud.net/backend/templategen/dal"
"penahub.gitlab.yandexcloud.net/backend/templategen/dal/model"
"penahub.gitlab.yandexcloud.net/backend/templategen/gdisk"
"penahub.gitlab.yandexcloud.net/backend/templategen/penadisk"
"penahub.gitlab.yandexcloud.net/backend/templategen/templategen"
"penahub.gitlab.yandexcloud.net/backend/templategen/yadisk"
)
const (
@ -25,9 +26,8 @@ const (
)
type Worker struct {
AmoId string
AmoID string
Tube chan *model.WorkerTask // Буферизированный канал с размером CacheSize
ctx context.Context
lifeTimer *time.Timer
dal *dal.MongoDAL
yaDisk *yadisk.ClientApp
@ -36,57 +36,56 @@ type Worker struct {
logger *zap.Logger
}
func NewWorker(amoId string, ctx context.Context, dal *dal.MongoDAL, yaDisk *yadisk.ClientApp, gDisk *gdisk.ClientApp, amo *amo.ClientApp, logger *zap.Logger) *Worker {
func NewWorker(amoID string, dal *dal.MongoDAL, yaDisk *yadisk.ClientApp, gDisk *gdisk.ClientApp, amo *amo.ClientApp, logger *zap.Logger) *Worker {
return &Worker{
AmoId: amoId,
AmoID: amoID,
Tube: make(chan *model.WorkerTask, CacheSize),
ctx: ctx,
lifeTimer: time.NewTimer(WorkerLifetime),
dal: dal,
yaDisk: yaDisk,
gDisk: gDisk,
amo: amo,
logger: logger.With(zap.String("amo_id", amoId))}
logger: logger.With(zap.String("amo_id", amoID))}
}
func (w *Worker) Start(stopWorker chan string) {
func (w *Worker) Start(ctx context.Context, stopWorker chan string) {
w.logger.Info("worker started")
go func() {
select {
case <-w.ctx.Done():
case <-ctx.Done():
w.logger.Info("worker stopped - context done")
stopWorker <- w.AmoId
stopWorker <- w.AmoID
return
case <-w.lifeTimer.C:
w.logger.Info("worker stopped - timeout")
stopWorker <- w.AmoId
stopWorker <- w.AmoID
return
}
}()
go func() {
for task := range w.Tube {
w.logger.Info("new task", zap.String("_id", task.ID))
if task == nil {
w.logger.Error("worker got empty task")
continue
}
err := w.dal.WorkerTask.UpdateStatus(w.ctx, task.ID, model.WorkerTaskStatusProcessing)
w.logger.Info("new task", zap.String("_id", task.ID))
err := w.dal.WorkerTask.UpdateStatus(ctx, task.ID, model.WorkerTaskStatusProcessing)
if err != nil {
w.logger.Error("cannot update workerTask", zap.String("_id", task.ID))
}
task.Status = model.WorkerTaskStatusDone
if !w.DoTask(task) {
if !w.DoTask(ctx, task) {
task.Status = model.WorkerTaskStatusFailed
w.logger.Info("task failed", zap.String("_id", task.ID))
} else {
w.logger.Info("task done", zap.String("_id", task.ID))
}
err = w.dal.WorkerTask.UpdateByID(w.ctx, task)
err = w.dal.WorkerTask.UpdateByID(ctx, task)
if err != nil {
w.logger.Error("cannot update workerTask", zap.String("_id", task.ID))
@ -95,7 +94,7 @@ func (w *Worker) Start(stopWorker chan string) {
}()
}
func (w *Worker) DoTask(task *model.WorkerTask) bool {
func (w *Worker) DoTask(ctx context.Context, task *model.WorkerTask) bool {
var err error
logger := w.logger.With(zap.String("_id", task.ID))
@ -104,74 +103,74 @@ func (w *Worker) DoTask(task *model.WorkerTask) bool {
w.lifeTimer.Reset(WorkerLifetime)
// находим историю к которой привязана задача
history, err := w.dal.History.GetByWorkerTaskID(w.ctx, task.ID)
history, err := w.dal.History.GetByWorkerTaskID(ctx, task.ID)
if err != nil {
logger.Error("get history", zap.Error(err))
}
defer func() {
err = w.dal.History.UpdateByID(w.ctx, history)
err = w.dal.History.UpdateByID(ctx, history)
if err != nil {
w.logger.Error("cannot update history", zap.Error(err))
}
}()
// проверяем чтобы все поля были на месте
if task.LeadId <= 0 {
if task.LeadID <= 0 {
err = errors.New("got bad lead_id")
return w.checkErrTask(logger, history, "bad data", err)
return w.checkErrTask(ctx, logger, history, "bad data", err)
}
if task.UserID == "" {
err = errors.New("got bad user_id")
return w.checkErrTask(logger, history, "bad data", err)
return w.checkErrTask(ctx, logger, history, "bad data", err)
}
if task.Source.File == "" {
err = errors.New("got bad source.file")
return w.checkErrTask(logger, history, "bad data", err)
return w.checkErrTask(ctx, logger, history, "bad data", err)
}
if task.Source.StorageID == "" {
err = errors.New("got bad source.storage_id")
return w.checkErrTask(logger, history, "bad data", err)
return w.checkErrTask(ctx, logger, history, "bad data", err)
}
if task.Source.StorageType == "" {
err = errors.New("got bad source.storage_type")
return w.checkErrTask(logger, history, "bad data", err)
return w.checkErrTask(ctx, logger, history, "bad data", err)
}
if task.Target.StorageID == "" {
err = errors.New("got bad target.storage_id")
return w.checkErrTask(logger, history, "bad data", err)
return w.checkErrTask(ctx, logger, history, "bad data", err)
}
if task.Target.StorageType == "" {
err = errors.New("got bad target.storage_type")
return w.checkErrTask(logger, history, "bad data", err)
return w.checkErrTask(ctx, logger, history, "bad data", err)
}
// amo client
amoData, err := w.dal.Amo.GetByID(w.ctx, task.AmoID)
amoData, err := w.dal.Amo.GetByID(ctx, task.AmoID)
if err != nil {
return w.checkErrTask(logger, history, "cannot get amo data", err)
return w.checkErrTask(ctx, logger, history, "cannot get amo data", err)
}
amoClient, err := w.amo.NewClient(w.ctx, amoData.Referer, amoData.Token(), "")
amoClient, err := w.amo.NewClient(ctx, amoData.Referer, amoData.Token(), "")
if err != nil {
return w.checkErrTask(logger, history, "cannot create amo client", err)
return w.checkErrTask(ctx, logger, history, "cannot create amo client", err)
}
lead, err := DebounceWrapper(amoClient.GetLeadById)(strconv.FormatInt(task.LeadId, 10))
lead, err := DebounceWrapper(amoClient.GetLeadByID)(ctx, strconv.FormatInt(task.LeadID, 10))
if err != nil {
return w.checkErrTask(logger, history, "cannot get lead", err)
return w.checkErrTask(ctx, logger, history, "cannot get lead", err)
}
if lead == nil {
err = errors.New("empty lead data")
return w.checkErrTask(logger, history, "cannot get lead", err)
return w.checkErrTask(ctx, logger, history, "cannot get lead", err)
}
dataTemplate := map[string]interface{}{}
@ -184,11 +183,12 @@ func (w *Worker) DoTask(task *model.WorkerTask) bool {
// Добавялем инфо контактов
contacts := []amo.Contact{}
for _, data := range lead.Embedded.Contacts {
contact, err := DebounceWrapper(amoClient.GetContactById)(strconv.Itoa(data.Id))
var contact *amo.Contact
contact, err = DebounceWrapper(amoClient.GetContactByID)(ctx, strconv.Itoa(data.ID))
if err == nil {
contacts = append(contacts, *contact)
} else {
return w.checkErrTask(logger, history, "cannot get contact", err)
return w.checkErrTask(ctx, logger, history, "cannot get contact", err)
}
}
@ -197,29 +197,30 @@ func (w *Worker) DoTask(task *model.WorkerTask) bool {
// Добавляем инфо компаний
companies := []amo.Company{}
for _, data := range lead.Embedded.Companies {
company, err := DebounceWrapper(amoClient.GetCompanyById)(strconv.Itoa(data.Id))
var company *amo.Company
company, err = DebounceWrapper(amoClient.GetCompanyByID)(ctx, strconv.Itoa(data.ID))
if err == nil {
companies = append(companies, *company)
} else {
return w.checkErrTask(logger, history, "cannot get company", err)
return w.checkErrTask(ctx, logger, history, "cannot get company", err)
}
}
dataTemplate["Компании"] = templategen.AmoCompaniesFieldsToRuMap(companies)
err = w.Generate(task, lead.Name, dataTemplate)
err = w.Generate(ctx, task, lead.Name, dataTemplate)
history.Target.File = task.Target.File
history.Target.StorageID = task.Target.StorageID
history.Target.StorageType = task.Target.StorageType
history.DownloadUrl = task.DownloadUrl
history.PublicUrl = task.PublicUrl
history.DownloadURL = task.DownloadURL
history.PublicURL = task.PublicURL
return w.checkErrTask(logger, history, "cannot generate", err)
return w.checkErrTask(ctx, logger, history, "cannot generate", err)
}
// checkErrTask - если найдена ошибка, пишет лог, добавляет в историю и возвращает false
func (w *Worker) checkErrTask(logger *zap.Logger, history *model.History, msg string, err error) bool {
// checkErrTask - если найдена ошибка, пишет лог, добавляет в историю и возвращает false.
func (w *Worker) checkErrTask(ctx context.Context, logger *zap.Logger, history *model.History, msg string, err error) bool {
if err != nil {
logger.Error("task failed: "+msg, zap.Error(err))
if len(history.Errors) > 0 {
@ -228,7 +229,7 @@ func (w *Worker) checkErrTask(logger *zap.Logger, history *model.History, msg st
history.Errors = []string{err.Error()}
}
err = w.dal.History.UpdateByID(w.ctx, history)
err = w.dal.History.UpdateByID(ctx, history)
if err != nil {
logger.Error("cannot update history", zap.Error(err))
}
@ -239,28 +240,28 @@ func (w *Worker) checkErrTask(logger *zap.Logger, history *model.History, msg st
}
// Generate - генерирует файл и отправляет его на указанный в task.source диск
// @TODO: после написания тестов сделать чтобы отправлял на диск task.target!!!
func (w *Worker) Generate(task *model.WorkerTask, name string, data any) error {
// TODO: !!!ВНИМАНИЕ!!! После написания тестов сделать чтобы отправлял на диск task.target!!!
func (w *Worker) Generate(ctx context.Context, task *model.WorkerTask, name string, data any) error {
switch task.Source.StorageType {
case "gdisk":
gdiskData, err := w.dal.GDisk.GetByID(w.ctx, task.Source.StorageID)
gdiskData, err := w.dal.GDisk.GetByID(ctx, task.Source.StorageID)
if err != nil {
return err
}
client, err := w.gDisk.NewClient(w.ctx, gdiskData.Token())
client, err := w.gDisk.NewClient(ctx, gdiskData.Token())
if err != nil {
return err
}
task.Target.File, task.DownloadUrl, err = templategen.GDiskGenerateDoc(task.Source.File, name,
task.UserID, gdiskData.SaveFolderID, client, data)
task.Target.File, task.DownloadURL, err = templategen.GDiskGenerateDoc(task.Source.File, name,
gdiskData.SaveFolderID, client, data)
if err != nil {
return err
}
case "yadisk":
yaDiskData, err := w.dal.YaDisk.GetByID(w.ctx, task.Source.StorageID)
yaDiskData, err := w.dal.YaDisk.GetByID(ctx, task.Source.StorageID)
if err != nil {
return err
}
@ -269,50 +270,49 @@ func (w *Worker) Generate(task *model.WorkerTask, name string, data any) error {
return fmt.Errorf("no such yadisk")
}
client, err := w.yaDisk.NewClient(w.ctx, yaDiskData.Token(), "")
client, err := w.yaDisk.NewClient(ctx, yaDiskData.Token(), "")
if err != nil {
return err
}
task.Target.File, task.DownloadUrl, err = templategen.YaDiskGenerateDoc(task.Source.File, name,
task.UserID,
task.Target.File, task.DownloadURL, err = templategen.YaDiskGenerateDoc(ctx, task.Source.File, name,
yaDiskData.SaveFolder, client, data)
if err != nil {
return err
}
err = client.PublishResource(task.Target.File)
err = client.PublishResource(ctx, task.Target.File)
if err != nil {
return err
}
res, err := client.GetResources(task.Target.File, 0, 0)
res, err := client.GetResources(ctx, task.Target.File, 0, 0)
if err != nil {
return err
}
task.PublicUrl = res.PublicUrl
task.PublicURL = res.PublicURL
case "penadisk":
penadiskData, err := w.dal.PenaDisk.GetByUserID(w.ctx, task.UserID)
fmt.Println("ORAORA1", penadiskData, err)
penadiskData, err := w.dal.PenaDisk.GetByUserID(ctx, task.UserID)
fmt.Println("ORAORA1", penadiskData, err)
if err != nil {
return err
}
client := penadisk.NewClient(task.UserID)
task.Target.File, task.DownloadUrl, err = templategen.PenaDiskGenerateDocBytes(task.Source.File, name,
task.UserID, penadiskData.SaveFolder, client, data)
task.Target.File, task.DownloadURL, err = templategen.PenaDiskGenerateDocBytes(ctx, task.Source.File, name,
penadiskData.SaveFolder, client, data)
fmt.Println("ORAORA2", task.Target.File, task.DownloadUrl)
fmt.Println("ORAORA2", task.Target.File, task.DownloadURL)
if err != nil {
return err
}
task.PublicUrl = task.DownloadUrl
task.PublicURL = task.DownloadURL
}
return nil