customer/internal/workers/mongo_watcher.go
2024-07-21 13:35:19 +03:00

96 lines
2.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package workers
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
"sync"
)
type WatchWorker struct {
accountChannels map[string][]chan map[string]interface{}
mu sync.Mutex
mongoDB *mongo.Collection
logger *zap.Logger
}
func NewWatchWorker(mongoDB *mongo.Collection, logger *zap.Logger) *WatchWorker {
return &WatchWorker{
accountChannels: make(map[string][]chan map[string]interface{}),
mongoDB: mongoDB,
logger: logger,
}
}
func (w *WatchWorker) Run(ctx context.Context) {
pipeline := mongo.Pipeline{
{{"$match", bson.M{"operationType": "update"}}},
}
opts := options.ChangeStream()
opts.SetFullDocument(options.UpdateLookup)
changeStream, err := w.mongoDB.Watch(ctx, pipeline, opts)
if err != nil {
w.logger.Error("error start change stream", zap.Error(err))
return
}
defer changeStream.Close(ctx)
for {
if changeStream.Next(ctx) {
var changeEvent struct {
FullDocument struct {
UserID string `bson:"userId"`
} `bson:"fullDocument"`
UpdateDescription struct {
UpdatedFields map[string]interface{} `bson:"updatedFields"`
} `bson:"updateDescription"`
}
if err := changeStream.Decode(&changeEvent); err != nil {
w.logger.Error("error decode change event", zap.Error(err))
continue
}
w.mu.Lock()
channels, ok := w.accountChannels[changeEvent.FullDocument.UserID]
w.mu.Unlock()
if !ok {
fmt.Println("userID", changeEvent.FullDocument.UserID)
fmt.Println("fields", changeEvent.UpdateDescription.UpdatedFields)
}
if ok {
for _, ch := range channels {
select {
case ch <- changeEvent.UpdateDescription.UpdatedFields:
case <-ctx.Done():
return
}
}
}
}
select {
case <-ctx.Done():
return
default:
fmt.Println("default select case")
}
}
}
func (w *WatchWorker) AddChannel(userID string, ch chan map[string]interface{}) {
w.mu.Lock()
defer w.mu.Unlock()
w.accountChannels[userID] = append(w.accountChannels[userID], ch)
}
// todo подумать как дропать если с одного устройства могут выйти на другом нет а ключ с массивом один
func (w *WatchWorker) DropChannel(userID string) {
w.mu.Lock()
defer w.mu.Unlock()
fmt.Println("drop chan, userID:", userID)
delete(w.accountChannels, userID)
}