diff --git a/tests/benchmarks/sse_watcher_test.go b/tests/benchmarks/sse_watcher_test.go index c14d37d..0c3dd75 100644 --- a/tests/benchmarks/sse_watcher_test.go +++ b/tests/benchmarks/sse_watcher_test.go @@ -2,207 +2,14 @@ package benchmarks import ( "bufio" - "context" - "encoding/json" "fmt" - "github.com/gofiber/fiber/v2" "github.com/stretchr/testify/require" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "go.uber.org/zap" - "log" "net/http" - "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/workers" "sync" "testing" "time" ) -func initDB(ctx context.Context) (*mongo.Client, *mongo.Collection) { - clientOptions := options.Client().ApplyURI("mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0&readPreference=primary&ssl=false") - client, err := mongo.Connect(context.Background(), clientOptions) - if err != nil { - panic(err) - } - - collection := client.Database("testdb").Collection("testcollection") - collection.Drop(ctx) - for i := 0; i < 1000; i++ { - _, err := collection.InsertOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i), "field": "value"}) - if err != nil { - panic(err) - } - } - - return client, collection -} - -func initSrv(repo *AccountRepository, logger *zap.Logger, watchWorker *workers.WatchWorker) *fiber.App { - app := fiber.New() - controller := &AccountController{ - accountRepo: repo, - logger: logger, - watchWorker: watchWorker, - } - app.Get("/account-pipe/:userID", controller.AccountPipe) - app.Use("/account-pipe-wc/:userID", controller.AccountPipeWC) - return app -} - -type AccountController struct { - accountRepo *AccountRepository - logger *zap.Logger - watchWorker *workers.WatchWorker -} - -func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error { - userID := ctx.Params("userID") - - ctx.Set(fiber.HeaderContentType, "text/event-stream") - ctx.Set("Cache-Control", "no-cache") - ctx.Set("Connection", "keep-alive") - ctx.Set("Transfer-Encoding", "chunked") - - accountCh := make(chan map[string]interface{}) - cancelCtx, cancel := context.WithCancel(ctx.Context()) - - go func(ctx context.Context) { - defer close(accountCh) - if err := receiver.accountRepo.AccountPipe(ctx, userID, accountCh); err != nil { - //receiver.logger.Error("error in account pipe repo method", zap.Error(err)) - } - }(cancelCtx) - - ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) { - pingTicker := time.NewTicker(5 * time.Second) - defer pingTicker.Stop() - for { - select { - case accountData, ok := <-accountCh: - if !ok { - return - } - accountJSON, err := json.Marshal(accountData) - if err != nil { - //receiver.logger.Error("error marshal account JSON", zap.Error(err)) - continue - } - fmt.Fprintf(w, "data: %s\n\n", accountJSON) - if err := w.Flush(); err != nil { - //receiver.logger.Error("error flushing", zap.Error(err)) - cancel() - //receiver.logger.Info("Close connection Account Pipe sse") - return - } - case <-pingTicker.C: - fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`) - if err := w.Flush(); err != nil { - //receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err)) - cancel() - return - } - } - } - }) - - return nil -} - -func (receiver *AccountController) AccountPipeWC(ctx *fiber.Ctx) error { - userID := ctx.Params("userID") - - ctx.Set(fiber.HeaderContentType, "text/event-stream") - ctx.Set("Cache-Control", "no-cache") - ctx.Set("Connection", "keep-alive") - ctx.Set("Transfer-Encoding", "chunked") - - accountCh := make(chan map[string]interface{}) - //cancelCtx, cancel := context.WithCancel(ctx.Context()) - - receiver.watchWorker.AddChannel(userID, accountCh) - - ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) { - pingTicker := time.NewTicker(5 * time.Second) - defer pingTicker.Stop() - for { - select { - case accountData, ok := <-accountCh: - if !ok { - return - } - accountJSON, err := json.Marshal(accountData) - if err != nil { - receiver.logger.Error("error marshal account JSON", zap.Error(err)) - continue - } - fmt.Fprintf(w, "data: %s\n\n", accountJSON) - if err := w.Flush(); err != nil { - //receiver.logger.Error("error flushing", zap.Error(err)) - //cancel() - //receiver.logger.Info("Close connection Account Pipe sse") - //receiver.watchWorker.DropChannel(userID) - return - } - case <-pingTicker.C: - fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`) - if err := w.Flush(); err != nil { - //receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err)) - //receiver.watchWorker.DropChannel(userID) - //cancel() - return - } - } - } - }) - - return nil -} - -type AccountRepository struct { - mongoDB *mongo.Collection - logger *zap.Logger -} - -func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID string, accountCh chan<- map[string]interface{}) error { - pipeline := mongo.Pipeline{ - {{"$match", bson.M{"operationType": "update", "fullDocument.userId": userID}}}, - } - - opts := options.ChangeStream() - opts.SetFullDocument(options.UpdateLookup) - changeStream, err := receiver.mongoDB.Watch(ctx, pipeline, opts) - if err != nil { - return err - } - defer changeStream.Close(ctx) - - for { - select { - case <-ctx.Done(): - //receiver.logger.Info("Context canceled, thread is closed now") - return nil - default: - if changeStream.Next(ctx) { - var changeEvent struct { - UpdateDescription struct { - UpdatedFields map[string]interface{} `bson:"updatedFields"` - } `bson:"updateDescription"` - } - if err := changeStream.Decode(&changeEvent); err != nil { - //receiver.logger.Error("error decoding change event", zap.Error(err)) - continue - } - select { - case accountCh <- changeEvent.UpdateDescription.UpdatedFields: - case <-ctx.Done(): - return nil - } - } - } - } -} - //goos: windows //goarch: amd64 //pkg: penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks @@ -215,82 +22,41 @@ func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID strin //PASS //ok penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks 130.316s -func BenchmarkAccountPipe(b *testing.B) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - logger, err := zap.NewProduction(zap.AddStacktrace(zap.DPanicLevel)) - if err != nil { - log.Fatalf("failed to init zap logger: %v", err) - } - client, collection := initDB(ctx) - defer client.Disconnect(ctx) - - go func() { - for { - select { - case <-ctx.Done(): - fmt.Println("DROPED") - return - default: - for i := 0; i < 100; i++ { - _, err := collection.UpdateOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i)}, - bson.M{"$set": bson.M{"field": fmt.Sprintf("value-%d", time.Now().UnixNano())}}) - if err != nil { - logger.Error("failed update", zap.Error(err)) - } - } - time.Sleep(1 * time.Second) - } - } - }() - - watchWorker := workers.NewWatchWorker(collection, logger) - repo := &AccountRepository{mongoDB: collection, logger: logger} - app := initSrv(repo, logger, watchWorker) - - go func() { - if err := app.Listen(":3000"); err != nil { - b.Fatal(err) - } - }() - time.Sleep(5 * time.Second) - - for _, n := range []int{10, 20, 30, 40, 50} { - b.Run(fmt.Sprintf("COUNT_%d", n), func(b *testing.B) { - var wg sync.WaitGroup - for i := 0; i < b.N; i++ { - wg.Add(n) - for j := 0; j < n; j++ { - go func(j int) { - defer wg.Done() - userID := fmt.Sprintf("user%d", j) - //fmt.Println(userID) - resp, err := http.Get(fmt.Sprintf("http://localhost:3000/account-pipe/%s", userID)) - require.NoError(b, err) - defer resp.Body.Close() - scanner := bufio.NewScanner(resp.Body) - count := 0 - for scanner.Scan() { - if count == 25 { - break - } - count++ - //line := scanner.Text() - //fmt.Println("Received:", line) - } - time.Sleep(2 * time.Second) - }(j) - } - wg.Wait() - time.Sleep(1 * time.Second) - } - }) - } - - if err := app.Shutdown(); err != nil { - b.Fatalf("error return srv: %s", err.Error()) - } -} +//func BenchmarkAccountPipe(b *testing.B) { +// for _, n := range []int{100} { +// b.Run(fmt.Sprintf("COUNT_%d", n), func(b *testing.B) { +// var wg sync.WaitGroup +// for i := 0; i < b.N; i++ { +// wg.Add(n) +// for j := 0; j < n; j++ { +// go func(j int) { +// defer wg.Done() +// userID := fmt.Sprintf("user%d", j) +// //fmt.Println(userID) +// resp, err := http.Get(fmt.Sprintf("http://localhost:3000/account-pipe/%s", userID)) +// require.NoError(b, err) +// defer resp.Body.Close() +// scanner := bufio.NewScanner(resp.Body) +// count := 0 +// for scanner.Scan() { +// if count == 25 { +// break +// } +// count++ +// //line := scanner.Text() +// //fmt.Println("Received:", line) +// } +// time.Sleep(2 * time.Second) +// }(j) +// time.Sleep(time.Second) +// } +// wg.Wait() +// fmt.Println("WAITER OK") +// time.Sleep(1 * time.Second) +// } +// }) +// } +//} //goos: windows //goarch: amd64 @@ -304,46 +70,7 @@ func BenchmarkAccountPipe(b *testing.B) { //PASS func BenchmarkAccountPipeWithWorker(b *testing.B) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - logger, err := zap.NewProduction(zap.AddStacktrace(zap.DPanicLevel)) - if err != nil { - log.Fatalf("failed to init zap logger: %v", err) - } - client, collection := initDB(ctx) - defer client.Disconnect(ctx) - - go func() { - for { - select { - case <-ctx.Done(): - fmt.Println("DROPED") - return - default: - for i := 0; i < 100; i++ { - _, err := collection.UpdateOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i)}, - bson.M{"$set": bson.M{"field": fmt.Sprintf("value-%d", time.Now().UnixNano())}}) - if err != nil { - logger.Error("failed update", zap.Error(err)) - } - } - time.Sleep(1 * time.Second) - } - } - }() - - watchWorker := workers.NewWatchWorker(collection, logger) - repo := &AccountRepository{mongoDB: collection, logger: logger} - app := initSrv(repo, logger, watchWorker) - - go func() { - if err := app.Listen(":3000"); err != nil { - b.Fatal(err) - } - }() - time.Sleep(5 * time.Second) - - for _, n := range []int{10, 20, 30, 40, 50} { + for _, n := range []int{500} { b.Run(fmt.Sprintf("COUNT_%d", n), func(b *testing.B) { var wg sync.WaitGroup for i := 0; i < b.N; i++ { @@ -368,14 +95,11 @@ func BenchmarkAccountPipeWithWorker(b *testing.B) { } time.Sleep(2 * time.Second) }(j) + time.Sleep(time.Second) } wg.Wait() time.Sleep(1 * time.Second) } }) } - - if err := app.Shutdown(); err != nil { - b.Fatalf("error return srv: %s", err.Error()) - } } diff --git a/tests/server/main.go b/tests/server/main.go new file mode 100644 index 0000000..527d74a --- /dev/null +++ b/tests/server/main.go @@ -0,0 +1,250 @@ +package main + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "github.com/gofiber/fiber/v2" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.uber.org/zap" + "log" + "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/workers" + "time" +) + +func initDB(ctx context.Context) (*mongo.Client, *mongo.Collection) { + clientOptions := options.Client().ApplyURI("mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0&readPreference=primary&ssl=false") + client, err := mongo.Connect(context.Background(), clientOptions) + if err != nil { + panic(err) + } + + collection := client.Database("testdb").Collection("testcollection") + collection.Drop(ctx) + for i := 0; i < 1000; i++ { + _, err := collection.InsertOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i), "field": "value"}) + if err != nil { + panic(err) + } + } + + return client, collection +} + +func initSrv(repo *AccountRepository, logger *zap.Logger, watchWorker *workers.WatchWorker) *fiber.App { + app := fiber.New() + controller := &AccountController{ + accountRepo: repo, + logger: logger, + watchWorker: watchWorker, + } + app.Get("/account-pipe/:userID", controller.AccountPipe) + app.Use("/account-pipe-wc/:userID", controller.AccountPipeWC) + return app +} + +type AccountController struct { + accountRepo *AccountRepository + logger *zap.Logger + watchWorker *workers.WatchWorker +} + +func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error { + userID := ctx.Params("userID") + + ctx.Set(fiber.HeaderContentType, "text/event-stream") + ctx.Set("Cache-Control", "no-cache") + ctx.Set("Connection", "keep-alive") + ctx.Set("Transfer-Encoding", "chunked") + + accountCh := make(chan map[string]interface{}) + cancelCtx, cancel := context.WithCancel(ctx.Context()) + + go func(ctx context.Context) { + defer close(accountCh) + if err := receiver.accountRepo.AccountPipe(ctx, userID, accountCh); err != nil { + //receiver.logger.Error("error in account pipe repo method", zap.Error(err)) + } + }(cancelCtx) + + ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) { + pingTicker := time.NewTicker(5 * time.Second) + defer pingTicker.Stop() + for { + select { + case accountData, ok := <-accountCh: + if !ok { + return + } + accountJSON, err := json.Marshal(accountData) + if err != nil { + //receiver.logger.Error("error marshal account JSON", zap.Error(err)) + continue + } + fmt.Fprintf(w, "data: %s\n\n", accountJSON) + if err := w.Flush(); err != nil { + //receiver.logger.Error("error flushing", zap.Error(err)) + cancel() + //receiver.logger.Info("Close connection Account Pipe sse") + return + } + case <-pingTicker.C: + fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`) + if err := w.Flush(); err != nil { + //receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err)) + cancel() + return + } + } + } + }) + + return nil +} + +func (receiver *AccountController) AccountPipeWC(ctx *fiber.Ctx) error { + userID := ctx.Params("userID") + + ctx.Set(fiber.HeaderContentType, "text/event-stream") + ctx.Set("Cache-Control", "no-cache") + ctx.Set("Connection", "keep-alive") + ctx.Set("Transfer-Encoding", "chunked") + + accountCh := make(chan map[string]interface{}) + //cancelCtx, cancel := context.WithCancel(ctx.Context()) + + receiver.watchWorker.AddChannel(userID, accountCh) + + ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) { + pingTicker := time.NewTicker(5 * time.Second) + defer pingTicker.Stop() + for { + select { + case accountData, ok := <-accountCh: + if !ok { + return + } + accountJSON, err := json.Marshal(accountData) + if err != nil { + receiver.logger.Error("error marshal account JSON", zap.Error(err)) + continue + } + fmt.Fprintf(w, "data: %s\n\n", accountJSON) + if err := w.Flush(); err != nil { + //receiver.logger.Error("error flushing", zap.Error(err)) + //cancel() + //receiver.logger.Info("Close connection Account Pipe sse") + //receiver.watchWorker.DropChannel(userID) + return + } + case <-pingTicker.C: + fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`) + if err := w.Flush(); err != nil { + //receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err)) + //receiver.watchWorker.DropChannel(userID) + //cancel() + return + } + } + } + }) + + return nil +} + +type AccountRepository struct { + mongoDB *mongo.Collection + logger *zap.Logger +} + +func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID string, accountCh chan<- map[string]interface{}) error { + pipeline := mongo.Pipeline{ + {{"$match", bson.M{"operationType": "update", "fullDocument.userId": userID}}}, + } + + opts := options.ChangeStream() + opts.SetFullDocument(options.UpdateLookup) + changeStream, err := receiver.mongoDB.Watch(ctx, pipeline, opts) + if err != nil { + return err + } + defer changeStream.Close(ctx) + + for { + select { + case <-ctx.Done(): + //receiver.logger.Info("Context canceled, thread is closed now") + return nil + default: + if changeStream.Next(ctx) { + var changeEvent struct { + UpdateDescription struct { + UpdatedFields map[string]interface{} `bson:"updatedFields"` + } `bson:"updateDescription"` + } + if err := changeStream.Decode(&changeEvent); err != nil { + //receiver.logger.Error("error decoding change event", zap.Error(err)) + continue + } + select { + case accountCh <- changeEvent.UpdateDescription.UpdatedFields: + case <-ctx.Done(): + return nil + } + } + } + } +} + +func main() { + ctx := context.Background() + + logger, err := zap.NewProduction() + if err != nil { + log.Fatalf(err.Error()) + } + defer logger.Sync() + + client, collection := initDB(ctx) + defer func() { + if err := client.Disconnect(ctx); err != nil { + logger.Fatal("Error close mongo", zap.Error(err)) + } + }() + + repo := &AccountRepository{ + mongoDB: collection, + logger: logger, + } + + watchWorker := workers.NewWatchWorker(collection, logger) + go watchWorker.Run(ctx) + + //go func() { + // for { + // select { + // case <-ctx.Done(): + // fmt.Println("stop update") + // return + // default: + // for i := 0; i < 800; i++ { + // _, err := collection.UpdateOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i)}, + // bson.M{"$set": bson.M{"field": fmt.Sprintf("value-%d", time.Now().UnixNano())}}) + // if err != nil { + // logger.Error("error update", zap.Error(err)) + // } + // } + // time.Sleep(1 * time.Second) + // } + // } + //}() + + app := initSrv(repo, logger, watchWorker) + + if err := app.Listen(":3000"); err != nil { + logger.Fatal("Error server", zap.Error(err)) + } +}