added watcher worker

This commit is contained in:
Pavel 2024-07-21 12:33:43 +03:00
parent 77d259aa4f
commit 26167e30f3
6 changed files with 117 additions and 13 deletions

4
.gitignore vendored

@ -7,3 +7,7 @@ main
/versionrecover.bolt /versionrecover.bolt
/recover.bolt /recover.bolt
/tests/benchmarks/cpu.prof
/tests/benchmarks/mem.prof
/tests/benchmarks/block.prof
/tests/benchmarks/benchmarks.test.exe

@ -11,6 +11,7 @@ import (
"penahub.gitlab.yandexcloud.net/external/trashlog.git/app" "penahub.gitlab.yandexcloud.net/external/trashlog.git/app"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/wrappers/zaptrashlog" "penahub.gitlab.yandexcloud.net/external/trashlog.git/wrappers/zaptrashlog"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/workers"
"syscall" "syscall"
"time" "time"
@ -106,6 +107,8 @@ func Run(config *models.Config, logger *zap.Logger, build Build) (appErr error)
return err return err
} }
watchWorker := workers.NewWatchWorker(mongoDB.Collection("accounts"), logger)
clients := initialize.NewClients(initialize.ClientsDeps{ clients := initialize.NewClients(initialize.ClientsDeps{
Logger: logger, Logger: logger,
AuthURL: &config.Service.AuthMicroservice.URL, AuthURL: &config.Service.AuthMicroservice.URL,
@ -150,6 +153,7 @@ func Run(config *models.Config, logger *zap.Logger, build Build) (appErr error)
Repositories: repositories, Repositories: repositories,
Clients: clients, Clients: clients,
MiddleWare: middleWare, MiddleWare: middleWare,
WatchWorker: watchWorker,
}) })
serverHTTP := server.NewServer(server.ServerConfig{ serverHTTP := server.NewServer(server.ServerConfig{
@ -168,6 +172,8 @@ func Run(config *models.Config, logger *zap.Logger, build Build) (appErr error)
serverGRPC.Register(rpcControllers) serverGRPC.Register(rpcControllers)
go watchWorker.Run(ctx)
go func() { go func() {
if err := serverHTTP.Start(config.HTTP.Host + ":" + config.HTTP.Port); err != nil { if err := serverHTTP.Start(config.HTTP.Host + ":" + config.HTTP.Port); err != nil {
logger.Error("Server startup error", zap.Error(err)) logger.Error("Server startup error", zap.Error(err))

@ -14,13 +14,14 @@ import (
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http/history" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http/history"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http/wallet" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http/wallet"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/workers"
) )
type RpcControllersDeps struct { type RpcControllersDeps struct {
Logger *zap.Logger Logger *zap.Logger
Services *Services Services *Services
Repositories *Repositories Repositories *Repositories
HLogger hlog.Logger HLogger hlog.Logger
} }
type RpcControllers struct { type RpcControllers struct {
@ -51,6 +52,7 @@ type HttpControllersDeps struct {
Logger *zap.Logger Logger *zap.Logger
Repositories *Repositories Repositories *Repositories
Clients *Clients Clients *Clients
WatchWorker *workers.WatchWorker
} }
type HttpController struct { type HttpController struct {
@ -69,6 +71,7 @@ func NewHttpControllers(deps HttpControllersDeps) *HttpController {
Logger: deps.Logger, Logger: deps.Logger,
Encrypt: deps.Encrypt, Encrypt: deps.Encrypt,
AuthClient: deps.Clients.AuthClient, AuthClient: deps.Clients.AuthClient,
WatchWorker: deps.WatchWorker,
}), }),
CartController: cart.NewCartController(cart.Deps{ CartController: cart.NewCartController(cart.Deps{
MiddleWare: deps.MiddleWare, MiddleWare: deps.MiddleWare,

@ -2,7 +2,6 @@ package account
import ( import (
"bufio" "bufio"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
@ -15,6 +14,7 @@ import (
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/repository" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/repository"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/workers"
"strconv" "strconv"
"time" "time"
) )
@ -25,6 +25,7 @@ type Deps struct {
Logger *zap.Logger Logger *zap.Logger
Encrypt *qutils.Encrypt Encrypt *qutils.Encrypt
AuthClient *client.AuthClient AuthClient *client.AuthClient
WatchWorker *workers.WatchWorker
} }
type AccountController struct { type AccountController struct {
@ -33,6 +34,7 @@ type AccountController struct {
logger *zap.Logger logger *zap.Logger
encrypt *qutils.Encrypt encrypt *qutils.Encrypt
authClient *client.AuthClient authClient *client.AuthClient
watchWorker *workers.WatchWorker
} }
func NewAccountController(deps Deps) *AccountController { func NewAccountController(deps Deps) *AccountController {
@ -42,6 +44,7 @@ func NewAccountController(deps Deps) *AccountController {
logger: deps.Logger, logger: deps.Logger,
encrypt: deps.Encrypt, encrypt: deps.Encrypt,
authClient: deps.AuthClient, authClient: deps.AuthClient,
watchWorker: deps.WatchWorker,
} }
} }
@ -261,14 +264,9 @@ func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error {
ctx.Set("Transfer-Encoding", "chunked") ctx.Set("Transfer-Encoding", "chunked")
accountCh := make(chan map[string]interface{}) accountCh := make(chan map[string]interface{})
cancelCtx, cancel := context.WithCancel(ctx.Context()) //cancelCtx, cancel := context.WithCancel(ctx.Context())
go func(ctx context.Context) { receiver.watchWorker.AddChannel(userID, accountCh)
defer close(accountCh)
if err := receiver.accountRepo.AccountPipe(ctx, userID, accountCh); err != nil {
receiver.logger.Error("error in account pipe repo method", zap.Error(err))
}
}(cancelCtx)
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) { ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
pingTicker := time.NewTicker(5 * time.Second) pingTicker := time.NewTicker(5 * time.Second)
@ -287,15 +285,17 @@ func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error {
fmt.Fprintf(w, "data: %s\n\n", accountJSON) fmt.Fprintf(w, "data: %s\n\n", accountJSON)
if err := w.Flush(); err != nil { if err := w.Flush(); err != nil {
receiver.logger.Error("error flushing", zap.Error(err)) receiver.logger.Error("error flushing", zap.Error(err))
cancel() //cancel()
receiver.logger.Info("Close connection Account Pipe sse") //receiver.logger.Info("Close connection Account Pipe sse")
receiver.watchWorker.DropChannel(userID)
return return
} }
case <-pingTicker.C: case <-pingTicker.C:
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`) fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
if err := w.Flush(); err != nil { if err := w.Flush(); err != nil {
receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err)) receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
cancel() receiver.watchWorker.DropChannel(userID)
//cancel()
return return
} }
} }

@ -1 +1,92 @@
package workers package workers
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
"sync"
)
type WatchWorker struct {
accountChannels map[string]chan map[string]interface{}
mu sync.Mutex
mongoDB *mongo.Collection
logger *zap.Logger
}
func NewWatchWorker(mongoDB *mongo.Collection, logger *zap.Logger) *WatchWorker {
return &WatchWorker{
accountChannels: make(map[string]chan map[string]interface{}),
mongoDB: mongoDB,
logger: logger,
}
}
func (w *WatchWorker) Run(ctx context.Context) {
pipeline := mongo.Pipeline{
{{"$match", bson.M{"operationType": "update"}}},
}
opts := options.ChangeStream()
opts.SetFullDocument(options.UpdateLookup)
changeStream, err := w.mongoDB.Watch(ctx, pipeline, opts)
if err != nil {
w.logger.Error("error start change stream", zap.Error(err))
return
}
defer changeStream.Close(ctx)
for {
if changeStream.Next(ctx) {
var changeEvent struct {
FullDocument struct {
UserID string `bson:"userId"`
} `bson:"fullDocument"`
UpdateDescription struct {
UpdatedFields map[string]interface{} `bson:"updatedFields"`
} `bson:"updateDescription"`
}
if err := changeStream.Decode(&changeEvent); err != nil {
w.logger.Error("error decode change event", zap.Error(err))
continue
}
w.mu.Lock()
ch, ok := w.accountChannels[changeEvent.FullDocument.UserID]
w.mu.Unlock()
if !ok {
fmt.Println("userID", changeEvent.FullDocument.UserID)
fmt.Println("fields", changeEvent.UpdateDescription.UpdatedFields)
}
if ok {
select {
case ch <- changeEvent.UpdateDescription.UpdatedFields:
case <-ctx.Done():
return
}
}
}
select {
case <-ctx.Done():
return
default:
fmt.Println("default select case")
}
}
}
func (w *WatchWorker) AddChannel(userID string, ch chan map[string]interface{}) {
w.mu.Lock()
defer w.mu.Unlock()
w.accountChannels[userID] = ch
}
func (w *WatchWorker) DropChannel(userID string) {
w.mu.Lock()
defer w.mu.Unlock()
fmt.Println("drop chan, userID:", userID)
delete(w.accountChannels, userID)
}

@ -13,7 +13,7 @@ import (
func TestAccountPipe(t *testing.T) { func TestAccountPipe(t *testing.T) {
jwtUtil := helpers.InitializeJWT() jwtUtil := helpers.InitializeJWT()
token, tokenErr := jwtUtil.Create("807f1f77bcf81cd799439077") token, tokenErr := jwtUtil.Create("64e53ed187392e122e5d3d50")
if !assert.NoError(t, tokenErr) { if !assert.NoError(t, tokenErr) {
return return
} }