merge dev to update common with logic testSSE

This commit is contained in:
Pasha 2024-11-21 17:14:41 +03:00
commit 531216c6ff
15 changed files with 925 additions and 13 deletions

4
.gitignore vendored

@ -8,3 +8,7 @@ main
/versionrecover.bolt
/recover.bolt
/tests/benchmarks/cpu.prof
/tests/benchmarks/mem.prof
/tests/benchmarks/block.prof
/tests/benchmarks/benchmarks.test.exe

41
cmd/sse_bench/app/app.go Normal file

@ -0,0 +1,41 @@
package app
import (
"context"
"errors"
"fmt"
"gitea.pena/PenaSide/common/mongo"
"gitea.pena/PenaSide/customer/cmd/sse_bench/repository"
"gitea.pena/PenaSide/customer/internal/models"
"gitea.pena/PenaSide/customer/pkg/closer"
"go.uber.org/zap"
"os/signal"
"syscall"
"time"
)
func Run(config *models.Config, logger *zap.Logger) (appErr error) {
defer func() {
if recovered := recover(); recovered != nil {
appErr = errors.New("recovered panic on application run")
logger.Error("recovered panic on application run", zap.Any("recovered", recovered))
}
}()
closer := closer.New()
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
mongoDB, err := mongo.Connect(ctx, &mongo.ConnectDeps{
Configuration: &config.Database,
Timeout: 10 * time.Second,
})
if err != nil {
return fmt.Errorf("failed connection to db: %w", err)
}
accountRepo := repository.NewAccountRepository(repository.Deps{
Logger: logger,
MongoDB: mongoDB,
})
}

@ -0,0 +1,99 @@
package controller
import (
"bufio"
"encoding/json"
"fmt"
"gitea.pena/PenaSide/customer/cmd/sse_bench/repository"
"gitea.pena/PenaSide/customer/cmd/sse_bench/worker"
"gitea.pena/PenaSide/customer/internal/interface/client"
"gitea.pena/PenaSide/customer/internal/interface/controller/http"
"github.com/gofiber/fiber/v2"
"go.uber.org/zap"
"time"
)
type Deps struct {
MiddleWare *http.MiddleWare
AccountRepo *repository.AccountRepository
Logger *zap.Logger
WatchWorker *worker.WatchWorker
}
type AccountController struct {
middleWare *http.MiddleWare
accountRepo *repository.AccountRepository
logger *zap.Logger
authClient *client.AuthClient
watchWorker *worker.WatchWorker
}
func NewAccountController(deps Deps) *AccountController {
return &AccountController{
middleWare: deps.MiddleWare,
accountRepo: deps.AccountRepo,
logger: deps.Logger,
watchWorker: deps.WatchWorker,
}
}
func (receiver *AccountController) Register(router fiber.Router) {
router.Get("/account/pipe", receiver.AccountPipe)
}
func (receiver *AccountController) Name() string {
return ""
}
func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error {
userID, ok := receiver.middleWare.ExtractUserID(ctx)
if !ok || userID == "" {
return receiver.middleWare.NoAuth(ctx)
}
ctx.Set(fiber.HeaderContentType, "text/event-stream")
ctx.Set("Cache-Control", "no-cache")
ctx.Set("Connection", "keep-alive")
ctx.Set("Transfer-Encoding", "chunked")
accountCh := make(chan map[string]interface{})
//cancelCtx, cancel := context.WithCancel(ctx.Context())
receiver.watchWorker.AddChannel(userID, accountCh)
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
pingTicker := time.NewTicker(5 * time.Second)
defer pingTicker.Stop()
for {
select {
case accountData, ok := <-accountCh:
if !ok {
return
}
accountJSON, err := json.Marshal(accountData)
if err != nil {
receiver.logger.Error("error marshal account JSON", zap.Error(err))
continue
}
fmt.Fprintf(w, "data: %s\n\n", accountJSON)
if err := w.Flush(); err != nil {
receiver.logger.Error("error flushing", zap.Error(err))
//cancel()
//receiver.logger.Info("Close connection Account Pipe sse")
//receiver.watchWorker.DropChannel(userID)
return
}
case <-pingTicker.C:
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
if err := w.Flush(); err != nil {
receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
//receiver.watchWorker.DropChannel(userID)
//cancel()
return
}
}
}
})
return nil
}

