From 5c67c486af1f30caca239d6a6427c2cb4196d750 Mon Sep 17 00:00:00 2001 From: pasha1coil Date: Wed, 25 Jun 2025 17:06:25 +0300 Subject: [PATCH] added sse interface --- go.mod | 17 ++++-- go.sum | 19 +++++++ internal/interface/sse/consumer.go | 62 +++++++++++++++++++++ internal/interface/sse/dispatcher.go | 81 ++++++++++++++++++++++++++++ internal/interface/sse/producer.go | 77 ++++++++++++++++++++++++++ internal/interface/sse/service.go | 56 +++++++++++++++++++ internal/interface/sse/sse_model.go | 74 +++++++++++++++++++++++++ 7 files changed, 381 insertions(+), 5 deletions(-) create mode 100644 internal/interface/sse/consumer.go create mode 100644 internal/interface/sse/dispatcher.go create mode 100644 internal/interface/sse/producer.go create mode 100644 internal/interface/sse/service.go create mode 100644 internal/interface/sse/sse_model.go diff --git a/go.mod b/go.mod index 17f60aa..447f608 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/labstack/echo/v4 v4.11.4 github.com/pioz/faker v1.7.3 github.com/sethvargo/go-envconfig v1.0.0 - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.10.0 github.com/themakers/hlog v0.0.0-20191205140925-235e0e4baddf github.com/twmb/franz-go v1.16.1 github.com/twmb/franz-go/pkg/kadm v1.11.0 @@ -32,11 +32,16 @@ require ( require ( github.com/ClickHouse/clickhouse-go v1.5.4 // indirect github.com/andybalholm/brotli v1.1.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/golang-queue/queue v0.4.0 // indirect + github.com/golang-queue/redisdb v0.4.0 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/google/uuid v1.6.0 // indirect + github.com/jpillora/backoff v1.0.0 // indirect github.com/klauspost/compress v1.17.7 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/labstack/gommon v0.4.2 // indirect @@ -47,6 +52,7 @@ require ( github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/redis/go-redis/v9 v9.7.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/rs/xid v1.5.0 // indirect @@ -59,14 +65,15 @@ require ( github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/yassinebenaid/godump v0.11.1 // indirect github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect go.etcd.io/bbolt v1.3.10 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.25.0 // indirect + golang.org/x/crypto v0.31.0 // indirect golang.org/x/net v0.27.0 // indirect - golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.22.0 // indirect - golang.org/x/text v0.16.0 // indirect + golang.org/x/sync v0.10.0 // indirect + golang.org/x/sys v0.29.0 // indirect + golang.org/x/text v0.21.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index c864c99..ca6c0fc 100644 --- a/go.sum +++ b/go.sum @@ -24,6 +24,8 @@ github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwj github.com/caarlos0/env/v8 v8.0.0 h1:POhxHhSpuxrLMIdvTGARuZqR4Jjm8AYmoi/JKlcScs0= github.com/caarlos0/env/v8 v8.0.0/go.mod h1:7K4wMY9bH0esiXSSHlfHLX5xKGQMnkH5Fk4TDSSSzfo= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg= github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80= @@ -32,6 +34,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -52,6 +56,10 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw= github.com/golang-jwt/jwt/v5 v5.2.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-queue/queue v0.4.0 h1:vsOvW4Wqb7Ow5+tKnlZD0PbLf4MLEO1e5C7DV8BDfBg= +github.com/golang-queue/queue v0.4.0/go.mod h1:bZobuNN7gnumxi9LRGihr7y7quDeBZZAvfPcC+H5dzg= +github.com/golang-queue/redisdb v0.4.0 h1:o8CQsilorWQ/JurNAUAAMnbJGb/Oj/2m7Kno1z/oqi0= +github.com/golang-queue/redisdb v0.4.0/go.mod h1:kbQ390Yh0BmiOoSvaMj2fpr9ueL6HS1Ti9UISMB4PwE= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -72,6 +80,8 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vb github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= @@ -120,6 +130,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= +github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= @@ -144,6 +156,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/themakers/bdd v0.0.0-20210316111417-6b1dfe326f33 h1:N9f/Q+2Ssa+yDcbfaoLTYvXmdeyUUxsJKdPUVsjSmiA= github.com/themakers/bdd v0.0.0-20210316111417-6b1dfe326f33/go.mod h1:rpcH99JknBh8seZmlOlUg51gasZH6QH34oXNsIwYT6E= github.com/themakers/hlog v0.0.0-20191205140925-235e0e4baddf h1:TJJm6KcBssmbWzplF5lzixXl1RBAi/ViPs1GaSOkhwo= @@ -168,6 +181,8 @@ github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/yassinebenaid/godump v0.11.1 h1:SPujx/XaYqGDfmNh7JI3dOyCUVrG0bG2duhO3Eh2EhI= +github.com/yassinebenaid/godump v0.11.1/go.mod h1:dc/0w8wmg6kVIvNGAzbKH1Oa54dXQx8SNKh4dPRyW44= github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a h1:fZHgsYlfvtyqToslyjUt3VOPF4J7aK/3MPcK7xp3PDk= github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a/go.mod h1:ul22v+Nro/R083muKhosV54bj5niojjWZvU8xrevuH4= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -200,6 +215,7 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -235,6 +251,7 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -252,6 +269,7 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -266,6 +284,7 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= diff --git a/internal/interface/sse/consumer.go b/internal/interface/sse/consumer.go new file mode 100644 index 0000000..09ed8d4 --- /dev/null +++ b/internal/interface/sse/consumer.go @@ -0,0 +1,62 @@ +package sse + +import ( + "context" + "encoding/json" + "fmt" + "github.com/golang-queue/queue" + "github.com/golang-queue/queue/core" + "github.com/golang-queue/redisdb" +) + +type SSEConsumer struct { + dispatcher *ConnectionDispatcher + queue *queue.Queue +} + +func NewSSEConsumer(redisAddr string, dispatcher *ConnectionDispatcher) (*SSEConsumer, error) { + w := redisdb.NewWorker( + redisdb.WithAddr(redisAddr), + redisdb.WithChannel(RedisKeySSEEvents), + redisdb.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error { + var sseMessage SSEEvent + if err := json.Unmarshal(m.Payload(), &sseMessage); err != nil { + return fmt.Errorf("failed to unmarshal SSE message: %w", err) + } + + event := SSEEvent{ + UserID: sseMessage.UserID, + EventType: sseMessage.EventType, + Data: sseMessage.Data, + Timestamp: sseMessage.Timestamp, + } + + if err := dispatcher.SendToUser(event.UserID, event); err != nil { + return err + } + + return nil + }), + ) + + q, err := queue.NewQueue( + queue.WithWorkerCount(10), // 10 воркеров + queue.WithWorker(w), + ) + if err != nil { + return nil, err + } + + return &SSEConsumer{ + dispatcher: dispatcher, + queue: q, + }, nil +} + +func (r *SSEConsumer) Start() { + r.queue.Start() +} + +func (r *SSEConsumer) Stop() { + r.queue.Release() +} diff --git a/internal/interface/sse/dispatcher.go b/internal/interface/sse/dispatcher.go new file mode 100644 index 0000000..0e750b5 --- /dev/null +++ b/internal/interface/sse/dispatcher.go @@ -0,0 +1,81 @@ +package sse + +import ( + "fmt" +) + +type ConnectionDispatcher struct { + buffer *SSEConnectionRingBuffer +} + +// bufferSize размер кольца минимум 2 +func NewConnectionDispatcher(bufferSize int) *ConnectionDispatcher { + return &ConnectionDispatcher{ + buffer: NewSSEConnectionRingBuffer(bufferSize), + } +} + +func (r *ConnectionDispatcher) AddConnection(userID, deviceID string) *SSEConnection { + conn := NewSSEConnection(userID, deviceID) + currentMap := r.buffer.GetCurrent() // тут текущие данные откуда читают воркеры + nextMap := r.buffer.GetNext() // следующая поидее должна быть пустой + + for key, value := range currentMap { + nextMap[key] = value + } + + // покомментарию Миши копируем слайсы отдельно + existingConnections := nextMap[userID] + newConnections := make([]*SSEConnection, len(existingConnections)+1) + copy(newConnections, existingConnections) + newConnections[len(existingConnections)] = conn + nextMap[userID] = newConnections + + r.buffer.Rotate() + + return conn +} + +func (r *ConnectionDispatcher) RemoveConnection(userID, deviceID string) { + currentMap := r.buffer.GetCurrent() // тут текущие данные откуда читают воркеры + nextMap := r.buffer.GetNext() // следующая на которую переключимся + + for key, value := range currentMap { + nextMap[key] = value + } + + existingConnections := nextMap[userID] + afterDropConnections := make([]*SSEConnection, 0, len(existingConnections)) + + for _, conn := range existingConnections { + if conn.DeviceID != deviceID { + afterDropConnections = append(afterDropConnections, conn) + } else { + close(conn.Channel) + } + } + + // если есть коннекты то складываем оставшиеся после удаления, если == 0 то дропаем этого пользователя из мэпки + if len(afterDropConnections) > 0 { + nextMap[userID] = afterDropConnections + } else { + delete(nextMap, userID) + } + + r.buffer.Rotate() +} + +func (r *ConnectionDispatcher) SendToUser(userID string, event SSEEvent) error { + currentMap := r.buffer.GetCurrent() + connections := currentMap[userID] + + if len(connections) == 0 { + return fmt.Errorf("no active connections for user %s", userID) + } + + for _, conn := range connections { + conn.Channel <- event + } + + return nil +} diff --git a/internal/interface/sse/producer.go b/internal/interface/sse/producer.go new file mode 100644 index 0000000..89d49ba --- /dev/null +++ b/internal/interface/sse/producer.go @@ -0,0 +1,77 @@ +package sse + +import ( + "context" + "encoding/json" + "time" + + "github.com/golang-queue/queue" + "github.com/golang-queue/queue/core" + "github.com/golang-queue/redisdb" +) + +type SSEProducer struct { + queue *queue.Queue +} + +type SSEMessageTask struct { + Message SSEEvent +} + +// требует - github.com/golang-queue/queue +func (r *SSEMessageTask) Bytes() []byte { + b, _ := json.Marshal(r.Message) + return b +} + +func NewSSEProducer(redisAddr string) (*SSEProducer, error) { + worker := redisdb.NewWorker( + redisdb.WithAddr(redisAddr), + redisdb.WithChannel(RedisKeySSEEvents), + redisdb.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error { + return nil + }), + ) + + queue, err := queue.NewQueue( + queue.WithWorkerCount(1), // количество воркеров + queue.WithWorker(worker), + ) + if err != nil { + return nil, err + } + + return &SSEProducer{ + queue: queue, + }, nil +} + +// есть ли смысл на мульти отправку? c userid []slice по типу события +func (r *SSEProducer) SendEvent(userID, eventType string, data interface{}) error { + dataJSON, err := json.Marshal(data) + if err != nil { + return err + } + + message := SSEEvent{ + UserID: userID, + EventType: eventType, + Data: dataJSON, + Timestamp: time.Now(), + } + + task := &SSEMessageTask{ + Message: message, + } + + err = r.queue.Queue(task) + if err != nil { + return err + } + + return nil +} + +func (r *SSEProducer) Stop() { + r.queue.Release() +} diff --git a/internal/interface/sse/service.go b/internal/interface/sse/service.go new file mode 100644 index 0000000..06c6414 --- /dev/null +++ b/internal/interface/sse/service.go @@ -0,0 +1,56 @@ +package sse + +import ( + "fmt" + "github.com/gofiber/fiber/v2" + "go.uber.org/zap" +) + +type SSEService struct { + dispatcher *ConnectionDispatcher + producer *SSEProducer + consumer *SSEConsumer + logger *zap.Logger +} + +func NewSSEService(redisAddr string, bufferSize int, logger *zap.Logger) (*SSEService, error) { + dispatcher := NewConnectionDispatcher(bufferSize) + + producer, err := NewSSEProducer(redisAddr) + if err != nil { + return nil, fmt.Errorf("failed to create producer: %w", err) + } + + consumer, err := NewSSEConsumer(redisAddr, dispatcher) + if err != nil { + return nil, fmt.Errorf("failed to create consumer: %w", err) + } + + service := &SSEService{ + dispatcher: dispatcher, + producer: producer, + consumer: consumer, + logger: logger, + } + + return service, nil +} + +func (s *SSEService) Start() { + s.consumer.Start() +} + +func (s *SSEService) Stop() { + s.consumer.Stop() + s.producer.Stop() +} + +// для отправителей - допустим вотчер монги исал бы сюда +func (s *SSEService) SendEvent(userID, eventType string, data interface{}) error { + return s.producer.SendEvent(userID, eventType, data) +} + +// todo для тех кто потребляет контент +func (s *SSEService) HandleSSEConnection(ctx *fiber.Ctx, userID, deviceID string) error { + return nil +} diff --git a/internal/interface/sse/sse_model.go b/internal/interface/sse/sse_model.go new file mode 100644 index 0000000..603bc00 --- /dev/null +++ b/internal/interface/sse/sse_model.go @@ -0,0 +1,74 @@ +package sse + +import ( + "container/ring" + "encoding/json" + "time" + + "github.com/google/uuid" +) + +const ( + RedisKeySSEEvents = "sse_events" +) + +type SSEEvent struct { + UserID string + EventType string + Data json.RawMessage + Timestamp time.Time +} + +type SSEConnection struct { + ID string + UserID string + DeviceID string + Channel chan SSEEvent +} + +func NewSSEConnection(userID, deviceID string) *SSEConnection { + return &SSEConnection{ + ID: uuid.New().String(), + UserID: userID, + DeviceID: deviceID, + Channel: make(chan SSEEvent, 100), + } +} + +// todo Миша: прост довольно много мест, где можно ошибиться. +// например я вот вижу вариант, что нам в мэпе нужно копировать не deep, а чисто саму мэпу. +//пусть ключи указывают на те же значения, это не страшно. но я же могу в этом вопросе сильно ошибиться. +//мало ли, вдруг переприсваивание мэпы по факту сохранит ссылку на массив бакетов? +//ну т.е. если мы присвоим куда то уже существующую мэпу, мы будем редактировать изначальную +//и нужен какой то метод и глубокого и неглубокого копирования одновременно. копирование на пол-шишечки + +type SSEConnectionMap map[string][]*SSEConnection + +type SSEConnectionRingBuffer struct { + ring *ring.Ring +} + +func NewSSEConnectionRingBuffer(size int) *SSEConnectionRingBuffer { + return &SSEConnectionRingBuffer{ + ring: ring.New(size), + } +} + +func (r *SSEConnectionRingBuffer) GetCurrent() SSEConnectionMap { + if r.ring.Value == nil { + return make(SSEConnectionMap) + } + return r.ring.Value.(SSEConnectionMap) +} + +func (r *SSEConnectionRingBuffer) Rotate() { + r.ring = r.ring.Next() +} + +func (r *SSEConnectionRingBuffer) GetNext() SSEConnectionMap { + next := r.ring.Next() + if next.Value == nil { + next.Value = make(SSEConnectionMap) + } + return next.Value.(SSEConnectionMap) +}