generated from PenaSide/GolangTemplate
added low response from mongo watcher
This commit is contained in:
parent
c76a8f5b38
commit
600bddb1ee
@ -14,6 +14,7 @@ import (
|
||||
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http"
|
||||
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/repository"
|
||||
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Deps struct {
|
||||
@ -189,7 +190,7 @@ func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error {
|
||||
ctx.Set("Connection", "keep-alive")
|
||||
ctx.Set("Transfer-Encoding", "chunked")
|
||||
|
||||
accountCh := make(chan models.Account)
|
||||
accountCh := make(chan map[string]interface{})
|
||||
cancelCtx, cancel := context.WithCancel(ctx.Context())
|
||||
|
||||
go func(ctx context.Context) {
|
||||
@ -200,21 +201,31 @@ func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error {
|
||||
}(cancelCtx)
|
||||
|
||||
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
|
||||
pingTicker := time.NewTicker(5 * time.Second)
|
||||
defer pingTicker.Stop()
|
||||
for {
|
||||
select {
|
||||
case account, ok := <-accountCh:
|
||||
case accountData, ok := <-accountCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
accountJSON, err := json.Marshal(account)
|
||||
accountJSON, err := json.Marshal(accountData)
|
||||
if err != nil {
|
||||
receiver.logger.Error("error marshal account JSON", zap.Error(err))
|
||||
//receiver.logger.Error("error marshal account JSON", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
fmt.Fprintf(w, "data: %s\n\n", accountJSON)
|
||||
if err := w.Flush(); err != nil {
|
||||
receiver.logger.Error("error flushing", zap.Error(err))
|
||||
cancel()
|
||||
receiver.logger.Info("Close connection Account Pipe sse")
|
||||
return
|
||||
}
|
||||
case <-pingTicker.C:
|
||||
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
|
||||
if err := w.Flush(); err != nil {
|
||||
receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -499,14 +499,13 @@ func (receiver *AccountRepository) QuizLogoStat(ctx context.Context, req QuizLog
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID string, accountCh chan<- models.Account) error {
|
||||
func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID string, accountCh chan<- map[string]interface{}) error {
|
||||
pipeline := mongo.Pipeline{
|
||||
{{"$match", bson.M{"operationType": "update", "fullDocument.userId": userID}}},
|
||||
}
|
||||
|
||||
opts := options.ChangeStream()
|
||||
opts.SetFullDocument(options.UpdateLookup)
|
||||
|
||||
changeStream, err := receiver.mongoDB.Watch(ctx, pipeline, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -521,14 +520,16 @@ func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID strin
|
||||
default:
|
||||
if changeStream.Next(ctx) {
|
||||
var changeEvent struct {
|
||||
FullDocument models.Account `bson:"fullDocument"`
|
||||
UpdateDescription struct {
|
||||
UpdatedFields map[string]interface{} `bson:"updatedFields"`
|
||||
} `bson:"updateDescription"`
|
||||
}
|
||||
if err := changeStream.Decode(&changeEvent); err != nil {
|
||||
receiver.logger.Error("error decoding change event", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case accountCh <- changeEvent.FullDocument:
|
||||
case accountCh <- changeEvent.UpdateDescription.UpdatedFields:
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user