From eaa21ae9cc06ce867d6e35f1ff0e12e92aefda70 Mon Sep 17 00:00:00 2001 From: Pavel Date: Tue, 11 Jun 2024 11:57:38 +0300 Subject: [PATCH] fix, after threds not canceled, now if ping don't flush ctx canceled and threds stop --- .../interface/controller/http/account/controllers.go | 11 +++++++++++ internal/interface/repository/account.go | 1 + tests/e2e/accountPipe_test.go | 3 ++- 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/internal/interface/controller/http/account/controllers.go b/internal/interface/controller/http/account/controllers.go index 547e6b7..7966234 100644 --- a/internal/interface/controller/http/account/controllers.go +++ b/internal/interface/controller/http/account/controllers.go @@ -16,6 +16,7 @@ import ( "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/repository" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models" "strconv" + "time" ) type Deps struct { @@ -270,6 +271,8 @@ func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error { }(cancelCtx) ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) { + pingTicker := time.NewTicker(5 * time.Second) + defer pingTicker.Stop() for { select { case account, ok := <-accountCh: @@ -285,6 +288,14 @@ func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error { if err := w.Flush(); err != nil { receiver.logger.Error("error flushing", zap.Error(err)) cancel() + receiver.logger.Info("Close connection Account Pipe sse") + return + } + case <-pingTicker.C: + fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`) + if err := w.Flush(); err != nil { + receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err)) + cancel() return } } diff --git a/internal/interface/repository/account.go b/internal/interface/repository/account.go index a286e95..04f1904 100644 --- a/internal/interface/repository/account.go +++ b/internal/interface/repository/account.go @@ -516,6 +516,7 @@ func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID strin for { select { case <-ctx.Done(): + receiver.logger.Info("Context canceled, thread is closed now") return nil default: if changeStream.Next(ctx) { diff --git a/tests/e2e/accountPipe_test.go b/tests/e2e/accountPipe_test.go index 9c2e888..88e7f35 100644 --- a/tests/e2e/accountPipe_test.go +++ b/tests/e2e/accountPipe_test.go @@ -17,7 +17,8 @@ func TestAccountPipe(t *testing.T) { if !assert.NoError(t, tokenErr) { return } - url := fmt.Sprintf("http://localhost:8082/account/pipe?token=%s", token) + fmt.Println(token) + url := fmt.Sprintf("http://localhost:8082/account/pipe?Authorization=%s", token) client := &http.Client{ Timeout: 100 * time.Second, }