package controller import ( "bufio" "encoding/json" "fmt" "penahub.gitlab.yandexcloud.net/pena-services/customer/cmd/sse_bench/repository" "penahub.gitlab.yandexcloud.net/pena-services/customer/cmd/sse_bench/worker" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/client" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http" "time" ) type Deps struct { MiddleWare *http.MiddleWare AccountRepo *repository.AccountRepository Logger *zap.Logger WatchWorker *worker.WatchWorker } type AccountController struct { middleWare *http.MiddleWare accountRepo *repository.AccountRepository logger *zap.Logger authClient *client.AuthClient watchWorker *worker.WatchWorker } func NewAccountController(deps Deps) *AccountController { return &AccountController{ middleWare: deps.MiddleWare, accountRepo: deps.AccountRepo, logger: deps.Logger, watchWorker: deps.WatchWorker, } } func (receiver *AccountController) Register(router fiber.Router) { router.Get("/account/pipe", receiver.AccountPipe) } func (receiver *AccountController) Name() string { return "" } func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error { userID, ok := receiver.middleWare.ExtractUserID(ctx) if !ok || userID == "" { return receiver.middleWare.NoAuth(ctx) } ctx.Set(fiber.HeaderContentType, "text/event-stream") ctx.Set("Cache-Control", "no-cache") ctx.Set("Connection", "keep-alive") ctx.Set("Transfer-Encoding", "chunked") accountCh := make(chan map[string]interface{}) //cancelCtx, cancel := context.WithCancel(ctx.Context()) receiver.watchWorker.AddChannel(userID, accountCh) ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) { pingTicker := time.NewTicker(5 * time.Second) defer pingTicker.Stop() for { select { case accountData, ok := <-accountCh: if !ok { return } accountJSON, err := json.Marshal(accountData) if err != nil { receiver.logger.Error("error marshal account JSON", zap.Error(err)) continue } fmt.Fprintf(w, "data: %s\n\n", accountJSON) if err := w.Flush(); err != nil { receiver.logger.Error("error flushing", zap.Error(err)) //cancel() //receiver.logger.Info("Close connection Account Pipe sse") //receiver.watchWorker.DropChannel(userID) return } case <-pingTicker.C: fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`) if err := w.Flush(); err != nil { receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err)) //receiver.watchWorker.DropChannel(userID) //cancel() return } } } }) return nil }