trashlog/dal/clickhouse/clickhouse.go

326 lines
5.8 KiB
Go

package clickhouse
import (
"context"
"database/sql"
"fmt"
"github.com/BlackBroker/trashlog/dal/bbolt"
"github.com/BlackBroker/trashlog/model"
"github.com/ClickHouse/clickhouse-go"
"github.com/themakers/hlog"
"strings"
"time"
)
type DAL struct {
logger hlog.Logger
db *sql.DB
putRecordStmt *sql.Stmt
buffer []model.Record
schema []map[string]*string
recoveryDB *bbolt.DAL
}
const dbPath = "./recover.bolt"
type ErrorPingException struct {
Code int32
Message string
StackTrace string
}
func New(ctx context.Context, log hlog.Logger, credentials string) (*DAL, error) {
connect, err := sql.Open(
"clickhouse",
credentials,
)
if err != nil {
return nil, err
}
recoverStore, err := bbolt.New(dbPath)
if err != nil {
return nil, err
}
if err := connect.PingContext(ctx); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
log.Emit(ErrorPingException{
Code: exception.Code,
Message: exception.Message,
StackTrace: exception.StackTrace,
})
}
return nil, err
}
return &DAL{
db: connect,
logger: log.Module("clickhouse"),
recoveryDB: recoverStore,
}, nil
}
type ErrorCreateTable struct {
Err error
Query string
}
type ErrorCreateStatement struct {
Err error
Query string
}
type ErrorGetTableSchema struct {
Err error
}
func (d *DAL) Init(ctx context.Context) error {
query := `
CREATE TABLE IF NOT EXISTS statistics (
event_level Enum('info' = 0, 'warn' = 1, 'error' = 2, 'debug' = 3),
message String,
stacktrace String,
svc_build_time DateTime,
svc_version String,
svc_commit String,
svc_file String,
svc_line UInt64,
svc_module Array(String),
event_time DateTime,
create_time DateTime
) ENGINE = MergeTree()
ORDER BY (svc_version, event_level, svc_build_time,event_time)
PARTITION BY toStartOfDay(event_time)
PRIMARY KEY (svc_version, event_level, svc_build_time)
`
if _, err := d.db.ExecContext(ctx, query); err != nil {
d.logger.Emit(ErrorCreateTable{
Err: err,
Query: query,
})
return err
}
result, err := d.db.QueryContext(ctx, `DESC statistics`)
if err != nil {
d.logger.Emit(ErrorGetTableSchema{
Err: err,
})
return err
}
cols, err := result.Columns()
if err != nil {
d.logger.Emit(ErrorGetTableSchema{
Err: err,
})
return err
}
var schema []map[string]*string
for result.Next() {
data := map[string]*string{}
for _, fieldName := range cols {
var str string
data[fieldName] = &str
}
if err := result.Scan(
data[cols[0]],
data[cols[1]],
data[cols[2]],
data[cols[3]],
data[cols[4]],
data[cols[5]],
data[cols[6]],
); err != nil {
d.logger.Emit(ErrorGetTableSchema{
Err: err,
})
return err
}
schema = append(schema, data)
}
if err := result.Close(); err != nil {
return err
}
d.schema = schema
go d.batchRecords(ctx)
return nil
}
type ErrorPanicBatchRecords struct {
Err error
}
type WarnPanicBatch struct {
Message string
}
type ErrorInsertTransaction struct {
Err error
}
func (d *DAL) batchRecords(ctx context.Context) {
ticker := time.NewTicker(time.Second)
for {
select {
case t := <-ticker.C:
d.inserter(ctx, t)
case <-ctx.Done():
ticker.Stop()
return
}
}
}
func (d *DAL) inserter(ctx context.Context, t time.Time) {
if len(d.buffer) == 0 {
return
}
defer func() {
val := recover()
if val != nil {
if err, ok := val.(error); ok {
d.logger.Emit(ErrorPanicBatchRecords{Err: err})
} else {
d.logger.Emit(WarnPanicBatch{Message: fmt.Sprint(val)})
}
}
}()
trans, err := d.db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelDefault,
ReadOnly: false,
})
if err != nil {
d.logger.Emit(ErrorInsertTransaction{Err: err})
return
}
query := `
INSERT INTO statistics (%s) VALUES (%s);
`
var fields, placeholders []string
for _, field := range d.schema {
fields = append(fields, *field["name"])
placeholders = append(placeholders, "?")
}
query = fmt.Sprintf(
query,
strings.Join(fields, ","),
strings.Join(placeholders, ","),
)
insertStmt, err := trans.PrepareContext(ctx, query)
if err != nil {
d.logger.Emit(ErrorCreateStatement{
Err: err,
Query: query,
})
return
}
for _, record := range d.buffer{
if err := d.recordInserter(ctx, insertStmt, record, t); err != nil {
continue
}
}
if err := d.recoveryDB.YieldRecovery(func(record model.Record) error {
if err := d.recordInserter(ctx, insertStmt, record, t); err != nil {
return err
}
return nil
}); err != nil {
d.logger.Emit(WarnYieldRecovery{
Err: err,
})
}
d.buffer = []model.Record{}
defer func() {
if err := insertStmt.Close(); err != nil {
d.logger.Emit(ErrorCreateStatement{Err: err})
panic(err)
}
}()
if err := trans.Commit(); err != nil {
d.logger.Emit(ErrorInsertTransaction{Err: err})
panic(err)
}
}
type WarnYieldRecovery struct {
Err error
}
func (d *DAL) recordInserter(
ctx context.Context,
insertStmt *sql.Stmt,
record model.Record,
t time.Time,
) error {
if _, err := insertStmt.ExecContext(
ctx,
record.Level,
record.Message,
record.Stacktrace,
record.SvcBuildTime,
record.SvcVersion,
record.SvcCommit,
record.SvcFile,
record.SvcLine,
clickhouse.Array(record.Module),
record.TS,
t,
); err != nil {
if err := d.recoveryDB.PutRecord(record); err != nil {
return err
}
}
return nil
}
type DebugNeedMutex struct {}
func (d *DAL) PutRecord(record model.Record) {
defer func() {
val := recover()
if val != nil {
d.logger.Emit(DebugNeedMutex{})
}
}()
d.buffer = append(d.buffer, record)
return
}
func (d *DAL) AddColumn(ctx context.Context, name, fieldType string) error {
if _, err := d.db.ExecContext(ctx, fmt.Sprintf(`
ALTER TABLE statistics ADD COLUMN IF NOT EXISTS %s %s
`, name, fieldType)); err != nil {
return err
}
return nil
}