add closer
This commit is contained in:
parent
978d4a025f
commit
9a6e596071
@ -9,9 +9,11 @@ import (
|
|||||||
"codeword/internal/services"
|
"codeword/internal/services"
|
||||||
"codeword/internal/worker/purge_worker"
|
"codeword/internal/worker/purge_worker"
|
||||||
"codeword/internal/worker/recovery_worker"
|
"codeword/internal/worker/recovery_worker"
|
||||||
|
"codeword/pkg/closer"
|
||||||
"context"
|
"context"
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
"errors"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Run(ctx context.Context, cfg initialize.Config, logger *zap.Logger) error {
|
func Run(ctx context.Context, cfg initialize.Config, logger *zap.Logger) error {
|
||||||
@ -26,6 +28,8 @@ func Run(ctx context.Context, cfg initialize.Config, logger *zap.Logger) error {
|
|||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
shutdownGroup := closer.NewCloserGroup()
|
||||||
|
|
||||||
mdb, err := initialize.MongoDB(ctx, cfg)
|
mdb, err := initialize.MongoDB(ctx, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Failed to initialize MongoDB", zap.Error(err))
|
logger.Error("Failed to initialize MongoDB", zap.Error(err))
|
||||||
@ -70,7 +74,7 @@ func Run(ctx context.Context, cfg initialize.Config, logger *zap.Logger) error {
|
|||||||
Mongo: mdb.Collection("codeword"),
|
Mongo: mdb.Collection("codeword"),
|
||||||
})
|
})
|
||||||
|
|
||||||
purgeWC := purge_worker.NewRecoveryWC(purge_worker.Deps{
|
purgeWC := purge_worker.NewPurgeWC(purge_worker.Deps{
|
||||||
Logger: logger,
|
Logger: logger,
|
||||||
Mongo: mdb.Collection("codeword"),
|
Mongo: mdb.Collection("codeword"),
|
||||||
})
|
})
|
||||||
@ -87,44 +91,28 @@ func Run(ctx context.Context, cfg initialize.Config, logger *zap.Logger) error {
|
|||||||
go func() {
|
go func() {
|
||||||
if err := server.Start(cfg.HTTPHost + ":" + cfg.HTTPPort); err != nil {
|
if err := server.Start(cfg.HTTPHost + ":" + cfg.HTTPPort); err != nil {
|
||||||
logger.Error("Server startup error", zap.Error(err))
|
logger.Error("Server startup error", zap.Error(err))
|
||||||
|
cancel()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
shutdownGroup.Add(closer.CloserFunc(server.Shutdown))
|
||||||
|
shutdownGroup.Add(closer.CloserFunc(mdb.Client().Disconnect))
|
||||||
|
shutdownGroup.Add(closer.CloserFunc(recoveryWC.Stop))
|
||||||
|
shutdownGroup.Add(closer.CloserFunc(purgeWC.Stop))
|
||||||
|
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
|
|
||||||
if err := shutdownApp(ctx, server, mdb, logger); err != nil {
|
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
return err
|
defer timeoutCancel()
|
||||||
}
|
if err := shutdownGroup.Call(timeoutCtx); err != nil {
|
||||||
logger.Info("The application has stopped")
|
if errors.Is(err, context.DeadlineExceeded) {
|
||||||
return nil
|
logger.Error("Shutdown timed out", zap.Error(err))
|
||||||
}
|
} else {
|
||||||
|
logger.Error("Failed to shutdown services gracefully", zap.Error(err))
|
||||||
// TODO возможно стоит вынести в отдельные файлы или отказаться от разделения на отдельные методы
|
}
|
||||||
|
|
||||||
func shutdownApp(ctx context.Context, server *httpserver.Server, mdb *mongo.Database, logger *zap.Logger) error {
|
|
||||||
if err := shutdownHTTPServer(ctx, server, logger); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := shutdownMongoDB(ctx, mdb, logger); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func shutdownHTTPServer(ctx context.Context, server *httpserver.Server, logger *zap.Logger) error {
|
|
||||||
if err := server.Shutdown(ctx); err != nil {
|
|
||||||
logger.Error("Error stopping HTTP server", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func shutdownMongoDB(ctx context.Context, mdb *mongo.Database, logger *zap.Logger) error {
|
|
||||||
if err := mdb.Client().Disconnect(ctx); err != nil {
|
|
||||||
logger.Error("Error when closing MongoDB connection", zap.Error(err))
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.Info("Application has stopped")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -18,7 +18,7 @@ type PurgeWorker struct {
|
|||||||
mongo *mongo.Collection
|
mongo *mongo.Collection
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRecoveryWC(deps Deps) *PurgeWorker {
|
func NewPurgeWC(deps Deps) *PurgeWorker {
|
||||||
return &PurgeWorker{
|
return &PurgeWorker{
|
||||||
logger: deps.Logger,
|
logger: deps.Logger,
|
||||||
mongo: deps.Mongo,
|
mongo: deps.Mongo,
|
||||||
@ -54,3 +54,7 @@ func (wc *PurgeWorker) processTasks(ctx context.Context) {
|
|||||||
wc.logger.Info("Deleted documents", zap.Int64("count", result.DeletedCount))
|
wc.logger.Info("Deleted documents", zap.Int64("count", result.DeletedCount))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (wc *PurgeWorker) Stop(ctx context.Context) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -114,3 +114,7 @@ func (wc *RecoveryWorker) sendRecoveryTask(ctx context.Context, task models.Reco
|
|||||||
//wc.logger.Info("Recovery email sent and restore request updated successfully", zap.String("email", task.Email))
|
//wc.logger.Info("Recovery email sent and restore request updated successfully", zap.String("email", task.Email))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (wc *RecoveryWorker) Stop(ctx context.Context) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
37
pkg/closer/closer.go
Normal file
37
pkg/closer/closer.go
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
package closer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Closer interface {
|
||||||
|
Close(ctx context.Context) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type CloserFunc func(ctx context.Context) error
|
||||||
|
|
||||||
|
func (cf CloserFunc) Close(ctx context.Context) error {
|
||||||
|
return cf(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
type CloserGroup struct {
|
||||||
|
closers []Closer
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCloserGroup() *CloserGroup {
|
||||||
|
return &CloserGroup{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cg *CloserGroup) Add(c Closer) {
|
||||||
|
cg.closers = append(cg.closers, c)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cg *CloserGroup) Call(ctx context.Context) error {
|
||||||
|
var closeErr error
|
||||||
|
for i := len(cg.closers) - 1; i >= 0; i-- {
|
||||||
|
if err := cg.closers[i].Close(ctx); err != nil && closeErr == nil {
|
||||||
|
closeErr = err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return closeErr
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user