trashlog/dal/clickhouse/clickhouse.go
2024-11-07 11:58:57 +03:00

384 lines
7.7 KiB
Go

package clickhouse
import (
"context"
"database/sql"
"fmt"
"github.com/ClickHouse/clickhouse-go"
"github.com/themakers/hlog"
"gitea.pena/PenaSide/trashlog/dal/bbolt"
"gitea.pena/PenaSide/trashlog/model"
"strings"
"sync"
"time"
)
type DAL struct {
logger hlog.Logger
db *sql.DB
Schema map[string]string
schemaM sync.Mutex
recoveryDB *bbolt.DAL
}
const dbPath = "./recover.bolt"
type ErrorPingException struct {
Code int32
Message string
StackTrace string
}
func New(ctx context.Context, log hlog.Logger, credentials string) (*DAL, error) {
connect, err := sql.Open(
"clickhouse",
credentials,
)
if err != nil {
return nil, err
}
recoverStore, err := bbolt.New(dbPath)
if err != nil {
return nil, err
}
if err := connect.PingContext(ctx); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
log.Emit(ErrorPingException{
Code: exception.Code,
Message: exception.Message,
StackTrace: exception.StackTrace,
})
}
return nil, err
}
return &DAL{
db: connect,
logger: log.Module("clickhouse"),
recoveryDB: recoverStore,
Schema: map[string]string{},
schemaM: sync.Mutex{},
}, nil
}
type ErrorCreateTable struct {
Err error
Query string
}
type ErrorCreateStatement struct {
Err error
Query string
}
type ErrorGetTableSchema struct {
Err error
}
func (d *DAL) Init(ctx context.Context) error {
query := `
CREATE TABLE IF NOT EXISTS statistics (
event_level Enum('info' = 0, 'warn' = 1, 'error' = 2, 'debug' = 3),
message String,
stacktrace String,
svc_build_time DateTime,
svc_version String,
svc_commit String,
svc_file String,
svc_line UInt64,
svc_module Array(String),
event_time DateTime,
create_time DateTime
) ENGINE = MergeTree()
ORDER BY (svc_version, event_level, svc_build_time,event_time)
PARTITION BY toStartOfDay(event_time)
PRIMARY KEY (svc_version, event_level, svc_build_time)
`
if _, err := d.db.ExecContext(ctx, query); err != nil {
d.logger.Emit(ErrorCreateTable{
Err: err,
Query: query,
})
return err
}
bufferQuery := `
CREATE TABLE IF NOT EXISTS buffer_statistics
AS statistics ENGINE = Buffer(default, statistics,16, 600, 600, 1000, 1000, 104857600, 104857600);
`
if _, err := d.db.ExecContext(ctx, bufferQuery); err != nil {
d.logger.Emit(ErrorCreateTable{
Err: err,
Query: bufferQuery,
})
return err
}
if _, err := d.db.ExecContext(ctx, "set idle_connection_timeout = 86400;"); err != nil {
return err
}
result, err := d.getCurrentColumns(ctx)
if err != nil {
return err
}
d.Schema = result
return nil
}
func (d *DAL) getCurrentColumns(ctx context.Context) (map[string]string, error) {
descriptionRows, err := d.db.QueryContext(ctx, `DESC statistics`)
if err != nil {
d.logger.Emit(ErrorGetTableSchema{
Err: err,
})
return nil, err
}
defer descriptionRows.Close()
descriptionColumns, err := descriptionRows.Columns()
if err != nil {
d.logger.Emit(ErrorGetTableSchema{
Err: err,
})
return nil, err
}
var (
row = make([]string, len(descriptionColumns))
result = map[string]string{}
)
for descriptionRows.Next() {
if err := descriptionRows.Scan(&row[0], &row[1], &row[2], &row[3], &row[4], &row[5], &row[6]); err != nil {
return nil, err
}
result[row[0]] = row[1]
}
return result, nil
}
type ErrorPanicBatchRecords struct {
Err error
}
type WarnPanicBatch struct {
Message string
}
type ErrorInsertTransaction struct {
Err error
}
type WarnYieldRecovery struct {
Err error
}
func (d *DAL) recordInserter(
ctx context.Context,
insertStmt *sql.Stmt,
record model.Record,
fields []string,
t time.Time,
) error {
svcData := []any{
record.Level,
record.Message,
record.Stacktrace,
record.SvcBuildTime,
record.SvcVersion,
record.SvcCommit,
record.SvcFile,
record.SvcLine,
clickhouse.Array(record.Module),
record.TS,
t,
}
svcLen := len(svcData)
dataToInsert := make([]any, len(fields)+svcLen)
for i := range dataToInsert {
if i < svcLen {
dataToInsert[i] = svcData[i]
} else {
if v, ok := record.KeyFields[fields[i-svcLen]]; ok {
dataToInsert[i] = v
} else if v, ok := record.CtxFields[fields[i-svcLen]]; ok {
dataToInsert[i] = v
} else {
d.schemaM.Lock()
switch d.Schema[fields[i-svcLen]] {
case "String":
dataToInsert[i] = ""
case "Int64":
dataToInsert[i] = int64(0)
case "Float64":
dataToInsert[i] = float64(0)
case "UInt8":
dataToInsert[i] = uint8(0)
}
d.schemaM.Unlock()
}
}
}
if _, err := insertStmt.ExecContext(
ctx,
dataToInsert...,
); err != nil {
d.logger.Emit(ErrorExecClickhouse{Error: err.Error()})
if err := d.recoveryDB.PutRecord(record); err != nil {
return err
}
}
return nil
}
const unpreparedQuery = `
INSERT INTO buffer_statistics (event_level,message,stacktrace,svc_build_time,svc_version,svc_commit,svc_file,
svc_line, svc_module,event_time,create_time%s) VALUES (?,?,?,?,?,?,?,?,?,?,?%s);
`
func (d *DAL) buildQueryString(fields []string) string {
var fldsStr, phStr string
if len(fields) > 0 {
fldsStr = "," + strings.Join(fields, ",")
phStr = strings.Repeat(",?", len(fields))
}
return fmt.Sprintf(
unpreparedQuery,
fldsStr,
phStr,
)
}
func (d *DAL) prepareAdditionalFields(buf []model.Record) (r []string) {
exist := map[string]bool{}
prepare := func(field string) {
if _, ok := exist[field]; !ok {
d.schemaM.Lock()
defer d.schemaM.Unlock()
if _, ok := d.Schema[field]; ok {
r = append(r, field)
exist[field] = true
}
}
}
for _, rec := range buf {
for field := range rec.KeyFields {
prepare(field)
}
for field := range rec.CtxFields {
prepare(field)
}
}
return
}
type ErrorExecClickhouse struct {
Error string
}
type DebugNeedMutex struct{}
func (d *DAL) PutRecord(ctx context.Context, record model.Record) {
defer func() {
val := recover()
if val != nil {
if err, ok := val.(error); ok {
d.logger.Emit(ErrorPanicBatchRecords{Err: err})
} else {
d.logger.Emit(WarnPanicBatch{Message: fmt.Sprint(val)})
}
}
}()
t := time.Now()
trans, err := d.db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelDefault,
ReadOnly: false,
})
if err != nil {
d.logger.Emit(ErrorInsertTransaction{Err: err})
return
}
fields := d.prepareAdditionalFields([]model.Record{record})
query := d.buildQueryString(fields)
insertStmt, err := trans.PrepareContext(ctx, query)
if err != nil {
d.logger.Emit(ErrorCreateStatement{
Err: err,
Query: query,
})
return
}
defer func() {
if err := insertStmt.Close(); err != nil {
d.logger.Emit(ErrorCreateStatement{Err: err})
return
}
}()
if err := d.recordInserter(ctx, insertStmt, record, fields, t); err != nil {
return
}
if err := d.recoveryDB.YieldRecovery(func(record model.Record) error {
if err := d.recordInserter(ctx, insertStmt, record, fields, t); err != nil {
return err
}
return nil
}); err != nil {
d.logger.Emit(WarnYieldRecovery{
Err: err,
})
}
if err := trans.Commit(); err != nil {
d.logger.Emit(ErrorInsertTransaction{Err: err})
return
}
}
// AddColumn function for modify statistics table
func (d *DAL) AddColumn(ctx context.Context, columns map[string]string) error {
var colStatements []string
for name, t := range columns {
colStatements = append(colStatements, fmt.Sprintf("ADD COLUMN IF NOT EXISTS %s %s", name, t))
}
if len(colStatements) > 0 {
for _, table := range []string{"statistics", "buffer_statistics"} {
query := fmt.Sprintf(`ALTER TABLE %s %s`, table, strings.Join(colStatements, ","))
if _, err := d.db.ExecContext(ctx, query); err != nil {
return err
}
}
}
d.schemaM.Lock()
defer d.schemaM.Unlock()
for name, t := range columns {
d.Schema[name] = t
}
return nil
}