From a39a22407fed01a96cba6ec91fa5d4fb574bb7ac Mon Sep 17 00:00:00 2001 From: Skeris Date: Sat, 1 May 2021 13:05:45 +0300 Subject: [PATCH] add sse wrapper --- middleware/http_middleware.go | 12 +++++-- service/service.go | 4 +++ tools/tools.go | 67 +++++++++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+), 2 deletions(-) create mode 100644 tools/tools.go diff --git a/middleware/http_middleware.go b/middleware/http_middleware.go index e28db3c..e947c2d 100644 --- a/middleware/http_middleware.go +++ b/middleware/http_middleware.go @@ -51,9 +51,17 @@ func DefaultCookieAndRecoveryMiddleware( const headerKey = jwt_adapter.DefaultHeaderKey return func(w http.ResponseWriter, r *http.Request, next http.HandlerFunc) { - var err error + var ( + err error + tokenHeader string + ) cookie := &jwt_adapter.JwtAdapter{} - tokenHeader := r.Header.Get(headerKey) + + if r.Method == http.MethodGet { + tokenHeader = fmt.Sprintf("Bearer %s", r.Form.Get(headerKey)) + } else { + tokenHeader = r.Header.Get(headerKey) + } if tokenHeader == "" { fmt.Println("ERROR NO authHEader") diff --git a/service/service.go b/service/service.go index 5193c7e..b4c8186 100644 --- a/service/service.go +++ b/service/service.go @@ -109,3 +109,7 @@ func (h *Heruvym) CreateTicket(w http.ResponseWriter, r *http.Request) { return } } + +func (h *Heruvym) GetList(f http.Flusher, r *http.Request) { + +} diff --git a/tools/tools.go b/tools/tools.go new file mode 100644 index 0000000..3da00c2 --- /dev/null +++ b/tools/tools.go @@ -0,0 +1,67 @@ +package tools + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "time" +) + +type dataEmitter func(ctx context.Context) chan interface{} + +func SseWrapper(w http.ResponseWriter, r *http.Request, emitter dataEmitter) { + flusher, ok := w.(http.Flusher) + if !ok { + http.Error( + w, + "flushing is not allowed", + http.StatusFailedDependency, + ) + return + } + + ctx := r.Context() + + dE := emitter(ctx) + + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Second * 10): + SSEPing(w) + case m := <-dE: + line, err := json.Marshal(m) + if err != nil { + panic(err) + } + + if _, err := w.Write( + []byte( + fmt.Sprintf( + "data: %s\n\n", + string(line), + ), + ), + ); err != nil { + panic(err) + } + + flusher.Flush() + } + } +} + +// Sends SSE ping +func SSEPing(w http.ResponseWriter) { + if _, err := w.Write([]byte(":ping\n\n")); err != nil { + panic(err) + } + + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } else { + panic("streaming unsupported") + } +}