package benchmarks import ( "bufio" "context" "encoding/json" "fmt" "github.com/gofiber/fiber/v2" "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" "log" "net/http" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/workers" "sync" "testing" "time" ) func initDB(ctx context.Context) (*mongo.Client, *mongo.Collection) { clientOptions := options.Client().ApplyURI("mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0&readPreference=primary&ssl=false") client, err := mongo.Connect(context.Background(), clientOptions) if err != nil { panic(err) } collection := client.Database("testdb").Collection("testcollection") collection.Drop(ctx) for i := 0; i < 1000; i++ { _, err := collection.InsertOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i), "field": "value"}) if err != nil { panic(err) } } return client, collection } func initSrv(repo *AccountRepository, logger *zap.Logger, watchWorker *workers.WatchWorker) *fiber.App { app := fiber.New() controller := &AccountController{ accountRepo: repo, logger: logger, watchWorker: watchWorker, } app.Get("/account-pipe/:userID", controller.AccountPipe) app.Use("/account-pipe-wc/:userID", controller.AccountPipeWC) return app } type AccountController struct { accountRepo *AccountRepository logger *zap.Logger watchWorker *workers.WatchWorker } func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error { userID := ctx.Params("userID") ctx.Set(fiber.HeaderContentType, "text/event-stream") ctx.Set("Cache-Control", "no-cache") ctx.Set("Connection", "keep-alive") ctx.Set("Transfer-Encoding", "chunked") accountCh := make(chan map[string]interface{}) cancelCtx, cancel := context.WithCancel(ctx.Context()) go func(ctx context.Context) { defer close(accountCh) if err := receiver.accountRepo.AccountPipe(ctx, userID, accountCh); err != nil { //receiver.logger.Error("error in account pipe repo method", zap.Error(err)) } }(cancelCtx) ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) { pingTicker := time.NewTicker(5 * time.Second) defer pingTicker.Stop() for { select { case accountData, ok := <-accountCh: if !ok { return } accountJSON, err := json.Marshal(accountData) if err != nil { //receiver.logger.Error("error marshal account JSON", zap.Error(err)) continue } fmt.Fprintf(w, "data: %s\n\n", accountJSON) if err := w.Flush(); err != nil { //receiver.logger.Error("error flushing", zap.Error(err)) cancel() //receiver.logger.Info("Close connection Account Pipe sse") return } case <-pingTicker.C: fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`) if err := w.Flush(); err != nil { //receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err)) cancel() return } } } }) return nil } func (receiver *AccountController) AccountPipeWC(ctx *fiber.Ctx) error { userID := ctx.Params("userID") ctx.Set(fiber.HeaderContentType, "text/event-stream") ctx.Set("Cache-Control", "no-cache") ctx.Set("Connection", "keep-alive") ctx.Set("Transfer-Encoding", "chunked") accountCh := make(chan map[string]interface{}) //cancelCtx, cancel := context.WithCancel(ctx.Context()) receiver.watchWorker.AddChannel(userID, accountCh) ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) { pingTicker := time.NewTicker(5 * time.Second) defer pingTicker.Stop() for { select { case accountData, ok := <-accountCh: if !ok { return } accountJSON, err := json.Marshal(accountData) if err != nil { receiver.logger.Error("error marshal account JSON", zap.Error(err)) continue } fmt.Fprintf(w, "data: %s\n\n", accountJSON) if err := w.Flush(); err != nil { //receiver.logger.Error("error flushing", zap.Error(err)) //cancel() //receiver.logger.Info("Close connection Account Pipe sse") //receiver.watchWorker.DropChannel(userID) return } case <-pingTicker.C: fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`) if err := w.Flush(); err != nil { //receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err)) //receiver.watchWorker.DropChannel(userID) //cancel() return } } } }) return nil } type AccountRepository struct { mongoDB *mongo.Collection logger *zap.Logger } func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID string, accountCh chan<- map[string]interface{}) error { pipeline := mongo.Pipeline{ {{"$match", bson.M{"operationType": "update", "fullDocument.userId": userID}}}, } opts := options.ChangeStream() opts.SetFullDocument(options.UpdateLookup) changeStream, err := receiver.mongoDB.Watch(ctx, pipeline, opts) if err != nil { return err } defer changeStream.Close(ctx) for { select { case <-ctx.Done(): //receiver.logger.Info("Context canceled, thread is closed now") return nil default: if changeStream.Next(ctx) { var changeEvent struct { UpdateDescription struct { UpdatedFields map[string]interface{} `bson:"updatedFields"` } `bson:"updateDescription"` } if err := changeStream.Decode(&changeEvent); err != nil { //receiver.logger.Error("error decoding change event", zap.Error(err)) continue } select { case accountCh <- changeEvent.UpdateDescription.UpdatedFields: case <-ctx.Done(): return nil } } } } } //goos: windows //goarch: amd64 //pkg: penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks //cpu: AMD Ryzen 5 5600H with Radeon Graphics //BenchmarkAccountPipe/COUNT_10-12 1 18041336300 ns/op 13851296 B/op 161900 allocs/op //BenchmarkAccountPipe/COUNT_20-12 1 18034383800 ns/op 14816216 B/op 173328 allocs/op //BenchmarkAccountPipe/COUNT_30-12 1 21409782300 ns/op 17761944 B/op 211032 allocs/op //BenchmarkAccountPipe/COUNT_40-12 1 18021639700 ns/op 19411096 B/op 231789 allocs/op //BenchmarkAccountPipe/COUNT_50-12 1 19064359600 ns/op 20592840 B/op 245590 allocs/op //PASS //ok penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks 130.316s func BenchmarkAccountPipe(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() logger, err := zap.NewProduction(zap.AddStacktrace(zap.DPanicLevel)) if err != nil { log.Fatalf("failed to init zap logger: %v", err) } client, collection := initDB(ctx) defer client.Disconnect(ctx) go func() { for { select { case <-ctx.Done(): fmt.Println("DROPED") return default: for i := 0; i < 100; i++ { _, err := collection.UpdateOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i)}, bson.M{"$set": bson.M{"field": fmt.Sprintf("value-%d", time.Now().UnixNano())}}) if err != nil { logger.Error("failed update", zap.Error(err)) } } time.Sleep(1 * time.Second) } } }() watchWorker := workers.NewWatchWorker(collection, logger) repo := &AccountRepository{mongoDB: collection, logger: logger} app := initSrv(repo, logger, watchWorker) go func() { if err := app.Listen(":3000"); err != nil { b.Fatal(err) } }() time.Sleep(5 * time.Second) for _, n := range []int{10, 20, 30, 40, 50} { b.Run(fmt.Sprintf("COUNT_%d", n), func(b *testing.B) { var wg sync.WaitGroup for i := 0; i < b.N; i++ { wg.Add(n) for j := 0; j < n; j++ { go func(j int) { defer wg.Done() userID := fmt.Sprintf("user%d", j) //fmt.Println(userID) resp, err := http.Get(fmt.Sprintf("http://localhost:3000/account-pipe/%s", userID)) require.NoError(b, err) defer resp.Body.Close() scanner := bufio.NewScanner(resp.Body) count := 0 for scanner.Scan() { if count == 25 { break } count++ //line := scanner.Text() //fmt.Println("Received:", line) } time.Sleep(2 * time.Second) }(j) } wg.Wait() time.Sleep(1 * time.Second) } }) } if err := app.Shutdown(); err != nil { b.Fatalf("error return srv: %s", err.Error()) } } //goos: windows //goarch: amd64 //pkg: penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks //cpu: AMD Ryzen 5 5600H with Radeon Graphics //BenchmarkAccountPipeWithWorker/COUNT_10-12 1 68031591200 ns/op 45935456 B/op 545790 allocs/op //BenchmarkAccountPipeWithWorker/COUNT_20-12 1 68020876100 ns/op 46873480 B/op 557504 allocs/op //BenchmarkAccountPipeWithWorker/COUNT_30-12 1 68048437700 ns/op 47456704 B/op 562015 allocs/op //BenchmarkAccountPipeWithWorker/COUNT_40-12 1 68025191100 ns/op 47200848 B/op 556732 allocs/op //BenchmarkAccountPipeWithWorker/COUNT_50-12 1 68030922700 ns/op 48254768 B/op 567123 allocs/op //PASS //func BenchmarkAccountPipeWithWorker(b *testing.B) { // ctx, cancel := context.WithCancel(context.Background()) // defer cancel() // logger, err := zap.NewProduction(zap.AddStacktrace(zap.DPanicLevel)) // if err != nil { // log.Fatalf("failed to init zap logger: %v", err) // } // client, collection := initDB(ctx) // defer client.Disconnect(ctx) // // go func() { // for { // select { // case <-ctx.Done(): // fmt.Println("DROPED") // return // default: // for i := 0; i < 100; i++ { // _, err := collection.UpdateOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i)}, // bson.M{"$set": bson.M{"field": fmt.Sprintf("value-%d", time.Now().UnixNano())}}) // if err != nil { // logger.Error("failed update", zap.Error(err)) // } // } // time.Sleep(1 * time.Second) // } // } // }() // // watchWorker := workers.NewWatchWorker(collection, logger) // repo := &AccountRepository{mongoDB: collection, logger: logger} // app := initSrv(repo, logger, watchWorker) // // go func() { // if err := app.Listen(":3000"); err != nil { // b.Fatal(err) // } // }() // time.Sleep(5 * time.Second) // // for _, n := range []int{10, 20, 30, 40, 50} { // b.Run(fmt.Sprintf("COUNT_%d", n), func(b *testing.B) { // var wg sync.WaitGroup // for i := 0; i < b.N; i++ { // wg.Add(n) // for j := 0; j < n; j++ { // go func(j int) { // defer wg.Done() // userID := fmt.Sprintf("user%d", j) // //fmt.Println(userID) // resp, err := http.Get(fmt.Sprintf("http://localhost:3000/account-pipe-wc/%s", userID)) // require.NoError(b, err) // defer resp.Body.Close() // scanner := bufio.NewScanner(resp.Body) // count := 0 // for scanner.Scan() { // if count == 25 { // break // } // count++ // //line := scanner.Text() // //fmt.Println("Received:", line) // } // time.Sleep(2 * time.Second) // }(j) // } // wg.Wait() // time.Sleep(1 * time.Second) // } // }) // } // // if err := app.Shutdown(); err != nil { // b.Fatalf("error return srv: %s", err.Error()) // } //}