package tools import ( "context" "encoding/json" "fmt" "net/http" "reflect" "strings" "time" ) type DataEmitter func(ctx context.Context) chan interface{} const ContextURLKey = "url" func SseWrapper(emitter DataEmitter) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request,) { flusher, ok := w.(http.Flusher) if !ok { http.Error( w, "flushing is not allowed", http.StatusFailedDependency, ) return } ctx := context.WithValue( r.Context(), ContextURLKey, strings.Split(r.URL.Path, "/"), ) dE := emitter(ctx) for { select { case <-ctx.Done(): return case <-time.After(time.Second * 10): SSEPing(w) case m := <-dE: line, err := json.Marshal(m) if err != nil { panic(err) } if _, err := w.Write( []byte( fmt.Sprintf( "data: %s\n\n", string(line), ), ), ); err != nil { panic(err) } flusher.Flush() } } } } // Sends SSE ping func SSEPing(w http.ResponseWriter) { if _, err := w.Write([]byte(":ping\n\n")); err != nil { panic(err) } if flusher, ok := w.(http.Flusher); ok { flusher.Flush() } else { panic("streaming unsupported") } } func HandlerWrapper(f interface{}) http.HandlerFunc { fRV := reflect.ValueOf(f) fRT := fRV.Type() var argRT reflect.Type var argTypeRV reflect.Value var argExist bool if fRT.NumIn() > 1 { argExist = true argRT = fRT.In(1) argTypeRV = reflect.New(argRT) } return func(w http.ResponseWriter, q *http.Request) { q.Header.Set("Content-Type", "application/json") var fResultRV []reflect.Value if argExist { arg := argTypeRV.Interface() if err := json.NewDecoder(q.Body).Decode(&arg); err != nil { http.Error(w, "invalid request data", http.StatusBadRequest) } else { fResultRV = fRV.Call([]reflect.Value{reflect.ValueOf(q.Context()), reflect.ValueOf(arg).Elem()}) } } else { fResultRV = fRV.Call([]reflect.Value{reflect.ValueOf(q.Context())}) } w.WriteHeader(fResultRV[1].Interface().(int)) if err := json.NewEncoder(w).Encode(fResultRV[0].Interface()); err != nil { panic(err) } } }