diff --git a/.dockerignore b/.dockerignore index 828c31b..4312676 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1 +1,4 @@ templategenworker_linux +.idea +go.work +go.work.sum \ No newline at end of file diff --git a/.gitignore b/.gitignore index 828c31b..4312676 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ templategenworker_linux +.idea +go.work +go.work.sum \ No newline at end of file diff --git a/.golangci.yaml b/.golangci.yaml new file mode 100644 index 0000000..170cf07 --- /dev/null +++ b/.golangci.yaml @@ -0,0 +1,167 @@ +run: + timeout: 5m + skip-files: + - \.pb\.go$ + - \.pb\.validate\.go$ + - \.pb\.gw\.go$ + - \.gen\.go$ + skip-dirs: + - mocks + - proto + +linters: + disable-all: true + enable: + - asasalint + - asciicheck + - bidichk + - bodyclose + - containedctx + # - depguard + - dogsled + - dupword + - durationcheck + - errcheck + - errchkjson + - exportloopref + # - goconst Временно отключен, ругается на dal\postgres\user.go [159, 18] + - gocritic + - godot + - gofmt + - gci + - goprintffuncname + - gosec + - gosimple + - govet + - importas + - ineffassign + - misspell + - nakedret + - nilerr + - noctx + - nolintlint + - nosprintfhostport + - prealloc + - predeclared + - revive + - rowserrcheck + - staticcheck + - stylecheck + - thelper + - typecheck + - unconvert + - unparam + - unused + - usestdlibvars + - whitespace + +linters-settings: + errcheck: + exclude-functions: + - (io.Closer).Close + govet: + check-shadowing: true + gci: + custom-order: false + section-separators: + - newLine + sections: + - standard # Standard section: captures all standard packages. + - default # Default section: contains all imports that could not be matched to another section type. + - blank # Blank section: contains all blank imports. This section is not present unless explicitly enabled. + - dot # Dot section: contains all dot imports. This section is not present unless explicitly enabled. + importas: + no-unaliased: true + alias: + # Foundation libraries + - pkg: git.sbercloud.tech/products/paas/shared/foundation/management-server + alias: mgmtserver + maligned: + suggest-new: true + goconst: + min-len: 2 + min-occurrences: 2 + lll: + line-length: 140 + revive: + rules: + # The following rules are recommended https://github.com/mgechev/revive#recommended-configuration + - name: blank-imports + - name: context-as-argument + - name: context-keys-type + - name: dot-imports + - name: error-return + - name: error-strings + - name: error-naming + # - name: exported + - name: if-return + - name: increment-decrement + - name: var-naming + - name: var-declaration + # - name: package-comments + - name: range + - name: receiver-naming + - name: time-naming + - name: unexported-return + - name: indent-error-flow + - name: errorf + - name: empty-block + - name: superfluous-else + - name: unused-parameter + - name: unreachable-code + - name: redefines-builtin-id + # + # Rules in addition to the recommended configuration above. + # + - name: bool-literal-in-expr + - name: constant-logical-expr + gosec: + excludes: + - G304 # Potential file inclusion via variable + - G307 # Deferring unsafe method "Close" on type "\*os.File" + - G107 # Potential HTTP request made with variable url + - G108 # Profiling endpoint is automatically exposed on /debug/pprof + gocritic: + enabled-tags: + - diagnostic + - experimental + - performance + disabled-checks: + - appendAssign + - dupImport # https://github.com/go-critic/go-critic/issues/845 + - evalOrder + - ifElseChain + - octalLiteral + - regexpSimplify + - sloppyReassign + - truncateCmp + - typeDefFirst + - unnamedResult + - unnecessaryDefer + - whyNoLint + - wrapperFunc + - rangeValCopy + - hugeParam + +issues: + fix: true + exclude-rules: + - text: "at least one file in a package should have a package comment" + linters: + - stylecheck + - text: "should have a package comment, unless it's in another file for this package" + linters: + - golint + - text: "should have comment or be unexported" + linters: + - golint + - path: _test\.go + linters: + - gosec + - dupl + exclude-use-default: false + +output: + # colored-line-number|line-number|json|tab|checkstyle, default is "colored-line-number" + format: colored-line-number + print-linter-name: true diff --git a/getenv.go b/getenv.go index 2ce92b0..9726a8a 100644 --- a/getenv.go +++ b/getenv.go @@ -11,8 +11,7 @@ import ( func getEnv(mask interface{}) Env { r := reflect.ValueOf(mask) - var argTypeRV reflect.Value - argTypeRV = reflect.New(r.Type()) + argTypeRV := reflect.New(r.Type()) for i := 0; i < r.NumField(); i++ { v := r.Type().Field(i).Tag.Get("env") @@ -30,7 +29,7 @@ func getEnv(mask interface{}) Env { case "string": argTypeRV.Elem().Field(i).SetString(val) case "bool": - if strings.ToLower(val) == "true" { + if strings.EqualFold(val, "true") { argTypeRV.Elem().Field(i).SetBool(true) } else { argTypeRV.Elem().Field(i).SetBool(false) @@ -42,7 +41,7 @@ func getEnv(mask interface{}) Env { } argTypeRV.Elem().Field(i).SetUint(num) default: - panic(errors.New("Something strange happend: " + t)) + panic(errors.New("Something strange happened: " + t)) } } diff --git a/main.go b/main.go index f3aeb2b..ee40f00 100644 --- a/main.go +++ b/main.go @@ -3,24 +3,24 @@ package main import ( "context" "fmt" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "log" "os" "os/signal" + "syscall" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" "penahub.gitlab.yandexcloud.net/backend/templategen/amo" "penahub.gitlab.yandexcloud.net/backend/templategen/dal" - GDisk "penahub.gitlab.yandexcloud.net/backend/templategen/gdisk" - YaDisk "penahub.gitlab.yandexcloud.net/backend/templategen/yadisk" + "penahub.gitlab.yandexcloud.net/backend/templategen/gdisk" + "penahub.gitlab.yandexcloud.net/backend/templategen/yadisk" "penahub.gitlab.yandexcloud.net/backend/templategenworker/worker" - "syscall" ) type Env struct { Domain string `env:"DOMAIN" default:"tempgen.pena.digital"` LogFile string `env:"LOG_FILE" default:"./tmp/logs.log"` - MongoUrl string `env:"MONGO_URL" default:"mongodb://mongo1:30001,mongo2:30002,mongo3:30003/?replicaSet=penahub-rs"` - DbName string `env:"DB_NAME" default:"templategen"` + MongoURL string `env:"MONGO_URL" default:"mongodb://mongo1:30001,mongo2:30002,mongo3:30003/?replicaSet=penahub-rs"` + DBName string `env:"DB_NAME" default:"templategen"` YaDiskClientID string `env:"YADISK_CLIENT_ID" default:"94482c181e5148c096ae6ad3b2a981ea"` YaDiskClientSecret string `env:"YADISK_CLIENT_SECRET" default:"7dc4f541c3f64f4a9078e59d7494d222"` GDiskCredentials string `env:"GDISK_CREDENTIALS" default:"./static/gdisk-credentials.json"` @@ -52,14 +52,15 @@ func main() { // Start Data Access Layer - mongoDal, err := dal.InitMongoDAL(ctx, opts.MongoUrl, opts.DbName, logger) + mongoDal, err := dal.InitMongoDAL(ctx, opts.MongoURL, opts.DBName, logger) defer mongoDal.Disconnect() if err != nil { - logger.Fatal("ErrorConnectToDAL", zap.Error(err)) + logger.Error("ErrorConnectToDAL", zap.Error(err)) + return } // Yandex Disk - yaDisk := YaDisk.NewClientApp( + yaDisk := yadisk.NewClientApp( opts.YaDiskClientID, opts.YaDiskClientSecret, fmt.Sprintf("https://%v/yadisk", opts.Domain), @@ -67,9 +68,10 @@ func main() { ) // Google Drive - gDisk, err := GDisk.NewClientApp(opts.GDiskCredentials) + gDisk, err := gdisk.NewClientApp(opts.GDiskCredentials) if err != nil { - log.Fatal("ErrorCreateGoogleDriveClientApp:", err) + logger.Error("ErrorCreateGoogleDriveClientApp:", zap.Error(err)) + return } // Amo @@ -79,10 +81,23 @@ func main() { fmt.Sprintf("https://%v/amo", opts.Domain)) // Start supervisor worker - supervisor := worker.NewSuperVisor(ctx, mongoDal, yaDisk, gDisk, amoApp, logger) + supervisorDeps := worker.SuperVisorDeps{ + Dal: mongoDal, + YaDisk: yaDisk, + GDisk: gDisk, + Amo: amoApp, + Logger: logger, + } + + supervisor, err := worker.NewSuperVisor(supervisorDeps) + if err != nil { + logger.Error("ErrorCreateSuperVisor", zap.Error(err)) + return + } + defer supervisor.Stop() - supervisor.Start() + supervisor.Start(ctx) // Graceful Shutdown interrupt := make(chan os.Signal, 1) @@ -90,11 +105,12 @@ func main() { killSignal := <-interrupt switch killSignal { case os.Interrupt: - logger.Fatal("AppInterrupted") + logger.Error("AppInterrupted") + return case syscall.SIGTERM: - logger.Fatal("AppTerminated") + logger.Error("AppTerminated") + return } defer cancel() - } diff --git a/worker/pool.go b/worker/pool.go index f108134..3aaf314 100644 --- a/worker/pool.go +++ b/worker/pool.go @@ -13,23 +13,23 @@ func NewPool() *Pool { return &Pool{workers: map[string]*Worker{}, mux: sync.RWMutex{}} } -func (p *Pool) Set(amoId string, worker *Worker) { +func (p *Pool) Set(amoID string, worker *Worker) { p.mux.Lock() - p.workers[amoId] = worker + p.workers[amoID] = worker p.mux.Unlock() } -func (p *Pool) Get(amoId string) *Worker { +func (p *Pool) Get(amoID string) *Worker { p.mux.Lock() - worker := p.workers[amoId] + worker := p.workers[amoID] p.mux.Unlock() return worker } -// Unset - останавливает воркер и удаляет его из пула -func (p *Pool) Unset(amoId string) { +// Unset - останавливает воркер и удаляет его из пула. +func (p *Pool) Unset(amoID string) { p.mux.Lock() - p.workers[amoId].Stop() - delete(p.workers, amoId) + p.workers[amoID].Stop() + delete(p.workers, amoID) p.mux.Unlock() } diff --git a/worker/supervisor.go b/worker/supervisor.go index cdc8f18..e150eb9 100644 --- a/worker/supervisor.go +++ b/worker/supervisor.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "time" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" @@ -11,17 +13,23 @@ import ( "penahub.gitlab.yandexcloud.net/backend/templategen/dal" "penahub.gitlab.yandexcloud.net/backend/templategen/dal/model" "penahub.gitlab.yandexcloud.net/backend/templategen/dal/mongos" - gdisk "penahub.gitlab.yandexcloud.net/backend/templategen/gdisk" - yadisk "penahub.gitlab.yandexcloud.net/backend/templategen/yadisk" - "time" + "penahub.gitlab.yandexcloud.net/backend/templategen/gdisk" + "penahub.gitlab.yandexcloud.net/backend/templategen/yadisk" ) const ( SuperVisorTick = 2 * time.Minute ) +type SuperVisorDeps struct { + Dal *dal.MongoDAL + YaDisk *yadisk.ClientApp + GDisk *gdisk.ClientApp + Amo *amo.ClientApp + Logger *zap.Logger +} + type SuperVisor struct { - ctx context.Context dal *dal.MongoDAL yaDisk *yadisk.ClientApp gDisk *gdisk.ClientApp @@ -33,29 +41,47 @@ type SuperVisor struct { interrupter chan bool } -func NewSuperVisor(ctx context.Context, dal *dal.MongoDAL, yaDisk *yadisk.ClientApp, gDisk *gdisk.ClientApp, amo *amo.ClientApp, logger *zap.Logger) *SuperVisor { +func NewSuperVisor(deps SuperVisorDeps) (*SuperVisor, error) { + if deps.Dal == nil { + return nil, errors.New("nil SuperVisorDeps.Dal") + } + + if deps.YaDisk == nil { + return nil, errors.New("nil SuperVisorDeps.YaDisk") + } + + if deps.GDisk == nil { + return nil, errors.New("nil SuperVisorDeps.GDisk") + } + + if deps.Amo == nil { + return nil, errors.New("nil SuperVisorDeps.Amo") + } + + if deps.Logger == nil { + return nil, errors.New("nil SuperVisorDeps.Logger") + } + return &SuperVisor{ - ctx: ctx, - dal: dal, - yaDisk: yaDisk, - gDisk: gDisk, - amo: amo, - logger: logger, + dal: deps.Dal, + yaDisk: deps.YaDisk, + gDisk: deps.GDisk, + amo: deps.Amo, + logger: deps.Logger, pool: NewPool(), stopWorker: make(chan string), workerTaskChan: make(chan model.WorkerTask), interrupter: make(chan bool), - } + }, nil } -func (sv *SuperVisor) Start() { +func (sv *SuperVisor) Start(ctx context.Context) { // Проверяем задачи со статусом new, pending и processing, если не просрочены отправляем их в работу tasks, err := sv.dal.WorkerTask.GetByFilter( - sv.ctx, + ctx, &mongos.WorkerStatusFilter{Status: model.WorkerTaskStatusNew}, - options.Find().SetSort(bson.D{{"created_at", 1}}), + options.Find().SetSort(bson.D{{Key: "created_at", Value: 1}}), ) - if err != nil { sv.logger.Error("GetByFilter", zap.Error(err)) } @@ -64,11 +90,11 @@ func (sv *SuperVisor) Start() { sv.logger.Info(fmt.Sprintf("got %v tasks with status=new", len(tasks))) } - for _, task := range tasks { - if task.UpdatedAt.Add(TaskTimeout).Before(time.Now()) { - sv.SetTaskTimeout(&task) + for index := range tasks { + if tasks[index].UpdatedAt.Add(TaskTimeout).Before(time.Now()) { + sv.SetTaskTimeout(ctx, &tasks[index]) } else { - go sv.SetTask(task) + go sv.SetTask(ctx, tasks[index]) } } @@ -76,11 +102,10 @@ func (sv *SuperVisor) Start() { time.Sleep(time.Second * 5) tasks, err = sv.dal.WorkerTask.GetByFilter( - sv.ctx, + ctx, &mongos.WorkerStatusFilter{Status: model.WorkerTaskStatusProcessing}, - options.Find().SetSort(bson.D{{"created_at", 1}}), + options.Find().SetSort(bson.D{{Key: "created_at", Value: 1}}), ) - if err != nil { sv.logger.Error("GetByFilter", zap.Error(err)) } @@ -89,11 +114,11 @@ func (sv *SuperVisor) Start() { sv.logger.Info(fmt.Sprintf("got %v tasks with status=processing", len(tasks))) } - for _, task := range tasks { - if task.UpdatedAt.Add(TaskTimeout).Before(time.Now()) { - sv.SetTaskTimeout(&task) + for index := range tasks { + if tasks[index].UpdatedAt.Add(TaskTimeout).Before(time.Now()) { + sv.SetTaskTimeout(ctx, &tasks[index]) } else { - go sv.SetTask(task) + go sv.SetTask(ctx, tasks[index]) } } @@ -101,11 +126,10 @@ func (sv *SuperVisor) Start() { time.Sleep(time.Second * 5) tasks, err = sv.dal.WorkerTask.GetByFilter( - sv.ctx, + ctx, &mongos.WorkerStatusFilter{Status: model.WorkerTaskStatusPending}, - options.Find().SetSort(bson.D{{"created_at", 1}}), + options.Find().SetSort(bson.D{{Key: "created_at", Value: 1}}), ) - if err != nil { sv.logger.Error("GetByFilter", zap.Error(err)) } @@ -114,11 +138,11 @@ func (sv *SuperVisor) Start() { sv.logger.Info(fmt.Sprintf("got %v tasks with status=pending", len(tasks))) } - for _, task := range tasks { - if task.UpdatedAt.Add(TaskTimeout).Before(time.Now()) { - sv.SetTaskTimeout(&task) + for index := range tasks { + if tasks[index].UpdatedAt.Add(TaskTimeout).Before(time.Now()) { + sv.SetTaskTimeout(ctx, &tasks[index]) } else { - go sv.SetTask(task) + go sv.SetTask(ctx, tasks[index]) } } @@ -128,23 +152,25 @@ func (sv *SuperVisor) Start() { // Запускаем горутину c получением новых задач go func() { workerTaskChan := make(chan model.WorkerTask) - go sv.dal.WorkerTask.Listen(sv.ctx, model.WorkerTaskStatusNew, workerTaskChan) + go sv.dal.WorkerTask.Listen(ctx, model.WorkerTaskStatusNew, workerTaskChan) for task := range workerTaskChan { - go sv.SetTask(task) + go sv.SetTask(ctx, task) } }() // Запускаем горутину проверяющую просрочку задач по статусу и отвечающую за закрытие супервизора go func() { + ticker := time.NewTicker(SuperVisorTick) + defer ticker.Stop() + for { // Проверяем задачи в processing и при просрочке ставим статус timeout tasks, err = sv.dal.WorkerTask.GetByFilter( - sv.ctx, + ctx, &mongos.WorkerStatusFilter{Status: model.WorkerTaskStatusProcessing}, - options.Find().SetSort(bson.D{{"created_at", 1}}), + options.Find().SetSort(bson.D{{Key: "created_at", Value: 1}}), ) - if err != nil { sv.logger.Error("GetByFilter", zap.Error(err)) } @@ -153,19 +179,18 @@ func (sv *SuperVisor) Start() { sv.logger.Info(fmt.Sprintf("got %v tasks with status=processing", len(tasks))) } - for _, task := range tasks { - if task.UpdatedAt.Add(TaskTimeout).Before(time.Now()) { - sv.SetTaskTimeout(&task) + for index := range tasks { + if tasks[index].UpdatedAt.Add(TaskTimeout).Before(time.Now()) { + sv.SetTaskTimeout(ctx, &tasks[index]) } } // Проверяем задачи в pending и при просрочке ставим статус timeout tasks, err = sv.dal.WorkerTask.GetByFilter( - sv.ctx, + ctx, &mongos.WorkerStatusFilter{Status: model.WorkerTaskStatusPending}, - options.Find().SetSort(bson.D{{"created_at", 1}}), + options.Find().SetSort(bson.D{{Key: "created_at", Value: 1}}), ) - if err != nil { sv.logger.Error("GetByFilter", zap.Error(err)) } @@ -174,17 +199,17 @@ func (sv *SuperVisor) Start() { sv.logger.Info(fmt.Sprintf("got %v tasks with status=processing", len(tasks))) } - for _, task := range tasks { - if task.UpdatedAt.Add(TaskTimeout).Before(time.Now()) { - sv.SetTaskTimeout(&task) + for i := range tasks { + if tasks[i].UpdatedAt.Add(TaskTimeout).Before(time.Now()) { + sv.SetTaskTimeout(ctx, &tasks[i]) } } select { - case <-sv.ctx.Done(): + case <-ctx.Done(): sv.Stop() return - case <-time.Tick(SuperVisorTick): + case <-ticker.C: continue case <-sv.interrupter: return @@ -196,10 +221,10 @@ func (sv *SuperVisor) Start() { go func() { for { select { - case workerAmoId := <-sv.stopWorker: - sv.pool.Unset(workerAmoId) + case workerAmoID := <-sv.stopWorker: + sv.pool.Unset(workerAmoID) continue - case <-sv.ctx.Done(): + case <-ctx.Done(): sv.Stop() return case <-sv.interrupter: @@ -209,11 +234,12 @@ func (sv *SuperVisor) Start() { }() } -// SetTask - ставит задаче статус pending, отправляет её в воркер. Если воркер не существует, то создает его -func (sv *SuperVisor) SetTask(task model.WorkerTask) { +// SetTask - ставит задаче статус pending, отправляет её в воркер. Если воркер не существует, то создает его. +func (sv *SuperVisor) SetTask(ctx context.Context, task model.WorkerTask) { + var err error + // Ставим статус ожидания - err := sv.dal.WorkerTask.UpdateStatus(sv.ctx, task.ID, model.WorkerTaskStatusPending) - if err != nil { + if err = sv.dal.WorkerTask.UpdateStatus(ctx, task.ID, model.WorkerTaskStatusPending); err != nil { sv.logger.Error("cannot update workerTask status", zap.Error(err), zap.String("_id", task.ID)) } @@ -223,22 +249,20 @@ func (sv *SuperVisor) SetTask(task model.WorkerTask) { sv.logger.Error(err.Error(), zap.String("_id", task.ID)) var historyFound []model.History - historyFound, err = sv.dal.History.GetByFilter(sv.ctx, &mongos.HistoryFilter{WorkerTaskID: task.ID}) + historyFound, err = sv.dal.History.GetByFilter(ctx, &mongos.HistoryFilter{WorkerTaskID: task.ID}) if err != nil { sv.logger.Error("cannot get history", zap.Error(err), zap.String("_id", task.ID)) - } else { - if len(historyFound) > 0 { - historyFound[0].Errors = append(historyFound[0].Errors, err.Error()) - err = sv.dal.History.UpdateByID(sv.ctx, &historyFound[0]) + } else if len(historyFound) > 0 { + historyFound[0].Errors = append(historyFound[0].Errors, err.Error()) + err = sv.dal.History.UpdateByID(ctx, &historyFound[0]) - if err != nil { - sv.logger.Error("cannot update history", zap.Error(err), zap.String("_id", task.ID)) - } + if err != nil { + sv.logger.Error("cannot update history", zap.Error(err), zap.String("_id", task.ID)) } } - err = sv.dal.WorkerTask.UpdateStatus(sv.ctx, task.ID, model.WorkerTaskStatusFailed) + err = sv.dal.WorkerTask.UpdateStatus(ctx, task.ID, model.WorkerTaskStatusFailed) if err != nil { sv.logger.Error("cannot update workerTask status", zap.Error(err), zap.String("_id", task.ID)) } @@ -248,45 +272,56 @@ func (sv *SuperVisor) SetTask(task model.WorkerTask) { // Создаем воркеры worker := sv.pool.Get(task.AmoID) if worker == nil { - worker = NewWorker(task.AmoID, sv.ctx, sv.dal, sv.yaDisk, sv.gDisk, sv.amo, sv.logger) + workerDeps := WorkerDeps{ + AmoID: task.AmoID, + Dal: sv.dal, + YaDisk: sv.yaDisk, + GDisk: sv.gDisk, + Amo: sv.amo, + Logger: sv.logger, + } + + worker, err = NewWorker(workerDeps) + if err != nil { + sv.logger.Error("FailedCreateWorker", zap.Error(err)) + return + } + sv.pool.Set(task.AmoID, worker) - worker.Start(sv.stopWorker) + worker.Start(ctx, sv.stopWorker) } sv.pool.Get(task.AmoID).Tube <- &task } -func (sv *SuperVisor) SetTaskTimeout(task *model.WorkerTask) { +func (sv *SuperVisor) SetTaskTimeout(ctx context.Context, task *model.WorkerTask) { sv.logger.Info("task timeout", zap.String("_id", task.ID)) - historyFound, err := sv.dal.History.GetByFilter(sv.ctx, &mongos.HistoryFilter{WorkerTaskID: task.ID}) - + historyFound, err := sv.dal.History.GetByFilter(ctx, &mongos.HistoryFilter{WorkerTaskID: task.ID}) if err != nil { sv.logger.Error("cannot get history", zap.Error(err), zap.String("_id", task.ID)) - } else { - if len(historyFound) > 0 { - err = errors.New("task timeout") - if len(historyFound[0].Errors) > 0 { - historyFound[0].Errors = append(historyFound[0].Errors, err.Error()) - } else { - historyFound[0].Errors = []string{err.Error()} - } + } - err = sv.dal.History.UpdateByID(sv.ctx, &historyFound[0]) - if err != nil { - sv.logger.Error("cannot update history", zap.Error(err), zap.String("_id", task.ID)) - } + if len(historyFound) > 0 { + err = errors.New("task timeout") + if len(historyFound[0].Errors) > 0 { + historyFound[0].Errors = append(historyFound[0].Errors, err.Error()) + } else { + historyFound[0].Errors = []string{err.Error()} + } + + if err = sv.dal.History.UpdateByID(ctx, &historyFound[0]); err != nil { + sv.logger.Error("cannot update history", zap.Error(err), zap.String("_id", task.ID)) } } - err = sv.dal.WorkerTask.UpdateStatus(sv.ctx, task.ID, model.WorkerTaskStatusTimeout) - if err != nil { + if err = sv.dal.WorkerTask.UpdateStatus(ctx, task.ID, model.WorkerTaskStatusTimeout); err != nil { sv.logger.Error("cannot update workerTask status", zap.Error(err), zap.String("_id", task.ID)) } } func (sv *SuperVisor) Stop() { for _, worker := range sv.pool.workers { - sv.pool.Unset(worker.AmoId) + sv.pool.Unset(worker.AmoID) } close(sv.workerTaskChan) sv.interrupter <- true diff --git a/worker/worker.go b/worker/worker.go index d3b4dff..fdaa437 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -3,18 +3,19 @@ package worker import ( "context" "errors" - "go.uber.org/zap" - "penahub.gitlab.yandexcloud.net/backend/templategen/amo" - "penahub.gitlab.yandexcloud.net/backend/templategen/dal" - "penahub.gitlab.yandexcloud.net/backend/templategen/dal/model" - gdisk "penahub.gitlab.yandexcloud.net/backend/templategen/gdisk" - "penahub.gitlab.yandexcloud.net/backend/templategen/penadisk" - "penahub.gitlab.yandexcloud.net/backend/templategen/templategen" - yadisk "penahub.gitlab.yandexcloud.net/backend/templategen/yadisk" "fmt" "reflect" "strconv" "time" + + "go.uber.org/zap" + "penahub.gitlab.yandexcloud.net/backend/templategen/amo" + "penahub.gitlab.yandexcloud.net/backend/templategen/dal" + "penahub.gitlab.yandexcloud.net/backend/templategen/dal/model" + "penahub.gitlab.yandexcloud.net/backend/templategen/gdisk" + "penahub.gitlab.yandexcloud.net/backend/templategen/penadisk" + "penahub.gitlab.yandexcloud.net/backend/templategen/templategen" + "penahub.gitlab.yandexcloud.net/backend/templategen/yadisk" ) const ( @@ -24,10 +25,18 @@ const ( TaskTimeout = 5 * time.Minute ) +type WorkerDeps struct { + AmoID string + Dal *dal.MongoDAL + YaDisk *yadisk.ClientApp + GDisk *gdisk.ClientApp + Amo *amo.ClientApp + Logger *zap.Logger +} + type Worker struct { - AmoId string + AmoID string Tube chan *model.WorkerTask // Буферизированный канал с размером CacheSize - ctx context.Context lifeTimer *time.Timer dal *dal.MongoDAL yaDisk *yadisk.ClientApp @@ -36,142 +45,190 @@ type Worker struct { logger *zap.Logger } -func NewWorker(amoId string, ctx context.Context, dal *dal.MongoDAL, yaDisk *yadisk.ClientApp, gDisk *gdisk.ClientApp, amo *amo.ClientApp, logger *zap.Logger) *Worker { +func NewWorker(deps WorkerDeps) (*Worker, error) { + if deps.AmoID == "" { + return nil, errors.New("empty WorkerDeps.AmoID") + } + + if deps.Dal == nil { + return nil, errors.New("nil WorkerDeps.Dal") + } + + if deps.YaDisk == nil { + return nil, errors.New("nil WorkerDeps.YaDisk") + } + + if deps.GDisk == nil { + return nil, errors.New("nil WorkerDeps.GDisk") + } + + if deps.Amo == nil { + return nil, errors.New("nil WorkerDeps.Amo") + } + + if deps.Logger == nil { + return nil, errors.New("nil WorkerDeps.Logger") + } + return &Worker{ - AmoId: amoId, + AmoID: deps.AmoID, Tube: make(chan *model.WorkerTask, CacheSize), - ctx: ctx, lifeTimer: time.NewTimer(WorkerLifetime), - dal: dal, - yaDisk: yaDisk, - gDisk: gDisk, - amo: amo, - logger: logger.With(zap.String("amo_id", amoId))} + dal: deps.Dal, + yaDisk: deps.YaDisk, + gDisk: deps.GDisk, + amo: deps.Amo, + logger: deps.Logger.With(zap.String("amo_id", deps.AmoID)), + }, nil } -func (w *Worker) Start(stopWorker chan string) { +func (w *Worker) Start(ctx context.Context, stopWorker chan string) { w.logger.Info("worker started") go func() { select { - case <-w.ctx.Done(): + case <-ctx.Done(): w.logger.Info("worker stopped - context done") - stopWorker <- w.AmoId + stopWorker <- w.AmoID return case <-w.lifeTimer.C: w.logger.Info("worker stopped - timeout") - stopWorker <- w.AmoId + stopWorker <- w.AmoID return } - }() go func() { for task := range w.Tube { - w.logger.Info("new task", zap.String("_id", task.ID)) if task == nil { w.logger.Error("worker got empty task") continue } - err := w.dal.WorkerTask.UpdateStatus(w.ctx, task.ID, model.WorkerTaskStatusProcessing) - if err != nil { + w.logger.Info("new task", zap.String("_id", task.ID)) + + var err error + + if err = w.dal.WorkerTask.UpdateStatus(ctx, task.ID, model.WorkerTaskStatusProcessing); err != nil { w.logger.Error("cannot update workerTask", zap.String("_id", task.ID)) } task.Status = model.WorkerTaskStatusDone - if !w.DoTask(task) { + if !w.DoTask(ctx, task) { task.Status = model.WorkerTaskStatusFailed w.logger.Info("task failed", zap.String("_id", task.ID)) } else { w.logger.Info("task done", zap.String("_id", task.ID)) } - err = w.dal.WorkerTask.UpdateByID(w.ctx, task) - - if err != nil { + if err = w.dal.WorkerTask.UpdateByID(ctx, task); err != nil { w.logger.Error("cannot update workerTask", zap.String("_id", task.ID)) } } }() } -func (w *Worker) DoTask(task *model.WorkerTask) bool { - var err error +func (w *Worker) DoTask(ctx context.Context, task *model.WorkerTask) bool { logger := w.logger.With(zap.String("_id", task.ID)) // сбрасываем таймер w.lifeTimer.Stop() w.lifeTimer.Reset(WorkerLifetime) + var err error + // находим историю к которой привязана задача - history, err := w.dal.History.GetByWorkerTaskID(w.ctx, task.ID) + history, err := w.dal.History.GetByWorkerTaskID(ctx, task.ID) if err != nil { logger.Error("get history", zap.Error(err)) } defer func() { - err = w.dal.History.UpdateByID(w.ctx, history) + err = w.dal.History.UpdateByID(ctx, history) if err != nil { w.logger.Error("cannot update history", zap.Error(err)) } }() // проверяем чтобы все поля были на месте - if task.LeadId <= 0 { + if task.LeadID <= 0 { err = errors.New("got bad lead_id") - return w.checkErrTask(logger, history, "bad data", err) + return w.checkErrTask(ctx, logger, history, "bad data", err) } - if task.UserID == "" { - err = errors.New("got bad user_id") - return w.checkErrTask(logger, history, "bad data", err) + if task.PenaID == "" { + err = errors.New("got bad pena_id") + return w.checkErrTask(ctx, logger, history, "bad data", err) } if task.Source.File == "" { err = errors.New("got bad source.file") - return w.checkErrTask(logger, history, "bad data", err) + return w.checkErrTask(ctx, logger, history, "bad data", err) } if task.Source.StorageID == "" { err = errors.New("got bad source.storage_id") - return w.checkErrTask(logger, history, "bad data", err) + return w.checkErrTask(ctx, logger, history, "bad data", err) } if task.Source.StorageType == "" { err = errors.New("got bad source.storage_type") - return w.checkErrTask(logger, history, "bad data", err) + return w.checkErrTask(ctx, logger, history, "bad data", err) } if task.Target.StorageID == "" { err = errors.New("got bad target.storage_id") - return w.checkErrTask(logger, history, "bad data", err) + return w.checkErrTask(ctx, logger, history, "bad data", err) } if task.Target.StorageType == "" { err = errors.New("got bad target.storage_type") - return w.checkErrTask(logger, history, "bad data", err) + return w.checkErrTask(ctx, logger, history, "bad data", err) } // amo client - amoData, err := w.dal.Amo.GetByID(w.ctx, task.AmoID) + amoData, err := w.dal.Amo.GetByID(ctx, task.AmoID) if err != nil { - return w.checkErrTask(logger, history, "cannot get amo data", err) + return w.checkErrTask(ctx, logger, history, "cannot get amo data", err) } - amoClient, err := w.amo.NewClient(w.ctx, amoData.Referer, amoData.Token(), "") - - if err != nil { - return w.checkErrTask(logger, history, "cannot create amo client", err) + // Check privileges + if len(amoData.Privileges) == 0 { + return w.checkErrTask(ctx, logger, history, "check privileges", errors.New("user has no tariff")) } - lead, err := DebounceWrapper(amoClient.GetLeadById)(strconv.FormatInt(task.LeadId, 10)) + // По количеству оставшихся генераций + var privilegeAmount int64 + + privilege, privilegeCountExists := amoData.Privileges[model.PrivilegeTemplateCount] + if privilegeCountExists { + privilegeAmount = privilege.Amount + + if privilegeAmount < 1 { + return w.checkErrTask(ctx, logger, history, "check privileges", errors.New("tariff ended - not enough count")) + } + } + + // По дате + privilege, privilegeUnlimTimeExists := amoData.Privileges[model.PrivilegeTemplateUnlimTime] + if privilegeUnlimTimeExists { + if privilege.CreatedAt.AddDate(0, 0, int(privilege.Amount)).After(time.Now()) { + return w.checkErrTask(ctx, logger, history, "check privileges", errors.New("tariff ended - by time")) + } + } + + amoClient, err := w.amo.NewClient(ctx, amoData.Referer, amoData.Token(), "") if err != nil { - return w.checkErrTask(logger, history, "cannot get lead", err) + return w.checkErrTask(ctx, logger, history, "cannot create amo client", err) + } + + lead, err := DebounceWrapper(amoClient.GetLeadByID)(ctx, strconv.FormatInt(task.LeadID, 10)) + if err != nil { + return w.checkErrTask(ctx, logger, history, "cannot get lead", err) } if lead == nil { err = errors.New("empty lead data") - return w.checkErrTask(logger, history, "cannot get lead", err) + return w.checkErrTask(ctx, logger, history, "cannot get lead", err) } dataTemplate := map[string]interface{}{} @@ -184,11 +241,12 @@ func (w *Worker) DoTask(task *model.WorkerTask) bool { // Добавялем инфо контактов contacts := []amo.Contact{} for _, data := range lead.Embedded.Contacts { - contact, err := DebounceWrapper(amoClient.GetContactById)(strconv.Itoa(data.Id)) + var contact *amo.Contact + contact, err = DebounceWrapper(amoClient.GetContactByID)(ctx, strconv.Itoa(data.ID)) if err == nil { contacts = append(contacts, *contact) } else { - return w.checkErrTask(logger, history, "cannot get contact", err) + return w.checkErrTask(ctx, logger, history, "cannot get contact", err) } } @@ -197,29 +255,42 @@ func (w *Worker) DoTask(task *model.WorkerTask) bool { // Добавляем инфо компаний companies := []amo.Company{} for _, data := range lead.Embedded.Companies { - company, err := DebounceWrapper(amoClient.GetCompanyById)(strconv.Itoa(data.Id)) + var company *amo.Company + company, err = DebounceWrapper(amoClient.GetCompanyByID)(ctx, strconv.Itoa(data.ID)) if err == nil { companies = append(companies, *company) } else { - return w.checkErrTask(logger, history, "cannot get company", err) + return w.checkErrTask(ctx, logger, history, "cannot get company", err) } } dataTemplate["Компании"] = templategen.AmoCompaniesFieldsToRuMap(companies) - err = w.Generate(task, lead.Name, dataTemplate) + err = w.Generate(ctx, task, lead.Name, dataTemplate) + + if err != nil { + return w.checkErrTask(ctx, logger, history, "cannot generate", err) + } + + if privilegeCountExists { + err = w.dal.Amo.UpdateAmountPrivilege(ctx, amoData.ID, model.PrivilegeTemplateCount, privilegeAmount-1) + } + + if err != nil { + return w.checkErrTask(ctx, logger, history, "cannot update amo.privilege.amount", err) + } history.Target.File = task.Target.File history.Target.StorageID = task.Target.StorageID history.Target.StorageType = task.Target.StorageType - history.DownloadUrl = task.DownloadUrl - history.PublicUrl = task.PublicUrl + history.DownloadURL = task.DownloadURL + history.PublicURL = task.PublicURL - return w.checkErrTask(logger, history, "cannot generate", err) + return true } -// checkErrTask - если найдена ошибка, пишет лог, добавляет в историю и возвращает false -func (w *Worker) checkErrTask(logger *zap.Logger, history *model.History, msg string, err error) bool { +// checkErrTask - если найдена ошибка, пишет лог, добавляет в историю и возвращает false. +func (w *Worker) checkErrTask(ctx context.Context, logger *zap.Logger, history *model.History, msg string, err error) bool { if err != nil { logger.Error("task failed: "+msg, zap.Error(err)) if len(history.Errors) > 0 { @@ -228,8 +299,7 @@ func (w *Worker) checkErrTask(logger *zap.Logger, history *model.History, msg st history.Errors = []string{err.Error()} } - err = w.dal.History.UpdateByID(w.ctx, history) - if err != nil { + if err = w.dal.History.UpdateByID(ctx, history); err != nil { logger.Error("cannot update history", zap.Error(err)) } return false @@ -239,28 +309,28 @@ func (w *Worker) checkErrTask(logger *zap.Logger, history *model.History, msg st } // Generate - генерирует файл и отправляет его на указанный в task.source диск -// @TODO: после написания тестов сделать чтобы отправлял на диск task.target!!! -func (w *Worker) Generate(task *model.WorkerTask, name string, data any) error { +// TODO: !!!ВНИМАНИЕ!!! После написания тестов сделать чтобы отправлял на диск task.target!!! +func (w *Worker) Generate(ctx context.Context, task *model.WorkerTask, name string, data any) error { switch task.Source.StorageType { case "gdisk": - gdiskData, err := w.dal.GDisk.GetByID(w.ctx, task.Source.StorageID) + gdiskData, err := w.dal.GDisk.GetByID(ctx, task.Source.StorageID) if err != nil { return err } - client, err := w.gDisk.NewClient(w.ctx, gdiskData.Token()) + client, err := w.gDisk.NewClient(ctx, gdiskData.Token()) if err != nil { return err } - task.Target.File, task.DownloadUrl, err = templategen.GDiskGenerateDoc(task.Source.File, name, - task.UserID, gdiskData.SaveFolderID, client, data) + task.Target.File, task.DownloadURL, err = templategen.GDiskGenerateDoc(task.Source.File, name, + gdiskData.SaveFolderID, client, data) if err != nil { return err } case "yadisk": - yaDiskData, err := w.dal.YaDisk.GetByID(w.ctx, task.Source.StorageID) + yaDiskData, err := w.dal.YaDisk.GetByID(ctx, task.Source.StorageID) if err != nil { return err } @@ -269,50 +339,42 @@ func (w *Worker) Generate(task *model.WorkerTask, name string, data any) error { return fmt.Errorf("no such yadisk") } - client, err := w.yaDisk.NewClient(w.ctx, yaDiskData.Token(), "") - + client, err := w.yaDisk.NewClient(ctx, yaDiskData.Token(), "") if err != nil { return err } - task.Target.File, task.DownloadUrl, err = templategen.YaDiskGenerateDoc(task.Source.File, name, - task.UserID, + task.Target.File, task.DownloadURL, err = templategen.YaDiskGenerateDoc(ctx, task.Source.File, name, yaDiskData.SaveFolder, client, data) - if err != nil { return err } - err = client.PublishResource(task.Target.File) + if err = client.PublishResource(ctx, task.Target.File); err != nil { + return err + } + res, err := client.GetResources(ctx, task.Target.File, 0, 0) if err != nil { return err } - res, err := client.GetResources(task.Target.File, 0, 0) - if err != nil { - return err - } - - task.PublicUrl = res.PublicUrl + task.PublicURL = res.PublicURL case "penadisk": - penadiskData, err := w.dal.PenaDisk.GetByUserID(w.ctx, task.UserID) - fmt.Println("ORAORA1", penadiskData, err) + penadiskData, err := w.dal.PenaDisk.GetByPenaID(ctx, task.PenaID) if err != nil { return err } - client := penadisk.NewClient(task.UserID) + client := penadisk.NewClient(task.PenaID) - task.Target.File, task.DownloadUrl, err = templategen.PenaDiskGenerateDocBytes(task.Source.File, name, - task.UserID, penadiskData.SaveFolder, client, data) - - fmt.Println("ORAORA2", task.Target.File, task.DownloadUrl) + task.Target.File, task.DownloadURL, err = templategen.PenaDiskGenerateDocBytes(ctx, task.Source.File, name, + penadiskData.SaveFolder, client, data) if err != nil { return err } - task.PublicUrl = task.DownloadUrl + task.PublicURL = task.DownloadURL } return nil