customer/scripts/data_to_clickhouse/main.go
2024-11-20 15:32:18 +03:00

223 lines
5.7 KiB
Go

package main
import (
"context"
"fmt"
"gitea.pena/PenaSide/customer/internal/interface/client"
"gitea.pena/PenaSide/customer/internal/models"
"gitea.pena/PenaSide/customer/internal/utils"
"gitea.pena/PenaSide/trashlog/wrappers/zaptrashlog"
"github.com/themakers/hlog"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"log"
"time"
)
var TrashLogHost = "localhost:7113"
var Version = "replenishment"
var Commit = "last"
var BuildTime = time.Now().Unix()
var ModuleLogger = "customer-replenishment"
var AuthServiceUrl = ""
var MongoURL = "mongodb://test:test@localhost:27020/?authSource=admin"
var MongoDatabase = "admin"
var AccountCollection = "accounts"
var HistoryCollection = "history"
func main() {
ctx := context.Background()
logger, err := zap.NewProduction(zap.AddStacktrace(zap.DPanicLevel))
if err != nil {
log.Fatalf("failed to init zap logger: %v", err)
}
clickHouseLogger, err := zaptrashlog.NewCore(ctx, zap.InfoLevel, TrashLogHost, Version, Commit, BuildTime)
if err != nil {
panic(err)
}
loggerForHlog := logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewTee(core, clickHouseLogger)
}), zap.AddCallerSkip(2))
loggerHlog := hlog.New(loggerForHlog).Module(ModuleLogger)
loggerHlog.With(models.AllFields{})
authClient := client.NewAuthClient(client.AuthClientDeps{
Logger: logger,
URLs: &models.AuthMicroserviceURL{
User: AuthServiceUrl,
},
})
mongoClient, err := connectToMongoDB(ctx, MongoURL)
if err != nil {
logger.Fatal("failed to connect to MongoDB", zap.Error(err))
}
defer func() {
if err := mongoClient.Disconnect(ctx); err != nil {
logger.Error("failed to disconnect MongoDB client", zap.Error(err))
}
}()
err = SetAccountData(ctx, loggerHlog, authClient, mongoClient)
if err != nil {
logger.Error("failed set account data to clickHouse", zap.Error(err))
return
}
err = SetPayCartData(ctx, loggerHlog, mongoClient)
if err != nil {
logger.Error("failed set pay cart data to clickHouse", zap.Error(err))
return
}
time.Sleep(15 * time.Second)
}
func connectToMongoDB(ctx context.Context, mongoURL string) (*mongo.Client, error) {
clientOpts := options.Client().ApplyURI(mongoURL)
client, err := mongo.Connect(ctx, clientOpts)
if err != nil {
return nil, err
}
err = client.Ping(ctx, nil)
if err != nil {
return nil, err
}
return client, nil
}
func SetAccountData(ctx context.Context, logger hlog.Logger, authClient *client.AuthClient, mongoClient *mongo.Client) error {
cl := mongoClient.Database(MongoDatabase).Collection(AccountCollection)
filter := bson.M{"isDeleted": false}
opts := options.Find()
cursor, err := cl.Find(ctx, filter, opts)
if err != nil {
return err
}
defer cursor.Close(ctx)
var accounts []models.Account
for cursor.Next(ctx) {
var account models.Account
if err := cursor.Decode(&account); err != nil {
return err
}
accounts = append(accounts, account)
}
if err := cursor.Err(); err != nil {
return err
}
for _, account := range accounts {
logger.Emit(models.InfoGetAccount{
CtxUserID: account.UserID,
CtxAccountID: account.ID,
})
for _, item := range account.Cart {
logger.Emit(models.InfoAddToCart{
CtxUserID: account.UserID,
CtxAccountID: account.ID,
CtxTariffID: item,
})
}
userData, err := authClient.GetUser(ctx, account.UserID)
if err != nil {
fmt.Printf("error getting user from auth service,scip him, userid:%s, error: %s", account.UserID, err.Error())
continue
}
quiz := ""
if account.From != "" {
quiz = "quiz"
}
logger.Emit(models.InfoCreateAccount{
CtxUserID: account.UserID,
CtxAccountID: account.ID,
KeyFromSource: quiz,
KeyFromID: account.From,
KeyFromPartner: account.Partner,
CtxLogin: userData.Login,
CtxEmail: userData.Email,
CtxPhone: userData.PhoneNumber,
KeyCurrency: account.Wallet.Currency,
})
}
return nil
}
func SetPayCartData(ctx context.Context, logger hlog.Logger, mongoClient *mongo.Client) error {
clHistory := mongoClient.Database(MongoDatabase).Collection(HistoryCollection)
clAccount := mongoClient.Database(MongoDatabase).Collection(AccountCollection)
filter := bson.M{"isDeleted": false}
opts := options.Find()
cursor, err := clHistory.Find(ctx, filter, opts)
if err != nil {
return err
}
defer cursor.Close(ctx)
var histories []models.History
for cursor.Next(ctx) {
var history models.History
if err := cursor.Decode(&history); err != nil {
return err
}
histories = append(histories, history)
}
if err := cursor.Err(); err != nil {
return err
}
for _, history := range histories {
currentAccount, err := findByUserID(ctx, clAccount, history.UserID)
if err != nil {
fmt.Printf("Error getting account by userID:%s", err.Error())
continue
}
logger.Emit(models.InfoPayCart{
CtxUserID: history.UserID,
CtxAccountID: currentAccount.ID,
KeySuccess: uint8(1),
CtxPrice: history.RawDetails.Price,
//CtxTariff: strings.Join(account.Cart, ","),
//CtxDiscount: strings.Join(utils.GetAppliedDiscountsIDs(discountResponse.AppliedDiscounts), ","),
//CtxRowPrice: int64(tariffsAmount),
CtxRowData: utils.MarshalRawDetails(history.RawDetails),
})
}
return nil
}
func findByUserID(ctx context.Context, clAccount *mongo.Collection, userID string) (*models.Account, error) {
filter := bson.M{
"userId": userID,
"isDeleted": false,
}
var account models.Account
err := clAccount.FindOne(ctx, filter).Decode(&account)
if err != nil {
if err == mongo.ErrNoDocuments {
fmt.Printf("No account found for userID: %s", userID)
return nil, nil
}
return nil, err
}
return &account, nil
}