generated from PenaSide/GolangTemplate
save bench sse
This commit is contained in:
parent
531216c6ff
commit
2f0983e895
@ -1,41 +0,0 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"gitea.pena/PenaSide/common/mongo"
|
||||
"gitea.pena/PenaSide/customer/cmd/sse_bench/repository"
|
||||
"gitea.pena/PenaSide/customer/internal/models"
|
||||
"gitea.pena/PenaSide/customer/pkg/closer"
|
||||
"go.uber.org/zap"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
func Run(config *models.Config, logger *zap.Logger) (appErr error) {
|
||||
defer func() {
|
||||
if recovered := recover(); recovered != nil {
|
||||
appErr = errors.New("recovered panic on application run")
|
||||
logger.Error("recovered panic on application run", zap.Any("recovered", recovered))
|
||||
}
|
||||
}()
|
||||
|
||||
closer := closer.New()
|
||||
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||
defer cancel()
|
||||
|
||||
mongoDB, err := mongo.Connect(ctx, &mongo.ConnectDeps{
|
||||
Configuration: &config.Database,
|
||||
Timeout: 10 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed connection to db: %w", err)
|
||||
}
|
||||
|
||||
accountRepo := repository.NewAccountRepository(repository.Deps{
|
||||
Logger: logger,
|
||||
MongoDB: mongoDB,
|
||||
})
|
||||
}
|
@ -33,7 +33,7 @@ import (
|
||||
//ok penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks 1075.632s
|
||||
|
||||
func BenchmarkAccountPipe(b *testing.B) {
|
||||
for _, n := range []int{1000} {
|
||||
for _, n := range []int{500} {
|
||||
b.Run(fmt.Sprintf("COUNT_%d", n), func(b *testing.B) {
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < b.N; i++ {
|
||||
@ -42,7 +42,7 @@ func BenchmarkAccountPipe(b *testing.B) {
|
||||
go func(j int) {
|
||||
defer wg.Done()
|
||||
userID := fmt.Sprintf("user%d", j)
|
||||
//fmt.Println(userID)
|
||||
fmt.Println(userID)
|
||||
resp, err := http.Get(fmt.Sprintf("http://localhost:3000/account-pipe/%s", userID))
|
||||
require.NoError(b, err)
|
||||
defer resp.Body.Close()
|
||||
@ -53,8 +53,8 @@ func BenchmarkAccountPipe(b *testing.B) {
|
||||
break
|
||||
}
|
||||
count++
|
||||
//line := scanner.Text()
|
||||
//fmt.Println("Received:", line)
|
||||
line := scanner.Text()
|
||||
fmt.Println("Received:", line)
|
||||
}
|
||||
time.Sleep(2 * time.Second)
|
||||
}(j)
|
||||
@ -89,7 +89,7 @@ func BenchmarkAccountPipe(b *testing.B) {
|
||||
//ok penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks 1076.521s
|
||||
|
||||
func BenchmarkAccountPipeWithWorker(b *testing.B) {
|
||||
for _, n := range []int{10} {
|
||||
for _, n := range []int{500} {
|
||||
b.Run(fmt.Sprintf("COUNT_%d", n), func(b *testing.B) {
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < b.N; i++ {
|
||||
@ -109,8 +109,8 @@ func BenchmarkAccountPipeWithWorker(b *testing.B) {
|
||||
break
|
||||
}
|
||||
count++
|
||||
//line := scanner.Text()
|
||||
//fmt.Println("Received:", line)
|
||||
line := scanner.Text()
|
||||
fmt.Println("Received:", line)
|
||||
}
|
||||
time.Sleep(2 * time.Second)
|
||||
}(j)
|
@ -1,99 +0,0 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"gitea.pena/PenaSide/customer/cmd/sse_bench/repository"
|
||||
"gitea.pena/PenaSide/customer/cmd/sse_bench/worker"
|
||||
"gitea.pena/PenaSide/customer/internal/interface/client"
|
||||
"gitea.pena/PenaSide/customer/internal/interface/controller/http"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Deps struct {
|
||||
MiddleWare *http.MiddleWare
|
||||
AccountRepo *repository.AccountRepository
|
||||
Logger *zap.Logger
|
||||
WatchWorker *worker.WatchWorker
|
||||
}
|
||||
|
||||
type AccountController struct {
|
||||
middleWare *http.MiddleWare
|
||||
accountRepo *repository.AccountRepository
|
||||
logger *zap.Logger
|
||||
authClient *client.AuthClient
|
||||
watchWorker *worker.WatchWorker
|
||||
}
|
||||
|
||||
func NewAccountController(deps Deps) *AccountController {
|
||||
return &AccountController{
|
||||
middleWare: deps.MiddleWare,
|
||||
accountRepo: deps.AccountRepo,
|
||||
logger: deps.Logger,
|
||||
watchWorker: deps.WatchWorker,
|
||||
}
|
||||
}
|
||||
|
||||
func (receiver *AccountController) Register(router fiber.Router) {
|
||||
router.Get("/account/pipe", receiver.AccountPipe)
|
||||
}
|
||||
|
||||
func (receiver *AccountController) Name() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error {
|
||||
userID, ok := receiver.middleWare.ExtractUserID(ctx)
|
||||
if !ok || userID == "" {
|
||||
return receiver.middleWare.NoAuth(ctx)
|
||||
}
|
||||
|
||||
ctx.Set(fiber.HeaderContentType, "text/event-stream")
|
||||
ctx.Set("Cache-Control", "no-cache")
|
||||
ctx.Set("Connection", "keep-alive")
|
||||
ctx.Set("Transfer-Encoding", "chunked")
|
||||
|
||||
accountCh := make(chan map[string]interface{})
|
||||
//cancelCtx, cancel := context.WithCancel(ctx.Context())
|
||||
|
||||
receiver.watchWorker.AddChannel(userID, accountCh)
|
||||
|
||||
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
|
||||
pingTicker := time.NewTicker(5 * time.Second)
|
||||
defer pingTicker.Stop()
|
||||
for {
|
||||
select {
|
||||
case accountData, ok := <-accountCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
accountJSON, err := json.Marshal(accountData)
|
||||
if err != nil {
|
||||
receiver.logger.Error("error marshal account JSON", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
fmt.Fprintf(w, "data: %s\n\n", accountJSON)
|
||||
if err := w.Flush(); err != nil {
|
||||
receiver.logger.Error("error flushing", zap.Error(err))
|
||||
//cancel()
|
||||
//receiver.logger.Info("Close connection Account Pipe sse")
|
||||
//receiver.watchWorker.DropChannel(userID)
|
||||
return
|
||||
}
|
||||
case <-pingTicker.C:
|
||||
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
|
||||
if err := w.Flush(); err != nil {
|
||||
receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
|
||||
//receiver.watchWorker.DropChannel(userID)
|
||||
//cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
@ -1 +1,253 @@
|
||||
package sse_bench
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"gitea.pena/PenaSide/customer/cmd/sse_bench/worker"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"go.uber.org/zap"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
func initDB(ctx context.Context) (*mongo.Client, *mongo.Collection) {
|
||||
clientOptions := options.Client().ApplyURI("mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0&readPreference=primary&ssl=false")
|
||||
client, err := mongo.Connect(context.Background(), clientOptions)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
collection := client.Database("testdb").Collection("testcollection")
|
||||
collection.Drop(ctx)
|
||||
for i := 0; i < 1000; i++ {
|
||||
_, err := collection.InsertOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i), "field": "value"})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
return client, collection
|
||||
}
|
||||
|
||||
func initSrv(repo *AccountRepository, logger *zap.Logger, watchWorker *worker.WatchWorker) *fiber.App {
|
||||
app := fiber.New()
|
||||
controller := &AccountController{
|
||||
accountRepo: repo,
|
||||
logger: logger,
|
||||
watchWorker: watchWorker,
|
||||
}
|
||||
app.Get("/account-pipe/:userID", controller.AccountPipe)
|
||||
app.Use("/account-pipe-wc/:userID", controller.AccountPipeWC)
|
||||
return app
|
||||
}
|
||||
|
||||
type AccountController struct {
|
||||
accountRepo *AccountRepository
|
||||
logger *zap.Logger
|
||||
watchWorker *worker.WatchWorker
|
||||
}
|
||||
|
||||
func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error {
|
||||
userID := ctx.Params("userID")
|
||||
|
||||
ctx.Set(fiber.HeaderContentType, "text/event-stream")
|
||||
ctx.Set("Cache-Control", "no-cache")
|
||||
ctx.Set("Connection", "keep-alive")
|
||||
ctx.Set("Transfer-Encoding", "chunked")
|
||||
|
||||
accountCh := make(chan map[string]interface{})
|
||||
cancelCtx, cancel := context.WithCancel(ctx.Context())
|
||||
|
||||
go func(ctx context.Context) {
|
||||
defer close(accountCh)
|
||||
if err := receiver.accountRepo.AccountPipe(ctx, userID, accountCh); err != nil {
|
||||
//receiver.logger.Error("error in account pipe repo method", zap.Error(err))
|
||||
}
|
||||
}(cancelCtx)
|
||||
|
||||
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
|
||||
pingTicker := time.NewTicker(5 * time.Second)
|
||||
defer pingTicker.Stop()
|
||||
for {
|
||||
select {
|
||||
case accountData, ok := <-accountCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
accountJSON, err := json.Marshal(accountData)
|
||||
if err != nil {
|
||||
//receiver.logger.Error("error marshal account JSON", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
fmt.Println(accountJSON)
|
||||
fmt.Fprintf(w, "data: %s\n\n", accountJSON)
|
||||
if err := w.Flush(); err != nil {
|
||||
//receiver.logger.Error("error flushing", zap.Error(err))
|
||||
cancel()
|
||||
//receiver.logger.Info("Close connection Account Pipe sse")
|
||||
return
|
||||
}
|
||||
case <-pingTicker.C:
|
||||
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
|
||||
if err := w.Flush(); err != nil {
|
||||
//receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (receiver *AccountController) AccountPipeWC(ctx *fiber.Ctx) error {
|
||||
userID := ctx.Params("userID")
|
||||
|
||||
ctx.Set(fiber.HeaderContentType, "text/event-stream")
|
||||
ctx.Set("Cache-Control", "no-cache")
|
||||
ctx.Set("Connection", "keep-alive")
|
||||
ctx.Set("Transfer-Encoding", "chunked")
|
||||
|
||||
accountCh := make(chan map[string]interface{})
|
||||
//cancelCtx, cancel := context.WithCancel(ctx.Context())
|
||||
|
||||
receiver.watchWorker.AddChannel(userID, accountCh)
|
||||
|
||||
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
|
||||
pingTicker := time.NewTicker(5 * time.Second)
|
||||
defer pingTicker.Stop()
|
||||
for {
|
||||
select {
|
||||
case accountData, ok := <-accountCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
accountJSON, err := json.Marshal(accountData)
|
||||
if err != nil {
|
||||
receiver.logger.Error("error marshal account JSON", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
fmt.Fprintf(w, "data: %s\n\n", accountJSON)
|
||||
if err := w.Flush(); err != nil {
|
||||
//receiver.logger.Error("error flushing", zap.Error(err))
|
||||
//cancel()
|
||||
//receiver.logger.Info("Close connection Account Pipe sse")
|
||||
//receiver.watchWorker.DropChannel(userID)
|
||||
return
|
||||
}
|
||||
case <-pingTicker.C:
|
||||
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
|
||||
if err := w.Flush(); err != nil {
|
||||
//receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
|
||||
//receiver.watchWorker.DropChannel(userID)
|
||||
//cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type AccountRepository struct {
|
||||
mongoDB *mongo.Collection
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID string, accountCh chan<- map[string]interface{}) error {
|
||||
pipeline := mongo.Pipeline{
|
||||
{{"$match", bson.M{"operationType": "update", "fullDocument.userId": userID}}},
|
||||
}
|
||||
|
||||
opts := options.ChangeStream()
|
||||
opts.SetFullDocument(options.UpdateLookup)
|
||||
changeStream, err := receiver.mongoDB.Watch(ctx, pipeline, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer changeStream.Close(ctx)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
//receiver.logger.Info("Context canceled, thread is closed now")
|
||||
return nil
|
||||
default:
|
||||
if changeStream.Next(ctx) {
|
||||
var changeEvent struct {
|
||||
UpdateDescription struct {
|
||||
UpdatedFields map[string]interface{} `bson:"updatedFields"`
|
||||
} `bson:"updateDescription"`
|
||||
}
|
||||
if err := changeStream.Decode(&changeEvent); err != nil {
|
||||
//receiver.logger.Error("error decoding change event", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case accountCh <- changeEvent.UpdateDescription.UpdatedFields:
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
|
||||
logger, err := zap.NewProduction()
|
||||
if err != nil {
|
||||
log.Fatalf(err.Error())
|
||||
}
|
||||
defer logger.Sync()
|
||||
|
||||
client, collection := initDB(ctx)
|
||||
defer func() {
|
||||
if err := client.Disconnect(ctx); err != nil {
|
||||
logger.Fatal("Error close mongo", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
repo := &AccountRepository{
|
||||
mongoDB: collection,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
watchWorker := worker.NewWatchWorker(collection, logger)
|
||||
go watchWorker.Run(ctx)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
fmt.Println("stop update")
|
||||
return
|
||||
default:
|
||||
for i := 0; i < 800; i++ {
|
||||
fmt.Println("YA WORK OKAY")
|
||||
res, err := collection.UpdateOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i)},
|
||||
bson.M{"$set": bson.M{"field": fmt.Sprintf("value-%d", time.Now().UnixNano())}})
|
||||
if err != nil {
|
||||
logger.Error("error update", zap.Error(err))
|
||||
}
|
||||
fmt.Println("TOCHNO OKAY", res.ModifiedCount)
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
app := initSrv(repo, logger, watchWorker)
|
||||
|
||||
if err := app.Listen(":3000"); err != nil {
|
||||
logger.Fatal("Error server", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
@ -1,74 +0,0 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"go.uber.org/zap"
|
||||
"log"
|
||||
)
|
||||
|
||||
type Deps struct {
|
||||
Logger *zap.Logger
|
||||
MongoDB *mongo.Collection
|
||||
}
|
||||
|
||||
type AccountRepository struct {
|
||||
logger *zap.Logger
|
||||
mongoDB *mongo.Collection
|
||||
}
|
||||
|
||||
func NewAccountRepository(deps Deps) *AccountRepository {
|
||||
if deps.Logger == nil {
|
||||
log.Panicln("logger is nil on <NewAccountRepository>")
|
||||
}
|
||||
|
||||
if deps.MongoDB == nil {
|
||||
log.Panicln("mongodb is nil on <NewAccountRepository>")
|
||||
}
|
||||
|
||||
return &AccountRepository{
|
||||
logger: deps.Logger,
|
||||
mongoDB: deps.MongoDB,
|
||||
}
|
||||
}
|
||||
|
||||
func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID string, accountCh chan<- map[string]interface{}) error {
|
||||
pipeline := mongo.Pipeline{
|
||||
{{"$match", bson.M{"operationType": "update", "fullDocument.userId": userID}}},
|
||||
}
|
||||
|
||||
opts := options.ChangeStream()
|
||||
opts.SetFullDocument(options.UpdateLookup)
|
||||
changeStream, err := receiver.mongoDB.Watch(ctx, pipeline, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer changeStream.Close(ctx)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
receiver.logger.Info("Context canceled, thread is closed now")
|
||||
return nil
|
||||
default:
|
||||
if changeStream.Next(ctx) {
|
||||
var changeEvent struct {
|
||||
UpdateDescription struct {
|
||||
UpdatedFields map[string]interface{} `bson:"updatedFields"`
|
||||
} `bson:"updateDescription"`
|
||||
}
|
||||
if err := changeStream.Decode(&changeEvent); err != nil {
|
||||
receiver.logger.Error("error decoding change event", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case accountCh <- changeEvent.UpdateDescription.UpdatedFields:
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,252 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"gitea.pena/PenaSide/customer/cmd/sse_bench/worker"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"go.uber.org/zap"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
func initDB(ctx context.Context) (*mongo.Client, *mongo.Collection) {
|
||||
clientOptions := options.Client().ApplyURI("mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0&readPreference=primary&ssl=false")
|
||||
client, err := mongo.Connect(context.Background(), clientOptions)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
collection := client.Database("testdb").Collection("testcollection")
|
||||
collection.Drop(ctx)
|
||||
for i := 0; i < 1000; i++ {
|
||||
_, err := collection.InsertOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i), "field": "value"})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
return client, collection
|
||||
}
|
||||
|
||||
func initSrv(repo *AccountRepository, logger *zap.Logger, watchWorker *worker.WatchWorker) *fiber.App {
|
||||
app := fiber.New()
|
||||
controller := &AccountController{
|
||||
accountRepo: repo,
|
||||
logger: logger,
|
||||
watchWorker: watchWorker,
|
||||
}
|
||||
app.Get("/account-pipe/:userID", controller.AccountPipe)
|
||||
app.Use("/account-pipe-wc/:userID", controller.AccountPipeWC)
|
||||
return app
|
||||
}
|
||||
|
||||
type AccountController struct {
|
||||
accountRepo *AccountRepository
|
||||
logger *zap.Logger
|
||||
watchWorker *worker.WatchWorker
|
||||
}
|
||||
|
||||
func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error {
|
||||
userID := ctx.Params("userID")
|
||||
|
||||
ctx.Set(fiber.HeaderContentType, "text/event-stream")
|
||||
ctx.Set("Cache-Control", "no-cache")
|
||||
ctx.Set("Connection", "keep-alive")
|
||||
ctx.Set("Transfer-Encoding", "chunked")
|
||||
|
||||
accountCh := make(chan map[string]interface{})
|
||||
cancelCtx, cancel := context.WithCancel(ctx.Context())
|
||||
|
||||
go func(ctx context.Context) {
|
||||
defer close(accountCh)
|
||||
if err := receiver.accountRepo.AccountPipe(ctx, userID, accountCh); err != nil {
|
||||
//receiver.logger.Error("error in account pipe repo method", zap.Error(err))
|
||||
}
|
||||
}(cancelCtx)
|
||||
|
||||
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
|
||||
pingTicker := time.NewTicker(5 * time.Second)
|
||||
defer pingTicker.Stop()
|
||||
for {
|
||||
select {
|
||||
case accountData, ok := <-accountCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
accountJSON, err := json.Marshal(accountData)
|
||||
if err != nil {
|
||||
//receiver.logger.Error("error marshal account JSON", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
fmt.Fprintf(w, "data: %s\n\n", accountJSON)
|
||||
if err := w.Flush(); err != nil {
|
||||
//receiver.logger.Error("error flushing", zap.Error(err))
|
||||
cancel()
|
||||
//receiver.logger.Info("Close connection Account Pipe sse")
|
||||
return
|
||||
}
|
||||
case <-pingTicker.C:
|
||||
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
|
||||
if err := w.Flush(); err != nil {
|
||||
//receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (receiver *AccountController) AccountPipeWC(ctx *fiber.Ctx) error {
|
||||
userID := ctx.Params("userID")
|
||||
|
||||
ctx.Set(fiber.HeaderContentType, "text/event-stream")
|
||||
ctx.Set("Cache-Control", "no-cache")
|
||||
ctx.Set("Connection", "keep-alive")
|
||||
ctx.Set("Transfer-Encoding", "chunked")
|
||||
|
||||
accountCh := make(chan map[string]interface{})
|
||||
//cancelCtx, cancel := context.WithCancel(ctx.Context())
|
||||
|
||||
receiver.watchWorker.AddChannel(userID, accountCh)
|
||||
|
||||
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
|
||||
pingTicker := time.NewTicker(5 * time.Second)
|
||||
defer pingTicker.Stop()
|
||||
for {
|
||||
select {
|
||||
case accountData, ok := <-accountCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
accountJSON, err := json.Marshal(accountData)
|
||||
if err != nil {
|
||||
receiver.logger.Error("error marshal account JSON", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
fmt.Fprintf(w, "data: %s\n\n", accountJSON)
|
||||
if err := w.Flush(); err != nil {
|
||||
//receiver.logger.Error("error flushing", zap.Error(err))
|
||||
//cancel()
|
||||
//receiver.logger.Info("Close connection Account Pipe sse")
|
||||
//receiver.watchWorker.DropChannel(userID)
|
||||
return
|
||||
}
|
||||
case <-pingTicker.C:
|
||||
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
|
||||
if err := w.Flush(); err != nil {
|
||||
//receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
|
||||
//receiver.watchWorker.DropChannel(userID)
|
||||
//cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type AccountRepository struct {
|
||||
mongoDB *mongo.Collection
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID string, accountCh chan<- map[string]interface{}) error {
|
||||
pipeline := mongo.Pipeline{
|
||||
{{"$match", bson.M{"operationType": "update", "fullDocument.userId": userID}}},
|
||||
}
|
||||
|
||||
opts := options.ChangeStream()
|
||||
opts.SetFullDocument(options.UpdateLookup)
|
||||
changeStream, err := receiver.mongoDB.Watch(ctx, pipeline, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer changeStream.Close(ctx)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
//receiver.logger.Info("Context canceled, thread is closed now")
|
||||
return nil
|
||||
default:
|
||||
if changeStream.Next(ctx) {
|
||||
var changeEvent struct {
|
||||
UpdateDescription struct {
|
||||
UpdatedFields map[string]interface{} `bson:"updatedFields"`
|
||||
} `bson:"updateDescription"`
|
||||
}
|
||||
if err := changeStream.Decode(&changeEvent); err != nil {
|
||||
//receiver.logger.Error("error decoding change event", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case accountCh <- changeEvent.UpdateDescription.UpdatedFields:
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
|
||||
logger, err := zap.NewProduction()
|
||||
if err != nil {
|
||||
log.Fatalf(err.Error())
|
||||
}
|
||||
defer logger.Sync()
|
||||
|
||||
client, collection := initDB(ctx)
|
||||
defer func() {
|
||||
if err := client.Disconnect(ctx); err != nil {
|
||||
logger.Fatal("Error close mongo", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
repo := &AccountRepository{
|
||||
mongoDB: collection,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
watchWorker := worker.NewWatchWorker(collection, logger)
|
||||
go watchWorker.Run(ctx)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
fmt.Println("stop update")
|
||||
return
|
||||
default:
|
||||
for i := 0; i < 800; i++ {
|
||||
fmt.Println("YA WORK OKAY")
|
||||
res, err := collection.UpdateOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i)},
|
||||
bson.M{"$set": bson.M{"field": fmt.Sprintf("value-%d", time.Now().UnixNano())}})
|
||||
if err != nil {
|
||||
logger.Error("error update", zap.Error(err))
|
||||
}
|
||||
fmt.Println("TOCHNO OKAY", res.ModifiedCount)
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
app := initSrv(repo, logger, watchWorker)
|
||||
|
||||
if err := app.Listen(":3000"); err != nil {
|
||||
logger.Fatal("Error server", zap.Error(err))
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user