generated from PenaSide/GolangTemplate
вмержил ветку updatecommon в config чтобы избежать конфликтов в дальнейшем
This commit is contained in:
commit
bdf6f5bb81
@ -1,41 +0,0 @@
|
|||||||
package app
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"gitea.pena/PenaSide/common/mongo"
|
|
||||||
"gitea.pena/PenaSide/customer/cmd/sse_bench/repository"
|
|
||||||
"gitea.pena/PenaSide/customer/internal/models"
|
|
||||||
"gitea.pena/PenaSide/customer/pkg/closer"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"os/signal"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func Run(config *models.Config, logger *zap.Logger) (appErr error) {
|
|
||||||
defer func() {
|
|
||||||
if recovered := recover(); recovered != nil {
|
|
||||||
appErr = errors.New("recovered panic on application run")
|
|
||||||
logger.Error("recovered panic on application run", zap.Any("recovered", recovered))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
closer := closer.New()
|
|
||||||
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
mongoDB, err := mongo.Connect(ctx, &mongo.ConnectDeps{
|
|
||||||
Configuration: &config.Database,
|
|
||||||
Timeout: 10 * time.Second,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed connection to db: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
accountRepo := repository.NewAccountRepository(repository.Deps{
|
|
||||||
Logger: logger,
|
|
||||||
MongoDB: mongoDB,
|
|
||||||
})
|
|
||||||
}
|
|
@ -33,7 +33,7 @@ import (
|
|||||||
//ok penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks 1075.632s
|
//ok penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks 1075.632s
|
||||||
|
|
||||||
func BenchmarkAccountPipe(b *testing.B) {
|
func BenchmarkAccountPipe(b *testing.B) {
|
||||||
for _, n := range []int{1000} {
|
for _, n := range []int{500} {
|
||||||
b.Run(fmt.Sprintf("COUNT_%d", n), func(b *testing.B) {
|
b.Run(fmt.Sprintf("COUNT_%d", n), func(b *testing.B) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
@ -42,7 +42,7 @@ func BenchmarkAccountPipe(b *testing.B) {
|
|||||||
go func(j int) {
|
go func(j int) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
userID := fmt.Sprintf("user%d", j)
|
userID := fmt.Sprintf("user%d", j)
|
||||||
//fmt.Println(userID)
|
fmt.Println(userID)
|
||||||
resp, err := http.Get(fmt.Sprintf("http://localhost:3000/account-pipe/%s", userID))
|
resp, err := http.Get(fmt.Sprintf("http://localhost:3000/account-pipe/%s", userID))
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
@ -53,8 +53,8 @@ func BenchmarkAccountPipe(b *testing.B) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
count++
|
count++
|
||||||
//line := scanner.Text()
|
line := scanner.Text()
|
||||||
//fmt.Println("Received:", line)
|
fmt.Println("Received:", line)
|
||||||
}
|
}
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
}(j)
|
}(j)
|
||||||
@ -89,7 +89,7 @@ func BenchmarkAccountPipe(b *testing.B) {
|
|||||||
//ok penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks 1076.521s
|
//ok penahub.gitlab.yandexcloud.net/pena-services/customer/tests/benchmarks 1076.521s
|
||||||
|
|
||||||
func BenchmarkAccountPipeWithWorker(b *testing.B) {
|
func BenchmarkAccountPipeWithWorker(b *testing.B) {
|
||||||
for _, n := range []int{10} {
|
for _, n := range []int{500} {
|
||||||
b.Run(fmt.Sprintf("COUNT_%d", n), func(b *testing.B) {
|
b.Run(fmt.Sprintf("COUNT_%d", n), func(b *testing.B) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
@ -109,8 +109,8 @@ func BenchmarkAccountPipeWithWorker(b *testing.B) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
count++
|
count++
|
||||||
//line := scanner.Text()
|
line := scanner.Text()
|
||||||
//fmt.Println("Received:", line)
|
fmt.Println("Received:", line)
|
||||||
}
|
}
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
}(j)
|
}(j)
|
@ -1,97 +0,0 @@
|
|||||||
package controller
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bufio"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"penahub.gitlab.yandexcloud.net/pena-services/customer/cmd/sse_bench/repository"
|
|
||||||
"penahub.gitlab.yandexcloud.net/pena-services/customer/cmd/sse_bench/worker"
|
|
||||||
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/client"
|
|
||||||
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Deps struct {
|
|
||||||
MiddleWare *http.MiddleWare
|
|
||||||
AccountRepo *repository.AccountRepository
|
|
||||||
Logger *zap.Logger
|
|
||||||
WatchWorker *worker.WatchWorker
|
|
||||||
}
|
|
||||||
|
|
||||||
type AccountController struct {
|
|
||||||
middleWare *http.MiddleWare
|
|
||||||
accountRepo *repository.AccountRepository
|
|
||||||
logger *zap.Logger
|
|
||||||
authClient *client.AuthClient
|
|
||||||
watchWorker *worker.WatchWorker
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewAccountController(deps Deps) *AccountController {
|
|
||||||
return &AccountController{
|
|
||||||
middleWare: deps.MiddleWare,
|
|
||||||
accountRepo: deps.AccountRepo,
|
|
||||||
logger: deps.Logger,
|
|
||||||
watchWorker: deps.WatchWorker,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (receiver *AccountController) Register(router fiber.Router) {
|
|
||||||
router.Get("/account/pipe", receiver.AccountPipe)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (receiver *AccountController) Name() string {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error {
|
|
||||||
userID, ok := receiver.middleWare.ExtractUserID(ctx)
|
|
||||||
if !ok || userID == "" {
|
|
||||||
return receiver.middleWare.NoAuth(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx.Set(fiber.HeaderContentType, "text/event-stream")
|
|
||||||
ctx.Set("Cache-Control", "no-cache")
|
|
||||||
ctx.Set("Connection", "keep-alive")
|
|
||||||
ctx.Set("Transfer-Encoding", "chunked")
|
|
||||||
|
|
||||||
accountCh := make(chan map[string]interface{})
|
|
||||||
//cancelCtx, cancel := context.WithCancel(ctx.Context())
|
|
||||||
|
|
||||||
receiver.watchWorker.AddChannel(userID, accountCh)
|
|
||||||
|
|
||||||
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
|
|
||||||
pingTicker := time.NewTicker(5 * time.Second)
|
|
||||||
defer pingTicker.Stop()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case accountData, ok := <-accountCh:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
accountJSON, err := json.Marshal(accountData)
|
|
||||||
if err != nil {
|
|
||||||
receiver.logger.Error("error marshal account JSON", zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
fmt.Fprintf(w, "data: %s\n\n", accountJSON)
|
|
||||||
if err := w.Flush(); err != nil {
|
|
||||||
receiver.logger.Error("error flushing", zap.Error(err))
|
|
||||||
//cancel()
|
|
||||||
//receiver.logger.Info("Close connection Account Pipe sse")
|
|
||||||
//receiver.watchWorker.DropChannel(userID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-pingTicker.C:
|
|
||||||
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
|
|
||||||
if err := w.Flush(); err != nil {
|
|
||||||
receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
|
|
||||||
//receiver.watchWorker.DropChannel(userID)
|
|
||||||
//cancel()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -1 +1,253 @@
|
|||||||
package sse_bench
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"gitea.pena/PenaSide/customer/cmd/sse_bench/worker"
|
||||||
|
"github.com/gofiber/fiber/v2"
|
||||||
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func initDB(ctx context.Context) (*mongo.Client, *mongo.Collection) {
|
||||||
|
clientOptions := options.Client().ApplyURI("mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0&readPreference=primary&ssl=false")
|
||||||
|
client, err := mongo.Connect(context.Background(), clientOptions)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
collection := client.Database("testdb").Collection("testcollection")
|
||||||
|
collection.Drop(ctx)
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
_, err := collection.InsertOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i), "field": "value"})
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return client, collection
|
||||||
|
}
|
||||||
|
|
||||||
|
func initSrv(repo *AccountRepository, logger *zap.Logger, watchWorker *worker.WatchWorker) *fiber.App {
|
||||||
|
app := fiber.New()
|
||||||
|
controller := &AccountController{
|
||||||
|
accountRepo: repo,
|
||||||
|
logger: logger,
|
||||||
|
watchWorker: watchWorker,
|
||||||
|
}
|
||||||
|
app.Get("/account-pipe/:userID", controller.AccountPipe)
|
||||||
|
app.Use("/account-pipe-wc/:userID", controller.AccountPipeWC)
|
||||||
|
return app
|
||||||
|
}
|
||||||
|
|
||||||
|
type AccountController struct {
|
||||||
|
accountRepo *AccountRepository
|
||||||
|
logger *zap.Logger
|
||||||
|
watchWorker *worker.WatchWorker
|
||||||
|
}
|
||||||
|
|
||||||
|
func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error {
|
||||||
|
userID := ctx.Params("userID")
|
||||||
|
|
||||||
|
ctx.Set(fiber.HeaderContentType, "text/event-stream")
|
||||||
|
ctx.Set("Cache-Control", "no-cache")
|
||||||
|
ctx.Set("Connection", "keep-alive")
|
||||||
|
ctx.Set("Transfer-Encoding", "chunked")
|
||||||
|
|
||||||
|
accountCh := make(chan map[string]interface{})
|
||||||
|
cancelCtx, cancel := context.WithCancel(ctx.Context())
|
||||||
|
|
||||||
|
go func(ctx context.Context) {
|
||||||
|
defer close(accountCh)
|
||||||
|
if err := receiver.accountRepo.AccountPipe(ctx, userID, accountCh); err != nil {
|
||||||
|
//receiver.logger.Error("error in account pipe repo method", zap.Error(err))
|
||||||
|
}
|
||||||
|
}(cancelCtx)
|
||||||
|
|
||||||
|
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
|
||||||
|
pingTicker := time.NewTicker(5 * time.Second)
|
||||||
|
defer pingTicker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case accountData, ok := <-accountCh:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
accountJSON, err := json.Marshal(accountData)
|
||||||
|
if err != nil {
|
||||||
|
//receiver.logger.Error("error marshal account JSON", zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fmt.Println(accountJSON)
|
||||||
|
fmt.Fprintf(w, "data: %s\n\n", accountJSON)
|
||||||
|
if err := w.Flush(); err != nil {
|
||||||
|
//receiver.logger.Error("error flushing", zap.Error(err))
|
||||||
|
cancel()
|
||||||
|
//receiver.logger.Info("Close connection Account Pipe sse")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-pingTicker.C:
|
||||||
|
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
|
||||||
|
if err := w.Flush(); err != nil {
|
||||||
|
//receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (receiver *AccountController) AccountPipeWC(ctx *fiber.Ctx) error {
|
||||||
|
userID := ctx.Params("userID")
|
||||||
|
|
||||||
|
ctx.Set(fiber.HeaderContentType, "text/event-stream")
|
||||||
|
ctx.Set("Cache-Control", "no-cache")
|
||||||
|
ctx.Set("Connection", "keep-alive")
|
||||||
|
ctx.Set("Transfer-Encoding", "chunked")
|
||||||
|
|
||||||
|
accountCh := make(chan map[string]interface{})
|
||||||
|
//cancelCtx, cancel := context.WithCancel(ctx.Context())
|
||||||
|
|
||||||
|
receiver.watchWorker.AddChannel(userID, accountCh)
|
||||||
|
|
||||||
|
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
|
||||||
|
pingTicker := time.NewTicker(5 * time.Second)
|
||||||
|
defer pingTicker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case accountData, ok := <-accountCh:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
accountJSON, err := json.Marshal(accountData)
|
||||||
|
if err != nil {
|
||||||
|
receiver.logger.Error("error marshal account JSON", zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fmt.Fprintf(w, "data: %s\n\n", accountJSON)
|
||||||
|
if err := w.Flush(); err != nil {
|
||||||
|
//receiver.logger.Error("error flushing", zap.Error(err))
|
||||||
|
//cancel()
|
||||||
|
//receiver.logger.Info("Close connection Account Pipe sse")
|
||||||
|
//receiver.watchWorker.DropChannel(userID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-pingTicker.C:
|
||||||
|
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
|
||||||
|
if err := w.Flush(); err != nil {
|
||||||
|
//receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
|
||||||
|
//receiver.watchWorker.DropChannel(userID)
|
||||||
|
//cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type AccountRepository struct {
|
||||||
|
mongoDB *mongo.Collection
|
||||||
|
logger *zap.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID string, accountCh chan<- map[string]interface{}) error {
|
||||||
|
pipeline := mongo.Pipeline{
|
||||||
|
{{"$match", bson.M{"operationType": "update", "fullDocument.userId": userID}}},
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := options.ChangeStream()
|
||||||
|
opts.SetFullDocument(options.UpdateLookup)
|
||||||
|
changeStream, err := receiver.mongoDB.Watch(ctx, pipeline, opts)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer changeStream.Close(ctx)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
//receiver.logger.Info("Context canceled, thread is closed now")
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
if changeStream.Next(ctx) {
|
||||||
|
var changeEvent struct {
|
||||||
|
UpdateDescription struct {
|
||||||
|
UpdatedFields map[string]interface{} `bson:"updatedFields"`
|
||||||
|
} `bson:"updateDescription"`
|
||||||
|
}
|
||||||
|
if err := changeStream.Decode(&changeEvent); err != nil {
|
||||||
|
//receiver.logger.Error("error decoding change event", zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case accountCh <- changeEvent.UpdateDescription.UpdatedFields:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
logger, err := zap.NewProduction()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf(err.Error())
|
||||||
|
}
|
||||||
|
defer logger.Sync()
|
||||||
|
|
||||||
|
client, collection := initDB(ctx)
|
||||||
|
defer func() {
|
||||||
|
if err := client.Disconnect(ctx); err != nil {
|
||||||
|
logger.Fatal("Error close mongo", zap.Error(err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
repo := &AccountRepository{
|
||||||
|
mongoDB: collection,
|
||||||
|
logger: logger,
|
||||||
|
}
|
||||||
|
|
||||||
|
watchWorker := worker.NewWatchWorker(collection, logger)
|
||||||
|
go watchWorker.Run(ctx)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
fmt.Println("stop update")
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
for i := 0; i < 800; i++ {
|
||||||
|
fmt.Println("YA WORK OKAY")
|
||||||
|
res, err := collection.UpdateOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i)},
|
||||||
|
bson.M{"$set": bson.M{"field": fmt.Sprintf("value-%d", time.Now().UnixNano())}})
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("error update", zap.Error(err))
|
||||||
|
}
|
||||||
|
fmt.Println("TOCHNO OKAY", res.ModifiedCount)
|
||||||
|
}
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
app := initSrv(repo, logger, watchWorker)
|
||||||
|
|
||||||
|
if err := app.Listen(":3000"); err != nil {
|
||||||
|
logger.Fatal("Error server", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,70 +0,0 @@
|
|||||||
package repository
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"log"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Deps struct {
|
|
||||||
Logger *zap.Logger
|
|
||||||
MongoDB *mongo.Collection
|
|
||||||
}
|
|
||||||
|
|
||||||
type AccountRepository struct {
|
|
||||||
logger *zap.Logger
|
|
||||||
mongoDB *mongo.Collection
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewAccountRepository(deps Deps) *AccountRepository {
|
|
||||||
if deps.Logger == nil {
|
|
||||||
log.Panicln("logger is nil on <NewAccountRepository>")
|
|
||||||
}
|
|
||||||
|
|
||||||
if deps.MongoDB == nil {
|
|
||||||
log.Panicln("mongodb is nil on <NewAccountRepository>")
|
|
||||||
}
|
|
||||||
|
|
||||||
return &AccountRepository{
|
|
||||||
logger: deps.Logger,
|
|
||||||
mongoDB: deps.MongoDB,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID string, accountCh chan<- map[string]interface{}) error {
|
|
||||||
pipeline := mongo.Pipeline{
|
|
||||||
{{"$match", bson.M{"operationType": "update", "fullDocument.userId": userID}}},
|
|
||||||
}
|
|
||||||
|
|
||||||
opts := options.ChangeStream()
|
|
||||||
opts.SetFullDocument(options.UpdateLookup)
|
|
||||||
changeStream, err := receiver.mongoDB.Watch(ctx, pipeline, opts)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer changeStream.Close(ctx)
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
receiver.logger.Info("Context canceled, thread is closed now")
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
if changeStream.Next(ctx) {
|
|
||||||
var changeEvent struct {
|
|
||||||
UpdateDescription struct {
|
|
||||||
UpdatedFields map[string]interface{} `bson:"updatedFields"`
|
|
||||||
} `bson:"updateDescription"`
|
|
||||||
}
|
|
||||||
if err := changeStream.Decode(&changeEvent); err != nil {
|
|
||||||
receiver.logger.Error("error decoding change event", zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case accountCh <- changeEvent.UpdateDescription.UpdatedFields:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
6
go.mod
6
go.mod
@ -5,6 +5,7 @@ go 1.23.2
|
|||||||
toolchain go1.23.3
|
toolchain go1.23.3
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
gitea.pena/PenaSide/common v0.0.0-20241126121130-cf56ae1e3fb2
|
||||||
gitea.pena/PenaSide/linters-golang v0.0.0-20241119212350-2759fa93724a
|
gitea.pena/PenaSide/linters-golang v0.0.0-20241119212350-2759fa93724a
|
||||||
gitea.pena/PenaSide/trashlog v0.0.0-20241119225515-2fd267647ca4
|
gitea.pena/PenaSide/trashlog v0.0.0-20241119225515-2fd267647ca4
|
||||||
github.com/go-resty/resty/v2 v2.11.0
|
github.com/go-resty/resty/v2 v2.11.0
|
||||||
@ -25,13 +26,9 @@ require (
|
|||||||
google.golang.org/grpc v1.65.0
|
google.golang.org/grpc v1.65.0
|
||||||
google.golang.org/protobuf v1.34.2
|
google.golang.org/protobuf v1.34.2
|
||||||
gopkg.in/tucnak/telebot.v2 v2.5.0
|
gopkg.in/tucnak/telebot.v2 v2.5.0
|
||||||
penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240607202348-efe5f2bf3e8c
|
|
||||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240520145524-451212248881
|
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
gitea.pena/PenaSide/common v0.0.0-20241126121130-cf56ae1e3fb2 // indirect
|
|
||||||
gitea.pena/PenaSide/hlog v0.0.0-20241125221102-a54c29c002a9 // indirect
|
|
||||||
github.com/ClickHouse/clickhouse-go v1.5.4 // indirect
|
github.com/ClickHouse/clickhouse-go v1.5.4 // indirect
|
||||||
github.com/andybalholm/brotli v1.1.0 // indirect
|
github.com/andybalholm/brotli v1.1.0 // indirect
|
||||||
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
|
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
|
||||||
@ -53,7 +50,6 @@ require (
|
|||||||
github.com/rogpeppe/go-internal v1.12.0 // indirect
|
github.com/rogpeppe/go-internal v1.12.0 // indirect
|
||||||
github.com/rs/xid v1.5.0 // indirect
|
github.com/rs/xid v1.5.0 // indirect
|
||||||
github.com/skeris/appInit v1.0.2 // indirect
|
github.com/skeris/appInit v1.0.2 // indirect
|
||||||
github.com/tealeg/xlsx v1.0.5 // indirect
|
|
||||||
github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect
|
github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect
|
||||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||||
github.com/valyala/fasthttp v1.52.0 // indirect
|
github.com/valyala/fasthttp v1.52.0 // indirect
|
||||||
|
15
go.sum
15
go.sum
@ -1,16 +1,8 @@
|
|||||||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||||
gitea.pena/PenaSide/common v0.0.0-20241120141501-1695a0981562 h1:LobhWlICMcbCCI+SouOzeRk2K5MsXSVHtv3QOcHRZnY=
|
|
||||||
gitea.pena/PenaSide/common v0.0.0-20241120141501-1695a0981562/go.mod h1:l71j3W1yROhOSfjWZ6wcMuzjBR37gu2ZTcXsorEJoiw=
|
|
||||||
gitea.pena/PenaSide/common v0.0.0-20241126121130-cf56ae1e3fb2 h1:9U9JJBwWtQV4z/PUCUWCFKurk7sHrT59fGSoXobSdL0=
|
gitea.pena/PenaSide/common v0.0.0-20241126121130-cf56ae1e3fb2 h1:9U9JJBwWtQV4z/PUCUWCFKurk7sHrT59fGSoXobSdL0=
|
||||||
gitea.pena/PenaSide/common v0.0.0-20241126121130-cf56ae1e3fb2/go.mod h1:l71j3W1yROhOSfjWZ6wcMuzjBR37gu2ZTcXsorEJoiw=
|
gitea.pena/PenaSide/common v0.0.0-20241126121130-cf56ae1e3fb2/go.mod h1:l71j3W1yROhOSfjWZ6wcMuzjBR37gu2ZTcXsorEJoiw=
|
||||||
gitea.pena/PenaSide/hlog v0.0.0-20241125221102-a54c29c002a9 h1:tBkXWNIt8icmkMMnq8MA421RWkUy4OZh5P7C3q8uCu4=
|
|
||||||
gitea.pena/PenaSide/hlog v0.0.0-20241125221102-a54c29c002a9/go.mod h1:sanhSL8aEsfcq21P+eItYiAnKAre+B67nGJmDfk2cf0=
|
|
||||||
gitea.pena/PenaSide/linters-golang v0.0.0-20241114215743-9a8e7d58cf96 h1:m4EMXEhsA/glI6eJeZnRGUhYPSQdcWj3hzT2IDNlWS0=
|
|
||||||
gitea.pena/PenaSide/linters-golang v0.0.0-20241114215743-9a8e7d58cf96/go.mod h1:gdd+vOT6up9STkEbxa2qESLIMZFjCmRbkcheFQCVgZU=
|
|
||||||
gitea.pena/PenaSide/linters-golang v0.0.0-20241119212350-2759fa93724a h1:UySqMgaOKNsR42Y6GQXoM2wn/waYNc9cakMUSvbEEAg=
|
gitea.pena/PenaSide/linters-golang v0.0.0-20241119212350-2759fa93724a h1:UySqMgaOKNsR42Y6GQXoM2wn/waYNc9cakMUSvbEEAg=
|
||||||
gitea.pena/PenaSide/linters-golang v0.0.0-20241119212350-2759fa93724a/go.mod h1:gdd+vOT6up9STkEbxa2qESLIMZFjCmRbkcheFQCVgZU=
|
gitea.pena/PenaSide/linters-golang v0.0.0-20241119212350-2759fa93724a/go.mod h1:gdd+vOT6up9STkEbxa2qESLIMZFjCmRbkcheFQCVgZU=
|
||||||
gitea.pena/PenaSide/trashlog v0.0.0-20241107132923-0f7d4d57eb4b h1:wVswXBfVDI6xi16o2A0wt8i6nswHkgPdd6U7ak0KUjo=
|
|
||||||
gitea.pena/PenaSide/trashlog v0.0.0-20241107132923-0f7d4d57eb4b/go.mod h1:LwHJCrPVumS6Rdorrr8NLEi4kpCed6ZVcIYnmEqbIVM=
|
|
||||||
gitea.pena/PenaSide/trashlog v0.0.0-20241119225515-2fd267647ca4 h1:y9B4CSPIgiUoaXKyXLZxs1A9hxzDj26F9MH2R6uTkHQ=
|
gitea.pena/PenaSide/trashlog v0.0.0-20241119225515-2fd267647ca4 h1:y9B4CSPIgiUoaXKyXLZxs1A9hxzDj26F9MH2R6uTkHQ=
|
||||||
gitea.pena/PenaSide/trashlog v0.0.0-20241119225515-2fd267647ca4/go.mod h1:Bp5fJX0XsTV3QYximX6R2kCRvVKQAPwxjjCPm+ToTY8=
|
gitea.pena/PenaSide/trashlog v0.0.0-20241119225515-2fd267647ca4/go.mod h1:Bp5fJX0XsTV3QYximX6R2kCRvVKQAPwxjjCPm+ToTY8=
|
||||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||||
@ -142,8 +134,6 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
|
|||||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
|
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
|
||||||
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||||
github.com/tealeg/xlsx v1.0.5 h1:+f8oFmvY8Gw1iUXzPk+kz+4GpbDZPK1FhPiQRd+ypgE=
|
|
||||||
github.com/tealeg/xlsx v1.0.5/go.mod h1:btRS8dz54TDnvKNosuAqxrM1QgN1udgk9O34bDCnORM=
|
|
||||||
github.com/themakers/bdd v0.0.0-20210316111417-6b1dfe326f33 h1:N9f/Q+2Ssa+yDcbfaoLTYvXmdeyUUxsJKdPUVsjSmiA=
|
github.com/themakers/bdd v0.0.0-20210316111417-6b1dfe326f33 h1:N9f/Q+2Ssa+yDcbfaoLTYvXmdeyUUxsJKdPUVsjSmiA=
|
||||||
github.com/themakers/bdd v0.0.0-20210316111417-6b1dfe326f33/go.mod h1:rpcH99JknBh8seZmlOlUg51gasZH6QH34oXNsIwYT6E=
|
github.com/themakers/bdd v0.0.0-20210316111417-6b1dfe326f33/go.mod h1:rpcH99JknBh8seZmlOlUg51gasZH6QH34oXNsIwYT6E=
|
||||||
github.com/themakers/hlog v0.0.0-20191205140925-235e0e4baddf h1:TJJm6KcBssmbWzplF5lzixXl1RBAi/ViPs1GaSOkhwo=
|
github.com/themakers/hlog v0.0.0-20191205140925-235e0e4baddf h1:TJJm6KcBssmbWzplF5lzixXl1RBAi/ViPs1GaSOkhwo=
|
||||||
@ -307,7 +297,6 @@ google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6h
|
|||||||
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
|
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
|
||||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||||
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
|
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
|
||||||
@ -326,7 +315,3 @@ gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
|||||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||||
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
||||||
penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240607202348-efe5f2bf3e8c h1:CWb4UcuNXhd1KTNOmy2U0TJO4+Qxgxrj5cwkyFqbgrk=
|
|
||||||
penahub.gitlab.yandexcloud.net/backend/penahub_common v0.0.0-20240607202348-efe5f2bf3e8c/go.mod h1:+bPxq2wfW5S1gd+83vZYmHm33AE7nEBfznWS8AM1TKE=
|
|
||||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240520145524-451212248881 h1:U1/WGQdwZsmrV/ta7Uqm13Dg07IPN/5omS8gzBJYZv4=
|
|
||||||
penahub.gitlab.yandexcloud.net/backend/quiz/common.git v0.0.0-20240520145524-451212248881/go.mod h1:oRyhT55ctjqp/7ZxIzkR7OsQ7T/NLibsfrbb7Ytns64=
|
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"gitea.pena/PenaSide/common/mongo"
|
||||||
"gitea.pena/PenaSide/customer/internal/interface/controller/http"
|
"gitea.pena/PenaSide/customer/internal/interface/controller/http"
|
||||||
"gitea.pena/PenaSide/trashlog/app"
|
"gitea.pena/PenaSide/trashlog/app"
|
||||||
"gitea.pena/PenaSide/trashlog/wrappers/zaptrashlog"
|
"gitea.pena/PenaSide/trashlog/wrappers/zaptrashlog"
|
||||||
@ -11,7 +12,6 @@ import (
|
|||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
tb "gopkg.in/tucnak/telebot.v2"
|
tb "gopkg.in/tucnak/telebot.v2"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"penahub.gitlab.yandexcloud.net/backend/penahub_common/mongo"
|
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -145,6 +145,7 @@ func Run(config *models.Config, logger *zap.Logger, build Build) (appErr error)
|
|||||||
})
|
})
|
||||||
|
|
||||||
encrypt := &config.EncryptCommon
|
encrypt := &config.EncryptCommon
|
||||||
|
|
||||||
middleWare := http.NewMiddleWare(logger)
|
middleWare := http.NewMiddleWare(logger)
|
||||||
|
|
||||||
httpControllers := initialize.NewHttpControllers(initialize.HttpControllersDeps{
|
httpControllers := initialize.NewHttpControllers(initialize.HttpControllersDeps{
|
||||||
|
@ -4,11 +4,11 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"gitea.pena/PenaSide/common/mongo"
|
||||||
"gitea.pena/PenaSide/customer/internal/initialize"
|
"gitea.pena/PenaSide/customer/internal/initialize"
|
||||||
"gitea.pena/PenaSide/customer/internal/models"
|
"gitea.pena/PenaSide/customer/internal/models"
|
||||||
"github.com/golang-jwt/jwt/v5"
|
"github.com/golang-jwt/jwt/v5"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"penahub.gitlab.yandexcloud.net/backend/penahub_common/mongo"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestConfiguration(t *testing.T) {
|
func TestConfiguration(t *testing.T) {
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package initialize
|
package initialize
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"gitea.pena/PenaSide/common/encrypt"
|
||||||
"gitea.pena/PenaSide/customer/internal/interface/broker/tariff"
|
"gitea.pena/PenaSide/customer/internal/interface/broker/tariff"
|
||||||
"gitea.pena/PenaSide/customer/internal/interface/controller/grpc/customer"
|
"gitea.pena/PenaSide/customer/internal/interface/controller/grpc/customer"
|
||||||
"gitea.pena/PenaSide/customer/internal/interface/controller/grpc/payment"
|
"gitea.pena/PenaSide/customer/internal/interface/controller/grpc/payment"
|
||||||
@ -15,7 +16,6 @@ import (
|
|||||||
"gitea.pena/PenaSide/customer/internal/interface/controller/http/wallet_client"
|
"gitea.pena/PenaSide/customer/internal/interface/controller/http/wallet_client"
|
||||||
"github.com/themakers/hlog"
|
"github.com/themakers/hlog"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
qutils "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/utils"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type RpcControllersDeps struct {
|
type RpcControllersDeps struct {
|
||||||
@ -46,7 +46,7 @@ func NewRpcControllers(deps RpcControllersDeps) *RpcControllers {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type HttpControllersDeps struct {
|
type HttpControllersDeps struct {
|
||||||
Encrypt *qutils.Encrypt
|
Encrypt *encrypt.Encrypt
|
||||||
Producer *tariff.Producer
|
Producer *tariff.Producer
|
||||||
GRPCDomain string
|
GRPCDomain string
|
||||||
MiddleWare *http.MiddleWare
|
MiddleWare *http.MiddleWare
|
||||||
|
@ -5,6 +5,8 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"gitea.pena/PenaSide/common/encrypt"
|
||||||
|
"gitea.pena/PenaSide/common/log_mw"
|
||||||
"gitea.pena/PenaSide/customer/internal/errors"
|
"gitea.pena/PenaSide/customer/internal/errors"
|
||||||
"gitea.pena/PenaSide/customer/internal/interface/client"
|
"gitea.pena/PenaSide/customer/internal/interface/client"
|
||||||
"gitea.pena/PenaSide/customer/internal/interface/controller/http"
|
"gitea.pena/PenaSide/customer/internal/interface/controller/http"
|
||||||
@ -12,8 +14,6 @@ import (
|
|||||||
"gitea.pena/PenaSide/customer/internal/models"
|
"gitea.pena/PenaSide/customer/internal/models"
|
||||||
"github.com/gofiber/fiber/v2"
|
"github.com/gofiber/fiber/v2"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"penahub.gitlab.yandexcloud.net/backend/penahub_common/log_mw"
|
|
||||||
qutils "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/utils"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -21,7 +21,7 @@ type Deps struct {
|
|||||||
MiddleWare *http.MiddleWare
|
MiddleWare *http.MiddleWare
|
||||||
AccountRepo *repository.AccountRepository
|
AccountRepo *repository.AccountRepository
|
||||||
Logger *zap.Logger
|
Logger *zap.Logger
|
||||||
Encrypt *qutils.Encrypt
|
Encrypt *encrypt.Encrypt
|
||||||
AuthClient *client.AuthClient
|
AuthClient *client.AuthClient
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -29,7 +29,7 @@ type AccountController struct {
|
|||||||
middleWare *http.MiddleWare
|
middleWare *http.MiddleWare
|
||||||
accountRepo *repository.AccountRepository
|
accountRepo *repository.AccountRepository
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
encrypt *qutils.Encrypt
|
encrypt *encrypt.Encrypt
|
||||||
authClient *client.AuthClient
|
authClient *client.AuthClient
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,7 +4,7 @@ import (
|
|||||||
"github.com/gofiber/fiber/v2"
|
"github.com/gofiber/fiber/v2"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"google.golang.org/protobuf/types/known/timestamppb"
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
"penahub.gitlab.yandexcloud.net/backend/penahub_common/log_mw"
|
"gitea.pena/PenaSide/common/log_mw"
|
||||||
"gitea.pena/PenaSide/customer/internal/errors"
|
"gitea.pena/PenaSide/customer/internal/errors"
|
||||||
"gitea.pena/PenaSide/customer/internal/interface/broker/tariff"
|
"gitea.pena/PenaSide/customer/internal/interface/broker/tariff"
|
||||||
"gitea.pena/PenaSide/customer/internal/interface/client"
|
"gitea.pena/PenaSide/customer/internal/interface/client"
|
||||||
|
@ -6,7 +6,7 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"penahub.gitlab.yandexcloud.net/backend/penahub_common/log_mw"
|
"gitea.pena/PenaSide/common/log_mw"
|
||||||
"gitea.pena/PenaSide/customer/internal/interface/client"
|
"gitea.pena/PenaSide/customer/internal/interface/client"
|
||||||
"gitea.pena/PenaSide/customer/internal/interface/controller/http"
|
"gitea.pena/PenaSide/customer/internal/interface/controller/http"
|
||||||
"gitea.pena/PenaSide/customer/internal/interface/repository"
|
"gitea.pena/PenaSide/customer/internal/interface/repository"
|
||||||
|
@ -3,6 +3,7 @@ package wallet_client
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"gitea.pena/PenaSide/common/log_mw"
|
||||||
"gitea.pena/PenaSide/customer/internal/errors"
|
"gitea.pena/PenaSide/customer/internal/errors"
|
||||||
"gitea.pena/PenaSide/customer/internal/interface/client"
|
"gitea.pena/PenaSide/customer/internal/interface/client"
|
||||||
"gitea.pena/PenaSide/customer/internal/interface/controller/http"
|
"gitea.pena/PenaSide/customer/internal/interface/controller/http"
|
||||||
@ -13,7 +14,6 @@ import (
|
|||||||
"gitea.pena/PenaSide/customer/pkg/validate"
|
"gitea.pena/PenaSide/customer/pkg/validate"
|
||||||
"github.com/gofiber/fiber/v2"
|
"github.com/gofiber/fiber/v2"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"penahub.gitlab.yandexcloud.net/backend/penahub_common/log_mw"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Deps struct {
|
type Deps struct {
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
mongoWrapper "gitea.pena/PenaSide/common/mongo"
|
||||||
"gitea.pena/PenaSide/customer/internal/errors"
|
"gitea.pena/PenaSide/customer/internal/errors"
|
||||||
"gitea.pena/PenaSide/customer/internal/fields"
|
"gitea.pena/PenaSide/customer/internal/fields"
|
||||||
"gitea.pena/PenaSide/customer/internal/models"
|
"gitea.pena/PenaSide/customer/internal/models"
|
||||||
@ -14,7 +15,6 @@ import (
|
|||||||
"go.mongodb.org/mongo-driver/mongo"
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
"go.mongodb.org/mongo-driver/mongo/options"
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
mongoWrapper "penahub.gitlab.yandexcloud.net/backend/penahub_common/mongo"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type AccountRepositoryDeps struct {
|
type AccountRepositoryDeps struct {
|
||||||
|
@ -10,7 +10,7 @@ import (
|
|||||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
mongoWrapper "penahub.gitlab.yandexcloud.net/backend/penahub_common/mongo"
|
mongoWrapper "gitea.pena/PenaSide/common/mongo"
|
||||||
"gitea.pena/PenaSide/customer/internal/errors"
|
"gitea.pena/PenaSide/customer/internal/errors"
|
||||||
"gitea.pena/PenaSide/customer/internal/fields"
|
"gitea.pena/PenaSide/customer/internal/fields"
|
||||||
"gitea.pena/PenaSide/customer/internal/models"
|
"gitea.pena/PenaSide/customer/internal/models"
|
||||||
|
@ -13,7 +13,7 @@ import (
|
|||||||
"go.mongodb.org/mongo-driver/mongo"
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
"go.mongodb.org/mongo-driver/mongo/options"
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
mongoWrapper "penahub.gitlab.yandexcloud.net/backend/penahub_common/mongo"
|
mongoWrapper "gitea.pena/PenaSide/common/mongo"
|
||||||
"gitea.pena/PenaSide/customer/internal/errors"
|
"gitea.pena/PenaSide/customer/internal/errors"
|
||||||
"gitea.pena/PenaSide/customer/internal/fields"
|
"gitea.pena/PenaSide/customer/internal/fields"
|
||||||
"gitea.pena/PenaSide/customer/internal/models"
|
"gitea.pena/PenaSide/customer/internal/models"
|
||||||
|
@ -2,9 +2,9 @@ package models
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"gitea.pena/PenaSide/common/encrypt"
|
"gitea.pena/PenaSide/common/encrypt"
|
||||||
|
"gitea.pena/PenaSide/common/mongo"
|
||||||
"github.com/gofiber/fiber/v2"
|
"github.com/gofiber/fiber/v2"
|
||||||
"github.com/golang-jwt/jwt/v5"
|
"github.com/golang-jwt/jwt/v5"
|
||||||
"penahub.gitlab.yandexcloud.net/backend/penahub_common/mongo"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -6,7 +6,7 @@ import (
|
|||||||
"github.com/gofiber/fiber/v2"
|
"github.com/gofiber/fiber/v2"
|
||||||
"github.com/themakers/hlog"
|
"github.com/themakers/hlog"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"penahub.gitlab.yandexcloud.net/backend/penahub_common/log_mw"
|
"gitea.pena/PenaSide/common/log_mw"
|
||||||
"gitea.pena/PenaSide/customer/internal/models"
|
"gitea.pena/PenaSide/customer/internal/models"
|
||||||
"gitea.pena/PenaSide/customer/internal/utils"
|
"gitea.pena/PenaSide/customer/internal/utils"
|
||||||
)
|
)
|
||||||
|
@ -6,7 +6,7 @@ import (
|
|||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"log"
|
"log"
|
||||||
"penahub.gitlab.yandexcloud.net/backend/penahub_common/mongo"
|
"gitea.pena/PenaSide/common/mongo"
|
||||||
"gitea.pena/PenaSide/customer/internal/interface/client"
|
"gitea.pena/PenaSide/customer/internal/interface/client"
|
||||||
"gitea.pena/PenaSide/customer/internal/interface/repository"
|
"gitea.pena/PenaSide/customer/internal/interface/repository"
|
||||||
codeword_rpc "gitea.pena/PenaSide/customer/internal/proto/codeword"
|
codeword_rpc "gitea.pena/PenaSide/customer/internal/proto/codeword"
|
||||||
|
@ -1,252 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bufio"
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"github.com/gofiber/fiber/v2"
|
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
|
||||||
"go.mongodb.org/mongo-driver/mongo/options"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"log"
|
|
||||||
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/workers"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func initDB(ctx context.Context) (*mongo.Client, *mongo.Collection) {
|
|
||||||
clientOptions := options.Client().ApplyURI("mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=rs0&readPreference=primary&ssl=false")
|
|
||||||
client, err := mongo.Connect(context.Background(), clientOptions)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
collection := client.Database("testdb").Collection("testcollection")
|
|
||||||
collection.Drop(ctx)
|
|
||||||
for i := 0; i < 1000; i++ {
|
|
||||||
_, err := collection.InsertOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i), "field": "value"})
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return client, collection
|
|
||||||
}
|
|
||||||
|
|
||||||
func initSrv(repo *AccountRepository, logger *zap.Logger, watchWorker *workers.WatchWorker) *fiber.App {
|
|
||||||
app := fiber.New()
|
|
||||||
controller := &AccountController{
|
|
||||||
accountRepo: repo,
|
|
||||||
logger: logger,
|
|
||||||
watchWorker: watchWorker,
|
|
||||||
}
|
|
||||||
app.Get("/account-pipe/:userID", controller.AccountPipe)
|
|
||||||
app.Use("/account-pipe-wc/:userID", controller.AccountPipeWC)
|
|
||||||
return app
|
|
||||||
}
|
|
||||||
|
|
||||||
type AccountController struct {
|
|
||||||
accountRepo *AccountRepository
|
|
||||||
logger *zap.Logger
|
|
||||||
watchWorker *workers.WatchWorker
|
|
||||||
}
|
|
||||||
|
|
||||||
func (receiver *AccountController) AccountPipe(ctx *fiber.Ctx) error {
|
|
||||||
userID := ctx.Params("userID")
|
|
||||||
|
|
||||||
ctx.Set(fiber.HeaderContentType, "text/event-stream")
|
|
||||||
ctx.Set("Cache-Control", "no-cache")
|
|
||||||
ctx.Set("Connection", "keep-alive")
|
|
||||||
ctx.Set("Transfer-Encoding", "chunked")
|
|
||||||
|
|
||||||
accountCh := make(chan map[string]interface{})
|
|
||||||
cancelCtx, cancel := context.WithCancel(ctx.Context())
|
|
||||||
|
|
||||||
go func(ctx context.Context) {
|
|
||||||
defer close(accountCh)
|
|
||||||
if err := receiver.accountRepo.AccountPipe(ctx, userID, accountCh); err != nil {
|
|
||||||
//receiver.logger.Error("error in account pipe repo method", zap.Error(err))
|
|
||||||
}
|
|
||||||
}(cancelCtx)
|
|
||||||
|
|
||||||
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
|
|
||||||
pingTicker := time.NewTicker(5 * time.Second)
|
|
||||||
defer pingTicker.Stop()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case accountData, ok := <-accountCh:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
accountJSON, err := json.Marshal(accountData)
|
|
||||||
if err != nil {
|
|
||||||
//receiver.logger.Error("error marshal account JSON", zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
fmt.Fprintf(w, "data: %s\n\n", accountJSON)
|
|
||||||
if err := w.Flush(); err != nil {
|
|
||||||
//receiver.logger.Error("error flushing", zap.Error(err))
|
|
||||||
cancel()
|
|
||||||
//receiver.logger.Info("Close connection Account Pipe sse")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-pingTicker.C:
|
|
||||||
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
|
|
||||||
if err := w.Flush(); err != nil {
|
|
||||||
//receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
|
|
||||||
cancel()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (receiver *AccountController) AccountPipeWC(ctx *fiber.Ctx) error {
|
|
||||||
userID := ctx.Params("userID")
|
|
||||||
|
|
||||||
ctx.Set(fiber.HeaderContentType, "text/event-stream")
|
|
||||||
ctx.Set("Cache-Control", "no-cache")
|
|
||||||
ctx.Set("Connection", "keep-alive")
|
|
||||||
ctx.Set("Transfer-Encoding", "chunked")
|
|
||||||
|
|
||||||
accountCh := make(chan map[string]interface{})
|
|
||||||
//cancelCtx, cancel := context.WithCancel(ctx.Context())
|
|
||||||
|
|
||||||
receiver.watchWorker.AddChannel(userID, accountCh)
|
|
||||||
|
|
||||||
ctx.Status(fiber.StatusOK).Context().SetBodyStreamWriter(func(w *bufio.Writer) {
|
|
||||||
pingTicker := time.NewTicker(5 * time.Second)
|
|
||||||
defer pingTicker.Stop()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case accountData, ok := <-accountCh:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
accountJSON, err := json.Marshal(accountData)
|
|
||||||
if err != nil {
|
|
||||||
receiver.logger.Error("error marshal account JSON", zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
fmt.Fprintf(w, "data: %s\n\n", accountJSON)
|
|
||||||
if err := w.Flush(); err != nil {
|
|
||||||
//receiver.logger.Error("error flushing", zap.Error(err))
|
|
||||||
//cancel()
|
|
||||||
//receiver.logger.Info("Close connection Account Pipe sse")
|
|
||||||
//receiver.watchWorker.DropChannel(userID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-pingTicker.C:
|
|
||||||
fmt.Fprintf(w, "data: %s\n\n", `{"event": "ping"}`)
|
|
||||||
if err := w.Flush(); err != nil {
|
|
||||||
//receiver.logger.Error("error sending ping Account Pipe sse, close connection", zap.Error(err))
|
|
||||||
//receiver.watchWorker.DropChannel(userID)
|
|
||||||
//cancel()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type AccountRepository struct {
|
|
||||||
mongoDB *mongo.Collection
|
|
||||||
logger *zap.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID string, accountCh chan<- map[string]interface{}) error {
|
|
||||||
pipeline := mongo.Pipeline{
|
|
||||||
{{"$match", bson.M{"operationType": "update", "fullDocument.userId": userID}}},
|
|
||||||
}
|
|
||||||
|
|
||||||
opts := options.ChangeStream()
|
|
||||||
opts.SetFullDocument(options.UpdateLookup)
|
|
||||||
changeStream, err := receiver.mongoDB.Watch(ctx, pipeline, opts)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer changeStream.Close(ctx)
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
//receiver.logger.Info("Context canceled, thread is closed now")
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
if changeStream.Next(ctx) {
|
|
||||||
var changeEvent struct {
|
|
||||||
UpdateDescription struct {
|
|
||||||
UpdatedFields map[string]interface{} `bson:"updatedFields"`
|
|
||||||
} `bson:"updateDescription"`
|
|
||||||
}
|
|
||||||
if err := changeStream.Decode(&changeEvent); err != nil {
|
|
||||||
//receiver.logger.Error("error decoding change event", zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case accountCh <- changeEvent.UpdateDescription.UpdatedFields:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
logger, err := zap.NewProduction()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf(err.Error())
|
|
||||||
}
|
|
||||||
defer logger.Sync()
|
|
||||||
|
|
||||||
client, collection := initDB(ctx)
|
|
||||||
defer func() {
|
|
||||||
if err := client.Disconnect(ctx); err != nil {
|
|
||||||
logger.Fatal("Error close mongo", zap.Error(err))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
repo := &AccountRepository{
|
|
||||||
mongoDB: collection,
|
|
||||||
logger: logger,
|
|
||||||
}
|
|
||||||
|
|
||||||
watchWorker := workers.NewWatchWorker(collection, logger)
|
|
||||||
go watchWorker.Run(ctx)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
fmt.Println("stop update")
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
for i := 0; i < 800; i++ {
|
|
||||||
fmt.Println("YA WORK OKAY")
|
|
||||||
res, err := collection.UpdateOne(ctx, bson.M{"userId": fmt.Sprintf("user%d", i)},
|
|
||||||
bson.M{"$set": bson.M{"field": fmt.Sprintf("value-%d", time.Now().UnixNano())}})
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("error update", zap.Error(err))
|
|
||||||
}
|
|
||||||
fmt.Println("TOCHNO OKAY", res.ModifiedCount)
|
|
||||||
}
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
app := initSrv(repo, logger, watchWorker)
|
|
||||||
|
|
||||||
if err := app.Listen(":3000"); err != nil {
|
|
||||||
logger.Fatal("Error server", zap.Error(err))
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user