package clickhouse import ( "context" "database/sql" "fmt" "github.com/ClickHouse/clickhouse-go" "github.com/themakers/hlog" "penahub.gitlab.yandexcloud.net/external/trashlog.git/dal/bbolt" "penahub.gitlab.yandexcloud.net/external/trashlog.git/model" "strings" "sync" "time" ) type DAL struct { logger hlog.Logger db *sql.DB putRecordStmt *sql.Stmt buffer []model.Record Schema map[string]string schemaM, bufM sync.Mutex recordsStream chan model.Record recoveryDB *bbolt.DAL } const dbPath = "./recover.bolt" type ErrorPingException struct { Code int32 Message string StackTrace string } func New(ctx context.Context, log hlog.Logger, credentials string) (*DAL, error) { connect, err := sql.Open( "clickhouse", credentials, ) if err != nil { return nil, err } recoverStore, err := bbolt.New(dbPath) if err != nil { return nil, err } if err := connect.PingContext(ctx); err != nil { if exception, ok := err.(*clickhouse.Exception); ok { log.Emit(ErrorPingException{ Code: exception.Code, Message: exception.Message, StackTrace: exception.StackTrace, }) } return nil, err } return &DAL{ db: connect, logger: log.Module("clickhouse"), recoveryDB: recoverStore, Schema: map[string]string{}, recordsStream: make(chan model.Record, 1000), schemaM: sync.Mutex{}, bufM: sync.Mutex{}, }, nil } type ErrorCreateTable struct { Err error Query string } type ErrorCreateStatement struct { Err error Query string } type ErrorGetTableSchema struct { Err error } func (d *DAL) Init(ctx context.Context) error { query := ` CREATE TABLE IF NOT EXISTS statistics ( event_level Enum('info' = 0, 'warn' = 1, 'error' = 2, 'debug' = 3), message String, stacktrace String, svc_build_time DateTime, svc_version String, svc_commit String, svc_file String, svc_line UInt64, svc_module Array(String), event_time DateTime, create_time DateTime ) ENGINE = MergeTree() ORDER BY (svc_version, event_level, svc_build_time,event_time) PARTITION BY toStartOfDay(event_time) PRIMARY KEY (svc_version, event_level, svc_build_time) ` if _, err := d.db.ExecContext(ctx, query); err != nil { d.logger.Emit(ErrorCreateTable{ Err: err, Query: query, }) return err } if _, err := d.db.ExecContext(ctx, "set idle_connection_timeout = 86400;"); err != nil { return err } result, err := d.getCurrentColumns(ctx) if err != nil { return err } d.Schema = result go d.batchRecords(ctx) return nil } func (d *DAL) getCurrentColumns(ctx context.Context) (map[string]string, error) { descriptionRows, err := d.db.QueryContext(ctx, `DESC statistics`) if err != nil { d.logger.Emit(ErrorGetTableSchema{ Err: err, }) return nil, err } defer descriptionRows.Close() descriptionColumns, err := descriptionRows.Columns() if err != nil { d.logger.Emit(ErrorGetTableSchema{ Err: err, }) return nil, err } var ( row = make([]string, len(descriptionColumns)) result = map[string]string{} ) for descriptionRows.Next() { if err := descriptionRows.Scan(&row[0], &row[1], &row[2], &row[3], &row[4], &row[5], &row[6]); err != nil { return nil, err } result[row[0]] = row[1] } return result, nil } type ErrorPanicBatchRecords struct { Err error } type WarnPanicBatch struct { Message string } type ErrorInsertTransaction struct { Err error } func (d *DAL) batchRecords(ctx context.Context) { ticker := time.NewTicker(time.Second) for { select { case t := <-ticker.C: //comment for test cases d.inserter(ctx, t) case r := <-d.recordsStream: d.bufM.Lock() d.buffer = append(d.buffer, r) d.bufM.Unlock() case <-ctx.Done(): ticker.Stop() return } } } func (d *DAL) inserter(ctx context.Context, t time.Time) { if len(d.buffer) == 0 { return } var buffer []model.Record d.bufM.Lock() buffer = d.buffer d.buffer = []model.Record{} d.bufM.Unlock() defer func() { val := recover() if val != nil { if err, ok := val.(error); ok { d.logger.Emit(ErrorPanicBatchRecords{Err: err}) } else { d.logger.Emit(WarnPanicBatch{Message: fmt.Sprint(val)}) } } }() trans, err := d.db.BeginTx(ctx, &sql.TxOptions{ Isolation: sql.LevelDefault, ReadOnly: false, }) if err != nil { d.logger.Emit(ErrorInsertTransaction{Err: err}) return } fields := d.prepareAdditionalFields(buffer) query := d.buildQueryString(fields) insertStmt, err := trans.PrepareContext(ctx, query) if err != nil { d.logger.Emit(ErrorCreateStatement{ Err: err, Query: query, }) return } for _, record := range buffer { if err := d.recordInserter(ctx, insertStmt, record, fields, t); err != nil { fmt.Println("ERRINSERTER", err) continue } } if err := d.recoveryDB.YieldRecovery(func(record model.Record) error { if err := d.recordInserter(ctx, insertStmt, record, fields, t); err != nil { return err } return nil }); err != nil { d.logger.Emit(WarnYieldRecovery{ Err: err, }) } defer func() { if err := insertStmt.Close(); err != nil { d.logger.Emit(ErrorCreateStatement{Err: err}) panic(err) } }() if err := trans.Commit(); err != nil { d.logger.Emit(ErrorInsertTransaction{Err: err}) panic(err) } } type WarnYieldRecovery struct { Err error } func (d *DAL) recordInserter( ctx context.Context, insertStmt *sql.Stmt, record model.Record, fields []string, t time.Time, ) error { svcData := []any{ record.Level, record.Message, record.Stacktrace, record.SvcBuildTime, record.SvcVersion, record.SvcCommit, record.SvcFile, record.SvcLine, clickhouse.Array(record.Module), record.TS, t, } svcLen := len(svcData) dataToInsert := make([]any, len(fields)+svcLen) for i := range dataToInsert { if i < svcLen { dataToInsert[i] = svcData[i] } else { if v, ok := record.KeyFields[fields[i-svcLen]]; ok { dataToInsert[i] = v } else if v, ok := record.CtxFields[fields[i-svcLen]]; ok { dataToInsert[i] = v } else { d.schemaM.Lock() switch d.Schema[fields[i-svcLen]] { case "String": dataToInsert[i] = "" case "Int64": dataToInsert[i] = int64(0) case "Float64": dataToInsert[i] = float64(0) case "UInt8": dataToInsert[i] = uint8(0) } d.schemaM.Unlock() } } } if _, err := insertStmt.ExecContext( ctx, dataToInsert..., ); err != nil { d.logger.Emit(ErrorExecClickhouse{Error: err.Error()}) if err := d.recoveryDB.PutRecord(record); err != nil { return err } } return nil } const unpreparedQuery = ` INSERT INTO statistics (event_level,message,stacktrace,svc_build_time,svc_version,svc_commit,svc_file, svc_line, svc_module,event_time,create_time%s) VALUES (?,?,?,?,?,?,?,?,?,?,?%s); ` func (d *DAL) buildQueryString(fields []string) string { var fldsStr, phStr string if len(fields) > 0 { fldsStr = "," + strings.Join(fields, ",") phStr = strings.Repeat(",?", len(fields)) } return fmt.Sprintf( unpreparedQuery, fldsStr, phStr, ) } func (d *DAL) prepareAdditionalFields(buf []model.Record) (r []string) { exist := map[string]bool{} prepare := func(field string) { if _, ok := exist[field]; !ok { d.schemaM.Lock() defer d.schemaM.Unlock() if _, ok := d.Schema[field]; ok { r = append(r, field) exist[field] = true } } } for _, rec := range buf { for field := range rec.KeyFields { prepare(field) } for field := range rec.CtxFields { prepare(field) } } return } type ErrorExecClickhouse struct { Error string } type DebugNeedMutex struct{} func (d *DAL) PutRecord(record model.Record) { d.recordsStream <- record } // AddColumn function for modify statistics table func (d *DAL) AddColumn(ctx context.Context, columns map[string]string) error { var colStatements []string for name, t := range columns { colStatements = append(colStatements, fmt.Sprintf( "ADD COLUMN IF NOT EXISTS %s %s", name, t, )) } if len(colStatements) > 0 { if _, err := d.db.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE statistics %s;`, strings.Join(colStatements, ","))); err != nil { return err } } for name, t := range columns { d.schemaM.Lock() d.Schema[name] = t d.schemaM.Unlock() } return nil } // getBuffer need only for tests func (d *DAL) getBuffer() []model.Record { r := make([]model.Record, len(d.buffer)) copy(r, d.buffer) return r }