package tools import ( "context" "encoding/json" "fmt" "net/http" "time" ) type DataEmitter func(ctx context.Context) chan interface{} func SseWrapper(w http.ResponseWriter, r *http.Request, emitter DataEmitter) { flusher, ok := w.(http.Flusher) if !ok { http.Error( w, "flushing is not allowed", http.StatusFailedDependency, ) return } ctx := r.Context() dE := emitter(ctx) for { select { case <-ctx.Done(): return case <-time.After(time.Second * 10): SSEPing(w) case m := <-dE: line, err := json.Marshal(m) if err != nil { panic(err) } if _, err := w.Write( []byte( fmt.Sprintf( "data: %s\n\n", string(line), ), ), ); err != nil { panic(err) } flusher.Flush() } } } // Sends SSE ping func SSEPing(w http.ResponseWriter) { if _, err := w.Write([]byte(":ping\n\n")); err != nil { panic(err) } if flusher, ok := w.(http.Flusher); ok { flusher.Flush() } else { panic("streaming unsupported") } }