added sse interface

This commit is contained in:
pasha1coil 2025-06-25 17:06:25 +03:00
parent 0ad3e3ded3
commit 5c67c486af
7 changed files with 381 additions and 5 deletions

17
go.mod

@ -17,7 +17,7 @@ require (
github.com/labstack/echo/v4 v4.11.4
github.com/pioz/faker v1.7.3
github.com/sethvargo/go-envconfig v1.0.0
github.com/stretchr/testify v1.8.4
github.com/stretchr/testify v1.10.0
github.com/themakers/hlog v0.0.0-20191205140925-235e0e4baddf
github.com/twmb/franz-go v1.16.1
github.com/twmb/franz-go/pkg/kadm v1.11.0
@ -32,11 +32,16 @@ require (
require (
github.com/ClickHouse/clickhouse-go v1.5.4 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/golang-queue/queue v0.4.0 // indirect
github.com/golang-queue/redisdb v0.4.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/labstack/gommon v0.4.2 // indirect
@ -47,6 +52,7 @@ require (
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/redis/go-redis/v9 v9.7.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/rs/xid v1.5.0 // indirect
@ -59,14 +65,15 @@ require (
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/yassinebenaid/godump v0.11.1 // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
go.etcd.io/bbolt v1.3.10 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect

19
go.sum

@ -24,6 +24,8 @@ github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwj
github.com/caarlos0/env/v8 v8.0.0 h1:POhxHhSpuxrLMIdvTGARuZqR4Jjm8AYmoi/JKlcScs0=
github.com/caarlos0/env/v8 v8.0.0/go.mod h1:7K4wMY9bH0esiXSSHlfHLX5xKGQMnkH5Fk4TDSSSzfo=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg=
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80=
@ -32,6 +34,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
@ -52,6 +56,10 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw=
github.com/golang-jwt/jwt/v5 v5.2.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang-queue/queue v0.4.0 h1:vsOvW4Wqb7Ow5+tKnlZD0PbLf4MLEO1e5C7DV8BDfBg=
github.com/golang-queue/queue v0.4.0/go.mod h1:bZobuNN7gnumxi9LRGihr7y7quDeBZZAvfPcC+H5dzg=
github.com/golang-queue/redisdb v0.4.0 h1:o8CQsilorWQ/JurNAUAAMnbJGb/Oj/2m7Kno1z/oqi0=
github.com/golang-queue/redisdb v0.4.0/go.mod h1:kbQ390Yh0BmiOoSvaMj2fpr9ueL6HS1Ti9UISMB4PwE=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@ -72,6 +80,8 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vb
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
@ -120,6 +130,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
@ -144,6 +156,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/themakers/bdd v0.0.0-20210316111417-6b1dfe326f33 h1:N9f/Q+2Ssa+yDcbfaoLTYvXmdeyUUxsJKdPUVsjSmiA=
github.com/themakers/bdd v0.0.0-20210316111417-6b1dfe326f33/go.mod h1:rpcH99JknBh8seZmlOlUg51gasZH6QH34oXNsIwYT6E=
github.com/themakers/hlog v0.0.0-20191205140925-235e0e4baddf h1:TJJm6KcBssmbWzplF5lzixXl1RBAi/ViPs1GaSOkhwo=
@ -168,6 +181,8 @@ github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/yassinebenaid/godump v0.11.1 h1:SPujx/XaYqGDfmNh7JI3dOyCUVrG0bG2duhO3Eh2EhI=
github.com/yassinebenaid/godump v0.11.1/go.mod h1:dc/0w8wmg6kVIvNGAzbKH1Oa54dXQx8SNKh4dPRyW44=
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a h1:fZHgsYlfvtyqToslyjUt3VOPF4J7aK/3MPcK7xp3PDk=
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a/go.mod h1:ul22v+Nro/R083muKhosV54bj5niojjWZvU8xrevuH4=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@ -200,6 +215,7 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
@ -235,6 +251,7 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -252,6 +269,7 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
@ -266,6 +284,7 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=

@ -0,0 +1,62 @@
package sse
import (
"context"
"encoding/json"
"fmt"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/redisdb"
)
type SSEConsumer struct {
dispatcher *ConnectionDispatcher
queue *queue.Queue
}
func NewSSEConsumer(redisAddr string, dispatcher *ConnectionDispatcher) (*SSEConsumer, error) {
w := redisdb.NewWorker(
redisdb.WithAddr(redisAddr),
redisdb.WithChannel(RedisKeySSEEvents),
redisdb.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
var sseMessage SSEEvent
if err := json.Unmarshal(m.Payload(), &sseMessage); err != nil {
return fmt.Errorf("failed to unmarshal SSE message: %w", err)
}
event := SSEEvent{
UserID: sseMessage.UserID,
EventType: sseMessage.EventType,
Data: sseMessage.Data,
Timestamp: sseMessage.Timestamp,
}
if err := dispatcher.SendToUser(event.UserID, event); err != nil {
return err
}
return nil
}),
)
q, err := queue.NewQueue(
queue.WithWorkerCount(10), // 10 воркеров
queue.WithWorker(w),
)
if err != nil {
return nil, err
}
return &SSEConsumer{
dispatcher: dispatcher,
queue: q,
}, nil
}
func (r *SSEConsumer) Start() {
r.queue.Start()
}
func (r *SSEConsumer) Stop() {
r.queue.Release()
}

@ -0,0 +1,81 @@
package sse
import (
"fmt"
)
type ConnectionDispatcher struct {
buffer *SSEConnectionRingBuffer
}
// bufferSize размер кольца минимум 2
func NewConnectionDispatcher(bufferSize int) *ConnectionDispatcher {
return &ConnectionDispatcher{
buffer: NewSSEConnectionRingBuffer(bufferSize),
}
}
func (r *ConnectionDispatcher) AddConnection(userID, deviceID string) *SSEConnection {
conn := NewSSEConnection(userID, deviceID)
currentMap := r.buffer.GetCurrent() // тут текущие данные откуда читают воркеры
nextMap := r.buffer.GetNext() // следующая поидее должна быть пустой
for key, value := range currentMap {
nextMap[key] = value
}
// покомментарию Миши копируем слайсы отдельно
existingConnections := nextMap[userID]
newConnections := make([]*SSEConnection, len(existingConnections)+1)
copy(newConnections, existingConnections)
newConnections[len(existingConnections)] = conn
nextMap[userID] = newConnections
r.buffer.Rotate()
return conn
}
func (r *ConnectionDispatcher) RemoveConnection(userID, deviceID string) {
currentMap := r.buffer.GetCurrent() // тут текущие данные откуда читают воркеры
nextMap := r.buffer.GetNext() // следующая на которую переключимся
for key, value := range currentMap {
nextMap[key] = value
}
existingConnections := nextMap[userID]
afterDropConnections := make([]*SSEConnection, 0, len(existingConnections))
for _, conn := range existingConnections {
if conn.DeviceID != deviceID {
afterDropConnections = append(afterDropConnections, conn)
} else {
close(conn.Channel)
}
}
// если есть коннекты то складываем оставшиеся после удаления, если == 0 то дропаем этого пользователя из мэпки
if len(afterDropConnections) > 0 {
nextMap[userID] = afterDropConnections
} else {
delete(nextMap, userID)
}
r.buffer.Rotate()
}
func (r *ConnectionDispatcher) SendToUser(userID string, event SSEEvent) error {
currentMap := r.buffer.GetCurrent()
connections := currentMap[userID]
if len(connections) == 0 {
return fmt.Errorf("no active connections for user %s", userID)
}
for _, conn := range connections {
conn.Channel <- event
}
return nil
}

@ -0,0 +1,77 @@
package sse
import (
"context"
"encoding/json"
"time"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/redisdb"
)
type SSEProducer struct {
queue *queue.Queue
}
type SSEMessageTask struct {
Message SSEEvent
}
// требует - github.com/golang-queue/queue
func (r *SSEMessageTask) Bytes() []byte {
b, _ := json.Marshal(r.Message)
return b
}
func NewSSEProducer(redisAddr string) (*SSEProducer, error) {
worker := redisdb.NewWorker(
redisdb.WithAddr(redisAddr),
redisdb.WithChannel(RedisKeySSEEvents),
redisdb.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
return nil
}),
)
queue, err := queue.NewQueue(
queue.WithWorkerCount(1), // количество воркеров
queue.WithWorker(worker),
)
if err != nil {
return nil, err
}
return &SSEProducer{
queue: queue,
}, nil
}
// есть ли смысл на мульти отправку? c userid []slice по типу события
func (r *SSEProducer) SendEvent(userID, eventType string, data interface{}) error {
dataJSON, err := json.Marshal(data)
if err != nil {
return err
}
message := SSEEvent{
UserID: userID,
EventType: eventType,
Data: dataJSON,
Timestamp: time.Now(),
}
task := &SSEMessageTask{
Message: message,
}
err = r.queue.Queue(task)
if err != nil {
return err
}
return nil
}
func (r *SSEProducer) Stop() {
r.queue.Release()
}

@ -0,0 +1,56 @@
package sse
import (
"fmt"
"github.com/gofiber/fiber/v2"
"go.uber.org/zap"
)
type SSEService struct {
dispatcher *ConnectionDispatcher
producer *SSEProducer
consumer *SSEConsumer
logger *zap.Logger
}
func NewSSEService(redisAddr string, bufferSize int, logger *zap.Logger) (*SSEService, error) {
dispatcher := NewConnectionDispatcher(bufferSize)
producer, err := NewSSEProducer(redisAddr)
if err != nil {
return nil, fmt.Errorf("failed to create producer: %w", err)
}
consumer, err := NewSSEConsumer(redisAddr, dispatcher)
if err != nil {
return nil, fmt.Errorf("failed to create consumer: %w", err)
}
service := &SSEService{
dispatcher: dispatcher,
producer: producer,
consumer: consumer,
logger: logger,
}
return service, nil
}
func (s *SSEService) Start() {
s.consumer.Start()
}
func (s *SSEService) Stop() {
s.consumer.Stop()
s.producer.Stop()
}
// для отправителей - допустим вотчер монги исал бы сюда
func (s *SSEService) SendEvent(userID, eventType string, data interface{}) error {
return s.producer.SendEvent(userID, eventType, data)
}
// todo для тех кто потребляет контент
func (s *SSEService) HandleSSEConnection(ctx *fiber.Ctx, userID, deviceID string) error {
return nil
}

@ -0,0 +1,74 @@
package sse
import (
"container/ring"
"encoding/json"
"time"
"github.com/google/uuid"
)
const (
RedisKeySSEEvents = "sse_events"
)
type SSEEvent struct {
UserID string
EventType string
Data json.RawMessage
Timestamp time.Time
}
type SSEConnection struct {
ID string
UserID string
DeviceID string
Channel chan SSEEvent
}
func NewSSEConnection(userID, deviceID string) *SSEConnection {
return &SSEConnection{
ID: uuid.New().String(),
UserID: userID,
DeviceID: deviceID,
Channel: make(chan SSEEvent, 100),
}
}
// todo Миша: прост довольно много мест, где можно ошибиться.
// например я вот вижу вариант, что нам в мэпе нужно копировать не deep, а чисто саму мэпу.
//пусть ключи указывают на те же значения, это не страшно. но я же могу в этом вопросе сильно ошибиться.
//мало ли, вдруг переприсваивание мэпы по факту сохранит ссылку на массив бакетов?
//ну т.е. если мы присвоим куда то уже существующую мэпу, мы будем редактировать изначальную
//и нужен какой то метод и глубокого и неглубокого копирования одновременно. копирование на пол-шишечки
type SSEConnectionMap map[string][]*SSEConnection
type SSEConnectionRingBuffer struct {
ring *ring.Ring
}
func NewSSEConnectionRingBuffer(size int) *SSEConnectionRingBuffer {
return &SSEConnectionRingBuffer{
ring: ring.New(size),
}
}
func (r *SSEConnectionRingBuffer) GetCurrent() SSEConnectionMap {
if r.ring.Value == nil {
return make(SSEConnectionMap)
}
return r.ring.Value.(SSEConnectionMap)
}
func (r *SSEConnectionRingBuffer) Rotate() {
r.ring = r.ring.Next()
}
func (r *SSEConnectionRingBuffer) GetNext() SSEConnectionMap {
next := r.ring.Next()
if next.Value == nil {
next.Value = make(SSEConnectionMap)
}
return next.Value.(SSEConnectionMap)
}