change logic for rpc stream sending message

This commit is contained in:
Pavel 2024-04-28 11:08:58 +03:00
parent 3f74b1cb78
commit 2cd5c4f5eb
2 changed files with 17 additions and 15 deletions

@ -79,7 +79,7 @@ func (r *StatsRepository) GetStatistics(ctx context.Context, promoCodeID string)
return promoCodeStats, nil return promoCodeStats, nil
} }
func (r *StatsRepository) GetAllPromoActivations(ctx context.Context, req *codeword_rpc.Time) (*codeword_rpc.PromoActivationResp, error) { func (r *StatsRepository) GetAllPromoActivations(ctx context.Context, req *codeword_rpc.Time, stream codeword_rpc.PromoCodeService_GetAllPromoActivationsServer) error {
var pipeline []bson.M var pipeline []bson.M
pipeline = append(pipeline, bson.M{ pipeline = append(pipeline, bson.M{
"$project": bson.M{ "$project": bson.M{
@ -116,10 +116,10 @@ func (r *StatsRepository) GetAllPromoActivations(ctx context.Context, req *codew
cursor, err := r.mdb.Aggregate(ctx, pipeline) cursor, err := r.mdb.Aggregate(ctx, pipeline)
if err != nil { if err != nil {
return nil, err
return err
} }
result := make(map[string]*codeword_rpc.PromoActivationResp_Activations)
for cursor.Next(ctx) { for cursor.Next(ctx) {
var data struct { var data struct {
ID string `bson:"_id"` ID string `bson:"_id"`
@ -130,22 +130,24 @@ func (r *StatsRepository) GetAllPromoActivations(ctx context.Context, req *codew
} }
err := cursor.Decode(&data) err := cursor.Decode(&data)
if err != nil { if err != nil {
return nil, err return err
} }
if _, ok := result[data.ID]; !ok { resp := &codeword_rpc.PromoActivationResp{
result[data.ID] = &codeword_rpc.PromoActivationResp_Activations{} ID: data.ID,
} }
for _, user := range data.Users { for _, user := range data.Users {
result[data.ID].Values = append(result[data.ID].Values, &codeword_rpc.PromoActivationResp_UserTime{ resp.Users = append(resp.Users, &codeword_rpc.PromoActivationResp_UserTime{
UserID: user.UserID, UserID: user.UserID,
Time: user.Time, Time: user.Time,
}) })
} }
if err := stream.Send(resp); err != nil {
return err
}
} }
return &codeword_rpc.PromoActivationResp{ return nil
Response: result,
}, nil
} }

@ -29,7 +29,7 @@ type PromoCodeRepository interface {
type PromoStatsRepository interface { type PromoStatsRepository interface {
UpdateStatistics(ctx context.Context, req *models.ActivateReq, promoCode *models.PromoCode, userID string) error UpdateStatistics(ctx context.Context, req *models.ActivateReq, promoCode *models.PromoCode, userID string) error
GetStatistics(ctx context.Context, promoCodeID string) (models.PromoCodeStats, error) GetStatistics(ctx context.Context, promoCodeID string) (models.PromoCodeStats, error)
GetAllPromoActivations(ctx context.Context, req *codeword_rpc.Time) (*codeword_rpc.PromoActivationResp, error) GetAllPromoActivations(ctx context.Context, req *codeword_rpc.Time, stream codeword_rpc.PromoCodeService_GetAllPromoActivationsServer) error
} }
type PromoDeps struct { type PromoDeps struct {
@ -272,12 +272,12 @@ func (s *PromoCodeService) GetStats(ctx context.Context, req models.PromoStatReq
return resp, nil return resp, nil
} }
func (s *PromoCodeService) GetAllPromoActivations(ctx context.Context, req *codeword_rpc.Time) (*codeword_rpc.PromoActivationResp, error) { func (s *PromoCodeService) GetAllPromoActivations(req *codeword_rpc.Time, stream codeword_rpc.PromoCodeService_GetAllPromoActivationsServer) error {
result, err := s.statsRepo.GetAllPromoActivations(ctx, req) err := s.statsRepo.GetAllPromoActivations(stream.Context(), req, stream)
if err != nil { if err != nil {
s.logger.Error("error getting all promo activations data", zap.Error(err)) s.logger.Error("error getting all promo activations data", zap.Error(err))
return nil, err return err
} }
return result, nil return nil
} }