tools/transfer_to_clickhouse/main.go
2025-02-14 18:08:50 +03:00

172 lines
4.6 KiB
Go

package main
import (
"context"
"database/sql"
"fmt"
"gitea.pena/PenaSide/common/mongo"
"gitea.pena/PenaSide/hlog"
"gitea.pena/PenaSide/trashlog/wrappers/zaptrashlog"
_ "github.com/ClickHouse/clickhouse-go"
_ "github.com/lib/pq"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"log"
"os"
"os/signal"
"syscall"
"time"
"transfer_to_clickhouse/db"
"transfer_to_clickhouse/models"
"transfer_to_clickhouse/transfer"
)
const pgCred = "host=10.8.0.5 port=5433 user=squiz password=Redalert2 dbname=squiz sslmode=disable"
const mongoURLCodeword = "mongodb://mongodb.pena:27017/"
const mongoDatabaseNameCodeword = "auth"
const mongoURLCustomer = "mongodb://mongodb.pena:27017/"
const mongoDatabaseNameCustomer = "customer"
const trashLogHost1 = ":7113"
const moduleLogger1 = "transfer"
const authServiceURL = "http://10.7.0.4:59300/user"
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stopChan := make(chan os.Signal, 1)
signal.Notify(stopChan, os.Interrupt, syscall.SIGTERM)
logger1, err := zap.NewProduction(zap.AddStacktrace(zap.DPanicLevel))
if err != nil {
log.Fatalf("failed to init zap logger 1: %v", err)
}
sqlDB, err := db.NewSqlDatabase(ctx, pgCred)
if err != nil {
log.Fatalf("Failed init sql db %v", err)
}
mongoDBCodeword, err := mongo.Connect(ctx, &mongo.ConnectDeps{
Configuration: &mongo.Configuration{
URL: mongoURLCodeword,
DatabaseName: mongoDatabaseNameCodeword,
},
Timeout: 10 * time.Second,
})
if err != nil {
log.Fatalf("Failed initialize mongo codeword: %v", err)
}
mongoDBCustomer, err := mongo.Connect(ctx, &mongo.ConnectDeps{
Configuration: &mongo.Configuration{
URL: mongoURLCustomer,
DatabaseName: mongoDatabaseNameCustomer,
},
Timeout: 10 * time.Second,
})
if err != nil {
log.Fatalf("Failed initialize mongo customer: %v", err)
}
//clickHouse1, err := newClickHousePool(ctx, clickHouseCred1)
//if err != nil {
// log.Fatalf("Failed initialize clickhouse 1: %v", err)
//}
//
//clickHouse2, err := newClickHousePool(ctx, clickHouseCred2)
//if err != nil {
// log.Fatalf("Failed initialize clickhouse 2: %v", err)
//}
hlogForCh1, err := newHlogger(ctx, logger1, trashLogHost1, moduleLogger1)
if err != nil {
log.Fatalf("Failed initialize hlogForCh1:%v", err)
}
codewordTransfer := transfer.NewCodeword(hlogForCh1, mongoDBCodeword)
customerTransfer := transfer.NewCustomer(hlogForCh1, mongoDBCustomer, authServiceURL)
quizCoreTransfer := transfer.NewQuizCore(sqlDB, hlogForCh1)
quizAnswererTransfer := transfer.NewQuizAnswerer(sqlDB, hlogForCh1)
err = codewordTransfer.ActivatedEvents(ctx)
if err != nil {
log.Fatalf("Invalid activate events in codeword: %v", err)
}
err = codewordTransfer.EventCreate(ctx)
if err != nil {
log.Fatalf("invalid create events in codeword: %v", err)
}
err = codewordTransfer.DeletedEvents(ctx)
if err != nil {
log.Fatalf("invalid delete events in codeword: %v", err)
}
err = customerTransfer.SetAccountData(ctx)
if err != nil {
log.Fatalf("invalid set account data in customer: %v", err)
}
err = customerTransfer.SetPayCartData(ctx)
if err != nil {
log.Fatalf("invalid set pay cart data in customer: %v", err)
}
err = quizCoreTransfer.QuizEvents(ctx)
if err != nil {
log.Fatalf("invalid set quiz events in quiz core: %v", err)
}
err = quizCoreTransfer.AccountEvents(ctx)
if err != nil {
log.Fatalf("invalid set account events in quiz core: %v", err)
}
err = quizCoreTransfer.QuestionEvents(ctx)
if err != nil {
log.Fatalf("invalid set question events in quiz core: %v", err)
}
err = quizAnswererTransfer.AnswererEvents(ctx)
if err != nil {
log.Fatalf("invalid set answerer events in quiz answerer: %v", err)
}
<-stopChan
}
func newHlogger(ctx context.Context, logger *zap.Logger, trashLogHost, moduleLogger string) (hlog.Logger, error) {
clickHouseLogger, err := zaptrashlog.NewCore(ctx, zap.InfoLevel, trashLogHost, "", "", time.Now().Unix())
if err != nil {
return nil, err
}
loggerForHlog := logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewTee(core, clickHouseLogger)
}), zap.AddCallerSkip(2))
loggerHlog := hlog.New(loggerForHlog).Module(moduleLogger)
loggerHlog.With(models.AllFields{})
return loggerHlog, nil
}
func newClickHousePool(ctx context.Context, cred string) (*sql.DB, error) {
conn, err := sql.Open("clickhouse", cred)
if err != nil {
return nil, fmt.Errorf("error open database connection: %w", err)
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := conn.PingContext(timeoutCtx); err != nil {
return nil, fmt.Errorf("error ping database: %w", err)
}
return conn, nil
}