package repository import ( "context" "gitea.pena/PenaSide/codeword/internal/models" codeword_rpc "gitea.pena/PenaSide/codeword/internal/proto/codeword" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "time" ) type StatsRepository struct { mdb *mongo.Collection } func NewStatsRepository(deps Deps) *StatsRepository { return &StatsRepository{mdb: deps.Mdb} } func (r *StatsRepository) UpdateStatistics(ctx context.Context, req *models.ActivateReq, promoCode *models.PromoCode, userID string) error { var key string if req.FastLink != "" { key = req.FastLink } else { key = "-" } var promoCodeStats models.PromoCodeStats err := r.mdb.FindOne(ctx, bson.M{"_id": promoCode.ID}).Decode(&promoCodeStats) if err != nil && mongo.ErrNoDocuments == nil { return err } if promoCodeStats.UsageMap != nil { usageList := promoCodeStats.UsageMap[key] for _, usage := range usageList { if usage.UserID == userID { return ErrPromoCodeAlreadyActivated } } } usage := models.Usage{ UserID: userID, Time: uint64(time.Now().Unix()), } update := bson.M{ "$push": bson.M{ "usageMap." + key: usage, }, } opts := options.Update().SetUpsert(true) _, err = r.mdb.UpdateOne(ctx, bson.M{"_id": promoCode.ID}, update, opts) return err } func (r *StatsRepository) GetStatistics(ctx context.Context, promoCodeID string) (models.PromoCodeStats, error) { objID, err := primitive.ObjectIDFromHex(promoCodeID) if err != nil { return models.PromoCodeStats{}, err } filter := bson.M{"_id": objID} var promoCodeStats models.PromoCodeStats err = r.mdb.FindOne(ctx, filter).Decode(&promoCodeStats) if err != nil { if err == mongo.ErrNoDocuments { return models.PromoCodeStats{}, nil } return models.PromoCodeStats{}, err } return promoCodeStats, nil } func (r *StatsRepository) GetAllPromoActivations(ctx context.Context, req *codeword_rpc.Time, stream codeword_rpc.PromoCodeService_GetAllPromoActivationsServer) error { var pipeline []bson.M pipeline = append(pipeline, bson.M{ "$project": bson.M{ "_id": 1, "usageArray": bson.M{"$objectToArray": "$usageMap"}, }, }) pipeline = append(pipeline, bson.M{ "$unwind": "$usageArray", }) pipeline = append(pipeline, bson.M{ "$unwind": "$usageArray.v", }) pipeline = append(pipeline, bson.M{ "$group": bson.M{ "_id": "$usageArray.v.userID", "promoID": bson.M{"$first": "$_id"}, "Time": bson.M{"$first": "$usageArray.v.time"}, }, }) pipeline = append(pipeline, bson.M{ "$group": bson.M{ "_id": "$promoID", "users": bson.M{"$push": bson.M{ "UserID": "$_id", "Time": "$Time", }}, }, }) cursor, err := r.mdb.Aggregate(ctx, pipeline) if err != nil { return err } for cursor.Next(ctx) { var data struct { ID string `bson:"_id"` Users []struct { UserID string `bson:"UserID"` Time int64 } `bson:"users"` } err := cursor.Decode(&data) if err != nil { return err } resp := &codeword_rpc.PromoActivationResp{ ID: data.ID, } for _, user := range data.Users { resp.Users = append(resp.Users, &codeword_rpc.PromoActivationResp_UserTime{ UserID: user.UserID, Time: user.Time, }) } if err := stream.Send(resp); err != nil { return err } } return nil }