1
cmd/sse_bench/main.go Normal file

@ -0,0 +1 @@
package sse_bench

@ -0,0 +1,74 @@
package repository
import (
"context"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
"log"
)
type Deps struct {
Logger *zap.Logger
MongoDB *mongo.Collection
}
type AccountRepository struct {
logger *zap.Logger
mongoDB *mongo.Collection
}
func NewAccountRepository(deps Deps) *AccountRepository {
if deps.Logger == nil {
log.Panicln("logger is nil on <NewAccountRepository>")
}
if deps.MongoDB == nil {
log.Panicln("mongodb is nil on <NewAccountRepository>")
}
return &AccountRepository{
logger: deps.Logger,
mongoDB: deps.MongoDB,
}
}
func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID string, accountCh chan<- map[string]interface{}) error {
pipeline := mongo.Pipeline{
{{"$match", bson.M{"operationType": "update", "fullDocument.userId": userID}}},
}
opts := options.ChangeStream()
opts.SetFullDocument(options.UpdateLookup)
changeStream, err := receiver.mongoDB.Watch(ctx, pipeline, opts)
if err != nil {
return err
}
defer changeStream.Close(ctx)
for {
select {
case <-ctx.Done():
receiver.logger.Info("Context canceled, thread is closed now")
return nil
default:
if changeStream.Next(ctx) {
var changeEvent struct {
UpdateDescription struct {
UpdatedFields map[string]interface{} `bson:"updatedFields"`
} `bson:"updateDescription"`
}
if err := changeStream.Decode(&changeEvent); err != nil {
receiver.logger.Error("error decoding change event", zap.Error(err))
continue
}
select {
case accountCh <- changeEvent.UpdateDescription.UpdatedFields:
case <-ctx.Done():
return nil
}
}
}
}
}

@ -0,0 +1,95 @@
package worker
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
"sync"
)
type WatchWorker struct {
accountChannels map[string][]chan map[string]interface{}
mu sync.Mutex
mongoDB *mongo.Collection
logger *zap.Logger
}
func NewWatchWorker(mongoDB *mongo.Collection, logger *zap.Logger) *WatchWorker {
return &WatchWorker{
accountChannels: make(map[string][]chan map[string]interface{}),
mongoDB: mongoDB,
logger: logger,
}
}
func (w *WatchWorker) Run(ctx context.Context) {
pipeline := mongo.Pipeline{
{{"$match", bson.M{"operationType": "update"}}},
}
opts := options.ChangeStream()
opts.SetFullDocument(options.UpdateLookup)
changeStream, err := w.mongoDB.Watch(ctx, pipeline, opts)
if err != nil {
w.logger.Error("error start change stream", zap.Error(err))
return
}
defer changeStream.Close(ctx)
for {
if changeStream.Next(ctx) {
var changeEvent struct {
FullDocument struct {
UserID string `bson:"userId"`
} `bson:"fullDocument"`
UpdateDescription struct {
UpdatedFields map[string]interface{} `bson:"updatedFields"`
} `bson:"updateDescription"`
}
if err := changeStream.Decode(&changeEvent); err != nil {
w.logger.Error("error decode change event", zap.Error(err))
continue
}
w.mu.Lock()
channels, ok := w.accountChannels[changeEvent.FullDocument.UserID]
w.mu.Unlock()
if !ok {
fmt.Println("userID", changeEvent.FullDocument.UserID)
fmt.Println("fields", changeEvent.UpdateDescription.UpdatedFields)
}
if ok {
for _, ch := range channels {
select {
case ch <- changeEvent.UpdateDescription.UpdatedFields:
case <-ctx.Done():
return
}
}
}
}
select {
case <-ctx.Done():
return
default:
fmt.Println("default select case")
}
}
}
func (w *WatchWorker) AddChannel(userID string, ch chan map[string]interface{}) {
w.mu.Lock()
defer w.mu.Unlock()
w.accountChannels[userID] = append(w.accountChannels[userID], ch)
}
// todo подумать как дропать если с одного устройства могут выйти на другом нет а ключ с массивом один
func (w *WatchWorker) DropChannel(userID string) {
w.mu.Lock()
defer w.mu.Unlock()
fmt.Println("drop chan, userID:", userID)
delete(w.accountChannels, userID)
}

2
go.mod

