Fix issues in threads

This commit is contained in:
Danil Solovyov 2023-08-11 20:39:43 +05:00
parent 0080968e7e
commit 643c288e51
3 changed files with 136 additions and 61 deletions

15
main.go

@ -81,7 +81,20 @@ func main() {
fmt.Sprintf("https://%v/amo", opts.Domain))
// Start supervisor worker
supervisor := worker.NewSuperVisor(mongoDal, yaDisk, gDisk, amoApp, logger)
supervisorDeps := worker.SuperVisorDeps{
Dal: mongoDal,
YaDisk: yaDisk,
GDisk: gDisk,
Amo: amoApp,
Logger: logger,
}
supervisor, err := worker.NewSuperVisor(supervisorDeps)
if err != nil {
logger.Error("ErrorCreateSuperVisor", zap.Error(err))
return
}
defer supervisor.Stop()
supervisor.Start(ctx)

@ -21,6 +21,14 @@ const (
SuperVisorTick = 2 * time.Minute
)
type SuperVisorDeps struct {
Dal *dal.MongoDAL
YaDisk *yadisk.ClientApp
GDisk *gdisk.ClientApp
Amo *amo.ClientApp
Logger *zap.Logger
}
type SuperVisor struct {
dal *dal.MongoDAL
yaDisk *yadisk.ClientApp
@ -33,18 +41,38 @@ type SuperVisor struct {
interrupter chan bool
}
func NewSuperVisor(dal *dal.MongoDAL, yaDisk *yadisk.ClientApp, gDisk *gdisk.ClientApp, amo *amo.ClientApp, logger *zap.Logger) *SuperVisor {
func NewSuperVisor(deps SuperVisorDeps) (*SuperVisor, error) {
if deps.Dal == nil {
return nil, errors.New("nil SuperVisorDeps.Dal")
}
if deps.YaDisk == nil {
return nil, errors.New("nil SuperVisorDeps.YaDisk")
}
if deps.GDisk == nil {
return nil, errors.New("nil SuperVisorDeps.GDisk")
}
if deps.Amo == nil {
return nil, errors.New("nil SuperVisorDeps.Amo")
}
if deps.Logger == nil {
return nil, errors.New("nil SuperVisorDeps.Logger")
}
return &SuperVisor{
dal: dal,
yaDisk: yaDisk,
gDisk: gDisk,
amo: amo,
logger: logger,
dal: deps.Dal,
yaDisk: deps.YaDisk,
gDisk: deps.GDisk,
amo: deps.Amo,
logger: deps.Logger,
pool: NewPool(),
stopWorker: make(chan string),
workerTaskChan: make(chan model.WorkerTask),
interrupter: make(chan bool),
}
}, nil
}
func (sv *SuperVisor) Start(ctx context.Context) {
@ -54,7 +82,6 @@ func (sv *SuperVisor) Start(ctx context.Context) {
&mongos.WorkerStatusFilter{Status: model.WorkerTaskStatusNew},
options.Find().SetSort(bson.D{{Key: "created_at", Value: 1}}),
)
if err != nil {
sv.logger.Error("GetByFilter", zap.Error(err))
}
@ -63,11 +90,11 @@ func (sv *SuperVisor) Start(ctx context.Context) {
sv.logger.Info(fmt.Sprintf("got %v tasks with status=new", len(tasks)))
}
for i := range tasks {
if tasks[i].UpdatedAt.Add(TaskTimeout).Before(time.Now()) {
sv.SetTaskTimeout(ctx, &tasks[i])
for index := range tasks {
if tasks[index].UpdatedAt.Add(TaskTimeout).Before(time.Now()) {
sv.SetTaskTimeout(ctx, &tasks[index])
} else {
go sv.SetTask(ctx, tasks[i])
go sv.SetTask(ctx, tasks[index])
}
}
@ -79,7 +106,6 @@ func (sv *SuperVisor) Start(ctx context.Context) {
&mongos.WorkerStatusFilter{Status: model.WorkerTaskStatusProcessing},
options.Find().SetSort(bson.D{{Key: "created_at", Value: 1}}),
)
if err != nil {
sv.logger.Error("GetByFilter", zap.Error(err))
}
@ -88,11 +114,11 @@ func (sv *SuperVisor) Start(ctx context.Context) {
sv.logger.Info(fmt.Sprintf("got %v tasks with status=processing", len(tasks)))
}
for i := range tasks {
if tasks[i].UpdatedAt.Add(TaskTimeout).Before(time.Now()) {
sv.SetTaskTimeout(ctx, &tasks[i])
for index := range tasks {
if tasks[index].UpdatedAt.Add(TaskTimeout).Before(time.Now()) {
sv.SetTaskTimeout(ctx, &tasks[index])
} else {
go sv.SetTask(ctx, tasks[i])
go sv.SetTask(ctx, tasks[index])
}
}
@ -104,7 +130,6 @@ func (sv *SuperVisor) Start(ctx context.Context) {
&mongos.WorkerStatusFilter{Status: model.WorkerTaskStatusPending},
options.Find().SetSort(bson.D{{Key: "created_at", Value: 1}}),
)
if err != nil {
sv.logger.Error("GetByFilter", zap.Error(err))
}
@ -113,11 +138,11 @@ func (sv *SuperVisor) Start(ctx context.Context) {
sv.logger.Info(fmt.Sprintf("got %v tasks with status=pending", len(tasks)))
}
for i := range tasks {
if tasks[i].UpdatedAt.Add(TaskTimeout).Before(time.Now()) {
sv.SetTaskTimeout(ctx, &tasks[i])
for index := range tasks {
if tasks[index].UpdatedAt.Add(TaskTimeout).Before(time.Now()) {
sv.SetTaskTimeout(ctx, &tasks[index])
} else {
go sv.SetTask(ctx, tasks[i])
go sv.SetTask(ctx, tasks[index])
}
}
@ -146,7 +171,6 @@ func (sv *SuperVisor) Start(ctx context.Context) {
&mongos.WorkerStatusFilter{Status: model.WorkerTaskStatusProcessing},
options.Find().SetSort(bson.D{{Key: "created_at", Value: 1}}),
)
if err != nil {
sv.logger.Error("GetByFilter", zap.Error(err))
}
@ -155,9 +179,9 @@ func (sv *SuperVisor) Start(ctx context.Context) {
sv.logger.Info(fmt.Sprintf("got %v tasks with status=processing", len(tasks)))
}
for i := range tasks {
if tasks[i].UpdatedAt.Add(TaskTimeout).Before(time.Now()) {
sv.SetTaskTimeout(ctx, &tasks[i])
for index := range tasks {
if tasks[index].UpdatedAt.Add(TaskTimeout).Before(time.Now()) {
sv.SetTaskTimeout(ctx, &tasks[index])
}
}
@ -167,7 +191,6 @@ func (sv *SuperVisor) Start(ctx context.Context) {
&mongos.WorkerStatusFilter{Status: model.WorkerTaskStatusPending},
options.Find().SetSort(bson.D{{Key: "created_at", Value: 1}}),
)
if err != nil {
sv.logger.Error("GetByFilter", zap.Error(err))
}
@ -213,9 +236,10 @@ func (sv *SuperVisor) Start(ctx context.Context) {
// SetTask - ставит задаче статус pending, отправляет её в воркер. Если воркер не существует, то создает его.
func (sv *SuperVisor) SetTask(ctx context.Context, task model.WorkerTask) {
var err error
// Ставим статус ожидания
err := sv.dal.WorkerTask.UpdateStatus(ctx, task.ID, model.WorkerTaskStatusPending)
if err != nil {
if err = sv.dal.WorkerTask.UpdateStatus(ctx, task.ID, model.WorkerTaskStatusPending); err != nil {
sv.logger.Error("cannot update workerTask status", zap.Error(err), zap.String("_id", task.ID))
}
@ -248,7 +272,21 @@ func (sv *SuperVisor) SetTask(ctx context.Context, task model.WorkerTask) {
// Создаем воркеры
worker := sv.pool.Get(task.AmoID)
if worker == nil {
worker = NewWorker(task.AmoID, sv.dal, sv.yaDisk, sv.gDisk, sv.amo, sv.logger)
workerDeps := WorkerDeps{
AmoID: task.AmoID,
Dal: sv.dal,
YaDisk: sv.yaDisk,
GDisk: sv.gDisk,
Amo: sv.amo,
Logger: sv.logger,
}
worker, err = NewWorker(workerDeps)
if err != nil {
sv.logger.Error("FailedCreateWorker", zap.Error(err))
return
}
sv.pool.Set(task.AmoID, worker)
worker.Start(ctx, sv.stopWorker)
}
@ -259,10 +297,11 @@ func (sv *SuperVisor) SetTask(ctx context.Context, task model.WorkerTask) {
func (sv *SuperVisor) SetTaskTimeout(ctx context.Context, task *model.WorkerTask) {
sv.logger.Info("task timeout", zap.String("_id", task.ID))
historyFound, err := sv.dal.History.GetByFilter(ctx, &mongos.HistoryFilter{WorkerTaskID: task.ID})
if err != nil {
sv.logger.Error("cannot get history", zap.Error(err), zap.String("_id", task.ID))
} else if len(historyFound) > 0 {
}
if len(historyFound) > 0 {
err = errors.New("task timeout")
if len(historyFound[0].Errors) > 0 {
historyFound[0].Errors = append(historyFound[0].Errors, err.Error())
@ -270,14 +309,12 @@ func (sv *SuperVisor) SetTaskTimeout(ctx context.Context, task *model.WorkerTask
historyFound[0].Errors = []string{err.Error()}
}
err = sv.dal.History.UpdateByID(ctx, &historyFound[0])
if err != nil {
if err = sv.dal.History.UpdateByID(ctx, &historyFound[0]); err != nil {
sv.logger.Error("cannot update history", zap.Error(err), zap.String("_id", task.ID))
}
}
err = sv.dal.WorkerTask.UpdateStatus(ctx, task.ID, model.WorkerTaskStatusTimeout)
if err != nil {
if err = sv.dal.WorkerTask.UpdateStatus(ctx, task.ID, model.WorkerTaskStatusTimeout); err != nil {
sv.logger.Error("cannot update workerTask status", zap.Error(err), zap.String("_id", task.ID))
}
}

@ -25,6 +25,15 @@ const (
TaskTimeout = 5 * time.Minute
)
type WorkerDeps struct {
AmoID string
Dal *dal.MongoDAL
YaDisk *yadisk.ClientApp
GDisk *gdisk.ClientApp
Amo *amo.ClientApp
Logger *zap.Logger
}
type Worker struct {
AmoID string
Tube chan *model.WorkerTask // Буферизированный канал с размером CacheSize
@ -36,16 +45,41 @@ type Worker struct {
logger *zap.Logger
}
func NewWorker(amoID string, dal *dal.MongoDAL, yaDisk *yadisk.ClientApp, gDisk *gdisk.ClientApp, amo *amo.ClientApp, logger *zap.Logger) *Worker {
func NewWorker(deps WorkerDeps) (*Worker, error) {
if deps.AmoID == "" {
return nil, errors.New("empty WorkerDeps.AmoID")
}
if deps.Dal == nil {
return nil, errors.New("nil WorkerDeps.Dal")
}
if deps.YaDisk == nil {
return nil, errors.New("nil WorkerDeps.YaDisk")
}
if deps.GDisk == nil {
return nil, errors.New("nil WorkerDeps.GDisk")
}
if deps.Amo == nil {
return nil, errors.New("nil WorkerDeps.Amo")
}
if deps.Logger == nil {
return nil, errors.New("nil WorkerDeps.Logger")
}
return &Worker{
AmoID: amoID,
AmoID: deps.AmoID,
Tube: make(chan *model.WorkerTask, CacheSize),
lifeTimer: time.NewTimer(WorkerLifetime),
dal: dal,
yaDisk: yaDisk,
gDisk: gDisk,
amo: amo,
logger: logger.With(zap.String("amo_id", amoID))}
dal: deps.Dal,
yaDisk: deps.YaDisk,
gDisk: deps.GDisk,
amo: deps.Amo,
logger: deps.Logger.With(zap.String("amo_id", deps.AmoID)),
}, nil
}
func (w *Worker) Start(ctx context.Context, stopWorker chan string) {
@ -72,8 +106,9 @@ func (w *Worker) Start(ctx context.Context, stopWorker chan string) {
w.logger.Info("new task", zap.String("_id", task.ID))
err := w.dal.WorkerTask.UpdateStatus(ctx, task.ID, model.WorkerTaskStatusProcessing)
if err != nil {
var err error
if err = w.dal.WorkerTask.UpdateStatus(ctx, task.ID, model.WorkerTaskStatusProcessing); err != nil {
w.logger.Error("cannot update workerTask", zap.String("_id", task.ID))
}
@ -85,9 +120,7 @@ func (w *Worker) Start(ctx context.Context, stopWorker chan string) {
w.logger.Info("task done", zap.String("_id", task.ID))
}
err = w.dal.WorkerTask.UpdateByID(ctx, task)
if err != nil {
if err = w.dal.WorkerTask.UpdateByID(ctx, task); err != nil {
w.logger.Error("cannot update workerTask", zap.String("_id", task.ID))
}
}
@ -95,13 +128,14 @@ func (w *Worker) Start(ctx context.Context, stopWorker chan string) {
}
func (w *Worker) DoTask(ctx context.Context, task *model.WorkerTask) bool {
var err error
logger := w.logger.With(zap.String("_id", task.ID))
// сбрасываем таймер
w.lifeTimer.Stop()
w.lifeTimer.Reset(WorkerLifetime)
var err error
// находим историю к которой привязана задача
history, err := w.dal.History.GetByWorkerTaskID(ctx, task.ID)
if err != nil {
@ -183,7 +217,6 @@ func (w *Worker) DoTask(ctx context.Context, task *model.WorkerTask) bool {
}
amoClient, err := w.amo.NewClient(ctx, amoData.Referer, amoData.Token(), "")
if err != nil {
return w.checkErrTask(ctx, logger, history, "cannot create amo client", err)
}
@ -266,8 +299,7 @@ func (w *Worker) checkErrTask(ctx context.Context, logger *zap.Logger, history *
history.Errors = []string{err.Error()}
}
err = w.dal.History.UpdateByID(ctx, history)
if err != nil {
if err = w.dal.History.UpdateByID(ctx, history); err != nil {
logger.Error("cannot update history", zap.Error(err))
}
return false
@ -308,21 +340,17 @@ func (w *Worker) Generate(ctx context.Context, task *model.WorkerTask, name stri
}
client, err := w.yaDisk.NewClient(ctx, yaDiskData.Token(), "")
if err != nil {
return err
}
task.Target.File, task.DownloadURL, err = templategen.YaDiskGenerateDoc(ctx, task.Source.File, name,
yaDiskData.SaveFolder, client, data)
if err != nil {
return err
}
err = client.PublishResource(ctx, task.Target.File)
if err != nil {
if err = client.PublishResource(ctx, task.Target.File); err != nil {
return err
}
@ -335,7 +363,6 @@ func (w *Worker) Generate(ctx context.Context, task *model.WorkerTask, name stri
case "penadisk":
penadiskData, err := w.dal.PenaDisk.GetByPenaID(ctx, task.PenaID)
fmt.Println("ORAORA1", penadiskData, err)
if err != nil {
return err
}
@ -344,8 +371,6 @@ func (w *Worker) Generate(ctx context.Context, task *model.WorkerTask, name stri
task.Target.File, task.DownloadURL, err = templategen.PenaDiskGenerateDocBytes(ctx, task.Source.File, name,
penadiskData.SaveFolder, client, data)
fmt.Println("ORAORA2", task.Target.File, task.DownloadURL)
if err != nil {
return err
}