package tools import ( "context" "encoding/json" "fmt" "net/http" "reflect" "time" ) type DataEmitter func(ctx context.Context) chan interface{} const ContextURLKey = "url" func SseWrapper(emitter DataEmitter) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { defer func() { if v := recover(); v != nil { fmt.Println("sse her err", v) } }() // Отправляем header event-stream w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") flusher, ok := w.(http.Flusher) if !ok { http.Error( w, "flushing is not allowed", http.StatusFailedDependency, ) return } ctx := context.WithValue( r.Context(), ContextURLKey, r.URL.Query().Get("ticket"), ) dE := emitter(ctx) if dE == nil { http.Error(w, "no token", http.StatusUnauthorized) return } defer close(dE) for { select { case <-r.Context().Done(): return case <-time.After(time.Second * 10): SSEPing(w) case m := <-dE: if m == nil { continue } if err, ok := m.(error); ok { if _, err := w.Write( []byte( fmt.Sprintf( "data: %s\n\n", err.Error(), ), ), ); err != nil { panic(err) } flusher.Flush() return } line, err := json.Marshal(m) if err != nil { panic(err) } if _, err := w.Write( []byte( fmt.Sprintf( "data: %s\n\n", string(line), ), ), ); err != nil { panic(err) } flusher.Flush() } } } } // Sends SSE ping func SSEPing(w http.ResponseWriter) { if _, err := w.Write([]byte(":ping\n\n")); err != nil { panic(err) } if flusher, ok := w.(http.Flusher); ok { flusher.Flush() } else { panic("streaming unsupported") } } func HandlerWrapper(f interface{}) http.HandlerFunc { fRV := reflect.ValueOf(f) fRT := fRV.Type() var argRT reflect.Type var argTypeRV reflect.Value var argExist bool if fRT.NumIn() > 1 { argExist = true argRT = fRT.In(1) argTypeRV = reflect.New(argRT) } return func(w http.ResponseWriter, q *http.Request) { q.Header.Set("Content-Type", "application/json") var fResultRV []reflect.Value if argExist { arg := argTypeRV.Interface() if err := json.NewDecoder(q.Body).Decode(&arg); err != nil { http.Error(w, "invalid request data", http.StatusBadRequest) } else { fResultRV = fRV.Call([]reflect.Value{reflect.ValueOf(q.Context()), reflect.ValueOf(arg).Elem()}) } fmt.Println("GET STRANGE", arg) } else { fResultRV = fRV.Call([]reflect.Value{reflect.ValueOf(q.Context())}) } w.WriteHeader(fResultRV[1].Interface().(int)) if err := json.NewEncoder(w).Encode(fResultRV[0].Interface()); err != nil { panic(err) } } }