fix, after threds not canceled, now if ping don't flush ctx canceled and threds stop

This commit is contained in:
Pavel 2024-06-11 11:57:38 +03:00
parent 5c78187bf0
commit eaa21ae9cc
3 changed files with 14 additions and 1 deletions

@ -16,6 +16,7 @@ import (
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/repository"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models"
"strconv"
"time"
)
type Deps struct {
@ -270,6 +271,8 @@ func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error {
}(cancelCtx)
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
pingTicker := time.NewTicker(5 * time.Second)
defer pingTicker.Stop()
for {
select {
case account, ok := <-accountCh:
@ -285,6 +288,14 @@ func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error {
if err := w.Flush(); err != nil {
receiver.logger.Error("error flushing", zap.Error(err))
cancel()
receiver.logger.Info("Close connection Account Pipe sse")
return
}
case <-pingTicker.C:
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
if err := w.Flush(); err != nil {
receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
cancel()
return
}
}

@ -516,6 +516,7 @@ func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID strin
for {
select {
case <-ctx.Done():
receiver.logger.Info("Context canceled, thread is closed now")
return nil
default:
if changeStream.Next(ctx) {

@ -17,7 +17,8 @@ func TestAccountPipe(t *testing.T) {
if !assert.NoError(t, tokenErr) {
return
}
url := fmt.Sprintf("http://localhost:8082/account/pipe?token=%s", token)
fmt.Println(token)
url := fmt.Sprintf("http://localhost:8082/account/pipe?Authorization=%s", token)
client := &http.Client{
Timeout: 100 * time.Second,
}