separation of server and benchmark

This commit is contained in:
Pavel 2024-07-21 23:03:14 +03:00
parent b9d1a165e2
commit 1e9237a7e8
2 changed files with 287 additions and 313 deletions

@ -2,207 +2,14 @@ package benchmarks
import (
"bufio"
"context"
"encoding/json"
"fmt"
"github.com/gofiber/fiber/v2"
"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
"log"
"net/http"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/workers"
"sync"
"testing"
"time"
)
func initDB(ctx context.Context) (*mongo.Client, *mongo.Collection) {
clientOptions := options.Client().ApplyURI("mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0&readPreference=primary&ssl=false")
client, err := mongo.Connect(context.Background(), clientOptions)
if err != nil {
panic(err)
}
collection := client.Database("testdb").Collection("testcollection")
collection.Drop(ctx)
for i := 0; i < 1000; i++ {
_, err := collection.InsertOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i), "field": "value"})
if err != nil {
panic(err)
}
}
return client, collection
}
func initSrv(repo *AccountRepository, logger *zap.Logger, watchWorker *workers.WatchWorker) *fiber.App {
app := fiber.New()
controller := &AccountController{
accountRepo: repo,
logger: logger,
watchWorker: watchWorker,
}
app.Get("/account-pipe/:userID", controller.AccountPipe)
app.Use("/account-pipe-wc/:userID", controller.AccountPipeWC)
return app
}
type AccountController struct {
accountRepo *AccountRepository
logger *zap.Logger
watchWorker *workers.WatchWorker
}
func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error {
userID := ctx.Params("userID")
ctx.Set(fiber.HeaderContentType, "text/event-stream")
ctx.Set("Cache-Control", "no-cache")
ctx.Set("Connection", "keep-alive")
ctx.Set("Transfer-Encoding", "chunked")
accountCh := make(chan map[string]interface{})
cancelCtx, cancel := context.WithCancel(ctx.Context())
go func(ctx context.Context) {
defer close(accountCh)
if err := receiver.accountRepo.AccountPipe(ctx, userID, accountCh); err != nil {
//receiver.logger.Error("error in account pipe repo method", zap.Error(err))
}
}(cancelCtx)
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
pingTicker := time.NewTicker(5 * time.Second)
defer pingTicker.Stop()
for {
select {
case accountData, ok := <-accountCh:
if !ok {
return
}
accountJSON, err := json.Marshal(accountData)
if err != nil {
//receiver.logger.Error("error marshal account JSON", zap.Error(err))
continue
}
fmt.Fprintf(w, "data: %s\n\n", accountJSON)
if err := w.Flush(); err != nil {
//receiver.logger.Error("error flushing", zap.Error(err))
cancel()
//receiver.logger.Info("Close connection Account Pipe sse")
return
}
case <-pingTicker.C:
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
if err := w.Flush(); err != nil {
//receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
cancel()
return
}
}
}
})
return nil
}
func (receiver *AccountController) AccountPipeWC(ctx *fiber.Ctx) error {
userID := ctx.Params("userID")
ctx.Set(fiber.HeaderContentType, "text/event-stream")
ctx.Set("Cache-Control", "no-cache")
ctx.Set("Connection", "keep-alive")
ctx.Set("Transfer-Encoding", "chunked")
accountCh := make(chan map[string]interface{})
//cancelCtx, cancel := context.WithCancel(ctx.Context())
receiver.watchWorker.AddChannel(userID, accountCh)
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
pingTicker := time.NewTicker(5 * time.Second)
defer pingTicker.Stop()
for {
select {
case accountData, ok := <-accountCh:
if !ok {
return
}
accountJSON, err := json.Marshal(accountData)
if err != nil {
receiver.logger.Error("error marshal account JSON", zap.Error(err))
continue
}
fmt.Fprintf(w, "data: %s\n\n", accountJSON)
if err := w.Flush(); err != nil {
//receiver.logger.Error("error flushing", zap.Error(err))
//cancel()
//receiver.logger.Info("Close connection Account Pipe sse")
//receiver.watchWorker.DropChannel(userID)
return
}
case <-pingTicker.C:
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
if err := w.Flush(); err != nil {
//receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
//receiver.watchWorker.DropChannel(userID)
//cancel()
return
}
}
}
})
return nil
}
type AccountRepository struct {
mongoDB *mongo.Collection
logger *zap.Logger
}
func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID string, accountCh chan<- map[string]interface{}) error {
pipeline := mongo.Pipeline{
{{"$match", bson.M{"operationType": "update", "fullDocument.userId": userID}}},
}
opts := options.ChangeStream()
opts.SetFullDocument(options.UpdateLookup)
changeStream, err := receiver.mongoDB.Watch(ctx, pipeline, opts)
if err != nil {
return err
}
defer changeStream.Close(ctx)
for {
select {
case <-ctx.Done():
//receiver.logger.Info("Context canceled, thread is closed now")
return nil
default:
if changeStream.Next(ctx) {
var changeEvent struct {
UpdateDescription struct {
UpdatedFields map[string]interface{} `bson:"updatedFields"`
} `bson:"updateDescription"`
}
if err := changeStream.Decode(&changeEvent); err != nil {
//receiver.logger.Error("error decoding change event", zap.Error(err))
continue
}
select {
case accountCh <- changeEvent.UpdateDescription.UpdatedFields:
case <-ctx.Done():
return nil
}
}
}
}
}
//goos: windows
//goarch: amd64
//pkg: penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks
@ -215,82 +22,41 @@ func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID strin
//PASS
//ok penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks 130.316s
func BenchmarkAccountPipe(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger, err := zap.NewProduction(zap.AddStacktrace(zap.DPanicLevel))
if err != nil {
log.Fatalf("failed to init zap logger: %v", err)
}
client, collection := initDB(ctx)
defer client.Disconnect(ctx)
go func() {
for {
select {
case <-ctx.Done():
fmt.Println("DROPED")
return
default:
for i := 0; i < 100; i++ {
_, err := collection.UpdateOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i)},
bson.M{"$set": bson.M{"field": fmt.Sprintf("value-%d", time.Now().UnixNano())}})
if err != nil {
logger.Error("failed update", zap.Error(err))
}
}
time.Sleep(1 * time.Second)
}
}
}()
watchWorker := workers.NewWatchWorker(collection, logger)
repo := &AccountRepository{mongoDB: collection, logger: logger}
app := initSrv(repo, logger, watchWorker)
go func() {
if err := app.Listen(":3000"); err != nil {
b.Fatal(err)
}
}()
time.Sleep(5 * time.Second)
for _, n := range []int{10, 20, 30, 40, 50} {
b.Run(fmt.Sprintf("COUNT_%d", n), func(b *testing.B) {
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(n)
for j := 0; j < n; j++ {
go func(j int) {
defer wg.Done()
userID := fmt.Sprintf("user%d", j)
//fmt.Println(userID)
resp, err := http.Get(fmt.Sprintf("http://localhost:3000/account-pipe/%s", userID))
require.NoError(b, err)
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
count := 0
for scanner.Scan() {
if count == 25 {
break
}
count++
//line := scanner.Text()
//fmt.Println("Received:", line)
}
time.Sleep(2 * time.Second)
}(j)
}
wg.Wait()
time.Sleep(1 * time.Second)
}
})
}
if err := app.Shutdown(); err != nil {
b.Fatalf("error return srv: %s", err.Error())
}
}
//func BenchmarkAccountPipe(b *testing.B) {
// for _, n := range []int{100} {
// b.Run(fmt.Sprintf("COUNT_%d", n), func(b *testing.B) {
// var wg sync.WaitGroup
// for i := 0; i < b.N; i++ {
// wg.Add(n)
// for j := 0; j < n; j++ {
// go func(j int) {
// defer wg.Done()
// userID := fmt.Sprintf("user%d", j)
// //fmt.Println(userID)
// resp, err := http.Get(fmt.Sprintf("http://localhost:3000/account-pipe/%s", userID))
// require.NoError(b, err)
// defer resp.Body.Close()
// scanner := bufio.NewScanner(resp.Body)
// count := 0
// for scanner.Scan() {
// if count == 25 {
// break
// }
// count++
// //line := scanner.Text()
// //fmt.Println("Received:", line)
// }
// time.Sleep(2 * time.Second)
// }(j)
// time.Sleep(time.Second)
// }
// wg.Wait()
// fmt.Println("WAITER OK")
// time.Sleep(1 * time.Second)
// }
// })
// }
//}
//goos: windows
//goarch: amd64
@ -304,46 +70,7 @@ func BenchmarkAccountPipe(b *testing.B) {
//PASS
func BenchmarkAccountPipeWithWorker(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger, err := zap.NewProduction(zap.AddStacktrace(zap.DPanicLevel))
if err != nil {
log.Fatalf("failed to init zap logger: %v", err)
}
client, collection := initDB(ctx)
defer client.Disconnect(ctx)
go func() {
for {
select {
case <-ctx.Done():
fmt.Println("DROPED")
return
default:
for i := 0; i < 100; i++ {
_, err := collection.UpdateOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i)},
bson.M{"$set": bson.M{"field": fmt.Sprintf("value-%d", time.Now().UnixNano())}})
if err != nil {
logger.Error("failed update", zap.Error(err))
}
}
time.Sleep(1 * time.Second)
}
}
}()
watchWorker := workers.NewWatchWorker(collection, logger)
repo := &AccountRepository{mongoDB: collection, logger: logger}
app := initSrv(repo, logger, watchWorker)
go func() {
if err := app.Listen(":3000"); err != nil {
b.Fatal(err)
}
}()
time.Sleep(5 * time.Second)
for _, n := range []int{10, 20, 30, 40, 50} {
for _, n := range []int{500} {
b.Run(fmt.Sprintf("COUNT_%d", n), func(b *testing.B) {
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
@ -368,14 +95,11 @@ func BenchmarkAccountPipeWithWorker(b *testing.B) {
}
time.Sleep(2 * time.Second)
}(j)
time.Sleep(time.Second)
}
wg.Wait()
time.Sleep(1 * time.Second)
}
})
}
if err := app.Shutdown(); err != nil {
b.Fatalf("error return srv: %s", err.Error())
}
}

