package main import ( "context" "fmt" "gitea.pena/PenaSide/customer/internal/interface/client" "gitea.pena/PenaSide/customer/internal/models" "gitea.pena/PenaSide/customer/internal/utils" "gitea.pena/PenaSide/trashlog/wrappers/zaptrashlog" "github.com/themakers/hlog" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" "go.uber.org/zap/zapcore" "log" "time" ) var TrashLogHost = "localhost:7113" var Version = "replenishment" var Commit = "last" var BuildTime = time.Now().Unix() var ModuleLogger = "customer-replenishment" var AuthServiceUrl = "" var MongoURL = "mongodb://test:test@localhost:27020/?authSource=admin" var MongoDatabase = "admin" var AccountCollection = "accounts" var HistoryCollection = "history" func main() { ctx := context.Background() logger, err := zap.NewProduction(zap.AddStacktrace(zap.DPanicLevel)) if err != nil { log.Fatalf("failed to init zap logger: %v", err) } clickHouseLogger, err := zaptrashlog.NewCore(ctx, zap.InfoLevel, TrashLogHost, Version, Commit, BuildTime) if err != nil { panic(err) } loggerForHlog := logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core { return zapcore.NewTee(core, clickHouseLogger) }), zap.AddCallerSkip(2)) loggerHlog := hlog.New(loggerForHlog).Module(ModuleLogger) loggerHlog.With(models.AllFields{}) authClient := client.NewAuthClient(client.AuthClientDeps{ Logger: logger, URL: AuthServiceUrl, }) mongoClient, err := connectToMongoDB(ctx, MongoURL) if err != nil { logger.Fatal("failed to connect to MongoDB", zap.Error(err)) } defer func() { if err := mongoClient.Disconnect(ctx); err != nil { logger.Error("failed to disconnect MongoDB client", zap.Error(err)) } }() err = SetAccountData(ctx, loggerHlog, authClient, mongoClient) if err != nil { logger.Error("failed set account data to clickHouse", zap.Error(err)) return } err = SetPayCartData(ctx, loggerHlog, mongoClient) if err != nil { logger.Error("failed set pay cart data to clickHouse", zap.Error(err)) return } time.Sleep(15 * time.Second) } func connectToMongoDB(ctx context.Context, mongoURL string) (*mongo.Client, error) { clientOpts := options.Client().ApplyURI(mongoURL) client, err := mongo.Connect(ctx, clientOpts) if err != nil { return nil, err } err = client.Ping(ctx, nil) if err != nil { return nil, err } return client, nil } func SetAccountData(ctx context.Context, logger hlog.Logger, authClient *client.AuthClient, mongoClient *mongo.Client) error { cl := mongoClient.Database(MongoDatabase).Collection(AccountCollection) filter := bson.M{"isDeleted": false} opts := options.Find() cursor, err := cl.Find(ctx, filter, opts) if err != nil { return err } defer cursor.Close(ctx) var accounts []models.Account for cursor.Next(ctx) { var account models.Account if err := cursor.Decode(&account); err != nil { return err } accounts = append(accounts, account) } if err := cursor.Err(); err != nil { return err } for _, account := range accounts { logger.Emit(models.InfoGetAccount{ CtxUserID: account.UserID, CtxAccountID: account.ID, }) for _, item := range account.Cart { logger.Emit(models.InfoAddToCart{ CtxUserID: account.UserID, CtxAccountID: account.ID, CtxTariffID: item, }) } userData, err := authClient.GetUser(ctx, account.UserID) if err != nil { fmt.Printf("error getting user from auth service,scip him, userid:%s, error: %s", account.UserID, err.Error()) continue } quiz := "" if account.From != "" { quiz = "quiz" } logger.Emit(models.InfoCreateAccount{ CtxUserID: account.UserID, CtxAccountID: account.ID, KeyFromSource: quiz, KeyFromID: account.From, KeyFromPartner: account.Partner, CtxLogin: userData.Login, CtxEmail: userData.Email, CtxPhone: userData.PhoneNumber, KeyCurrency: account.Wallet.Currency, }) } return nil } func SetPayCartData(ctx context.Context, logger hlog.Logger, mongoClient *mongo.Client) error { clHistory := mongoClient.Database(MongoDatabase).Collection(HistoryCollection) clAccount := mongoClient.Database(MongoDatabase).Collection(AccountCollection) filter := bson.M{"isDeleted": false} opts := options.Find() cursor, err := clHistory.Find(ctx, filter, opts) if err != nil { return err } defer cursor.Close(ctx) var histories []models.History for cursor.Next(ctx) { var history models.History if err := cursor.Decode(&history); err != nil { return err } histories = append(histories, history) } if err := cursor.Err(); err != nil { return err } for _, history := range histories { currentAccount, err := findByUserID(ctx, clAccount, history.UserID) if err != nil { fmt.Printf("Error getting account by userID:%s", err.Error()) continue } logger.Emit(models.InfoPayCart{ CtxUserID: history.UserID, CtxAccountID: currentAccount.ID, KeySuccess: uint8(1), CtxPrice: history.RawDetails.Price, //CtxTariff: strings.Join(account.Cart, ","), //CtxDiscount: strings.Join(utils.GetAppliedDiscountsIDs(discountResponse.AppliedDiscounts), ","), //CtxRowPrice: int64(tariffsAmount), CtxRowData: utils.MarshalRawDetails(history.RawDetails), }) } return nil } func findByUserID(ctx context.Context, clAccount *mongo.Collection, userID string) (*models.Account, error) { filter := bson.M{ "userId": userID, "isDeleted": false, } var account models.Account err := clAccount.FindOne(ctx, filter).Decode(&account) if err != nil { if err == mongo.ErrNoDocuments { fmt.Printf("No account found for userID: %s", userID) return nil, nil } return nil, err } return &account, nil }