fix some sseerrors

This commit is contained in:
Skeris 2021-05-15 23:10:07 +03:00
parent 36612dd31b
commit 0854db1ede
7 changed files with 61 additions and 27 deletions

@ -3,6 +3,7 @@ package dal
import (
"bitbucket.org/skeris/heruvym/model"
"context"
"fmt"
"github.com/rs/xid"
"github.com/themakers/hlog"
"go.mongodb.org/mongo-driver/bson"
@ -205,21 +206,21 @@ func (d *DAL) WatchTickets(
operationTypes := []bson.D{{{"operationType", "insert"}},
{{"operationType", "update"}}}
matchStage := bson.D{
{
"$and", bson.D{
{"$or", operationTypes},
{"fullDocument.UserID", userID},
}},
matchStage := bson.M{
"$and": bson.A{
bson.M{"$or": operationTypes},
bson.M{"fullDocument.UserID": userID},
},
}
matchPipeline := bson.D{
{"$match", matchStage},
matchPipeline := mongo.Pipeline{
bson.D{{"$match", matchStage}},
}
cs, err := d.colTck.Watch(ctx, matchPipeline,
options.ChangeStream().SetFullDocument(options.Default))
if err != nil {
fmt.Println("111",err,matchPipeline)
return err
}
@ -322,7 +323,7 @@ func (d *DAL) YieldMessages(
ctx context.Context,
ticketID string,
yield func(ticket model.Message) error) error {
cursor, err := d.colTck.Find(ctx, bson.M{
cursor, err := d.colMsg.Find(ctx, bson.M{
"TicketID": ticketID,
})
if err != nil {
@ -359,8 +360,8 @@ func (d *DAL) WatchMessages(
},
}}}
matchPipeline := bson.D{
{"$match", matchStage},
matchPipeline := mongo.Pipeline{
bson.D{{"$match", matchStage}},
}
cs, err := d.colMsg.Watch(ctx, matchPipeline,

1
go.mod

@ -6,6 +6,7 @@ require (
bitbucket.org/skeris/profile v0.0.0
github.com/BlackBroker/trashlog v0.0.0-20210406151703-e2c4874359bf
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/gorilla/mux v1.8.0
github.com/pkg/errors v0.9.1 // indirect
github.com/rs/xid v1.3.0
github.com/skeris/appInit v0.1.12

1
go.sum

@ -158,6 +158,7 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=

@ -45,6 +45,7 @@ func (c *JwtAdapter) Init() {
c.LastSeen = t
}
func Get(ctx context.Context) *JwtAdapter {
if adapter, ok := ctx.Value(DefaultHeaderKey).(*JwtAdapter); ok {
return adapter

@ -1,7 +1,7 @@
package middleware
import (
"bitbucket.org/skeris/profile/jwt_adapter"
"bitbucket.org/skeris/heruvym/jwt_adapter"
"context"
"net/http"
)
@ -23,7 +23,7 @@ func (mw *Middleware) MiddlewareGetJwt(next http.Handler) http.Handler {
return
}
ctx := context.WithValue(r.Context(), "JWT", adapter)
ctx := context.WithValue(r.Context(), jwt_adapter.DefaultHeaderKey, adapter)
next.ServeHTTP(w, r.WithContext(ctx))
})
}

@ -9,6 +9,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/themakers/hlog"
"net/http"
)
@ -172,6 +173,7 @@ func (h *Heruvym) userTickets(ctx context.Context, userID string, output chan in
return nil
}); err != nil {
output <- errors.New("cannot get tickets")
return
}
if err := h.dal.WatchTickets(ctx, userID, func(ticket model.Ticket) error {
@ -179,6 +181,7 @@ func (h *Heruvym) userTickets(ctx context.Context, userID string, output chan in
return nil
}); err != nil {
output <- errors.New("cannot watch tickets")
return
}
}
@ -238,8 +241,7 @@ var _ tools.DataEmitter = (&Heruvym{}).Subscribe
func (h *Heruvym) Subscribe(ctx context.Context) chan interface{} {
sess := jwt_adapter.Get(ctx)
url := ctx.Value(tools.ContextURLKey).([]string)
ticketID := url[len(url)-1]
ticketID := ctx.Value(tools.ContextURLKey).(string)
output := make(chan interface{})
@ -255,13 +257,15 @@ func (h *Heruvym) Subscribe(ctx context.Context) chan interface{} {
output <- message
if err := h.dal.SetShown(ctx, message.ID, sess.Session); err != nil {
output <- errors.New("cannot read message")
fmt.Println("2", err)
output <- errors.New("cannot show message " + err.Error())
return err
}
return nil
}); err != nil {
output <- errors.New("cannot read messages")
fmt.Println("1", err)
output <- errors.New("cannot read messages " + err.Error())
}
if err := h.dal.WatchMessages(ctx, ticketID,
@ -269,13 +273,15 @@ func (h *Heruvym) Subscribe(ctx context.Context) chan interface{} {
output <- message
if err := h.dal.SetShown(ctx, message.ID, sess.Session); err != nil {
output <- errors.New("cannot read message")
fmt.Println("3", err)
output <- errors.New("cannot show watch message " + err.Error())
return err
}
return nil
}); err != nil {
output <- errors.New("cannot read messages")
fmt.Println("4", err)
output <- errors.New("cannot watch messages " + err.Error())
}
}()
} else {
@ -291,13 +297,15 @@ func (h *Heruvym) Subscribe(ctx context.Context) chan interface{} {
output <- message
if err := h.dal.SetShown(ctx, message.ID, sess.User); err != nil {
output <- errors.New("cannot read message")
fmt.Println("2", err)
output <- errors.New("cannot show message " + err.Error())
return err
}
return nil
}); err != nil {
output <- errors.New("cannot read messages")
fmt.Println("1", err)
output <- errors.New("cannot read messages " + err.Error())
}
if err := h.dal.WatchMessages(ctx, ticketID,
@ -305,13 +313,15 @@ func (h *Heruvym) Subscribe(ctx context.Context) chan interface{} {
output <- message
if err := h.dal.SetShown(ctx, message.ID, sess.Session); err != nil {
output <- errors.New("cannot read message")
fmt.Println("3", err)
output <- errors.New("cannot show watch message " + err.Error())
return err
}
return nil
}); err != nil {
output <- errors.New("cannot read messages")
fmt.Println("4", err)
output <- errors.New("cannot watch messages " + err.Error())
}
}()
} else {
@ -326,13 +336,15 @@ func (h *Heruvym) Subscribe(ctx context.Context) chan interface{} {
output <- message
if err := h.dal.SetShown(ctx, message.ID, sess.User); err != nil {
output <- errors.New("cannot read message")
fmt.Println("2", err)
output <- errors.New("cannot show message " + err.Error())
return err
}
return nil
}); err != nil {
output <- errors.New("cannot read messages")
fmt.Println("1", err)
output <- errors.New("cannot read messages " + err.Error())
}
if err := h.dal.WatchMessages(ctx, ticketID,
@ -340,13 +352,15 @@ func (h *Heruvym) Subscribe(ctx context.Context) chan interface{} {
output <- message
if err := h.dal.SetShown(ctx, message.ID, sess.Session); err != nil {
output <- errors.New("cannot read message")
fmt.Println("3", err)
output <- errors.New("cannot show watch message " + err.Error())
return err
}
return nil
}); err != nil {
output <- errors.New("cannot read messages")
fmt.Println("4", err)
output <- errors.New("cannot watch messages " + err.Error())
}
}()
}

@ -46,6 +46,22 @@ func SseWrapper(emitter DataEmitter) http.HandlerFunc {
case <-time.After(time.Second * 10):
SSEPing(w)
case m := <-dE:
if err, ok := m.(error); ok {
if _, err := w.Write(
[]byte(
fmt.Sprintf(
"data: %s\n\n",
err.Error(),
),
),
); err != nil {
panic(err)
}
flusher.Flush()
return
}
fmt.Println("DAATA",m)
line, err := json.Marshal(m)
if err != nil {
panic(err)