add storing

This commit is contained in:
Skeris 2021-02-21 21:13:13 +03:00
parent b4e3d156a2
commit fef877924c
5 changed files with 241 additions and 15 deletions

@ -123,7 +123,7 @@ func New(ctx context.Context, opts interface{}) (appInit.CommonApp, error) {
return nil, err
}
sinkSvc := sink.New(log)
sinkSvc := sink.New(log, logStore)
var grpcOpts []grpc.ServerOption
grpcServer := grpc.NewServer(grpcOpts...)

@ -3,18 +3,26 @@ package clickhouse
import (
"context"
"database/sql"
"fmt"
"github.com/BlackBroker/trashlog/model"
"github.com/ClickHouse/clickhouse-go"
"github.com/themakers/hlog"
"strings"
"time"
)
type DAL struct {
logger hlog.Logger
db *sql.DB
db *sql.DB
putRecordStmt *sql.Stmt
buffer []model.Record
schema []map[string]*string
}
type ErrorPingException struct {
Code int32
Message string
Code int32
Message string
StackTrace string
}
@ -22,7 +30,7 @@ func New(ctx context.Context, log hlog.Logger, credentials string) (*DAL, error)
connect, err := sql.Open(
"clickhouse",
credentials,
)
)
if err != nil {
return nil, err
}
@ -30,8 +38,8 @@ func New(ctx context.Context, log hlog.Logger, credentials string) (*DAL, error)
if err := connect.PingContext(ctx); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
log.Emit(ErrorPingException{
Code: exception.Code,
Message: exception.Message,
Code: exception.Code,
Message: exception.Message,
StackTrace: exception.StackTrace,
})
}
@ -39,19 +47,28 @@ func New(ctx context.Context, log hlog.Logger, credentials string) (*DAL, error)
}
return &DAL{
db: connect,
db: connect,
logger: log.Module("clickhouse"),
}, nil
}
type ErrorCreateTable struct {
Err error
Err error
Query string
}
type ErrorCreateStatement struct {
Err error
Query string
}
type ErrorGetTableSchema struct {
Err error
}
func (d *DAL) Init(ctx context.Context) error {
query := `
CREATE TABLE IF NOT EXISTS statistics ENGINE (
CREATE TABLE IF NOT EXISTS statistics (
event_level Enum('Info' = 0, 'Warn' = 1, 'Error' = 2, 'Debug' = 3),
message String,
stacktrace String,
@ -63,13 +80,13 @@ CREATE TABLE IF NOT EXISTS statistics ENGINE (
svc_module Array(String),
event_time DateTime,
create_time DateTime
) = MergeTree()
ORDER BY event_time
) ENGINE = MergeTree()
ORDER BY (svc_version, event_level, svc_build_time,event_time)
PARTITION BY toStartOfDay(event_time)
PRIMARY KEY (create_time, svc_module, svc_version, level, svc_build_time)
PRIMARY KEY (svc_version, event_level, svc_build_time)
`
if _, err := d.db.ExecContext(ctx,query); err != nil {
if _, err := d.db.ExecContext(ctx, query); err != nil {
d.logger.Emit(ErrorCreateTable{
Err: err,
Query: query,
@ -77,5 +94,163 @@ PRIMARY KEY (create_time, svc_module, svc_version, level, svc_build_time)
return err
}
result, err := d.db.QueryContext(ctx, `DESC statistics`)
if err != nil {
d.logger.Emit(ErrorGetTableSchema{
Err: err,
})
return err
}
cols, err := result.Columns()
if err != nil {
d.logger.Emit(ErrorGetTableSchema{
Err: err,
})
return err
}
var schema []map[string]*string
for result.Next() {
data := map[string]*string{}
for _, fieldName := range cols {
var str string
data[fieldName] = &str
}
if err := result.Scan(
data[cols[0]],
data[cols[1]],
data[cols[2]],
data[cols[3]],
data[cols[4]],
data[cols[5]],
data[cols[6]],
); err != nil {
d.logger.Emit(ErrorGetTableSchema{
Err: err,
})
return err
}
schema = append(schema, data)
}
if err := result.Close(); err != nil {
return err
}
d.schema = schema
go d.batchRecords(ctx)
return nil
}
type ErrorPanicBatchRecords struct {
Err error
}
type WarnPanicBatch struct {
Message string
}
type ErrorInsertTransaction struct {
Err error
}
func (d *DAL) batchRecords(ctx context.Context) {
ticker := time.NewTicker(time.Second)
for {
select {
case t := <-ticker.C:
d.inserter(ctx, t)
case <-ctx.Done():
ticker.Stop()
return
}
}
}
func (d *DAL) inserter(ctx context.Context, t time.Time) {
defer func() {
val := recover()
if err, ok := val.(error); ok {
d.logger.Emit(ErrorPanicBatchRecords{Err: err})
} else {
d.logger.Emit(WarnPanicBatch{Message: fmt.Sprint(val)})
}
}()
trans, err := d.db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelDefault,
ReadOnly: false,
})
if err != nil {
d.logger.Emit(ErrorInsertTransaction{Err: err})
panic(err)
}
query := `
INSERT INTO statistics (%s) VALUE (%s)
`
var fields, placeholders []string
for _, field := range d.schema {
fields = append(fields, *field["name"])
placeholders = append(placeholders, "?")
}
query = fmt.Sprintf(
query,
strings.Join(fields, ","),
strings.Join(placeholders, ","),
)
insertStmt, err := trans.PrepareContext(ctx, query)
if err != nil {
d.logger.Emit(ErrorCreateStatement{
Err: err,
Query: query,
})
panic(err)
}
for _, record := range d.buffer{
if _, err := insertStmt.ExecContext(
ctx,
record.Level,
record.Message,
record.Stacktrace,
record.SvcBuildTime,
record.SvcVersion,
record.SvcCommit,
record.SvcFile,
record.SvcLine,
clickhouse.Array(record.Module),
record.TS,
t,
); err != nil {
panic(err)
}
}
defer func() {
if err := insertStmt.Close(); err != nil {
d.logger.Emit(ErrorCreateStatement{Err: err})
panic(err)
}
}()
if err := trans.Commit(); err != nil {
d.logger.Emit(ErrorInsertTransaction{Err: err})
panic(err)
}
}
func (d *DAL) PutRecord(record model.Record) {
d.buffer = append(d.buffer, record)
return
}

@ -21,6 +21,19 @@ var _ = Describe("Clickhouse", func() {
log := hlog.New(logger)
//AfterSuite(func() {
// connect, err := sql.Open(
// "clickhouse",
// "tcp://127.0.0.1:9000?debug=true",
// )
// if err != nil {
// panic(err)
// }
// if _, err := connect.Exec(`DROP TABLE statistics`); err != nil {
// panic(err)
// }
//})
Context("Create", func() {
It("Connection", func() {
store, err = clickhouse.New(

17
model/model.go Normal file

@ -0,0 +1,17 @@
package model
type Record struct {
Level string
TS uint64
Message string
Module []string
Stacktrace string
KeyFields map[string]interface{}
CtxFields map[string]interface{}
SvcBuildTime uint64
SvcVersion string
SvcCommit string
SvcFile string
SvcLine uint64
}

@ -2,6 +2,8 @@ package sink
import (
"fmt"
"github.com/BlackBroker/trashlog/dal/clickhouse"
"github.com/BlackBroker/trashlog/model"
trashlogProto "github.com/BlackBroker/trashlog/proto/generated"
"github.com/themakers/hlog"
"io"
@ -9,11 +11,13 @@ import (
type Sink struct {
logger hlog.Logger
store *clickhouse.DAL
}
func New(log hlog.Logger) *Sink {
func New(log hlog.Logger, store *clickhouse.DAL) *Sink {
return &Sink{
logger: log.Module("sink"),
store: store,
}
}
@ -28,8 +32,25 @@ func (s *Sink) Valve(stream trashlogProto.Trashlog_ValveServer) error {
return err
}
s.store.PutRecord(dto2daoRecord(record))
fmt.Println("GotIT!", record)
}
return nil
}
func dto2daoRecord(in *trashlogProto.Record) model.Record {
return model.Record{
Level: in.Level,
TS: in.TS,
Message: in.Message,
Module: in.Module,
Stacktrace: in.Stacktrace,
SvcBuildTime: in.SvcFields.BuildTime,
SvcVersion: in.SvcFields.Version,
SvcCommit: in.SvcFields.Commit,
SvcFile: in.SvcFields.File,
SvcLine: in.SvcFields.Line,
}
}