package tools import ( "bufio" "context" "encoding/json" "fmt" "github.com/gofiber/fiber/v2" "net/http" "reflect" "time" ) type DataEmitter func(ctx context.Context) chan interface{} const ContextURLKey = "url" func SseWrapper(emitter DataEmitter) fiber.Handler { return func(ctx *fiber.Ctx) error { defer func() { if v := recover(); v != nil { fmt.Println("sse her err", v) } }() // Отправляем header event-stream fmt.Println("SseWrapper") ctx.Set("Content-Type", "text/event-stream") ctx.Set("Cache-Control", "no-cache") ctx.Set("Connection", "keep-alive") ctxValue := context.WithValue(ctx.Context(), ContextURLKey, ctx.Query("ticket")) ctxCancel, cancel := context.WithCancel(ctxValue) dE := emitter(ctxCancel) if dE == nil { fmt.Println("dE") cancel() return ctx.Status(fiber.StatusUnauthorized).SendString("no token") } ctx.Context().SetBodyStreamWriter(func(w *bufio.Writer) { pingTicker := time.NewTicker(5 * time.Second) defer pingTicker.Stop() defer close(dE) for { select { case <-pingTicker.C: fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`) if err := w.Flush(); err != nil { fmt.Println("sse, close connection") cancel() return } case m := <-dE: fmt.Println("datatatata", m) if m == nil { continue } if err, ok := m.(error); ok { fmt.Fprintf(w, "data: %s\n\n", err.Error()) if err := w.Flush(); err != nil { fmt.Println("error flushing err ") cancel() return } } fmt.Fprintf(w, "data: %s\n\n", m) if err := w.Flush(); err != nil { fmt.Println("error flushing data") cancel() return } } } }) return nil } } // Sends SSE ping func SSEPing(w http.ResponseWriter) { if _, err := w.Write([]byte(":ping\n\n")); err != nil { panic(err) } if flusher, ok := w.(http.Flusher); ok { flusher.Flush() } else { panic("streaming unsupported") } } func HandlerWrapper(f interface{}) http.HandlerFunc { fRV := reflect.ValueOf(f) fRT := fRV.Type() var argRT reflect.Type var argTypeRV reflect.Value var argExist bool if fRT.NumIn() > 1 { argExist = true argRT = fRT.In(1) argTypeRV = reflect.New(argRT) } return func(w http.ResponseWriter, q *http.Request) { q.Header.Set("Content-Type", "application/json") var fResultRV []reflect.Value if argExist { arg := argTypeRV.Interface() if err := json.NewDecoder(q.Body).Decode(&arg); err != nil { http.Error(w, "invalid request data", http.StatusBadRequest) } else { fResultRV = fRV.Call([]reflect.Value{reflect.ValueOf(q.Context()), reflect.ValueOf(arg).Elem()}) } fmt.Println("GET STRANGE", arg) } else { fResultRV = fRV.Call([]reflect.Value{reflect.ValueOf(q.Context())}) } w.WriteHeader(fResultRV[1].Interface().(int)) if err := json.NewEncoder(w).Encode(fResultRV[0].Interface()); err != nil { panic(err) } } }