@ -5,6 +5,7 @@ go 1.22.0
require (
gitea.pena/PenaSide/common v0.0.0-20241120141501-1695a0981562
gitea.pena/PenaSide/linters-golang v0.0.0-20241114215743-9a8e7d58cf96
gitea.pena/PenaSide/trashlog v0.0.0-20241107132923-0f7d4d57eb4b
github.com/go-resty/resty/v2 v2.11.0
github.com/gofiber/fiber/v2 v2.52.1
github.com/golang-jwt/jwt/v5 v5.2.0
@ -23,7 +24,6 @@ require (
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2
gopkg.in/tucnak/telebot.v2 v2.5.0
penahub.gitlab.yandexcloud.net/external/trashlog v0.1.5
)
require (

4
go.sum

@ -3,6 +3,8 @@ gitea.pena/PenaSide/common v0.0.0-20241120141501-1695a0981562 h1:LobhWlICMcbCCI+
gitea.pena/PenaSide/common v0.0.0-20241120141501-1695a0981562/go.mod h1:l71j3W1yROhOSfjWZ6wcMuzjBR37gu2ZTcXsorEJoiw=
gitea.pena/PenaSide/linters-golang v0.0.0-20241114215743-9a8e7d58cf96 h1:m4EMXEhsA/glI6eJeZnRGUhYPSQdcWj3hzT2IDNlWS0=
gitea.pena/PenaSide/linters-golang v0.0.0-20241114215743-9a8e7d58cf96/go.mod h1:gdd+vOT6up9STkEbxa2qESLIMZFjCmRbkcheFQCVgZU=
gitea.pena/PenaSide/trashlog v0.0.0-20241107132923-0f7d4d57eb4b h1:wVswXBfVDI6xi16o2A0wt8i6nswHkgPdd6U7ak0KUjo=
gitea.pena/PenaSide/trashlog v0.0.0-20241107132923-0f7d4d57eb4b/go.mod h1:LwHJCrPVumS6Rdorrr8NLEi4kpCed6ZVcIYnmEqbIVM=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/ClickHouse/clickhouse-go v1.5.4 h1:cKjXeYLNWVJIx2J1K6H2CqyRmfwVJVY1OV1coaaFcI0=
github.com/ClickHouse/clickhouse-go v1.5.4/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
@ -313,5 +315,3 @@ gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
penahub.gitlab.yandexcloud.net/external/trashlog v0.1.5 h1:amsK0bkSJxBisk334aFo5ZmVPvN1dBT0Sv5j3V5IsT8=
penahub.gitlab.yandexcloud.net/external/trashlog v0.1.5/go.mod h1:J8kQNEP4bL7ZNKHxuT4tfe6a3FHyovpAPkyytN4qllc=

@ -7,12 +7,12 @@ import (
"gitea.pena/PenaSide/common/encrypt"
"gitea.pena/PenaSide/common/mongo"
"gitea.pena/PenaSide/customer/internal/interface/controller/http"
"gitea.pena/PenaSide/trashlog/app"
"gitea.pena/PenaSide/trashlog/wrappers/zaptrashlog"
"github.com/themakers/hlog"
"go.uber.org/zap/zapcore"
tb "gopkg.in/tucnak/telebot.v2"
"os/signal"
"penahub.gitlab.yandexcloud.net/external/trashlog/app"
"penahub.gitlab.yandexcloud.net/external/trashlog/wrappers/zaptrashlog"
"syscall"
"time"

@ -6,15 +6,15 @@ import (
"log"
"time"
mongoWrapper "gitea.pena/PenaSide/common/mongo"
"gitea.pena/PenaSide/customer/internal/errors"
"gitea.pena/PenaSide/customer/internal/fields"
"gitea.pena/PenaSide/customer/internal/models"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
mongoWrapper "gitea.pena/PenaSide/common/mongo"
"gitea.pena/PenaSide/customer/internal/errors"
"gitea.pena/PenaSide/customer/internal/fields"
"gitea.pena/PenaSide/customer/internal/models"
)
type AccountRepositoryDeps struct {

@ -0,0 +1,222 @@
package main
import (
"context"
"fmt"
"gitea.pena/PenaSide/customer/internal/interface/client"
"gitea.pena/PenaSide/customer/internal/models"
"gitea.pena/PenaSide/customer/internal/utils"
"gitea.pena/PenaSide/trashlog/wrappers/zaptrashlog"
"github.com/themakers/hlog"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"log"
"time"
)
var TrashLogHost = "localhost:7113"
var Version = "replenishment"
var Commit = "last"
var BuildTime = time.Now().Unix()
var ModuleLogger = "customer-replenishment"
var AuthServiceUrl = ""
var MongoURL = "mongodb://test:test@localhost:27020/?authSource=admin"
var MongoDatabase = "admin"
var AccountCollection = "accounts"
var HistoryCollection = "history"
func main() {
ctx := context.Background()
logger, err := zap.NewProduction(zap.AddStacktrace(zap.DPanicLevel))
if err != nil {
log.Fatalf("failed to init zap logger: %v", err)
}
clickHouseLogger, err := zaptrashlog.NewCore(ctx, zap.InfoLevel, TrashLogHost, Version, Commit, BuildTime)
if err != nil {
panic(err)
}
loggerForHlog := logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewTee(core, clickHouseLogger)
}), zap.AddCallerSkip(2))
loggerHlog := hlog.New(loggerForHlog).Module(ModuleLogger)
loggerHlog.With(models.AllFields{})
authClient := client.NewAuthClient(client.AuthClientDeps{
Logger: logger,
URLs: &models.AuthMicroserviceURL{
User: AuthServiceUrl,
},
})
mongoClient, err := connectToMongoDB(ctx, MongoURL)
if err != nil {
logger.Fatal("failed to connect to MongoDB", zap.Error(err))
}
defer func() {
if err := mongoClient.Disconnect(ctx); err != nil {
logger.Error("failed to disconnect MongoDB client", zap.Error(err))
}
}()
err = SetAccountData(ctx, loggerHlog, authClient, mongoClient)
if err != nil {
logger.Error("failed set account data to clickHouse", zap.Error(err))
return
}
err = SetPayCartData(ctx, loggerHlog, mongoClient)
if err != nil {
logger.Error("failed set pay cart data to clickHouse", zap.Error(err))
return
}
time.Sleep(15 * time.Second)
}
func connectToMongoDB(ctx context.Context, mongoURL string) (*mongo.Client, error) {
clientOpts := options.Client().ApplyURI(mongoURL)
client, err := mongo.Connect(ctx, clientOpts)
if err != nil {
return nil, err
}
err = client.Ping(ctx, nil)
if err != nil {
return nil, err
}
return client, nil
}
func SetAccountData(ctx context.Context, logger hlog.Logger, authClient *client.AuthClient, mongoClient *mongo.Client) error {
cl := mongoClient.Database(MongoDatabase).Collection(AccountCollection)
filter := bson.M{"isDeleted": false}
opts := options.Find()
cursor, err := cl.Find(ctx, filter, opts)
if err != nil {
return err
}
defer cursor.Close(ctx)
var accounts []models.Account
for cursor.Next(ctx) {
var account models.Account
if err := cursor.Decode(&account); err != nil {
return err
}
accounts = append(accounts, account)
}
if err := cursor.Err(); err != nil {
return err
}
for _, account := range accounts {
logger.Emit(models.InfoGetAccount{
CtxUserID: account.UserID,
CtxAccountID: account.ID,
})
for _, item := range account.Cart {
logger.Emit(models.InfoAddToCart{
CtxUserID: account.UserID,
CtxAccountID: account.ID,
CtxTariffID: item,
})
}
userData, err := authClient.GetUser(ctx, account.UserID)
if err != nil {
fmt.Printf("error getting user from auth service,scip him, userid:%s, error: %s", account.UserID, err.Error())
continue
}
quiz := ""
if account.From != "" {
quiz = "quiz"
}
logger.Emit(models.InfoCreateAccount{
CtxUserID: account.UserID,
CtxAccountID: account.ID,
KeyFromSource: quiz,
KeyFromID: account.From,
KeyFromPartner: account.Partner,
CtxLogin: userData.Login,
CtxEmail: userData.Email,
CtxPhone: userData.PhoneNumber,
KeyCurrency: account.Wallet.Currency,
})
}
return nil
}
func SetPayCartData(ctx context.Context, logger hlog.Logger, mongoClient *mongo.Client) error {
clHistory := mongoClient.Database(MongoDatabase).Collection(HistoryCollection)
clAccount := mongoClient.Database(MongoDatabase).Collection(AccountCollection)
filter := bson.M{"isDeleted": false}
opts := options.Find()
cursor, err := clHistory.Find(ctx, filter, opts)
if err != nil {
return err
}
defer cursor.Close(ctx)
var histories []models.History
for cursor.Next(ctx) {
var history models.History
if err := cursor.Decode(&history); err != nil {
return err
}
histories = append(histories, history)
}
if err := cursor.Err(); err != nil {
return err
}
for _, history := range histories {
currentAccount, err := findByUserID(ctx, clAccount, history.UserID)
if err != nil {
fmt.Printf("Error getting account by userID:%s", err.Error())
continue
}
logger.Emit(models.InfoPayCart{
CtxUserID: history.UserID,
CtxAccountID: currentAccount.ID,
KeySuccess: uint8(1),
CtxPrice: history.RawDetails.Price,
//CtxTariff: strings.Join(account.Cart, ","),
//CtxDiscount: strings.Join(utils.GetAppliedDiscountsIDs(discountResponse.AppliedDiscounts), ","),
//CtxRowPrice: int64(tariffsAmount),
CtxRowData: utils.MarshalRawDetails(history.RawDetails),
})
}
return nil
}
func findByUserID(ctx context.Context, clAccount *mongo.Collection, userID string) (*models.Account, error) {
filter := bson.M{
"userId": userID,
"isDeleted": false,
}
var account models.Account
err := clAccount.FindOne(ctx, filter).Decode(&account)
if err != nil {
if err == mongo.ErrNoDocuments {
fmt.Printf("No account found for userID: %s", userID)
return nil, nil
}
return nil, err
}
return &account, nil
}

@ -0,0 +1,124 @@
package benchmarks
import (
"bufio"
"fmt"
"github.com/stretchr/testify/require"
"net/http"
"sync"
"testing"
"time"
)
//500
//PS D:\GoProject\pena_pj\customer\tests\benchmarks> go test -bench=BenchmarkAccountPipe -benchmem -timeout 30m
//WAITER OK
//goos: windows
//goarch: amd64
//pkg: penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks
//cpu: AMD Ryzen 5 5600H with Radeon Graphics
//BenchmarkAccountPipe/COUNT_500-12 1 540604425700 ns/op 16440056 B/op 77490 allocs/op
//PASS
//ok penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks 540.669s
//1000
//PS D:\GoProject\pena_pj\customer\tests\benchmarks> go test -bench=BenchmarkAccountPipe -benchmem -timeout 30m
//WAITER OK
//goos: windows
//goarch: amd64
//pkg: penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks
//cpu: AMD Ryzen 5 5600H with Radeon Graphics
//BenchmarkAccountPipe/COUNT_1000-12 1 1075578283500 ns/op 32757832 B/op 154755 allocs/op
//PASS
//ok penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks 1075.632s
func BenchmarkAccountPipe(b *testing.B) {
for _, n := range []int{1000} {
b.Run(fmt.Sprintf("COUNT_%d", n), func(b *testing.B) {
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(n)
for j := 0; j < n; j++ {
go func(j int) {
defer wg.Done()
userID := fmt.Sprintf("user%d", j)
//fmt.Println(userID)
resp, err := http.Get(fmt.Sprintf("http://localhost:3000/account-pipe/%s", userID))
require.NoError(b, err)
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
count := 0
for scanner.Scan() {
if count == 25 {
break
}
count++
//line := scanner.Text()
//fmt.Println("Received:", line)
}
time.Sleep(2 * time.Second)
}(j)
time.Sleep(time.Second)
}
wg.Wait()
fmt.Println("WAITER OK")
time.Sleep(1 * time.Second)
}
})
}
}
//500
//PS D:\GoProject\pena_pj\customer\tests\benchmarks> go test -bench=BenchmarkAccountPipeWithWorker -benchmem
//goos: windows
//goarch: amd64
//pkg: penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks
//cpu: AMD Ryzen 5 5600H with Radeon Graphics
//BenchmarkAccountPipeWithWorker/COUNT_500-12 1 571782250200 ns/op 16471520 B/op 77575 allocs/op
//PASS
//ok penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks 571.868s
//1000
//PS D:\GoProject\pena_pj\customer\tests\benchmarks> go test -bench=BenchmarkAccountPipeWithWorker -benchmem -timeout 30m
//goos: windows
//goarch: amd64
//pkg: penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks
//cpu: AMD Ryzen 5 5600H with Radeon Graphics
//BenchmarkAccountPipeWithWorker/COUNT_1000-12 1 1076407088000 ns/op 34201936 B/op 156101 allocs/op
//PASS
//ok penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks 1076.521s
func BenchmarkAccountPipeWithWorker(b *testing.B) {
for _, n := range []int{10} {
b.Run(fmt.Sprintf("COUNT_%d", n), func(b *testing.B) {
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(n)
for j := 0; j < n; j++ {
go func(j int) {
defer wg.Done()
userID := fmt.Sprintf("user%d", j)
//fmt.Println(userID)
resp, err := http.Get(fmt.Sprintf("http://localhost:3000/account-pipe-wc/%s", userID))
require.NoError(b, err)
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
count := 0
for scanner.Scan() {
if count == 25 {
break
}
count++
//line := scanner.Text()
//fmt.Println("Received:", line)
}
time.Sleep(2 * time.Second)
}(j)
time.Sleep(time.Second)
}
wg.Wait()
time.Sleep(1 * time.Second)
}
})
}
}

