add support for message delivering

This commit is contained in:
Skeris 2021-05-03 01:25:12 +03:00
parent 0b2babb4bb
commit 5f606ae2bd
9 changed files with 584 additions and 60 deletions

@ -133,7 +133,7 @@ func New(ctx context.Context, opts interface{}) (appInit.CommonApp, error) {
DalName: "MongoDB",
URI: options.MongoURI,
DbTable: options.MongoDbTable,
Collection: options.MongoCollections,
Collections: options.MongoCollections,
})
heruvym := service.New(database, connRoles, logger)

@ -1,7 +1,7 @@
package dal
import (
"bitbucket.org/BlackBroker/heruvym/model"
"bitbucket.org/skeris/heruvym/model"
"context"
"github.com/rs/xid"
"github.com/themakers/hlog"
@ -85,44 +85,57 @@ func (d *DAL) PutMessage(
ctx context.Context,
message, userID, sessionID, ticketID string,
files []string,
) error {
if _, err := d.colMsg.InsertOne(ctx, &model.Message{
) (*model.Message, error) {
insertable := model.Message{
ID: xid.New().String(),
UserID: userID,
SessionID: sessionID,
TicketID: ticketID,
Message: message,
Files: files,
Shown: map[string]int{},
CreatedAt: time.Now(),
}); err != nil {
}
if _, err := d.colMsg.InsertOne(ctx, &insertable); err != nil {
d.logger.Emit(ErrorInsert{
Err: err,
UserID: userID,
SessionID: sessionID,
})
return err
return nil, err
}
return nil
return &insertable, nil
}
func (d *DAL) CreateTicket(
ctx context.Context,
userID,
sessionID,
title string,
title, message string,
files []string,
) (string, error) {
tiketID := xid.New().String()
ticketID := xid.New().String()
if _, err := d.colTck.InsertOne(ctx, &model.Ticket{
ID: tiketID,
ID: ticketID,
UserID: userID,
SessionID: sessionID,
Title: title,
State: model.StateOpen,
CreatedAt: time.Now(),
TopMessage: model.Message{
ID: xid.New().String(),
UserID: userID,
SessionID: sessionID,
TicketID: ticketID,
Message: message,
Files: files,
CreatedAt: time.Now(),
},
}); err != nil {
d.logger.Emit(ErrorInsert{
Err: err,
@ -133,7 +146,7 @@ func (d *DAL) CreateTicket(
return "", err
}
return tiketID, nil
return ticketID, nil
}
func (d *DAL) GetTickets4Sess(ctx context.Context, sessID string) ([]model.Ticket, error) {
@ -151,3 +164,234 @@ func (d *DAL) GetTickets4Sess(ctx context.Context, sessID string) ([]model.Ticke
return result, nil
}
func (d *DAL) GetTicket4Sess(
ctx context.Context,
ticketID,
sessID string) (*model.Ticket, error) {
var result model.Ticket
if err := d.colTck.FindOne(ctx, bson.M{
"_id": ticketID,
"SessionID": sessID,
}).Decode(&result); err != nil {
return nil, err
}
return &result, nil
}
func (d *DAL) GetTicket4User(
ctx context.Context,
ticketID,
userID string) (*model.Ticket, error) {
var result model.Ticket
if err := d.colTck.FindOne(ctx, bson.M{
"_id": ticketID,
"UserID": userID,
}).Decode(&result); err != nil {
return nil, err
}
return &result, nil
}
func (d *DAL) WatchTickets(
ctx context.Context,
userID string,
yield func(ticket model.Ticket) error) error {
operationTypes := []bson.D{{{"operationType", "insert"}},
{{"operationType", "update"}}}
matchStage := bson.D{
{
"$and", bson.D{
{"$or", operationTypes},
{"fullDocument.UserID", userID},
}},
}
matchPipeline := bson.D{
{"$match", matchStage},
}
cs, err := d.colTck.Watch(ctx, matchPipeline,
options.ChangeStream().SetFullDocument(options.Default))
if err != nil {
return err
}
var piece model.Ticket
for cs.Next(ctx) {
if err := cs.Decode(&piece); err != nil {
return err
}
if err := yield(piece); err != nil {
return err
}
}
return nil
}
func (d *DAL) YieldUserTickets(
ctx context.Context,
userID string,
yield func(ticket model.Ticket) error) error {
cursor, err := d.colTck.Find(ctx, bson.M{
"UserID": userID,
})
if err != nil {
return err
}
var piece model.Ticket
for cursor.Next(ctx) {
if err := cursor.Decode(&piece); err != nil {
return err
}
if err := yield(piece); err != nil {
return err
}
}
return nil
}
func (d *DAL) WatchActiveTickets(ctx context.Context, yield func(ticket model.Ticket) error) error {
operationTypes := []bson.D{{{"operationType", "insert"}},
{{"operationType", "update"}}}
matchStage := bson.D{{"$and", []bson.D{
{
{"$or", operationTypes},
},
{
{
"fullDocument.State", bson.D{{
"$ne", model.StateClose,
}},
},
},
}}}
matchPipeline := bson.D{
{"$match", matchStage},
}
cs, err := d.colTck.Watch(ctx, matchPipeline,
options.ChangeStream().SetFullDocument(options.Default))
if err != nil {
return err
}
var piece model.Ticket
for cs.Next(ctx) {
if err := cs.Decode(&piece); err != nil {
return err
}
if err := yield(piece); err != nil {
return err
}
}
return nil
}
func (d *DAL) UpdateTopMessage(ctx context.Context, ticketID string, msg *model.Message) error {
if err := d.colTck.FindOneAndUpdate(ctx, bson.M{
"_id": ticketID,
}, bson.M{
"$set": bson.M{
"UpdatedAt": time.Now(),
"TopMessage": msg,
},
}).Decode(&model.Ticket{}); err != nil {
return err
}
return nil
}
func (d *DAL) YieldMessages(
ctx context.Context,
ticketID string,
yield func(ticket model.Message) error) error {
cursor, err := d.colTck.Find(ctx, bson.M{
"TicketID": ticketID,
})
if err != nil {
return err
}
var piece model.Message
for cursor.Next(ctx) {
if err := cursor.Decode(&piece); err != nil {
return err
}
if err := yield(piece); err != nil {
return err
}
}
return nil
}
func (d *DAL) WatchMessages(
ctx context.Context, ticketID string, yield func(ticket model.Message) error) error {
operationTypes := []bson.D{{{"operationType", "insert"}},
{{"operationType", "update"}}}
matchStage := bson.D{{"$and", []bson.D{
{
{"$or", operationTypes},
},
{
{
"fullDocument.TicketID", ticketID,
},
},
}}}
matchPipeline := bson.D{
{"$match", matchStage},
}
cs, err := d.colMsg.Watch(ctx, matchPipeline,
options.ChangeStream().SetFullDocument(options.Default))
if err != nil {
return err
}
var piece model.Message
for cs.Next(ctx) {
if err := cs.Decode(&piece); err != nil {
return err
}
if err := yield(piece); err != nil {
return err
}
}
return nil
}
func (d *DAL) SetShown(ctx context.Context, messageID, userID string) error {
if err := d.colMsg.FindOneAndUpdate(ctx, bson.M{
"_id": messageID,
}, bson.M{
"$set": bson.M{
userID: 1,
},
}).Decode(&model.Message{}); err != nil {
return err
}
return nil
}

2
go.mod

@ -3,7 +3,7 @@ module bitbucket.org/skeris/heruvym
go 1.16
require (
bitbucket.org/skeris/profile v0.0.0-20210411000338-496c859828a5 // indirect
bitbucket.org/skeris/profile v0.0.0 // indirect
github.com/BlackBroker/trashlog v0.0.0-20210406151703-e2c4874359bf
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/gorilla/mux v1.8.0

13
go.sum

@ -4,6 +4,8 @@ bitbucket.org/skeris/profile v0.0.0-20210410222825-ad4fbe380f68 h1:OquUpSs9fjMRM
bitbucket.org/skeris/profile v0.0.0-20210410222825-ad4fbe380f68/go.mod h1:nFz19Zl7b9CNqFXWDdvJKrFS7S/th4dX3GnP5F0Yu5M=
bitbucket.org/skeris/profile v0.0.0-20210411000338-496c859828a5 h1:nAtrodBJM8auGMZB4IrLFmdM/VpDsXjycC+e53SFNJ0=
bitbucket.org/skeris/profile v0.0.0-20210411000338-496c859828a5/go.mod h1:nFz19Zl7b9CNqFXWDdvJKrFS7S/th4dX3GnP5F0Yu5M=
bitbucket.org/skeris/profile v0.0.0 h1:j0AkZA0DYjMQHRLsUg3GkS9S+J7jZf0jUQcGKOL26aE=
bitbucket.org/skeris/profile v0.0.0/go.mod h1:WNa+zUEegYaFLXtPR79qIyJ08zJ4M1C/OvfUs3MmEGg=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
@ -156,6 +158,7 @@ github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
@ -173,6 +176,7 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGw
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
@ -183,6 +187,7 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/klauspost/compress v1.9.5 h1:U+CaK85mrNNb4k8BNOfgJtJ/gr6kswUCFj6miSzVC6M=
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s=
github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@ -198,13 +203,19 @@ github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/minio/md5-simd v1.1.0 h1:QPfiOqlZH+Cj9teu0t9b1nTBfPbyTl16Of5MeuShdK4=
github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
github.com/minio/minio-go/v7 v7.0.10 h1:1oUKe4EOPUEhw2qnPQaPsJ0lmVTYLFu03SiItauXs94=
github.com/minio/minio-go/v7 v7.0.10/go.mod h1:td4gW1ldOsj1PbSNS+WYK43j+P1XVhX/8W8awaYlBFo=
github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU=
github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8=
@ -321,6 +332,7 @@ golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EH
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/image v0.0.0-20210220032944-ac19c3e999fb/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@ -578,6 +590,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/ini.v1 v1.57.0 h1:9unxIsFcTt4I55uWluz+UmL95q4kdJ0buvQ1ZIqVQww=
gopkg.in/ini.v1 v1.57.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=

@ -1,8 +1,8 @@
package http_middleware
import (
"bitbucket.org/BlackBroker/heruvym/jwt_adapter"
"bitbucket.org/BlackBroker/heruvym/middleware/hijack"
"bitbucket.org/skeris/heruvym/jwt_adapter"
"bitbucket.org/skeris/heruvym/middleware/hijack"
"context"
"fmt"
"github.com/skeris/authService/errors"

@ -3,23 +3,35 @@ package model
import "time"
type Message struct {
ID string `bson:"_id"`
TicketID string `bson:"TicketID"`
ID string `bson:"_id"`
TicketID string `bson:"TicketID"`
UserID string `bson:"UserID"`
SessionID string `bson:"SessionID"`
Message string `bson:"Messsage"`
Files []string `bson:"Files"`
Message string `bson:"Messsage"`
Files []string `bson:"Files"`
Shown map[string]int `bson:"Shown"`
CreatedAt time.Time `bson:"CreatedAt"`
}
const (
StateClose = "close"
StateOpen = "open"
StateWait = "wait"
StateNeedAnswer = "answer"
)
type Ticket struct {
ID string `bson:"_id"`
UserID string `bson:"UserID"`
SessionID string `bson:"SessionID"`
State string `bson:"State"`
TopMessage Message `bson:"TopMessage"`
Title string `bson:"Title"`
CreatedAt time.Time `bson:"CreatedAt"`
UpdatedAt time.Time `bson:"UpdatedAt"`
}

@ -1,9 +1,10 @@
package service
import (
"bitbucket.org/BlackBroker/heruvym/dal"
"bitbucket.org/BlackBroker/heruvym/jwt_adapter"
"bitbucket.org/BlackBroker/heruvym/tools"
"bitbucket.org/skeris/heruvym/dal"
"bitbucket.org/skeris/heruvym/jwt_adapter"
"bitbucket.org/skeris/heruvym/model"
"bitbucket.org/skeris/heruvym/tools"
rAL "bitbucket.org/skeris/profile/dal"
"context"
"encoding/json"
@ -29,6 +30,9 @@ func New(dataAccessLayer *dal.DAL, ral rAL.LayerMongoDb, log hlog.Logger) *Heruv
func (h Heruvym) Register(m *http.ServeMux) *http.ServeMux {
m.HandleFunc("/create", h.CreateTicket)
m.HandleFunc("/subscribe", tools.SseWrapper(h.GetList))
m.HandleFunc("/ticket", tools.SseWrapper(h.Subscribe))
m.HandleFunc("/send", tools.HandlerWrapper(h.PutMessage))
return m
}
@ -82,13 +86,15 @@ func (h *Heruvym) CreateTicket(w http.ResponseWriter, r *http.Request) {
session.User,
session.ID,
request.Title,
request.Message,
[]string{},
)
if err != nil {
http.Error(w, "CannotCreateTicket", http.StatusInternalServerError)
return
}
if err := h.dal.PutMessage(ctx,
if _, err := h.dal.PutMessage(ctx,
request.Message,
session.User,
session.Session,
@ -116,7 +122,7 @@ func (h *Heruvym) CreateTicket(w http.ResponseWriter, r *http.Request) {
}
}
var _ tools.DataEmitter = Heruvym{}.GetList
var _ tools.DataEmitter = (&Heruvym{}).GetList
func (h *Heruvym) GetList(ctx context.Context) chan interface{} {
sess := jwt_adapter.Get(ctx)
@ -126,12 +132,56 @@ func (h *Heruvym) GetList(ctx context.Context) chan interface{} {
if sess.User == "" {
go h.unauthorizedTickets(ctx, sess.Session, output)
} else {
role, err := h.ral.GetProfileRole(ctx, sess.User)
if err != nil {
go h.hasNoRole(output)
}
if role == "admin" || role == "manager" {
go h.allTickets(ctx, output)
} else {
go h.userTickets(ctx, sess.User, output)
}
}
return output
}
func (h *Heruvym) allTickets(ctx context.Context, output chan interface{}) {
defer close(output)
if err := h.dal.WatchActiveTickets(ctx, func(ticket model.Ticket) error {
output <- ticket
return nil
}); err != nil {
output <- errors.New("cannot watch all tickets")
}
}
func (h *Heruvym) userTickets(ctx context.Context, userID string, output chan interface{}) {
defer close(output)
if err := h.dal.YieldUserTickets(ctx, userID, func(ticket model.Ticket) error {
output <- ticket
return nil
}); err != nil {
output <- errors.New("cannot get tickets")
}
if err := h.dal.WatchTickets(ctx, userID, func(ticket model.Ticket) error {
output <- ticket
return nil
}); err != nil {
output <- errors.New("cannot watch tickets")
}
}
func (h *Heruvym) hasNoRole(output chan interface{}) {
defer close(output)
output <- errors.New("no role in profile")
}
func (h *Heruvym) unauthorizedTickets(ctx context.Context, sess string, output chan interface{}) {
defer close(output)
@ -144,3 +194,162 @@ func (h *Heruvym) unauthorizedTickets(ctx context.Context, sess string, output c
output <- ticket
}
}
type ReqPutMessage struct {
Message string `json:"message"`
TicketID string `json:"ticket"`
Files []string `json:"files"`
Lang string `json:"lang"`
}
func (h *Heruvym) PutMessage(
ctx context.Context,
request ReqPutMessage,
) (interface{}, int) {
sess := jwt_adapter.Get(ctx)
message, err := h.dal.PutMessage(
ctx,
request.Message,
sess.User,
sess.Session,
request.TicketID,
request.Files,
)
if err != nil {
return errors.New("can not put message"), http.StatusInternalServerError
}
if err := h.dal.UpdateTopMessage(ctx, request.TicketID, message); err != nil {
return errors.New("can not update ticket"), http.StatusInternalServerError
}
return nil, http.StatusOK
}
var _ tools.DataEmitter = (&Heruvym{}).Subscribe
func (h *Heruvym) Subscribe(ctx context.Context) chan interface{} {
sess := jwt_adapter.Get(ctx)
url := ctx.Value(tools.ContextURLKey).([]string)
ticketID := url[len(url)-1]
output := make(chan interface{})
if sess.User == "" {
go func() {
ticket, err := h.dal.GetTicket4Sess(ctx, ticketID, sess.Session)
if err != nil || ticket == nil {
output <- errors.New("no tickets 4 session")
return
}
if err := h.dal.YieldMessages(ctx, ticketID, func(message model.Message) error {
output <- message
if err := h.dal.SetShown(ctx, message.ID, sess.Session); err != nil {
output <- errors.New("cannot read message")
return err
}
return nil
}); err != nil {
output <- errors.New("cannot read messages")
}
if err := h.dal.WatchMessages(ctx, ticketID,
func(message model.Message) error {
output <- message
if err := h.dal.SetShown(ctx, message.ID, sess.Session); err != nil {
output <- errors.New("cannot read message")
return err
}
return nil
}); err != nil {
output <- errors.New("cannot read messages")
}
}()
} else {
role, err := h.ral.GetProfileRole(ctx, sess.User)
if err != nil {
go h.hasNoRole(output)
}
if role == "admin" || role == "manager" {
go func() {
if err := h.dal.YieldMessages(ctx, ticketID, func(message model.Message) error {
output <- message
if err := h.dal.SetShown(ctx, message.ID, sess.User); err != nil {
output <- errors.New("cannot read message")
return err
}
return nil
}); err != nil {
output <- errors.New("cannot read messages")
}
if err := h.dal.WatchMessages(ctx, ticketID,
func(message model.Message) error {
output <- message
if err := h.dal.SetShown(ctx, message.ID, sess.Session); err != nil {
output <- errors.New("cannot read message")
return err
}
return nil
}); err != nil {
output <- errors.New("cannot read messages")
}
}()
} else {
go func() {
ticket, err := h.dal.GetTicket4User(ctx, ticketID, sess.User)
if err != nil || ticket == nil {
output <- errors.New("no tickets 4 user")
return
}
if err := h.dal.YieldMessages(ctx, ticketID, func(message model.Message) error {
output <- message
if err := h.dal.SetShown(ctx, message.ID, sess.User); err != nil {
output <- errors.New("cannot read message")
return err
}
return nil
}); err != nil {
output <- errors.New("cannot read messages")
}
if err := h.dal.WatchMessages(ctx, ticketID,
func(message model.Message) error {
output <- message
if err := h.dal.SetShown(ctx, message.ID, sess.Session); err != nil {
output <- errors.New("cannot read message")
return err
}
return nil
}); err != nil {
output <- errors.New("cannot read messages")
}
}()
}
}
return output
}
func (h *Heruvym) handleOwnMessages(output chan interface{}) {
defer close(output)
}

@ -1,8 +1,8 @@
package test
import (
"bitbucket.org/BlackBroker/heruvym/app"
"bitbucket.org/BlackBroker/heruvym/service"
"bitbucket.org/skeris/heruvym/app"
"bitbucket.org/skeris/heruvym/service"
"bitbucket.org/skeris/profile/jwt_adapter"
"bytes"
"context"

@ -5,50 +5,60 @@ import (
"encoding/json"
"fmt"
"net/http"
"reflect"
"strings"
"time"
)
type DataEmitter func(ctx context.Context) chan interface{}
func SseWrapper(w http.ResponseWriter, r *http.Request, emitter DataEmitter) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(
w,
"flushing is not allowed",
http.StatusFailedDependency,
)
return
}
const ContextURLKey = "url"
ctx := r.Context()
dE := emitter(ctx)
for {
select {
case <-ctx.Done():
func SseWrapper(emitter DataEmitter) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request,) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(
w,
"flushing is not allowed",
http.StatusFailedDependency,
)
return
case <-time.After(time.Second * 10):
SSEPing(w)
case m := <-dE:
line, err := json.Marshal(m)
if err != nil {
panic(err)
}
}
if _, err := w.Write(
[]byte(
fmt.Sprintf(
"data: %s\n\n",
string(line),
ctx := context.WithValue(
r.Context(),
ContextURLKey,
strings.Split(r.URL.Path, "/"),
)
dE := emitter(ctx)
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second * 10):
SSEPing(w)
case m := <-dE:
line, err := json.Marshal(m)
if err != nil {
panic(err)
}
if _, err := w.Write(
[]byte(
fmt.Sprintf(
"data: %s\n\n",
string(line),
),
),
),
); err != nil {
panic(err)
}
); err != nil {
panic(err)
}
flusher.Flush()
flusher.Flush()
}
}
}
}
@ -65,3 +75,39 @@ func SSEPing(w http.ResponseWriter) {
panic("streaming unsupported")
}
}
func HandlerWrapper(f interface{}) http.HandlerFunc {
fRV := reflect.ValueOf(f)
fRT := fRV.Type()
var argRT reflect.Type
var argTypeRV reflect.Value
var argExist bool
if fRT.NumIn() > 1 {
argExist = true
argRT = fRT.In(1)
argTypeRV = reflect.New(argRT)
}
return func(w http.ResponseWriter, q *http.Request) {
q.Header.Set("Content-Type", "application/json")
var fResultRV []reflect.Value
if argExist {
arg := argTypeRV.Interface()
if err := json.NewDecoder(q.Body).Decode(&arg); err != nil {
http.Error(w, "invalid request data", http.StatusBadRequest)
} else {
fResultRV = fRV.Call([]reflect.Value{reflect.ValueOf(q.Context()), reflect.ValueOf(arg).Elem()})
}
} else {
fResultRV = fRV.Call([]reflect.Value{reflect.ValueOf(q.Context())})
}
w.WriteHeader(fResultRV[1].Interface().(int))
if err := json.NewEncoder(w).Encode(fResultRV[0].Interface()); err != nil {
panic(err)
}
}
}