package main import ( "bufio" "context" "encoding/json" "fmt" "gitea.pena/PenaSide/customer/cmd/sse_bench/worker" "github.com/gofiber/fiber/v2" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" "log" "time" ) func initDB(ctx context.Context) (*mongo.Client, *mongo.Collection) { clientOptions := options.Client().ApplyURI("mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0&readPreference=primary&ssl=false") client, err := mongo.Connect(context.Background(), clientOptions) if err != nil { panic(err) } collection := client.Database("testdb").Collection("testcollection") collection.Drop(ctx) for i := 0; i < 1000; i++ { _, err := collection.InsertOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i), "field": "value"}) if err != nil { panic(err) } } return client, collection } func initSrv(repo *AccountRepository, logger *zap.Logger, watchWorker *worker.WatchWorker) *fiber.App { app := fiber.New() controller := &AccountController{ accountRepo: repo, logger: logger, watchWorker: watchWorker, } app.Get("/account-pipe/:userID", controller.AccountPipe) app.Use("/account-pipe-wc/:userID", controller.AccountPipeWC) return app } type AccountController struct { accountRepo *AccountRepository logger *zap.Logger watchWorker *worker.WatchWorker } func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error { userID := ctx.Params("userID") ctx.Set(fiber.HeaderContentType, "text/event-stream") ctx.Set("Cache-Control", "no-cache") ctx.Set("Connection", "keep-alive") ctx.Set("Transfer-Encoding", "chunked") accountCh := make(chan map[string]interface{}) cancelCtx, cancel := context.WithCancel(ctx.Context()) go func(ctx context.Context) { defer close(accountCh) if err := receiver.accountRepo.AccountPipe(ctx, userID, accountCh); err != nil { //receiver.logger.Error("error in account pipe repo method", zap.Error(err)) } }(cancelCtx) ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) { pingTicker := time.NewTicker(5 * time.Second) defer pingTicker.Stop() for { select { case accountData, ok := <-accountCh: if !ok { return } accountJSON, err := json.Marshal(accountData) if err != nil { //receiver.logger.Error("error marshal account JSON", zap.Error(err)) continue } fmt.Println(accountJSON) fmt.Fprintf(w, "data: %s\n\n", accountJSON) if err := w.Flush(); err != nil { //receiver.logger.Error("error flushing", zap.Error(err)) cancel() //receiver.logger.Info("Close connection Account Pipe sse") return } case <-pingTicker.C: fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`) if err := w.Flush(); err != nil { //receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err)) cancel() return } } } }) return nil } func (receiver *AccountController) AccountPipeWC(ctx *fiber.Ctx) error { userID := ctx.Params("userID") ctx.Set(fiber.HeaderContentType, "text/event-stream") ctx.Set("Cache-Control", "no-cache") ctx.Set("Connection", "keep-alive") ctx.Set("Transfer-Encoding", "chunked") accountCh := make(chan map[string]interface{}) //cancelCtx, cancel := context.WithCancel(ctx.Context()) receiver.watchWorker.AddChannel(userID, accountCh) ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) { pingTicker := time.NewTicker(5 * time.Second) defer pingTicker.Stop() for { select { case accountData, ok := <-accountCh: if !ok { return } accountJSON, err := json.Marshal(accountData) if err != nil { receiver.logger.Error("error marshal account JSON", zap.Error(err)) continue } fmt.Fprintf(w, "data: %s\n\n", accountJSON) if err := w.Flush(); err != nil { //receiver.logger.Error("error flushing", zap.Error(err)) //cancel() //receiver.logger.Info("Close connection Account Pipe sse") //receiver.watchWorker.DropChannel(userID) return } case <-pingTicker.C: fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`) if err := w.Flush(); err != nil { //receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err)) //receiver.watchWorker.DropChannel(userID) //cancel() return } } } }) return nil } type AccountRepository struct { mongoDB *mongo.Collection logger *zap.Logger } func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID string, accountCh chan<- map[string]interface{}) error { pipeline := mongo.Pipeline{ {{"$match", bson.M{"operationType": "update", "fullDocument.userId": userID}}}, } opts := options.ChangeStream() opts.SetFullDocument(options.UpdateLookup) changeStream, err := receiver.mongoDB.Watch(ctx, pipeline, opts) if err != nil { return err } defer changeStream.Close(ctx) for { select { case <-ctx.Done(): //receiver.logger.Info("Context canceled, thread is closed now") return nil default: if changeStream.Next(ctx) { var changeEvent struct { UpdateDescription struct { UpdatedFields map[string]interface{} `bson:"updatedFields"` } `bson:"updateDescription"` } if err := changeStream.Decode(&changeEvent); err != nil { //receiver.logger.Error("error decoding change event", zap.Error(err)) continue } select { case accountCh <- changeEvent.UpdateDescription.UpdatedFields: case <-ctx.Done(): return nil } } } } } func main() { ctx := context.Background() logger, err := zap.NewProduction() if err != nil { log.Fatalf(err.Error()) } defer logger.Sync() client, collection := initDB(ctx) defer func() { if err := client.Disconnect(ctx); err != nil { logger.Fatal("Error close mongo", zap.Error(err)) } }() repo := &AccountRepository{ mongoDB: collection, logger: logger, } watchWorker := worker.NewWatchWorker(collection, logger) go watchWorker.Run(ctx) go func() { for { select { case <-ctx.Done(): fmt.Println("stop update") return default: for i := 0; i < 800; i++ { fmt.Println("YA WORK OKAY") res, err := collection.UpdateOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i)}, bson.M{"$set": bson.M{"field": fmt.Sprintf("value-%d", time.Now().UnixNano())}}) if err != nil { logger.Error("error update", zap.Error(err)) } fmt.Println("TOCHNO OKAY", res.ModifiedCount) } time.Sleep(1 * time.Second) } } }() app := initSrv(repo, logger, watchWorker) if err := app.Listen(":3000"); err != nil { logger.Fatal("Error server", zap.Error(err)) } }