@ -13,7 +13,7 @@ import (
func TestAccountPipe(t *testing.T) {
jwtUtil := helpers.InitializeJWT()
token, tokenErr := jwtUtil.Create("64ebda4387392e122e5d411f")
token, tokenErr := jwtUtil.Create("64e53ed187392e122e5d3d50")
if !assert.NoError(t, tokenErr) {
return
}

@ -3,15 +3,15 @@ package integration
import (
"context"
"database/sql"
"gitea.pena/PenaSide/customer/internal/models"
"gitea.pena/PenaSide/trashlog/app"
"gitea.pena/PenaSide/trashlog/wrappers/zaptrashlog"
"github.com/stretchr/testify/assert"
"github.com/themakers/hlog"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"log"
"os"
"penahub.gitlab.yandexcloud.net/external/trashlog/app"
"penahub.gitlab.yandexcloud.net/external/trashlog/wrappers/zaptrashlog"
"gitea.pena/PenaSide/customer/internal/models"
"testing"
"time"
)

252
tests/server/main.go Normal file

@ -0,0 +1,252 @@
package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"gitea.pena/PenaSide/customer/cmd/sse_bench/worker"
"github.com/gofiber/fiber/v2"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
"log"
"time"
)
func initDB(ctx context.Context) (*mongo.Client, *mongo.Collection) {
clientOptions := options.Client().ApplyURI("mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0&readPreference=primary&ssl=false")
client, err := mongo.Connect(context.Background(), clientOptions)
if err != nil {
panic(err)
}
collection := client.Database("testdb").Collection("testcollection")
collection.Drop(ctx)
for i := 0; i < 1000; i++ {
_, err := collection.InsertOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i), "field": "value"})
if err != nil {
panic(err)
}
}
return client, collection
}
func initSrv(repo *AccountRepository, logger *zap.Logger, watchWorker *worker.WatchWorker) *fiber.App {
app := fiber.New()
controller := &AccountController{
accountRepo: repo,
logger: logger,
watchWorker: watchWorker,
}
app.Get("/account-pipe/:userID", controller.AccountPipe)
app.Use("/account-pipe-wc/:userID", controller.AccountPipeWC)
return app
}
type AccountController struct {
accountRepo *AccountRepository
logger *zap.Logger
watchWorker *worker.WatchWorker
}
func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error {
userID := ctx.Params("userID")
ctx.Set(fiber.HeaderContentType, "text/event-stream")
ctx.Set("Cache-Control", "no-cache")
ctx.Set("Connection", "keep-alive")
ctx.Set("Transfer-Encoding", "chunked")
accountCh := make(chan map[string]interface{})
cancelCtx, cancel := context.WithCancel(ctx.Context())
go func(ctx context.Context) {
defer close(accountCh)
if err := receiver.accountRepo.AccountPipe(ctx, userID, accountCh); err != nil {
//receiver.logger.Error("error in account pipe repo method", zap.Error(err))
}
}(cancelCtx)
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
pingTicker := time.NewTicker(5 * time.Second)
defer pingTicker.Stop()
for {
select {
case accountData, ok := <-accountCh:
if !ok {
return
}
accountJSON, err := json.Marshal(accountData)
if err != nil {
//receiver.logger.Error("error marshal account JSON", zap.Error(err))
continue
}
fmt.Fprintf(w, "data: %s\n\n", accountJSON)
if err := w.Flush(); err != nil {
//receiver.logger.Error("error flushing", zap.Error(err))
cancel()
//receiver.logger.Info("Close connection Account Pipe sse")
return
}
case <-pingTicker.C:
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
if err := w.Flush(); err != nil {
//receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
cancel()
return
}
}
}
})
return nil
}
func (receiver *AccountController) AccountPipeWC(ctx *fiber.Ctx) error {
userID := ctx.Params("userID")
ctx.Set(fiber.HeaderContentType, "text/event-stream")
ctx.Set("Cache-Control", "no-cache")
ctx.Set("Connection", "keep-alive")
ctx.Set("Transfer-Encoding", "chunked")
accountCh := make(chan map[string]interface{})
//cancelCtx, cancel := context.WithCancel(ctx.Context())
receiver.watchWorker.AddChannel(userID, accountCh)
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
pingTicker := time.NewTicker(5 * time.Second)
defer pingTicker.Stop()
for {
select {
case accountData, ok := <-accountCh:
if !ok {
return
}
accountJSON, err := json.Marshal(accountData)
if err != nil {
receiver.logger.Error("error marshal account JSON", zap.Error(err))
continue
}
fmt.Fprintf(w, "data: %s\n\n", accountJSON)
if err := w.Flush(); err != nil {
//receiver.logger.Error("error flushing", zap.Error(err))
//cancel()
//receiver.logger.Info("Close connection Account Pipe sse")
//receiver.watchWorker.DropChannel(userID)
return
}
case <-pingTicker.C:
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
if err := w.Flush(); err != nil {
//receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
//receiver.watchWorker.DropChannel(userID)
//cancel()
return
}
}
}
})
return nil
}
type AccountRepository struct {
mongoDB *mongo.Collection
logger *zap.Logger
}
func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID string, accountCh chan<- map[string]interface{}) error {
pipeline := mongo.Pipeline{
{{"$match", bson.M{"operationType": "update", "fullDocument.userId": userID}}},
}
opts := options.ChangeStream()
opts.SetFullDocument(options.UpdateLookup)
changeStream, err := receiver.mongoDB.Watch(ctx, pipeline, opts)
if err != nil {
return err
}
defer changeStream.Close(ctx)
for {
select {
case <-ctx.Done():
//receiver.logger.Info("Context canceled, thread is closed now")
return nil
default:
if changeStream.Next(ctx) {
var changeEvent struct {
UpdateDescription struct {
UpdatedFields map[string]interface{} `bson:"updatedFields"`
} `bson:"updateDescription"`
}
if err := changeStream.Decode(&changeEvent); err != nil {
//receiver.logger.Error("error decoding change event", zap.Error(err))
continue
}
select {
case accountCh <- changeEvent.UpdateDescription.UpdatedFields:
case <-ctx.Done():
return nil
}
}
}
}
}
func main() {
ctx := context.Background()
logger, err := zap.NewProduction()
if err != nil {
log.Fatalf(err.Error())
}
defer logger.Sync()
client, collection := initDB(ctx)
defer func() {
if err := client.Disconnect(ctx); err != nil {
logger.Fatal("Error close mongo", zap.Error(err))
}
}()
repo := &AccountRepository{
mongoDB: collection,
logger: logger,
}
watchWorker := worker.NewWatchWorker(collection, logger)
go watchWorker.Run(ctx)
go func() {
for {
select {
case <-ctx.Done():
fmt.Println("stop update")
return
default:
for i := 0; i < 800; i++ {
fmt.Println("YA WORK OKAY")
res, err := collection.UpdateOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i)},
bson.M{"$set": bson.M{"field": fmt.Sprintf("value-%d", time.Now().UnixNano())}})
if err != nil {
logger.Error("error update", zap.Error(err))
}
fmt.Println("TOCHNO OKAY", res.ModifiedCount)
}
time.Sleep(1 * time.Second)
}
}
}()
app := initSrv(repo, logger, watchWorker)
if err := app.Listen(":3000"); err != nil {
logger.Fatal("Error server", zap.Error(err))
}
}