package clickhouse import ( "context" "database/sql" "fmt" "github.com/pioz/faker" "github.com/stretchr/testify/assert" "github.com/themakers/bdd" "github.com/themakers/hlog" "go.uber.org/zap" "penahub.gitlab.yandexcloud.net/external/trashlog.git/model" "strings" "testing" "time" ) func TestClickhouse(t *testing.T) { faker.SetSeed(time.Now().Unix()) var ( ctx = context.Background() newcolumns = []string{ faker.Pick("ctx", "key") + faker.StringWithSize(10), faker.Pick("ctx", "key") + faker.StringWithSize(10), faker.Pick("ctx", "key") + faker.StringWithSize(10), faker.Pick("ctx", "key") + faker.StringWithSize(10), } types = []string{ "String", "Int64", "Float64", "UInt8", } d *DAL oldcolmns = map[string]string{} ) logger, err := zap.NewDevelopment() if err != nil { t.Fatal(err) } dal, err := New(ctx, hlog.New(logger), "tcp://127.0.0.1:9000?debug=true") if err != nil { t.Fatal(err) } d = dal connect, err := sql.Open( "clickhouse", "tcp://127.0.0.1:9000?debug=true", ) if err != nil { t.Fatal(err) } bdd.Scenario(t, "clickhouse fulfilling of statistics", func(t *testing.T, runID string) { bdd.Act(t, "init batching", func() { assert.NoError(t, d.Init(ctx)) }) bdd.Act(t, "get current newcolumns", func() { oc, err := d.getCurrentColumns(ctx) assert.NoError(t, err) assert.NotNil(t, oc) oldcolmns = oc }) bdd.Act(t, "add column", func() { bdd.Test(t, "add one column", func() { assert.NoError(t, d.AddColumn(ctx, map[string]string{ newcolumns[0]: types[0], })) assert.Greater(t, len(d.Schema), len(oldcolmns)) ncols, err := d.getCurrentColumns(ctx) assert.NoError(t, err) has := false for name := range ncols { if name == newcolumns[0] { has = true } } assert.True(t, has) }) bdd.Test(t, "add multicolumn", func() { assert.NoError(t, d.AddColumn(ctx, map[string]string{ newcolumns[1]: types[1], newcolumns[2]: types[2], newcolumns[3]: types[3], })) assert.Greater(t, len(d.Schema), len(oldcolmns)) ncols, err := d.getCurrentColumns(ctx) assert.NoError(t, err) var has []bool for name := range ncols { for i := range newcolumns[1:] { if name == newcolumns[i] { has = append(has, true) } } } assert.Len(t, has, 3) }) }) bdd.Act(t, "put records to buffer", func() { bdd.Test(t, "put records less than second to check buffer filling", func() { recs := make([]model.Record, 5) for i := 0; i < 5; i++ { recs[i] = model.Record{ Level: "debug", TS: uint64(time.Now().Unix()), Message: "test" + fmt.Sprint(i), Module: []string{"test", "test1"}, Stacktrace: "t", KeyFields: map[string]interface{}{ faker.Pick(newcolumns...): "test", }, CtxFields: map[string]interface{}{ faker.Pick(newcolumns...): "test", }, SvcBuildTime: 0, SvcVersion: "1", SvcCommit: "2", SvcFile: "3", SvcLine: 0, } d.PutRecord(recs[i]) } time.Sleep(500 * time.Millisecond) buf := d.getBuffer() assert.Equal(t, len(recs), len(buf)) for i, rec := range buf { assert.Equal(t, rec, recs[i]) } }) }) bdd.Act(t, "prepare data for insert", func() { bdd.Test(t, "generate a bunch of records with new fields and prepare fields", func() { var recs []model.Record for i := 0; i < 100; i++ { recs = append(recs, model.Record{ Level: "debug", TS: 0, Message: "", Module: nil, Stacktrace: "", KeyFields: map[string]interface{}{ faker.Pick(newcolumns...): "test", }, CtxFields: map[string]interface{}{ faker.Pick(newcolumns...): "test", }, SvcBuildTime: 0, SvcVersion: "", SvcCommit: "", SvcFile: "", SvcLine: 0, }) } onlySign := d.prepareAdditionalFields(recs) assert.Equal(t, len(onlySign), len(newcolumns)) cntEq := 0 for _, o := range onlySign { for _, nc := range newcolumns { if nc == o { cntEq++ } } } assert.Equal(t, len(newcolumns), cntEq) }) bdd.Test(t, "generate records for an absent fields", func() { var recs []model.Record for i := 0; i < 100; i++ { recs = append(recs, model.Record{ Level: "debug", TS: 0, Message: "", Module: nil, Stacktrace: "", KeyFields: map[string]interface{}{ faker.String(): "test", }, CtxFields: map[string]interface{}{ faker.String(): "test", }, SvcBuildTime: 0, SvcVersion: "", SvcCommit: "", SvcFile: "", SvcLine: 0, }) } onlySign := d.prepareAdditionalFields(recs) assert.Zero(t, onlySign) }) bdd.Test(t, "generate mixed absent and real fields", func() { var recs []model.Record for i := 0; i < 100; i++ { recs = append(recs, model.Record{ Level: "debug", TS: 0, Message: "", Module: nil, Stacktrace: "", KeyFields: map[string]interface{}{ faker.Pick(newcolumns...): "test", }, CtxFields: map[string]interface{}{ faker.String(): "test", }, SvcBuildTime: 0, SvcVersion: "", SvcCommit: "", SvcFile: "", SvcLine: 0, }) } onlySign := d.prepareAdditionalFields(recs) assert.NotZero(t, len(onlySign)) }) }) bdd.Act(t, "check created query string", func() { bdd.Test(t, "checking that we have all new fields and no unnecessary", func() { onlySign := d.prepareAdditionalFields(d.getBuffer()) q := d.buildQueryString(onlySign) for _, nc := range newcolumns { assert.Contains(t, q, nc) } qparts := strings.Split(q, "VALUES") assert.Len(t, qparts, 2) if len(qparts) != 2 { return } qparts = strings.Split(qparts[0], "(") assert.Len(t, qparts, 2) if len(qparts) != 2 { return } qparts = strings.Split(qparts[1], ")") assert.Len(t, qparts, 2) if len(qparts) != 2 { return } qparts = strings.Split(qparts[0], ",") assert.Len(t, qparts, len(newcolumns)+11) }) }) bdd.Act(t, "insert bunch of records", func() { bdd.Test(t, "prepare bunch and insert them", func() { rs, err := d.db.QueryContext(ctx, `select count() from statistics;`) assert.NoError(t, err) rs.Next() var cntBefore int64 assert.NoError(t, rs.Scan(&cntBefore)) rs.Close() var records []model.Record for i := 0; i < 100; i++ { fstfld, sndfld := newcolumns[i%len(newcolumns)], newcolumns[(i+1)%len(newcolumns)] records = append(records, model.Record{ Level: "debug", TS: 0, Message: "", Module: []string{}, Stacktrace: "", KeyFields: map[string]any{ fstfld: sample(types[i%len(types)]), }, CtxFields: map[string]any{ sndfld: sample(types[(i+1)%len(types)]), }, SvcBuildTime: 0, SvcVersion: "", SvcCommit: "", SvcFile: "", SvcLine: 0, }) } trans, err := d.db.BeginTx(ctx, &sql.TxOptions{ Isolation: sql.LevelDefault, ReadOnly: false, }) if err != nil { t.Fatal(err) return } fields := d.prepareAdditionalFields(records) query := d.buildQueryString(fields) insertStmt, err := trans.PrepareContext(ctx, query) if err != nil { t.Fatal(err) return } defer insertStmt.Close() for _, record := range records { if err := d.recordInserter(ctx, insertStmt, record, fields, time.Now()); err != nil { assert.NoError(t, err) continue } } assert.NoError(t, trans.Commit()) rs, err = d.db.QueryContext(ctx, `select count() from statistics;`) assert.NoError(t, err) rs.Next() var cntAfter int64 assert.NoError(t, rs.Scan(&cntAfter)) rs.Close() assert.Equal(t, cntAfter-cntBefore, int64(len(records))) }) }) bdd.Act(t, "drop created columns", func() { time.Sleep(25 * time.Second) for i := 0; i < len(newcolumns); i++ { connect.QueryContext(ctx, fmt.Sprintf(`alter table statistics drop column %s; `, newcolumns[i])) } }) }) } func sample(tstr string) any { switch tstr { case "String": return faker.String() case "Int64": return faker.Int64() case "Float64": return faker.Float64() case "UInt8": return faker.Uint8() } return nil }