generated from PenaSide/GolangTemplate
96 lines
2.4 KiB
Go
96 lines
2.4 KiB
Go
package worker
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"go.mongodb.org/mongo-driver/bson"
|
||
"go.mongodb.org/mongo-driver/mongo"
|
||
"go.mongodb.org/mongo-driver/mongo/options"
|
||
"go.uber.org/zap"
|
||
"sync"
|
||
)
|
||
|
||
type WatchWorker struct {
|
||
accountChannels map[string][]chan map[string]interface{}
|
||
mu sync.Mutex
|
||
mongoDB *mongo.Collection
|
||
logger *zap.Logger
|
||
}
|
||
|
||
func NewWatchWorker(mongoDB *mongo.Collection, logger *zap.Logger) *WatchWorker {
|
||
return &WatchWorker{
|
||
accountChannels: make(map[string][]chan map[string]interface{}),
|
||
mongoDB: mongoDB,
|
||
logger: logger,
|
||
}
|
||
}
|
||
|
||
func (w *WatchWorker) Run(ctx context.Context) {
|
||
pipeline := mongo.Pipeline{
|
||
{{"$match", bson.M{"operationType": "update"}}},
|
||
}
|
||
opts := options.ChangeStream()
|
||
opts.SetFullDocument(options.UpdateLookup)
|
||
changeStream, err := w.mongoDB.Watch(ctx, pipeline, opts)
|
||
if err != nil {
|
||
w.logger.Error("error start change stream", zap.Error(err))
|
||
return
|
||
}
|
||
defer changeStream.Close(ctx)
|
||
|
||
for {
|
||
if changeStream.Next(ctx) {
|
||
var changeEvent struct {
|
||
FullDocument struct {
|
||
UserID string `bson:"userId"`
|
||
} `bson:"fullDocument"`
|
||
UpdateDescription struct {
|
||
UpdatedFields map[string]interface{} `bson:"updatedFields"`
|
||
} `bson:"updateDescription"`
|
||
}
|
||
if err := changeStream.Decode(&changeEvent); err != nil {
|
||
w.logger.Error("error decode change event", zap.Error(err))
|
||
continue
|
||
}
|
||
|
||
w.mu.Lock()
|
||
channels, ok := w.accountChannels[changeEvent.FullDocument.UserID]
|
||
w.mu.Unlock()
|
||
if !ok {
|
||
fmt.Println("userID", changeEvent.FullDocument.UserID)
|
||
fmt.Println("fields", changeEvent.UpdateDescription.UpdatedFields)
|
||
}
|
||
if ok {
|
||
for _, ch := range channels {
|
||
select {
|
||
case ch <- changeEvent.UpdateDescription.UpdatedFields:
|
||
case <-ctx.Done():
|
||
return
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
default:
|
||
fmt.Println("default select case")
|
||
}
|
||
}
|
||
}
|
||
|
||
func (w *WatchWorker) AddChannel(userID string, ch chan map[string]interface{}) {
|
||
w.mu.Lock()
|
||
defer w.mu.Unlock()
|
||
w.accountChannels[userID] = append(w.accountChannels[userID], ch)
|
||
}
|
||
|
||
// todo подумать как дропать если с одного устройства могут выйти на другом нет а ключ с массивом один
|
||
func (w *WatchWorker) DropChannel(userID string) {
|
||
w.mu.Lock()
|
||
defer w.mu.Unlock()
|
||
fmt.Println("drop chan, userID:", userID)
|
||
delete(w.accountChannels, userID)
|
||
}
|