This commit is contained in:
Pasha 2024-11-21 16:14:42 +03:00
parent 78248cec4e
commit 5af9ea1ada
3 changed files with 12 additions and 22 deletions

@ -11,7 +11,6 @@ import (
"penahub.gitlab.yandexcloud.net/external/trashlog.git/app" "penahub.gitlab.yandexcloud.net/external/trashlog.git/app"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/wrappers/zaptrashlog" "penahub.gitlab.yandexcloud.net/external/trashlog.git/wrappers/zaptrashlog"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/workers"
"syscall" "syscall"
"time" "time"
@ -107,8 +106,6 @@ func Run(config *models.Config, logger *zap.Logger, build Build) (appErr error)
return err return err
} }
watchWorker := workers.NewWatchWorker(mongoDB.Collection("accounts"), logger)
clients := initialize.NewClients(initialize.ClientsDeps{ clients := initialize.NewClients(initialize.ClientsDeps{
Logger: logger, Logger: logger,
AuthURL: &config.Service.AuthMicroservice.URL, AuthURL: &config.Service.AuthMicroservice.URL,
@ -153,7 +150,6 @@ func Run(config *models.Config, logger *zap.Logger, build Build) (appErr error)
Repositories: repositories, Repositories: repositories,
Clients: clients, Clients: clients,
MiddleWare: middleWare, MiddleWare: middleWare,
WatchWorker: watchWorker,
}) })
serverHTTP := server.NewServer(server.ServerConfig{ serverHTTP := server.NewServer(server.ServerConfig{
@ -172,8 +168,6 @@ func Run(config *models.Config, logger *zap.Logger, build Build) (appErr error)
serverGRPC.Register(rpcControllers) serverGRPC.Register(rpcControllers)
go watchWorker.Run(ctx)
go func() { go func() {
if err := serverHTTP.Start(config.HTTP.Host + ":" + config.HTTP.Port); err != nil { if err := serverHTTP.Start(config.HTTP.Host + ":" + config.HTTP.Port); err != nil {
logger.Error("Server startup error", zap.Error(err)) logger.Error("Server startup error", zap.Error(err))

@ -14,7 +14,6 @@ import (
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http/history" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http/history"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http/wallet" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http/wallet"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/workers"
) )
type RpcControllersDeps struct { type RpcControllersDeps struct {
@ -52,7 +51,6 @@ type HttpControllersDeps struct {
Logger *zap.Logger Logger *zap.Logger
Repositories *Repositories Repositories *Repositories
Clients *Clients Clients *Clients
WatchWorker *workers.WatchWorker
} }
type HttpController struct { type HttpController struct {
@ -71,7 +69,6 @@ func NewHttpControllers(deps HttpControllersDeps) *HttpController {
Logger: deps.Logger, Logger: deps.Logger,
Encrypt: deps.Encrypt, Encrypt: deps.Encrypt,
AuthClient: deps.Clients.AuthClient, AuthClient: deps.Clients.AuthClient,
WatchWorker: deps.WatchWorker,
}), }),
CartController: cart.NewCartController(cart.Deps{ CartController: cart.NewCartController(cart.Deps{
MiddleWare: deps.MiddleWare, MiddleWare: deps.MiddleWare,

@ -2,9 +2,9 @@ package account
import ( import (
"bufio" "bufio"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/gofiber/fiber/v2"
"go.uber.org/zap" "go.uber.org/zap"
"math" "math"
"penahub.gitlab.yandexcloud.net/backend/penahub_common/log_mw" "penahub.gitlab.yandexcloud.net/backend/penahub_common/log_mw"
@ -14,7 +14,6 @@ import (
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/repository" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/repository"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/workers"
"strconv" "strconv"
"time" "time"
) )
@ -25,7 +24,6 @@ type Deps struct {
Logger *zap.Logger Logger *zap.Logger
Encrypt *qutils.Encrypt Encrypt *qutils.Encrypt
AuthClient *client.AuthClient AuthClient *client.AuthClient
WatchWorker *workers.WatchWorker
} }
type AccountController struct { type AccountController struct {
@ -34,7 +32,6 @@ type AccountController struct {
logger *zap.Logger logger *zap.Logger
encrypt *qutils.Encrypt encrypt *qutils.Encrypt
authClient *client.AuthClient authClient *client.AuthClient
watchWorker *workers.WatchWorker
} }
func NewAccountController(deps Deps) *AccountController { func NewAccountController(deps Deps) *AccountController {
@ -44,7 +41,6 @@ func NewAccountController(deps Deps) *AccountController {
logger: deps.Logger, logger: deps.Logger,
encrypt: deps.Encrypt, encrypt: deps.Encrypt,
authClient: deps.AuthClient, authClient: deps.AuthClient,
watchWorker: deps.WatchWorker,
} }
} }
@ -264,9 +260,14 @@ func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error {
ctx.Set("Transfer-Encoding", "chunked") ctx.Set("Transfer-Encoding", "chunked")
accountCh := make(chan map[string]interface{}) accountCh := make(chan map[string]interface{})
//cancelCtx, cancel := context.WithCancel(ctx.Context()) cancelCtx, cancel := context.WithCancel(ctx.Context())
receiver.watchWorker.AddChannel(userID, accountCh) go func(ctx context.Context) {
defer close(accountCh)
if err := receiver.accountRepo.AccountPipe(ctx, userID, accountCh); err != nil {
receiver.logger.Error("error in account pipe repo method", zap.Error(err))
}
}(cancelCtx)
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) { ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
pingTicker := time.NewTicker(5 * time.Second) pingTicker := time.NewTicker(5 * time.Second)
@ -279,23 +280,21 @@ func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error {
} }
accountJSON, err := json.Marshal(accountData) accountJSON, err := json.Marshal(accountData)
if err != nil { if err != nil {
receiver.logger.Error("error marshal account JSON", zap.Error(err)) //receiver.logger.Error("error marshal account JSON", zap.Error(err))
continue continue
} }
fmt.Fprintf(w, "data: %s\n\n", accountJSON) fmt.Fprintf(w, "data: %s\n\n", accountJSON)
if err := w.Flush(); err != nil { if err := w.Flush(); err != nil {
receiver.logger.Error("error flushing", zap.Error(err)) receiver.logger.Error("error flushing", zap.Error(err))
//cancel() cancel()
//receiver.logger.Info("Close connection Account Pipe sse") receiver.logger.Info("Close connection Account Pipe sse")
//receiver.watchWorker.DropChannel(userID)
return return
} }
case <-pingTicker.C: case <-pingTicker.C:
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`) fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
if err := w.Flush(); err != nil { if err := w.Flush(); err != nil {
receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err)) receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
//receiver.watchWorker.DropChannel(userID) cancel()
//cancel()
return return
} }
} }