diff --git a/.gitignore b/.gitignore index b656595..a102894 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,7 @@ main /versionrecover.bolt /recover.bolt +/tests/benchmarks/cpu.prof +/tests/benchmarks/mem.prof +/tests/benchmarks/block.prof +/tests/benchmarks/benchmarks.test.exe diff --git a/internal/app/app.go b/internal/app/app.go index 9a9104b..7f1b86e 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -11,6 +11,7 @@ import ( "penahub.gitlab.yandexcloud.net/external/trashlog.git/app" "penahub.gitlab.yandexcloud.net/external/trashlog.git/wrappers/zaptrashlog" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http" + "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/workers" "syscall" "time" @@ -106,6 +107,8 @@ func Run(config *models.Config, logger *zap.Logger, build Build) (appErr error) return err } + watchWorker := workers.NewWatchWorker(mongoDB.Collection("accounts"), logger) + clients := initialize.NewClients(initialize.ClientsDeps{ Logger: logger, AuthURL: &config.Service.AuthMicroservice.URL, @@ -150,6 +153,7 @@ func Run(config *models.Config, logger *zap.Logger, build Build) (appErr error) Repositories: repositories, Clients: clients, MiddleWare: middleWare, + WatchWorker: watchWorker, }) serverHTTP := server.NewServer(server.ServerConfig{ @@ -168,6 +172,8 @@ func Run(config *models.Config, logger *zap.Logger, build Build) (appErr error) serverGRPC.Register(rpcControllers) + go watchWorker.Run(ctx) + go func() { if err := serverHTTP.Start(config.HTTP.Host + ":" + config.HTTP.Port); err != nil { logger.Error("Server startup error", zap.Error(err)) diff --git a/internal/initialize/controllers.go b/internal/initialize/controllers.go index c68ed58..2d3f8f1 100644 --- a/internal/initialize/controllers.go +++ b/internal/initialize/controllers.go @@ -14,13 +14,14 @@ import ( "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http/history" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http/wallet" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models" + "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/workers" ) type RpcControllersDeps struct { Logger *zap.Logger Services *Services Repositories *Repositories - HLogger hlog.Logger + HLogger hlog.Logger } type RpcControllers struct { @@ -51,6 +52,7 @@ type HttpControllersDeps struct { Logger *zap.Logger Repositories *Repositories Clients *Clients + WatchWorker *workers.WatchWorker } type HttpController struct { @@ -69,6 +71,7 @@ func NewHttpControllers(deps HttpControllersDeps) *HttpController { Logger: deps.Logger, Encrypt: deps.Encrypt, AuthClient: deps.Clients.AuthClient, + WatchWorker: deps.WatchWorker, }), CartController: cart.NewCartController(cart.Deps{ MiddleWare: deps.MiddleWare, diff --git a/internal/interface/controller/http/account/controllers.go b/internal/interface/controller/http/account/controllers.go index 3fd5b9a..18a2cce 100644 --- a/internal/interface/controller/http/account/controllers.go +++ b/internal/interface/controller/http/account/controllers.go @@ -2,7 +2,6 @@ package account import ( "bufio" - "context" "encoding/json" "fmt" "github.com/gofiber/fiber/v2" @@ -15,6 +14,7 @@ import ( "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/repository" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models" + "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/workers" "strconv" "time" ) @@ -25,6 +25,7 @@ type Deps struct { Logger *zap.Logger Encrypt *qutils.Encrypt AuthClient *client.AuthClient + WatchWorker *workers.WatchWorker } type AccountController struct { @@ -33,6 +34,7 @@ type AccountController struct { logger *zap.Logger encrypt *qutils.Encrypt authClient *client.AuthClient + watchWorker *workers.WatchWorker } func NewAccountController(deps Deps) *AccountController { @@ -42,6 +44,7 @@ func NewAccountController(deps Deps) *AccountController { logger: deps.Logger, encrypt: deps.Encrypt, authClient: deps.AuthClient, + watchWorker: deps.WatchWorker, } } @@ -261,14 +264,9 @@ func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error { ctx.Set("Transfer-Encoding", "chunked") accountCh := make(chan map[string]interface{}) - cancelCtx, cancel := context.WithCancel(ctx.Context()) + //cancelCtx, cancel := context.WithCancel(ctx.Context()) - go func(ctx context.Context) { - defer close(accountCh) - if err := receiver.accountRepo.AccountPipe(ctx, userID, accountCh); err != nil { - receiver.logger.Error("error in account pipe repo method", zap.Error(err)) - } - }(cancelCtx) + receiver.watchWorker.AddChannel(userID, accountCh) ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) { pingTicker := time.NewTicker(5 * time.Second) @@ -287,15 +285,17 @@ func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error { fmt.Fprintf(w, "data: %s\n\n", accountJSON) if err := w.Flush(); err != nil { receiver.logger.Error("error flushing", zap.Error(err)) - cancel() - receiver.logger.Info("Close connection Account Pipe sse") + //cancel() + //receiver.logger.Info("Close connection Account Pipe sse") + receiver.watchWorker.DropChannel(userID) return } case <-pingTicker.C: fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`) if err := w.Flush(); err != nil { receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err)) - cancel() + receiver.watchWorker.DropChannel(userID) + //cancel() return } } diff --git a/internal/workers/mongo_watcher.go b/internal/workers/mongo_watcher.go index 1a3e3f2..5e7bb51 100644 --- a/internal/workers/mongo_watcher.go +++ b/internal/workers/mongo_watcher.go @@ -1 +1,92 @@ package workers + +import ( + "context" + "fmt" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.uber.org/zap" + "sync" +) + +type WatchWorker struct { + accountChannels map[string]chan map[string]interface{} + mu sync.Mutex + mongoDB *mongo.Collection + logger *zap.Logger +} + +func NewWatchWorker(mongoDB *mongo.Collection, logger *zap.Logger) *WatchWorker { + return &WatchWorker{ + accountChannels: make(map[string]chan map[string]interface{}), + mongoDB: mongoDB, + logger: logger, + } +} + +func (w *WatchWorker) Run(ctx context.Context) { + pipeline := mongo.Pipeline{ + {{"$match", bson.M{"operationType": "update"}}}, + } + opts := options.ChangeStream() + opts.SetFullDocument(options.UpdateLookup) + changeStream, err := w.mongoDB.Watch(ctx, pipeline, opts) + if err != nil { + w.logger.Error("error start change stream", zap.Error(err)) + return + } + defer changeStream.Close(ctx) + + for { + if changeStream.Next(ctx) { + var changeEvent struct { + FullDocument struct { + UserID string `bson:"userId"` + } `bson:"fullDocument"` + UpdateDescription struct { + UpdatedFields map[string]interface{} `bson:"updatedFields"` + } `bson:"updateDescription"` + } + if err := changeStream.Decode(&changeEvent); err != nil { + w.logger.Error("error decode change event", zap.Error(err)) + continue + } + + w.mu.Lock() + ch, ok := w.accountChannels[changeEvent.FullDocument.UserID] + w.mu.Unlock() + if !ok { + fmt.Println("userID", changeEvent.FullDocument.UserID) + fmt.Println("fields", changeEvent.UpdateDescription.UpdatedFields) + } + if ok { + select { + case ch <- changeEvent.UpdateDescription.UpdatedFields: + case <-ctx.Done(): + return + } + } + } + + select { + case <-ctx.Done(): + return + default: + fmt.Println("default select case") + } + } +} + +func (w *WatchWorker) AddChannel(userID string, ch chan map[string]interface{}) { + w.mu.Lock() + defer w.mu.Unlock() + w.accountChannels[userID] = ch +} + +func (w *WatchWorker) DropChannel(userID string) { + w.mu.Lock() + defer w.mu.Unlock() + fmt.Println("drop chan, userID:", userID) + delete(w.accountChannels, userID) +} diff --git a/tests/e2e/accountPipe_test.go b/tests/e2e/accountPipe_test.go index 0cb2f4a..9b8e065 100644 --- a/tests/e2e/accountPipe_test.go +++ b/tests/e2e/accountPipe_test.go @@ -13,7 +13,7 @@ import ( func TestAccountPipe(t *testing.T) { jwtUtil := helpers.InitializeJWT() - token, tokenErr := jwtUtil.Create("807f1f77bcf81cd799439077") + token, tokenErr := jwtUtil.Create("64e53ed187392e122e5d3d50") if !assert.NoError(t, tokenErr) { return }