2022-06-27 19:09:12 +00:00
|
|
|
package clickhouse
|
2021-02-20 16:15:31 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2022-06-27 19:09:12 +00:00
|
|
|
"database/sql"
|
|
|
|
"fmt"
|
2022-06-28 18:20:37 +00:00
|
|
|
"github.com/BlackBroker/trashlog/model"
|
2022-06-27 19:09:12 +00:00
|
|
|
"github.com/pioz/faker"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
|
|
"github.com/themakers/bdd"
|
2021-02-20 16:15:31 +00:00
|
|
|
"github.com/themakers/hlog"
|
|
|
|
"go.uber.org/zap"
|
2022-06-27 19:09:12 +00:00
|
|
|
"testing"
|
|
|
|
"time"
|
2021-02-20 16:15:31 +00:00
|
|
|
)
|
|
|
|
|
2022-06-27 19:09:12 +00:00
|
|
|
func TestClickhouse(t *testing.T) {
|
|
|
|
faker.SetSeed(time.Now().Unix())
|
2021-02-20 16:15:31 +00:00
|
|
|
|
|
|
|
var (
|
2022-06-27 19:09:12 +00:00
|
|
|
ctx = context.Background()
|
|
|
|
newcolumns = []string{
|
2022-06-28 18:20:37 +00:00
|
|
|
faker.Pick("ctx", "key")+faker.StringWithSize(10),
|
|
|
|
faker.Pick("ctx", "key")+faker.StringWithSize(10),
|
|
|
|
faker.Pick("ctx", "key")+faker.StringWithSize(10),
|
|
|
|
faker.Pick("ctx", "key")+faker.StringWithSize(10),
|
2022-06-27 19:09:12 +00:00
|
|
|
}
|
|
|
|
types = []string{
|
|
|
|
"String",
|
|
|
|
"Int64",
|
|
|
|
"Float64",
|
2022-06-28 18:20:37 +00:00
|
|
|
"UInt8",
|
2022-06-27 19:09:12 +00:00
|
|
|
}
|
|
|
|
d *DAL
|
|
|
|
oldcolmns = map[string]string{}
|
2021-02-20 16:15:31 +00:00
|
|
|
)
|
|
|
|
|
2022-06-27 19:09:12 +00:00
|
|
|
logger, err := zap.NewDevelopment()
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
dal, err := New(ctx, hlog.New(logger), "tcp://127.0.0.1:9000?debug=true")
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
d = dal
|
|
|
|
|
|
|
|
connect, err := sql.Open(
|
|
|
|
"clickhouse",
|
|
|
|
"tcp://127.0.0.1:9000?debug=true",
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatal(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
bdd.Scenario(t, "clickhouse fulfilling of statistics", func(t *testing.T, runID string) {
|
|
|
|
bdd.Act(t, "init batching", func() {
|
|
|
|
assert.NoError(t, d.Init(ctx))
|
|
|
|
})
|
|
|
|
bdd.Act(t, "get current newcolumns", func() {
|
|
|
|
oc, err := d.getCurrentColumns(ctx)
|
|
|
|
assert.NoError(t, err)
|
|
|
|
assert.NotNil(t, oc)
|
|
|
|
oldcolmns = oc
|
2021-02-20 16:15:31 +00:00
|
|
|
})
|
|
|
|
|
2022-06-27 19:09:12 +00:00
|
|
|
bdd.Act(t, "add column", func() {
|
|
|
|
bdd.Test(t, "add one column", func() {
|
|
|
|
assert.NoError(t, d.AddColumn(ctx, map[string]string{
|
|
|
|
newcolumns[0]: faker.Pick(types...),
|
|
|
|
}))
|
|
|
|
assert.Greater(t, len(d.Schema), len(oldcolmns))
|
|
|
|
ncols, err := d.getCurrentColumns(ctx)
|
|
|
|
assert.NoError(t, err)
|
|
|
|
has := false
|
|
|
|
for name := range ncols {
|
|
|
|
if name == newcolumns[0] {
|
|
|
|
has = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
assert.True(t, has)
|
|
|
|
})
|
|
|
|
bdd.Test(t, "add multicolumn", func() {
|
|
|
|
assert.NoError(t, d.AddColumn(ctx, map[string]string{
|
|
|
|
newcolumns[1]: faker.Pick(types...),
|
|
|
|
newcolumns[2]: faker.Pick(types...),
|
|
|
|
newcolumns[3]: faker.Pick(types...),
|
|
|
|
}))
|
|
|
|
assert.Greater(t, len(d.Schema), len(oldcolmns))
|
|
|
|
ncols, err := d.getCurrentColumns(ctx)
|
|
|
|
assert.NoError(t, err)
|
|
|
|
var has []bool
|
|
|
|
for name := range ncols {
|
|
|
|
for i := range newcolumns[1:] {
|
|
|
|
if name == newcolumns[i] {
|
|
|
|
has = append(has, true)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
assert.Len(t, has, 3)
|
|
|
|
})
|
|
|
|
})
|
2022-06-28 18:20:37 +00:00
|
|
|
|
|
|
|
bdd.Act(t, "put records to buffer", func() {
|
|
|
|
bdd.Test(t, "put records less than second to check buffer filling", func() {
|
|
|
|
recs := make([]model.Record, 5)
|
|
|
|
for i := 0; i<5;i++ {
|
|
|
|
recs[i] = model.Record{
|
|
|
|
Level: "debug",
|
|
|
|
TS: uint64(time.Now().Unix()),
|
|
|
|
Message: "test"+fmt.Sprint(i),
|
|
|
|
Module: []string{"test", "test1"},
|
|
|
|
Stacktrace: "t",
|
|
|
|
KeyFields: map[string]interface{}{},
|
|
|
|
CtxFields: map[string]interface{}{},
|
|
|
|
SvcBuildTime: 0,
|
|
|
|
SvcVersion: "1",
|
|
|
|
SvcCommit: "2",
|
|
|
|
SvcFile: "3",
|
|
|
|
SvcLine: 0,
|
|
|
|
}
|
|
|
|
d.PutRecord(recs[i])
|
|
|
|
}
|
|
|
|
time.Sleep(500 * time.Millisecond)
|
|
|
|
buf := d.getBuffer()
|
|
|
|
assert.Equal(t, len(recs), len(buf))
|
|
|
|
for i,rec := range buf {
|
|
|
|
assert.Equal(t, rec, recs[i])
|
|
|
|
}
|
|
|
|
})
|
|
|
|
})
|
|
|
|
|
|
|
|
bdd.Act(t, "prepare data for insert", func() {
|
|
|
|
bdd.Test(t, "generate a bunch of records with new fields and prepare fields", func() {
|
|
|
|
var recs []model.Record
|
|
|
|
for i := 0; i < 100; i++ {
|
|
|
|
recs = append(recs, model.Record{
|
|
|
|
Level: "debug",
|
|
|
|
TS: 0,
|
|
|
|
Message: "",
|
|
|
|
Module: nil,
|
|
|
|
Stacktrace: "",
|
|
|
|
KeyFields: map[string]interface{}{
|
|
|
|
faker.Pick(newcolumns...): "test",
|
|
|
|
},
|
|
|
|
CtxFields: map[string]interface{}{
|
|
|
|
faker.Pick(newcolumns...): "test",
|
|
|
|
},
|
|
|
|
SvcBuildTime: 0,
|
|
|
|
SvcVersion: "",
|
|
|
|
SvcCommit: "",
|
|
|
|
SvcFile: "",
|
|
|
|
SvcLine: 0,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
onlySign := d.prepareAdditionalFields(recs)
|
|
|
|
assert.Equal(t, len(onlySign), len(newcolumns))
|
|
|
|
cntEq := 0
|
|
|
|
for _, o := range onlySign {
|
|
|
|
for _, nc := range newcolumns {
|
|
|
|
if nc == o {
|
|
|
|
cntEq ++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
assert.Equal(t, len(newcolumns), cntEq)
|
|
|
|
})
|
|
|
|
bdd.Test(t, "generate records for an absent fields", func() {
|
|
|
|
var recs []model.Record
|
|
|
|
for i := 0; i < 100; i++ {
|
|
|
|
recs = append(recs, model.Record{
|
|
|
|
Level: "debug",
|
|
|
|
TS: 0,
|
|
|
|
Message: "",
|
|
|
|
Module: nil,
|
|
|
|
Stacktrace: "",
|
|
|
|
KeyFields: map[string]interface{}{
|
|
|
|
faker.String(): "test",
|
|
|
|
},
|
|
|
|
CtxFields: map[string]interface{}{
|
|
|
|
faker.String(): "test",
|
|
|
|
},
|
|
|
|
SvcBuildTime: 0,
|
|
|
|
SvcVersion: "",
|
|
|
|
SvcCommit: "",
|
|
|
|
SvcFile: "",
|
|
|
|
SvcLine: 0,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
onlySign := d.prepareAdditionalFields(recs)
|
|
|
|
assert.Zero(t, onlySign)
|
|
|
|
})
|
|
|
|
bdd.Test(t, "generate mixed absent and real fields", func() {
|
|
|
|
var recs []model.Record
|
|
|
|
for i := 0; i < 100; i++ {
|
|
|
|
recs = append(recs, model.Record{
|
|
|
|
Level: "debug",
|
|
|
|
TS: 0,
|
|
|
|
Message: "",
|
|
|
|
Module: nil,
|
|
|
|
Stacktrace: "",
|
|
|
|
KeyFields: map[string]interface{}{
|
|
|
|
faker.Pick(newcolumns...): "test",
|
|
|
|
},
|
|
|
|
CtxFields: map[string]interface{}{
|
|
|
|
faker.String(): "test",
|
|
|
|
},
|
|
|
|
SvcBuildTime: 0,
|
|
|
|
SvcVersion: "",
|
|
|
|
SvcCommit: "",
|
|
|
|
SvcFile: "",
|
|
|
|
SvcLine: 0,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
onlySign := d.prepareAdditionalFields(recs)
|
|
|
|
assert.NotZero(t, len(onlySign))
|
|
|
|
})
|
|
|
|
})
|
|
|
|
|
2022-06-27 19:09:12 +00:00
|
|
|
bdd.Act(t, "drop created columns", func() {
|
2022-06-28 18:20:37 +00:00
|
|
|
time.Sleep(25 * time.Second)
|
2022-06-27 19:09:12 +00:00
|
|
|
for i:=0; i<len(newcolumns); i++ {
|
|
|
|
connect.QueryContext(ctx, fmt.Sprintf(`alter table statistics
|
|
|
|
drop column %s;
|
|
|
|
`, newcolumns[i]))
|
|
|
|
}
|
2021-02-20 16:15:31 +00:00
|
|
|
})
|
|
|
|
})
|
2022-06-27 19:09:12 +00:00
|
|
|
}
|