heruvym/internal/tools/tools.go
2024-10-01 14:34:09 +03:00

130 lines
2.9 KiB
Go

package tools
import (
"bufio"
"context"
"encoding/json"
"fmt"
"github.com/gofiber/fiber/v2"
"net/http"
"reflect"
"time"
)
type DataEmitter func(ctx context.Context) chan interface{}
const ContextURLKey = "url"
func SseWrapper(emitter DataEmitter) fiber.Handler {
return func(ctx *fiber.Ctx) error {
defer func() {
if v := recover(); v != nil {
fmt.Println("sse her err", v)
}
}()
// Отправляем header event-stream
fmt.Println("SseWrapper")
ctx.Set("Content-Type", "text/event-stream")
ctx.Set("Cache-Control", "no-cache")
ctx.Set("Connection", "keep-alive")
ctxValue := context.WithValue(ctx.Context(), ContextURLKey, ctx.Query("ticket"))
ctxCancel, cancel := context.WithCancel(ctxValue)
dE := emitter(ctxCancel)
if dE == nil {
fmt.Println("dE")
cancel()
return ctx.Status(fiber.StatusUnauthorized).SendString("no token")
}
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
pingTicker := time.NewTicker(5 * time.Second)
defer pingTicker.Stop()
defer close(dE)
for {
select {
case <-pingTicker.C:
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
if err := w.Flush(); err != nil {
fmt.Println("sse, close connection")
cancel()
return
}
case m := <-dE:
fmt.Println("datatatata", m)
if m == nil {
continue
}
if err, ok := m.(error); ok {
fmt.Fprintf(w, "data: %s\n\n", err.Error())
if err := w.Flush(); err != nil {
fmt.Println("error flushing err ")
cancel()
return
}
}
fmt.Fprintf(w, "data: %s\n\n", m)
if err := w.Flush(); err != nil {
fmt.Println("error flushing data")
cancel()
return
}
}
}
})
return nil
}
}
// Sends SSE ping
func SSEPing(w http.ResponseWriter) {
if _, err := w.Write([]byte(":ping\n\n")); err != nil {
panic(err)
}
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
} else {
panic("streaming unsupported")
}
}
func HandlerWrapper(f interface{}) http.HandlerFunc {
fRV := reflect.ValueOf(f)
fRT := fRV.Type()
var argRT reflect.Type
var argTypeRV reflect.Value
var argExist bool
if fRT.NumIn() > 1 {
argExist = true
argRT = fRT.In(1)
argTypeRV = reflect.New(argRT)
}
return func(w http.ResponseWriter, q *http.Request) {
q.Header.Set("Content-Type", "application/json")
var fResultRV []reflect.Value
if argExist {
arg := argTypeRV.Interface()
if err := json.NewDecoder(q.Body).Decode(&arg); err != nil {
http.Error(w, "invalid request data", http.StatusBadRequest)
} else {
fResultRV = fRV.Call([]reflect.Value{reflect.ValueOf(q.Context()), reflect.ValueOf(arg).Elem()})
}
fmt.Println("GET STRANGE", arg)
} else {
fResultRV = fRV.Call([]reflect.Value{reflect.ValueOf(q.Context())})
}
w.WriteHeader(fResultRV[1].Interface().(int))
if err := json.NewEncoder(w).Encode(fResultRV[0].Interface()); err != nil {
panic(err)
}
}
}