package repository import ( "context" "fmt" "log" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" mongoWrapper "penahub.gitlab.yandexcloud.net/backend/penahub_common/mongo" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/errors" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/fields" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models" ) type AccountRepositoryDeps struct { Logger *zap.Logger MongoDB *mongo.Collection } type AccountRepository struct { logger *zap.Logger mongoDB *mongo.Collection } func NewAccountRepository(deps AccountRepositoryDeps) *AccountRepository { if deps.Logger == nil { log.Panicln("logger is nil on ") } if deps.MongoDB == nil { log.Panicln("mongodb is nil on ") } return &AccountRepository{ logger: deps.Logger, mongoDB: deps.MongoDB, } } func (receiver *AccountRepository) FindByUserID(ctx context.Context, id string) (*models.Account, errors.Error) { filter := bson.M{ fields.Account.UserID: id, fields.Account.IsDeleted: false, } account, err := mongoWrapper.FindOne[models.Account](ctx, &mongoWrapper.RequestSettings{ Driver: receiver.mongoDB, Filter: filter, }) if err != nil { receiver.logger.Error("failed to find account by userID on of ", zap.String("id", id), zap.Error(err), ) findError := errors.New( fmt.Errorf("failed to find account with <%s> on of : %w", id, err), errors.ErrInternalError, ) if err == mongo.ErrNoDocuments { findError.SetType(errors.ErrNotFound) } return nil, findError } return account, nil } func (receiver *AccountRepository) FindMany(ctx context.Context, page, limit int64) ([]models.Account, errors.Error) { filter := bson.M{fields.Account.IsDeleted: false} findOptions := options.Find() skip := (page - 1) * limit findOptions.SetSkip(skip) findOptions.SetLimit(limit) accounts, err := mongoWrapper.Find[models.Account](ctx, &mongoWrapper.RequestSettings{ Driver: receiver.mongoDB, Options: findOptions, Filter: filter, }) if err != nil { receiver.logger.Error("failed to find many accounts on of ", zap.Int64("page", page), zap.Int64("limit", limit), zap.Int64("skip", skip), zap.Error(err), ) return nil, errors.New( fmt.Errorf("failed to find many accounts on of : %w", err), errors.ErrInternalError, ) } return accounts, nil } func (receiver *AccountRepository) Insert(ctx context.Context, account *models.Account) (*models.Account, errors.Error) { result, err := receiver.mongoDB.InsertOne(ctx, account.Sanitize()) if err != nil { receiver.logger.Error("failed to insert account on of ", zap.Any("account", account), zap.Error(err), ) return nil, errors.New( fmt.Errorf("failed to insert account on of : %w", err), errors.ErrInternalError, ) } insertedID := result.InsertedID.(primitive.ObjectID).Hex() account.ID = insertedID return account, nil } func (receiver *AccountRepository) Remove(ctx context.Context, id string) (*models.Account, errors.Error) { account := models.Account{} options := options.FindOneAndUpdate().SetReturnDocument(options.After) filter := bson.M{ fields.Account.UserID: id, fields.Account.IsDeleted: false, } update := bson.M{"$set": bson.M{ fields.Account.IsDeleted: true, fields.Account.DeletedAt: time.Now(), }} if err := receiver.mongoDB.FindOneAndUpdate(ctx, filter, update, options).Decode(&account); err != nil { receiver.logger.Error("failed to set 'deleted=true' on of ", zap.String("id", id), zap.Error(err), ) removeErr := errors.New( fmt.Errorf("failed to remove account with <%s> on of : %w", id, err), errors.ErrInternalError, ) if err == mongo.ErrNoDocuments { removeErr.SetType(errors.ErrNotFound) } return nil, removeErr } return &account, nil } func (receiver *AccountRepository) CountAll(ctx context.Context) (int64, errors.Error) { count, err := receiver.mongoDB.CountDocuments(ctx, bson.M{fields.Account.IsDeleted: false}) if err != nil { receiver.logger.Error("failed to count all documents on of ", zap.Error(err)) return 0, errors.New( fmt.Errorf("failed to count all documents on of : %w", err), errors.ErrInternalError, ) } return count, nil } func (receiver *AccountRepository) AddItemToCart(ctx context.Context, userID, itemID string) (*models.Account, errors.Error) { account := models.Account{} options := options.FindOneAndUpdate().SetReturnDocument(options.After) filter := bson.M{ fields.Account.UserID: userID, fields.Account.IsDeleted: false, } update := bson.M{ "$addToSet": bson.M{fields.Account.Cart: itemID}, "$set": bson.M{fields.Account.UpdatedAt: time.Now()}, } if err := receiver.mongoDB.FindOneAndUpdate(ctx, filter, update, options).Decode(&account); err != nil { receiver.logger.Error("failed to add item on of ", zap.String("userID", userID), zap.String("itemID", itemID), zap.Error(err), ) removeErr := errors.New( fmt.Errorf("failed to add item <%s> account with <%s> on of : %w", itemID, userID, err), errors.ErrInternalError, ) if err == mongo.ErrNoDocuments { removeErr.SetType(errors.ErrNotFound) } return nil, removeErr } return &account, nil } func (receiver *AccountRepository) RemoveItemFromCart(ctx context.Context, userID, itemID string) (*models.Account, errors.Error) { account := models.Account{} options := options.FindOneAndUpdate().SetReturnDocument(options.After) filter := bson.M{ fields.Account.UserID: userID, fields.Account.IsDeleted: false, } update := bson.M{ "$pull": bson.M{fields.Account.Cart: itemID}, "$set": bson.M{fields.Account.UpdatedAt: time.Now()}, } if err := receiver.mongoDB.FindOneAndUpdate(ctx, filter, update, options).Decode(&account); err != nil { receiver.logger.Error("failed to add item on of ", zap.String("userID", userID), zap.String("itemID", itemID), zap.Error(err), ) removeErr := errors.New( fmt.Errorf("failed to add item <%s> account with <%s> on of : %w", itemID, userID, err), errors.ErrInternalError, ) if err == mongo.ErrNoDocuments { removeErr.SetType(errors.ErrNotFound) } return nil, removeErr } return &account, nil } func (receiver *AccountRepository) ChangeWallet(ctx context.Context, userID string, wallet *models.Wallet) (*models.Account, errors.Error) { account := models.Account{} options := options.FindOneAndUpdate().SetReturnDocument(options.After) filter := bson.M{ fields.Account.UserID: userID, fields.Account.IsDeleted: false, } update := bson.M{"$set": bson.M{ fields.Account.Wallet: wallet, fields.Account.UpdatedAt: time.Now(), }} if err := receiver.mongoDB.FindOneAndUpdate(ctx, filter, update, options).Decode(&account); err != nil { receiver.logger.Error("failed to change wallet on of ", zap.Error(err), zap.String("userID", userID), zap.Any("wallet", wallet), ) removeErr := errors.New( fmt.Errorf("failed to change wallet of account <%s> on of : %w", userID, err), errors.ErrInternalError, ) if err == mongo.ErrNoDocuments { removeErr.SetType(errors.ErrNotFound) } return nil, removeErr } return &account, nil } func (receiver *AccountRepository) ClearCart(ctx context.Context, userID string) (*models.Account, errors.Error) { account := models.Account{} options := options.FindOneAndUpdate().SetReturnDocument(options.After) filter := bson.M{ fields.Account.UserID: userID, fields.Account.IsDeleted: false, } update := bson.M{"$set": bson.M{ fields.Account.Cart: []string{}, fields.Account.UpdatedAt: time.Now(), }} if err := receiver.mongoDB.FindOneAndUpdate(ctx, filter, update, options).Decode(&account); err != nil { receiver.logger.Error("failed to clear cart on of ", zap.String("userID", userID), zap.Error(err), ) removeErr := errors.New( fmt.Errorf("failed to clear cart of account <%s> on of : %w", userID, err), errors.ErrInternalError, ) if err == mongo.ErrNoDocuments { removeErr.SetType(errors.ErrNotFound) } return nil, removeErr } return &account, nil } func (receiver *AccountRepository) SetStatus(ctx context.Context, userID string, status models.AccountStatus) (*models.Account, errors.Error) { account := models.Account{} options := options.FindOneAndUpdate().SetReturnDocument(options.After) filter := bson.M{ fields.Account.UserID: userID, fields.Account.IsDeleted: false, } update := bson.M{"$set": bson.M{ fields.Account.Status: status, fields.Account.UpdatedAt: time.Now(), }} if err := receiver.mongoDB.FindOneAndUpdate(ctx, filter, update, options).Decode(&account); err != nil { receiver.logger.Error("failed to set status on of ", zap.Error(err), zap.String("userID", userID), zap.String("status", string(status)), ) removeErr := errors.New( fmt.Errorf("failed to set status <%s> to account <%s> on of : %w", status, userID, err), errors.ErrInternalError, ) if err == mongo.ErrNoDocuments { removeErr.SetType(errors.ErrNotFound) } return nil, removeErr } return &account, nil } func (receiver *AccountRepository) UpdateName(ctx context.Context, userID string, name *models.Name) (*models.Account, errors.Error) { account := models.Account{} options := options.FindOneAndUpdate().SetReturnDocument(options.After) filter := bson.M{ fields.Account.UserID: userID, fields.Account.IsDeleted: false, } update := bson.M{"$set": bson.M{ fields.Account.Name: name, fields.Account.UpdatedAt: time.Now(), }} if err := receiver.mongoDB.FindOneAndUpdate(ctx, filter, update, options).Decode(&account); err != nil { receiver.logger.Error("failed to change name on of ", zap.Error(err), zap.String("userID", userID), zap.Any("name", name), ) removeErr := errors.New( fmt.Errorf("failed to change name of account <%s> on of : %w", userID, err), errors.ErrInternalError, ) if err == mongo.ErrNoDocuments { removeErr.SetType(errors.ErrNotFound) } return nil, removeErr } return &account, nil } type QuizLogoStatDeps struct { From *int Limit *int Page *int To *int } type QuizLogoStats struct { ID string `bson:"_id"` Regs int `bson:"regs"` Money int64 `bson:"money"` Quizes []Quiz `bson:"quizes"` } type Quiz struct { QuizID string `bson:"quiz"` Regs int `bson:"regs"` Money int64 `bson:"money"` } func (receiver *AccountRepository) QuizLogoStat(ctx context.Context, req QuizLogoStatDeps) ([]QuizLogoStats, error) { var pipeline mongo.Pipeline if req.From != nil && req.To != nil { match := bson.D{ {"$match", bson.D{ {"createdAt", bson.D{{"$gte", time.Unix(int64(*req.From), 0)}}}, {"createdAt", bson.D{{"$lte", time.Unix(int64(*req.To), 0)}}}, {"from", bson.D{{"$exists", true}, {"$ne", ""}}}, {"partner", bson.D{{"$exists", true}, {"$ne", ""}}}, }}, } pipeline = append(pipeline, match) } else { match := bson.D{ {"$match", bson.D{ {"from", bson.D{{"$exists", true}, {"$ne", ""}}}, {"partner", bson.D{{"$exists", true}, {"$ne", ""}}}, }}, } pipeline = append(pipeline, match) } pipeline = append(pipeline, mongo.Pipeline{ { {"$lookup", bson.D{ {"from", "histories"}, {"localField", "userId"}, {"foreignField", "userId"}, {"as", "history"}, }}, }, { {"$unwind", bson.D{ {"path", "$history"}, {"preserveNullAndEmptyArrays", true}, }}, }, { {"$match", bson.D{ {"$or", bson.A{ bson.D{{"history", bson.D{{"$exists", false}}}}, bson.D{{"history.key", "payment.succeeded"}}, }}, }}, }, { {"$group", bson.D{ {"_id", "$userId"}, {"partner", bson.D{{"$first", "$partner"}}}, {"from", bson.D{{"$first", "$from"}}}, {"sum", bson.D{{"$sum", "$history.rawDetails.price"}}}, }}, }, { {"$group", bson.D{ {"_id", "$from"}, {"partner", bson.D{{"$first", "$partner"}}}, {"regs", bson.D{{"$count", bson.D{}}}}, {"money", bson.D{{"$sum", "$sum"}}}, }}, }, { {"$group", bson.D{ {"_id", "$partner"}, {"regs", bson.D{{"$sum", "$regs"}}}, {"money", bson.D{{"$sum", "$money"}}}, {"quizes", bson.D{{"$push", bson.D{ {"quiz", "$_id"}, {"regs", "$regs"}, {"money", "$money"}, }}}}}, }, }, }...) var results []QuizLogoStats cursor, err := receiver.mongoDB.Aggregate(ctx, pipeline) if err != nil { return nil, err } defer cursor.Close(ctx) if err := cursor.All(ctx, &results); err != nil { return nil, err } return results, nil } func (receiver *AccountRepository) AccountPipe(ctx context.Context, userID string, accountCh chan<- models.Account) error { pipeline := mongo.Pipeline{ {{"$match", bson.M{"operationType": "update", "fullDocument.userId": userID}}}, } opts := options.ChangeStream() opts.SetFullDocument(options.UpdateLookup) changeStream, err := receiver.mongoDB.Watch(ctx, pipeline, opts) if err != nil { return err } defer changeStream.Close(ctx) for { select { case <-ctx.Done(): return nil default: if changeStream.Next(ctx) { var changeEvent struct { FullDocument models.Account `bson:"fullDocument"` } if err := changeStream.Decode(&changeEvent); err != nil { receiver.logger.Error("error decoding change event", zap.Error(err)) continue } select { case accountCh <- changeEvent.FullDocument: case <-ctx.Done(): return nil } } } } }