diff --git a/internal/app/app.go b/internal/app/app.go index 7f1b86e..9a9104b 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -11,7 +11,6 @@ import ( "penahub.gitlab.yandexcloud.net/external/trashlog.git/app" "penahub.gitlab.yandexcloud.net/external/trashlog.git/wrappers/zaptrashlog" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http" - "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/workers" "syscall" "time" @@ -107,8 +106,6 @@ func Run(config *models.Config, logger *zap.Logger, build Build) (appErr error) return err } - watchWorker := workers.NewWatchWorker(mongoDB.Collection("accounts"), logger) - clients := initialize.NewClients(initialize.ClientsDeps{ Logger: logger, AuthURL: &config.Service.AuthMicroservice.URL, @@ -153,7 +150,6 @@ func Run(config *models.Config, logger *zap.Logger, build Build) (appErr error) Repositories: repositories, Clients: clients, MiddleWare: middleWare, - WatchWorker: watchWorker, }) serverHTTP := server.NewServer(server.ServerConfig{ @@ -172,8 +168,6 @@ func Run(config *models.Config, logger *zap.Logger, build Build) (appErr error) serverGRPC.Register(rpcControllers) - go watchWorker.Run(ctx) - go func() { if err := serverHTTP.Start(config.HTTP.Host + ":" + config.HTTP.Port); err != nil { logger.Error("Server startup error", zap.Error(err)) diff --git a/internal/initialize/controllers.go b/internal/initialize/controllers.go index 2d3f8f1..a53e108 100644 --- a/internal/initialize/controllers.go +++ b/internal/initialize/controllers.go @@ -14,7 +14,6 @@ import ( "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http/history" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http/wallet" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models" - "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/workers" ) type RpcControllersDeps struct { @@ -52,7 +51,6 @@ type HttpControllersDeps struct { Logger *zap.Logger Repositories *Repositories Clients *Clients - WatchWorker *workers.WatchWorker } type HttpController struct { @@ -71,7 +69,6 @@ func NewHttpControllers(deps HttpControllersDeps) *HttpController { Logger: deps.Logger, Encrypt: deps.Encrypt, AuthClient: deps.Clients.AuthClient, - WatchWorker: deps.WatchWorker, }), CartController: cart.NewCartController(cart.Deps{ MiddleWare: deps.MiddleWare, diff --git a/internal/interface/controller/http/account/controllers.go b/internal/interface/controller/http/account/controllers.go index d5b6572..9433287 100644 --- a/internal/interface/controller/http/account/controllers.go +++ b/internal/interface/controller/http/account/controllers.go @@ -2,9 +2,9 @@ package account import ( "bufio" + "context" "encoding/json" "fmt" - "github.com/gofiber/fiber/v2" "go.uber.org/zap" "math" "penahub.gitlab.yandexcloud.net/backend/penahub_common/log_mw" @@ -14,7 +14,6 @@ import ( "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/repository" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models" - "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/workers" "strconv" "time" ) @@ -25,7 +24,6 @@ type Deps struct { Logger *zap.Logger Encrypt *qutils.Encrypt AuthClient *client.AuthClient - WatchWorker *workers.WatchWorker } type AccountController struct { @@ -34,7 +32,6 @@ type AccountController struct { logger *zap.Logger encrypt *qutils.Encrypt authClient *client.AuthClient - watchWorker *workers.WatchWorker } func NewAccountController(deps Deps) *AccountController { @@ -44,7 +41,6 @@ func NewAccountController(deps Deps) *AccountController { logger: deps.Logger, encrypt: deps.Encrypt, authClient: deps.AuthClient, - watchWorker: deps.WatchWorker, } } @@ -264,9 +260,14 @@ func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error { ctx.Set("Transfer-Encoding", "chunked") accountCh := make(chan map[string]interface{}) - //cancelCtx, cancel := context.WithCancel(ctx.Context()) + cancelCtx, cancel := context.WithCancel(ctx.Context()) - receiver.watchWorker.AddChannel(userID, accountCh) + go func(ctx context.Context) { + defer close(accountCh) + if err := receiver.accountRepo.AccountPipe(ctx, userID, accountCh); err != nil { + receiver.logger.Error("error in account pipe repo method", zap.Error(err)) + } + }(cancelCtx) ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) { pingTicker := time.NewTicker(5 * time.Second) @@ -279,23 +280,21 @@ func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error { } accountJSON, err := json.Marshal(accountData) if err != nil { - receiver.logger.Error("error marshal account JSON", zap.Error(err)) + //receiver.logger.Error("error marshal account JSON", zap.Error(err)) continue } fmt.Fprintf(w, "data: %s\n\n", accountJSON) if err := w.Flush(); err != nil { receiver.logger.Error("error flushing", zap.Error(err)) - //cancel() - //receiver.logger.Info("Close connection Account Pipe sse") - //receiver.watchWorker.DropChannel(userID) + cancel() + receiver.logger.Info("Close connection Account Pipe sse") return } case <-pingTicker.C: fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`) if err := w.Flush(); err != nil { receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err)) - //receiver.watchWorker.DropChannel(userID) - //cancel() + cancel() return } }