add rpc stream resp taking from stream codeword

This commit is contained in:
Pavel 2024-04-28 11:20:58 +03:00
parent df5b3d8f9a
commit 81baff6eff
5 changed files with 32 additions and 12 deletions

@ -6,6 +6,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"io"
"log"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/errors"
codeword_rpc "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/proto/codeword"
@ -37,7 +38,7 @@ func NewCodewordClient(deps CodewordClientDeps) *CodewordClient {
}
}
func (receiver *CodewordClient) GetAllPromoActivations(ctx context.Context, req *codeword_rpc.Time) (*codeword_rpc.PromoActivationResp, errors.Error) {
func (receiver *CodewordClient) GetAllPromoActivations(ctx context.Context, req *codeword_rpc.Time) (map[string][]*codeword_rpc.PromoActivationResp_UserTime, errors.Error) {
connection, err := grpc.Dial(receiver.codewordServiceHost, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
receiver.logger.Error("failed to connect on <GetAllPromoActivations> of <CodewordClient>", zap.Error(err), zap.String("codeword host", receiver.codewordServiceHost))
@ -51,11 +52,30 @@ func (receiver *CodewordClient) GetAllPromoActivations(ctx context.Context, req
client := codeword_rpc.NewPromoCodeServiceClient(connection)
response, err := client.GetAllPromoActivations(ctx, req)
stream, err := client.GetAllPromoActivations(ctx, req)
if err != nil {
receiver.logger.Error("failed getting stats resp on <GetAllPromoActivations> of <DiscountClient>", zap.Error(err))
return nil, errors.New(fmt.Errorf("failed getting stats resp from codeword: %w", err), errors.ErrInternalError)
}
response := make(map[string][]*codeword_rpc.PromoActivationResp_UserTime)
for {
activations, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
return nil, errors.New(fmt.Errorf("failed to receive response stream in codeword rpc client: %w", err), errors.ErrInternalError)
}
if activations == nil {
continue
}
response[activations.ID] = append(response[activations.ID], activations.Users...)
}
return response, nil
}

@ -378,7 +378,7 @@ func (receiver *HistoryRepository) GetHistoryByListUsers(ctx context.Context, ac
return historyMap, nil
}
func (receiver *HistoryRepository) GetPayUsersPromoHistory(ctx context.Context, codewordData *codeword_rpc.PromoActivationResp, from, to int64) (map[string]int64, error) {
func (receiver *HistoryRepository) GetPayUsersPromoHistory(ctx context.Context, codewordData map[string][]*codeword_rpc.PromoActivationResp_UserTime, from, to int64) (map[string]int64, error) {
match := utils.MatchGen(codewordData, from, to)
pipeline := []bson.M{

@ -920,8 +920,8 @@ func (api *API2) PromocodeLTV(ctx echo.Context) error {
Money int64
})
for promoID, data := range codewordData.Response {
for _, value := range data.Values {
for promoID, data := range codewordData {
for _, value := range data {
paids, ok := userSumMap[value.UserID]
if !ok {

@ -6,11 +6,11 @@ import (
"time"
)
func MatchGen(codewordData *codeword_rpc.PromoActivationResp, from, to int64) bson.M {
func MatchGen(codewordData map[string][]*codeword_rpc.PromoActivationResp_UserTime, from, to int64) bson.M {
userActivations := make(map[string]int64)
for _, values := range codewordData.Response {
for _, activation := range values.Values {
for _, values := range codewordData {
for _, activation := range values {
userActivations[activation.UserID] = activation.Time
}
}

@ -39,8 +39,8 @@ func Test_PromoLTV(t *testing.T) {
Timeout: 10 * time.Second,
})
from := int64(1712907425)
to := int64(1714117025)
from := int64(0)
to := int64(1714291104)
historyRepo := repository.NewHistoryRepository2(logger, mdb.Collection("histories"))
@ -61,8 +61,8 @@ func Test_PromoLTV(t *testing.T) {
Money int64
})
for promoID, data := range codewordData.Response {
for _, value := range data.Values {
for promoID, data := range codewordData {
for _, value := range data {
paids, ok := userSumMap[value.UserID]
if !ok {