package main import ( "context" "database/sql" "fmt" "gitea.pena/PenaSide/common/mongo" "gitea.pena/PenaSide/hlog" "gitea.pena/PenaSide/trashlog/wrappers/zaptrashlog" _ "github.com/ClickHouse/clickhouse-go" _ "github.com/lib/pq" "go.uber.org/zap" "go.uber.org/zap/zapcore" "log" "os" "os/signal" "syscall" "time" "transfer_to_clickhouse/db" "transfer_to_clickhouse/models" "transfer_to_clickhouse/transfer" ) const pgCred = "host=10.8.0.5 port=5433 user=squiz password=Redalert2 dbname=squiz sslmode=disable" const mongoURLCodeword = "mongodb://mongodb.pena:27017/" const mongoDatabaseNameCodeword = "auth" const mongoURLCustomer = "mongodb://mongodb.pena:27017/" const mongoDatabaseNameCustomer = "customer" const trashLogHost1 = ":7113" const moduleLogger1 = "transfer" const authServiceURL = "http://10.7.0.4:59300/user" func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() stopChan := make(chan os.Signal, 1) signal.Notify(stopChan, os.Interrupt, syscall.SIGTERM) logger1, err := zap.NewProduction(zap.AddStacktrace(zap.DPanicLevel)) if err != nil { log.Fatalf("failed to init zap logger 1: %v", err) } sqlDB, err := db.NewSqlDatabase(ctx, pgCred) if err != nil { log.Fatalf("Failed init sql db %v", err) } mongoDBCodeword, err := mongo.Connect(ctx, &mongo.ConnectDeps{ Configuration: &mongo.Configuration{ URL: mongoURLCodeword, DatabaseName: mongoDatabaseNameCodeword, }, Timeout: 10 * time.Second, }) if err != nil { log.Fatalf("Failed initialize mongo codeword: %v", err) } mongoDBCustomer, err := mongo.Connect(ctx, &mongo.ConnectDeps{ Configuration: &mongo.Configuration{ URL: mongoURLCustomer, DatabaseName: mongoDatabaseNameCustomer, }, Timeout: 10 * time.Second, }) if err != nil { log.Fatalf("Failed initialize mongo customer: %v", err) } //clickHouse1, err := newClickHousePool(ctx, clickHouseCred1) //if err != nil { // log.Fatalf("Failed initialize clickhouse 1: %v", err) //} // //clickHouse2, err := newClickHousePool(ctx, clickHouseCred2) //if err != nil { // log.Fatalf("Failed initialize clickhouse 2: %v", err) //} hlogForCh1, err := newHlogger(ctx, logger1, trashLogHost1, moduleLogger1) if err != nil { log.Fatalf("Failed initialize hlogForCh1:%v", err) } codewordTransfer := transfer.NewCodeword(hlogForCh1, mongoDBCodeword) customerTransfer := transfer.NewCustomer(hlogForCh1, mongoDBCustomer, authServiceURL) quizCoreTransfer := transfer.NewQuizCore(sqlDB, hlogForCh1) quizAnswererTransfer := transfer.NewQuizAnswerer(sqlDB, hlogForCh1) err = codewordTransfer.ActivatedEvents(ctx) if err != nil { log.Fatalf("Invalid activate events in codeword: %v", err) } err = codewordTransfer.EventCreate(ctx) if err != nil { log.Fatalf("invalid create events in codeword: %v", err) } err = codewordTransfer.DeletedEvents(ctx) if err != nil { log.Fatalf("invalid delete events in codeword: %v", err) } err = customerTransfer.SetAccountData(ctx) if err != nil { log.Fatalf("invalid set account data in customer: %v", err) } err = customerTransfer.SetPayCartData(ctx) if err != nil { log.Fatalf("invalid set pay cart data in customer: %v", err) } err = quizCoreTransfer.QuizEvents(ctx) if err != nil { log.Fatalf("invalid set quiz events in quiz core: %v", err) } err = quizCoreTransfer.AccountEvents(ctx) if err != nil { log.Fatalf("invalid set account events in quiz core: %v", err) } err = quizCoreTransfer.QuestionEvents(ctx) if err != nil { log.Fatalf("invalid set question events in quiz core: %v", err) } err = quizAnswererTransfer.AnswererEvents(ctx) if err != nil { log.Fatalf("invalid set answerer events in quiz answerer: %v", err) } <-stopChan } func newHlogger(ctx context.Context, logger *zap.Logger, trashLogHost, moduleLogger string) (hlog.Logger, error) { clickHouseLogger, err := zaptrashlog.NewCore(ctx, zap.InfoLevel, trashLogHost, "", "", time.Now().Unix()) if err != nil { return nil, err } loggerForHlog := logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core { return zapcore.NewTee(core, clickHouseLogger) }), zap.AddCallerSkip(2)) loggerHlog := hlog.New(loggerForHlog).Module(moduleLogger) loggerHlog.With(models.AllFields{}) return loggerHlog, nil } func newClickHousePool(ctx context.Context, cred string) (*sql.DB, error) { conn, err := sql.Open("clickhouse", cred) if err != nil { return nil, fmt.Errorf("error open database connection: %w", err) } timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() if err := conn.PingContext(timeoutCtx); err != nil { return nil, fmt.Errorf("error ping database: %w", err) } return conn, nil }