diff --git a/tests/benchmarks/sse_watcher_test.go b/tests/benchmarks/sse_watcher_test.go index 51944c7..5e87f8d 100644 --- a/tests/benchmarks/sse_watcher_test.go +++ b/tests/benchmarks/sse_watcher_test.go @@ -207,13 +207,13 @@ func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID strin //goarch: amd64 //pkg: penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks //cpu: AMD Ryzen 5 5600H with Radeon Graphics -//BenchmarkAccountPipe/COUNT_10-12 1 27025944800 ns/op 20398952 B/op 240564 allocs/op -//BenchmarkAccountPipe/COUNT_20-12 1 27026492600 ns/op 22625056 B/op 270549 allocs/op -//BenchmarkAccountPipe/COUNT_30-12 1 27025340100 ns/op 24004512 B/op 287960 allocs/op -//BenchmarkAccountPipe/COUNT_40-12 1 27025848800 ns/op 27199352 B/op 330753 allocs/op -//BenchmarkAccountPipe/COUNT_50-12 1 27903735600 ns/op 28512104 B/op 349569 allocs/op +//BenchmarkAccountPipe/COUNT_10-12 1 18041336300 ns/op 13851296 B/op 161900 allocs/op +//BenchmarkAccountPipe/COUNT_20-12 1 18034383800 ns/op 14816216 B/op 173328 allocs/op +//BenchmarkAccountPipe/COUNT_30-12 1 21409782300 ns/op 17761944 B/op 211032 allocs/op +//BenchmarkAccountPipe/COUNT_40-12 1 18021639700 ns/op 19411096 B/op 231789 allocs/op +//BenchmarkAccountPipe/COUNT_50-12 1 19064359600 ns/op 20592840 B/op 245590 allocs/op //PASS -//ok penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks 145.758s +//ok penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks 130.316s func BenchmarkAccountPipe(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) @@ -278,11 +278,11 @@ func BenchmarkAccountPipe(b *testing.B) { //line := scanner.Text() //fmt.Println("Received:", line) } - time.Sleep(10 * time.Second) + time.Sleep(2 * time.Second) }(j) } wg.Wait() - time.Sleep(2 * time.Second) + time.Sleep(1 * time.Second) } }) } @@ -303,79 +303,79 @@ func BenchmarkAccountPipe(b *testing.B) { //BenchmarkAccountPipeWithWorker/COUNT_50-12 1 68030922700 ns/op 48254768 B/op 567123 allocs/op //PASS -func BenchmarkAccountPipeWithWorker(b *testing.B) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - logger, err := zap.NewProduction(zap.AddStacktrace(zap.DPanicLevel)) - if err != nil { - log.Fatalf("failed to init zap logger: %v", err) - } - client, collection := initDB(ctx) - defer client.Disconnect(ctx) - - go func() { - for { - select { - case <-ctx.Done(): - fmt.Println("DROPED") - return - default: - for i := 0; i < 100; i++ { - _, err := collection.UpdateOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i)}, - bson.M{"$set": bson.M{"field": fmt.Sprintf("value-%d", time.Now().UnixNano())}}) - if err != nil { - logger.Error("failed update", zap.Error(err)) - } - } - time.Sleep(1 * time.Second) - } - } - }() - - watchWorker := workers.NewWatchWorker(collection, logger) - repo := &AccountRepository{mongoDB: collection, logger: logger} - app := initSrv(repo, logger, watchWorker) - - go func() { - if err := app.Listen(":3000"); err != nil { - b.Fatal(err) - } - }() - time.Sleep(5 * time.Second) - - for _, n := range []int{10, 20, 30, 40, 50} { - b.Run(fmt.Sprintf("COUNT_%d", n), func(b *testing.B) { - var wg sync.WaitGroup - for i := 0; i < b.N; i++ { - wg.Add(n) - for j := 0; j < n; j++ { - go func(j int) { - defer wg.Done() - userID := fmt.Sprintf("user%d", j) - //fmt.Println(userID) - resp, err := http.Get(fmt.Sprintf("http://localhost:3000/account-pipe-wc/%s", userID)) - require.NoError(b, err) - defer resp.Body.Close() - scanner := bufio.NewScanner(resp.Body) - count := 0 - for scanner.Scan() { - if count == 25 { - break - } - count++ - //line := scanner.Text() - //fmt.Println("Received:", line) - } - time.Sleep(2 * time.Second) - }(j) - } - wg.Wait() - time.Sleep(1 * time.Second) - } - }) - } - - if err := app.Shutdown(); err != nil { - b.Fatalf("error return srv: %s", err.Error()) - } -} +//func BenchmarkAccountPipeWithWorker(b *testing.B) { +// ctx, cancel := context.WithCancel(context.Background()) +// defer cancel() +// logger, err := zap.NewProduction(zap.AddStacktrace(zap.DPanicLevel)) +// if err != nil { +// log.Fatalf("failed to init zap logger: %v", err) +// } +// client, collection := initDB(ctx) +// defer client.Disconnect(ctx) +// +// go func() { +// for { +// select { +// case <-ctx.Done(): +// fmt.Println("DROPED") +// return +// default: +// for i := 0; i < 100; i++ { +// _, err := collection.UpdateOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i)}, +// bson.M{"$set": bson.M{"field": fmt.Sprintf("value-%d", time.Now().UnixNano())}}) +// if err != nil { +// logger.Error("failed update", zap.Error(err)) +// } +// } +// time.Sleep(1 * time.Second) +// } +// } +// }() +// +// watchWorker := workers.NewWatchWorker(collection, logger) +// repo := &AccountRepository{mongoDB: collection, logger: logger} +// app := initSrv(repo, logger, watchWorker) +// +// go func() { +// if err := app.Listen(":3000"); err != nil { +// b.Fatal(err) +// } +// }() +// time.Sleep(5 * time.Second) +// +// for _, n := range []int{10, 20, 30, 40, 50} { +// b.Run(fmt.Sprintf("COUNT_%d", n), func(b *testing.B) { +// var wg sync.WaitGroup +// for i := 0; i < b.N; i++ { +// wg.Add(n) +// for j := 0; j < n; j++ { +// go func(j int) { +// defer wg.Done() +// userID := fmt.Sprintf("user%d", j) +// //fmt.Println(userID) +// resp, err := http.Get(fmt.Sprintf("http://localhost:3000/account-pipe-wc/%s", userID)) +// require.NoError(b, err) +// defer resp.Body.Close() +// scanner := bufio.NewScanner(resp.Body) +// count := 0 +// for scanner.Scan() { +// if count == 25 { +// break +// } +// count++ +// //line := scanner.Text() +// //fmt.Println("Received:", line) +// } +// time.Sleep(2 * time.Second) +// }(j) +// } +// wg.Wait() +// time.Sleep(1 * time.Second) +// } +// }) +// } +// +// if err := app.Shutdown(); err != nil { +// b.Fatalf("error return srv: %s", err.Error()) +// } +//}