68 lines
1017 B
Go
68 lines
1017 B
Go
|
package tools
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"net/http"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
type dataEmitter func(ctx context.Context) chan interface{}
|
||
|
|
||
|
func SseWrapper(w http.ResponseWriter, r *http.Request, emitter dataEmitter) {
|
||
|
flusher, ok := w.(http.Flusher)
|
||
|
if !ok {
|
||
|
http.Error(
|
||
|
w,
|
||
|
"flushing is not allowed",
|
||
|
http.StatusFailedDependency,
|
||
|
)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
ctx := r.Context()
|
||
|
|
||
|
dE := emitter(ctx)
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return
|
||
|
case <-time.After(time.Second * 10):
|
||
|
SSEPing(w)
|
||
|
case m := <-dE:
|
||
|
line, err := json.Marshal(m)
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
|
||
|
if _, err := w.Write(
|
||
|
[]byte(
|
||
|
fmt.Sprintf(
|
||
|
"data: %s\n\n",
|
||
|
string(line),
|
||
|
),
|
||
|
),
|
||
|
); err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
|
||
|
flusher.Flush()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Sends SSE ping
|
||
|
func SSEPing(w http.ResponseWriter) {
|
||
|
if _, err := w.Write([]byte(":ping\n\n")); err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
|
||
|
if flusher, ok := w.(http.Flusher); ok {
|
||
|
flusher.Flush()
|
||
|
} else {
|
||
|
panic("streaming unsupported")
|
||
|
}
|
||
|
}
|