172 lines
4.6 KiB
Go
172 lines
4.6 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"gitea.pena/PenaSide/common/mongo"
|
|
"gitea.pena/PenaSide/hlog"
|
|
"gitea.pena/PenaSide/trashlog/wrappers/zaptrashlog"
|
|
_ "github.com/ClickHouse/clickhouse-go"
|
|
_ "github.com/lib/pq"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
"transfer_to_clickhouse/db"
|
|
"transfer_to_clickhouse/models"
|
|
"transfer_to_clickhouse/transfer"
|
|
)
|
|
|
|
const pgCred = "host=10.8.0.5 port=5433 user=squiz password=Redalert2 dbname=squiz sslmode=disable"
|
|
const mongoURLCodeword = "mongodb://mongodb.pena:27017/"
|
|
const mongoDatabaseNameCodeword = "auth"
|
|
const mongoURLCustomer = "mongodb://mongodb.pena:27017/"
|
|
const mongoDatabaseNameCustomer = "customer"
|
|
const trashLogHost1 = ":7113"
|
|
const moduleLogger1 = "transfer"
|
|
const authServiceURL = "http://10.7.0.4:59300/user"
|
|
|
|
func main() {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
stopChan := make(chan os.Signal, 1)
|
|
signal.Notify(stopChan, os.Interrupt, syscall.SIGTERM)
|
|
|
|
logger1, err := zap.NewProduction(zap.AddStacktrace(zap.DPanicLevel))
|
|
if err != nil {
|
|
log.Fatalf("failed to init zap logger 1: %v", err)
|
|
}
|
|
|
|
sqlDB, err := db.NewSqlDatabase(ctx, pgCred)
|
|
if err != nil {
|
|
log.Fatalf("Failed init sql db %v", err)
|
|
}
|
|
|
|
mongoDBCodeword, err := mongo.Connect(ctx, &mongo.ConnectDeps{
|
|
Configuration: &mongo.Configuration{
|
|
URL: mongoURLCodeword,
|
|
DatabaseName: mongoDatabaseNameCodeword,
|
|
},
|
|
Timeout: 10 * time.Second,
|
|
})
|
|
if err != nil {
|
|
log.Fatalf("Failed initialize mongo codeword: %v", err)
|
|
}
|
|
|
|
mongoDBCustomer, err := mongo.Connect(ctx, &mongo.ConnectDeps{
|
|
Configuration: &mongo.Configuration{
|
|
URL: mongoURLCustomer,
|
|
DatabaseName: mongoDatabaseNameCustomer,
|
|
},
|
|
Timeout: 10 * time.Second,
|
|
})
|
|
if err != nil {
|
|
log.Fatalf("Failed initialize mongo customer: %v", err)
|
|
}
|
|
|
|
//clickHouse1, err := newClickHousePool(ctx, clickHouseCred1)
|
|
//if err != nil {
|
|
// log.Fatalf("Failed initialize clickhouse 1: %v", err)
|
|
//}
|
|
//
|
|
//clickHouse2, err := newClickHousePool(ctx, clickHouseCred2)
|
|
//if err != nil {
|
|
// log.Fatalf("Failed initialize clickhouse 2: %v", err)
|
|
//}
|
|
|
|
hlogForCh1, err := newHlogger(ctx, logger1, trashLogHost1, moduleLogger1)
|
|
if err != nil {
|
|
log.Fatalf("Failed initialize hlogForCh1:%v", err)
|
|
}
|
|
|
|
codewordTransfer := transfer.NewCodeword(hlogForCh1, mongoDBCodeword)
|
|
|
|
customerTransfer := transfer.NewCustomer(hlogForCh1, mongoDBCustomer, authServiceURL)
|
|
|
|
quizCoreTransfer := transfer.NewQuizCore(sqlDB, hlogForCh1)
|
|
|
|
quizAnswererTransfer := transfer.NewQuizAnswerer(sqlDB, hlogForCh1)
|
|
|
|
err = codewordTransfer.ActivatedEvents(ctx)
|
|
if err != nil {
|
|
log.Fatalf("Invalid activate events in codeword: %v", err)
|
|
}
|
|
err = codewordTransfer.EventCreate(ctx)
|
|
if err != nil {
|
|
log.Fatalf("invalid create events in codeword: %v", err)
|
|
}
|
|
err = codewordTransfer.DeletedEvents(ctx)
|
|
if err != nil {
|
|
log.Fatalf("invalid delete events in codeword: %v", err)
|
|
}
|
|
|
|
err = customerTransfer.SetAccountData(ctx)
|
|
if err != nil {
|
|
log.Fatalf("invalid set account data in customer: %v", err)
|
|
}
|
|
|
|
err = customerTransfer.SetPayCartData(ctx)
|
|
if err != nil {
|
|
log.Fatalf("invalid set pay cart data in customer: %v", err)
|
|
}
|
|
|
|
err = quizCoreTransfer.QuizEvents(ctx)
|
|
if err != nil {
|
|
log.Fatalf("invalid set quiz events in quiz core: %v", err)
|
|
}
|
|
|
|
err = quizCoreTransfer.AccountEvents(ctx)
|
|
if err != nil {
|
|
log.Fatalf("invalid set account events in quiz core: %v", err)
|
|
}
|
|
|
|
err = quizCoreTransfer.QuestionEvents(ctx)
|
|
if err != nil {
|
|
log.Fatalf("invalid set question events in quiz core: %v", err)
|
|
}
|
|
|
|
err = quizAnswererTransfer.AnswererEvents(ctx)
|
|
if err != nil {
|
|
log.Fatalf("invalid set answerer events in quiz answerer: %v", err)
|
|
}
|
|
|
|
<-stopChan
|
|
}
|
|
|
|
func newHlogger(ctx context.Context, logger *zap.Logger, trashLogHost, moduleLogger string) (hlog.Logger, error) {
|
|
clickHouseLogger, err := zaptrashlog.NewCore(ctx, zap.InfoLevel, trashLogHost, "", "", time.Now().Unix())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
loggerForHlog := logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
|
|
return zapcore.NewTee(core, clickHouseLogger)
|
|
}), zap.AddCallerSkip(2))
|
|
|
|
loggerHlog := hlog.New(loggerForHlog).Module(moduleLogger)
|
|
loggerHlog.With(models.AllFields{})
|
|
|
|
return loggerHlog, nil
|
|
}
|
|
|
|
func newClickHousePool(ctx context.Context, cred string) (*sql.DB, error) {
|
|
conn, err := sql.Open("clickhouse", cred)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error open database connection: %w", err)
|
|
}
|
|
|
|
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
|
|
if err := conn.PingContext(timeoutCtx); err != nil {
|
|
return nil, fmt.Errorf("error ping database: %w", err)
|
|
}
|
|
|
|
return conn, nil
|
|
}
|