package tools import ( "bufio" "context" "fmt" "github.com/gofiber/fiber/v2" "net/http" "time" ) type DataEmitter func(ctx context.Context) chan interface{} const ContextURLKey = "url" func SseWrapper(emitter DataEmitter) fiber.Handler { return func(ctx *fiber.Ctx) error { defer func() { if v := recover(); v != nil { fmt.Println("sse her err", v) } }() // Отправляем header event-stream fmt.Println("SseWrapper") ctx.Set("Content-Type", "text/event-stream") ctx.Set("Cache-Control", "no-cache") ctx.Set("Connection", "keep-alive") ctxValue := context.WithValue(ctx.Context(), ContextURLKey, ctx.Query("ticket")) ctxCancel, cancel := context.WithCancel(ctxValue) dE := emitter(ctxCancel) if dE == nil { fmt.Println("dE") cancel() return ctx.Status(fiber.StatusUnauthorized).SendString("no token") } ctx.Context().SetBodyStreamWriter(func(w *bufio.Writer) { pingTicker := time.NewTicker(5 * time.Second) defer pingTicker.Stop() defer close(dE) for { select { case <-pingTicker.C: fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`) if err := w.Flush(); err != nil { fmt.Println("sse, close connection") cancel() return } case m := <-dE: if m == nil { continue } if err, ok := m.(error); ok { fmt.Fprintf(w, "data: %s\n\n", err.Error()) if err := w.Flush(); err != nil { fmt.Println("error flushing err ") cancel() return } } fmt.Fprintf(w, "data: %s\n\n", m) if err := w.Flush(); err != nil { fmt.Println("error flushing data") cancel() return } } } }) return nil } } // Sends SSE ping func SSEPing(w http.ResponseWriter) { if _, err := w.Write([]byte(":ping\n\n")); err != nil { panic(err) } if flusher, ok := w.(http.Flusher); ok { flusher.Flush() } else { panic("streaming unsupported") } }