250
tests/server/main.go Normal file

@ -0,0 +1,250 @@
package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"github.com/gofiber/fiber/v2"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
"log"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/workers"
"time"
)
func initDB(ctx context.Context) (*mongo.Client, *mongo.Collection) {
clientOptions := options.Client().ApplyURI("mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0&readPreference=primary&ssl=false")
client, err := mongo.Connect(context.Background(), clientOptions)
if err != nil {
panic(err)
}
collection := client.Database("testdb").Collection("testcollection")
collection.Drop(ctx)
for i := 0; i < 1000; i++ {
_, err := collection.InsertOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i), "field": "value"})
if err != nil {
panic(err)
}
}
return client, collection
}
func initSrv(repo *AccountRepository, logger *zap.Logger, watchWorker *workers.WatchWorker) *fiber.App {
app := fiber.New()
controller := &AccountController{
accountRepo: repo,
logger: logger,
watchWorker: watchWorker,
}
app.Get("/account-pipe/:userID", controller.AccountPipe)
app.Use("/account-pipe-wc/:userID", controller.AccountPipeWC)
return app
}
type AccountController struct {
accountRepo *AccountRepository
logger *zap.Logger
watchWorker *workers.WatchWorker
}
func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error {
userID := ctx.Params("userID")
ctx.Set(fiber.HeaderContentType, "text/event-stream")
ctx.Set("Cache-Control", "no-cache")
ctx.Set("Connection", "keep-alive")
ctx.Set("Transfer-Encoding", "chunked")
accountCh := make(chan map[string]interface{})
cancelCtx, cancel := context.WithCancel(ctx.Context())
go func(ctx context.Context) {
defer close(accountCh)
if err := receiver.accountRepo.AccountPipe(ctx, userID, accountCh); err != nil {
//receiver.logger.Error("error in account pipe repo method", zap.Error(err))
}
}(cancelCtx)
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
pingTicker := time.NewTicker(5 * time.Second)
defer pingTicker.Stop()
for {
select {
case accountData, ok := <-accountCh:
if !ok {
return
}
accountJSON, err := json.Marshal(accountData)
if err != nil {
//receiver.logger.Error("error marshal account JSON", zap.Error(err))
continue
}
fmt.Fprintf(w, "data: %s\n\n", accountJSON)
if err := w.Flush(); err != nil {
//receiver.logger.Error("error flushing", zap.Error(err))
cancel()
//receiver.logger.Info("Close connection Account Pipe sse")
return
}
case <-pingTicker.C:
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
if err := w.Flush(); err != nil {
//receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
cancel()
return
}
}
}
})
return nil
}
func (receiver *AccountController) AccountPipeWC(ctx *fiber.Ctx) error {
userID := ctx.Params("userID")
ctx.Set(fiber.HeaderContentType, "text/event-stream")
ctx.Set("Cache-Control", "no-cache")
ctx.Set("Connection", "keep-alive")
ctx.Set("Transfer-Encoding", "chunked")
accountCh := make(chan map[string]interface{})
//cancelCtx, cancel := context.WithCancel(ctx.Context())
receiver.watchWorker.AddChannel(userID, accountCh)
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
pingTicker := time.NewTicker(5 * time.Second)
defer pingTicker.Stop()
for {
select {
case accountData, ok := <-accountCh:
if !ok {
return
}
accountJSON, err := json.Marshal(accountData)
if err != nil {
receiver.logger.Error("error marshal account JSON", zap.Error(err))
continue
}
fmt.Fprintf(w, "data: %s\n\n", accountJSON)
if err := w.Flush(); err != nil {
//receiver.logger.Error("error flushing", zap.Error(err))
//cancel()
//receiver.logger.Info("Close connection Account Pipe sse")
//receiver.watchWorker.DropChannel(userID)
return
}
case <-pingTicker.C:
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
if err := w.Flush(); err != nil {
//receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
//receiver.watchWorker.DropChannel(userID)
//cancel()
return
}
}
}
})
return nil
}
type AccountRepository struct {
mongoDB *mongo.Collection
logger *zap.Logger
}
func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID string, accountCh chan<- map[string]interface{}) error {
pipeline := mongo.Pipeline{
{{"$match", bson.M{"operationType": "update", "fullDocument.userId": userID}}},
}
opts := options.ChangeStream()
opts.SetFullDocument(options.UpdateLookup)
changeStream, err := receiver.mongoDB.Watch(ctx, pipeline, opts)
if err != nil {
return err
}
defer changeStream.Close(ctx)
for {
select {
case <-ctx.Done():
//receiver.logger.Info("Context canceled, thread is closed now")
return nil
default:
if changeStream.Next(ctx) {
var changeEvent struct {
UpdateDescription struct {
UpdatedFields map[string]interface{} `bson:"updatedFields"`
} `bson:"updateDescription"`
}
if err := changeStream.Decode(&changeEvent); err != nil {
//receiver.logger.Error("error decoding change event", zap.Error(err))
continue
}
select {
case accountCh <- changeEvent.UpdateDescription.UpdatedFields:
case <-ctx.Done():
return nil
}
}
}
}
}
func main() {
ctx := context.Background()
logger, err := zap.NewProduction()
if err != nil {
log.Fatalf(err.Error())
}
defer logger.Sync()
client, collection := initDB(ctx)
defer func() {
if err := client.Disconnect(ctx); err != nil {
logger.Fatal("Error close mongo", zap.Error(err))
}
}()
repo := &AccountRepository{
mongoDB: collection,
logger: logger,
}
watchWorker := workers.NewWatchWorker(collection, logger)
go watchWorker.Run(ctx)
//go func() {
// for {
// select {
// case <-ctx.Done():
// fmt.Println("stop update")
// return
// default:
// for i := 0; i < 800; i++ {
// _, err := collection.UpdateOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i)},
// bson.M{"$set": bson.M{"field": fmt.Sprintf("value-%d", time.Now().UnixNano())}})
// if err != nil {
// logger.Error("error update", zap.Error(err))
// }
// }
// time.Sleep(1 * time.Second)
// }
// }
//}()
app := initSrv(repo, logger, watchWorker)
if err := app.Listen(":3000"); err != nil {
logger.Fatal("Error server", zap.Error(err))
}
}