149 lines
2.8 KiB
Go
149 lines
2.8 KiB
Go
package tools
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"reflect"
|
|
"time"
|
|
)
|
|
|
|
type DataEmitter func(ctx context.Context) chan interface{}
|
|
|
|
const ContextURLKey = "url"
|
|
|
|
func SseWrapper(emitter DataEmitter) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
defer func() {
|
|
if v := recover(); v != nil {
|
|
fmt.Println("sse her err", v)
|
|
}
|
|
} ()
|
|
// Отправляем header event-stream
|
|
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
http.Error(
|
|
w,
|
|
"flushing is not allowed",
|
|
http.StatusFailedDependency,
|
|
)
|
|
return
|
|
}
|
|
|
|
ctx := context.WithValue(
|
|
r.Context(),
|
|
ContextURLKey,
|
|
r.URL.Query().Get("ticket"),
|
|
)
|
|
|
|
dE := emitter(ctx)
|
|
|
|
if dE == nil {
|
|
http.Error(w, "no token",http.StatusUnauthorized )
|
|
return
|
|
}
|
|
|
|
defer close(dE)
|
|
for {
|
|
select {
|
|
case <-r.Context().Done():
|
|
return
|
|
case <-time.After(time.Second * 10):
|
|
SSEPing(w)
|
|
case m := <-dE:
|
|
if m == nil {
|
|
continue
|
|
}
|
|
if err, ok := m.(error); ok {
|
|
if _, err := w.Write(
|
|
[]byte(
|
|
fmt.Sprintf(
|
|
"data: %s\n\n",
|
|
err.Error(),
|
|
),
|
|
),
|
|
); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
flusher.Flush()
|
|
return
|
|
}
|
|
line, err := json.Marshal(m)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if _, err := w.Write(
|
|
[]byte(
|
|
fmt.Sprintf(
|
|
"data: %s\n\n",
|
|
string(line),
|
|
),
|
|
),
|
|
); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
flusher.Flush()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sends SSE ping
|
|
func SSEPing(w http.ResponseWriter) {
|
|
if _, err := w.Write([]byte(":ping\n\n")); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if flusher, ok := w.(http.Flusher); ok {
|
|
flusher.Flush()
|
|
} else {
|
|
panic("streaming unsupported")
|
|
}
|
|
}
|
|
|
|
func HandlerWrapper(f interface{}) http.HandlerFunc {
|
|
fRV := reflect.ValueOf(f)
|
|
fRT := fRV.Type()
|
|
|
|
var argRT reflect.Type
|
|
var argTypeRV reflect.Value
|
|
var argExist bool
|
|
|
|
if fRT.NumIn() > 1 {
|
|
argExist = true
|
|
argRT = fRT.In(1)
|
|
argTypeRV = reflect.New(argRT)
|
|
}
|
|
|
|
return func(w http.ResponseWriter, q *http.Request) {
|
|
q.Header.Set("Content-Type", "application/json")
|
|
var fResultRV []reflect.Value
|
|
|
|
if argExist {
|
|
arg := argTypeRV.Interface()
|
|
if err := json.NewDecoder(q.Body).Decode(&arg); err != nil {
|
|
http.Error(w, "invalid request data", http.StatusBadRequest)
|
|
} else {
|
|
fResultRV = fRV.Call([]reflect.Value{reflect.ValueOf(q.Context()), reflect.ValueOf(arg).Elem()})
|
|
}
|
|
fmt.Println("GET STRANGE", arg)
|
|
} else {
|
|
fResultRV = fRV.Call([]reflect.Value{reflect.ValueOf(q.Context())})
|
|
}
|
|
|
|
w.WriteHeader(fResultRV[1].Interface().(int))
|
|
if err := json.NewEncoder(w).Encode(fResultRV[0].Interface()); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
}
|