Merge branch 'dataLake' into 'main'

Data lake

See merge request backend/quiz/common!28
This commit is contained in:
Mikhail 2024-06-20 19:37:27 +00:00
commit f8a59de8ed
5 changed files with 268 additions and 0 deletions

@ -5,6 +5,8 @@ import (
"database/sql"
_ "embed"
"errors"
"fmt"
_ "github.com/ClickHouse/clickhouse-go"
_ "github.com/lib/pq"
"github.com/minio/minio-go/v7"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal/sqlcgen"
@ -184,3 +186,38 @@ func (d *AmoDal) Close(ctx context.Context) error {
}
return nil
}
type ClickHouseDAL struct {
conn *sql.DB
StatisticClickRepo *statistics.StatisticClick
}
func NewClickHouseDAL(ctx context.Context, cred string) (*ClickHouseDAL, error) {
conn, err := sql.Open("clickhouse", cred)
if err != nil {
return nil, fmt.Errorf("error open database connection: %w", err)
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := conn.PingContext(timeoutCtx); err != nil {
return nil, fmt.Errorf("error ping database: %w", err)
}
statsClickRepo, err := statistics.NewClickStatistic(ctx, statistics.DepsClick{
Conn: conn,
})
if err != nil {
return nil, err
}
return &ClickHouseDAL{
conn: conn,
StatisticClickRepo: statsClickRepo,
}, nil
}
func (d *ClickHouseDAL) Close(ctx context.Context) error {
return d.conn.Close()
}

2
go.mod

@ -18,7 +18,9 @@ require (
)
require (
github.com/ClickHouse/clickhouse-go v1.5.4 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect

10
go.sum

@ -1,12 +1,18 @@
github.com/ClickHouse/clickhouse-go v1.5.4 h1:cKjXeYLNWVJIx2J1K6H2CqyRmfwVJVY1OV1coaaFcI0=
github.com/ClickHouse/clickhouse-go v1.5.4/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4=
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg=
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/gofiber/fiber/v2 v2.52.0 h1:S+qXi7y+/Pgvqq4DrSmREGiFwtB7Bu6+QFLuIHYw/UE=
github.com/gofiber/fiber/v2 v2.52.0/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ=
github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw=
@ -20,6 +26,7 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI=
@ -32,6 +39,7 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
@ -41,6 +49,7 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
github.com/minio/minio-go/v7 v7.0.69 h1:l8AnsQFyY1xiwa/DaQskY4NXSLA2yrGsW5iD9nRPVS0=
@ -52,6 +61,7 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=

@ -0,0 +1,199 @@
package statistics
import (
"context"
"database/sql"
"fmt"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/utils"
"sort"
"strings"
)
type DepsClick struct {
Conn *sql.DB
}
type StatisticClick struct {
conn *sql.DB
}
func NewClickStatistic(ctx context.Context, deps DepsClick) (*StatisticClick, error) {
s := &StatisticClick{
conn: deps.Conn,
}
err := s.checkMW(ctx)
if err != nil {
fmt.Println("error check material view existing", err)
return nil, err
}
return s, nil
}
func (s *StatisticClick) checkMW(ctx context.Context) error {
query := `
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_last_answers_events
ENGINE = MergeTree()
ORDER BY (ctxsession, event_time) POPULATE AS
SELECT
event_time, ctxsession, ctxquizid, ctxquestionid, ctxidint, message,ctxquiz
FROM statistics
WHERE message IN ('InfoQuizOpen', 'InfoAnswer', 'InfoResult') AND event_level = 'info';
`
_, err := s.conn.ExecContext(ctx, query)
if err != nil {
return err
}
return nil
}
type Statistic struct {
Count int64
QuestionID int64
}
type PipeLineStatsResp map[int64][]Statistic
// пример:
//"[0, 116783, 116810]"
//"[0, 116783, 116798]"
//"[0, 116783, 116798, 116831]"
//"[0, 116783, 116810, 116849]"
//[0]
//"[0, 116783]"
//"[0, 116783, 116810, 116843]"
//SELECT DISTINCT last_que, reversed
//FROM ( SELECT groupArray(ctxquestionid) AS reversed, arraySlice(arrayReverse(groupArray(ctxquestionid)), 1, 1)[1] AS last_que
//FROM statistics WHERE ctxquizid = 26276 GROUP BY ctxsession ) AS sub;
func (s *StatisticClick) getFunnel(ctx context.Context, quizID int64, from uint64, to uint64) (map[int64][]int64, error) {
query := `
SELECT DISTINCT last_que, reversed
FROM ( SELECT groupUniqArray(ctxquestionid) AS reversed, arraySlice(arrayReverse(groupArray(ctxquestionid)), 1, 1)[1] AS last_que
FROM mv_last_answers_events WHERE ctxquizid = ? AND event_time BETWEEN ? AND ? GROUP BY ctxsession ) AS sub;
`
rows, err := s.conn.QueryContext(ctx, query, quizID, from, to)
if err != nil {
return nil, err
}
defer rows.Close()
funnel := make(map[int64][]int64)
for rows.Next() {
var lastQue int64
var reversed []int64
if err := rows.Scan(&lastQue, &reversed); err != nil {
return nil, err
}
funnel[lastQue] = reversed
}
if err := rows.Err(); err != nil {
return nil, err
}
result := make(map[int64][]int64)
keys := make([]int64, 0, len(funnel))
for key := range funnel {
keys = append(keys, key)
}
sort.Slice(keys, func(i, j int) bool {
return keys[i] < keys[j]
})
for _, lastQue := range keys {
reversed := funnel[lastQue]
found := false
for _, otherLastQue := range keys {
if otherLastQue != lastQue {
otherReversed := funnel[otherLastQue]
sort.Slice(otherReversed, func(i, j int) bool {
return otherReversed[i] < otherReversed[j]
})
index := utils.BinarySearch(lastQue, otherReversed)
if index {
found = true
break
}
}
}
if !found {
result[lastQue] = reversed
}
}
return result, nil
}
func (s *StatisticClick) GetPipelinesStatistics(ctx context.Context, quizID int64, from uint64, to uint64) (PipeLineStatsResp, error) {
pipelines := make(PipeLineStatsResp)
funnel, err := s.getFunnel(ctx, quizID, from, to)
if err != nil {
return nil, err
}
for lastQue, idS := range funnel {
sesCount, err := s.countSession(ctx, quizID, from, to, idS)
if err != nil {
return nil, err
}
for _, queID := range idS {
if sessionCount, ok := sesCount[queID]; ok {
pipeline := Statistic{
QuestionID: queID,
Count: sessionCount,
}
pipelines[lastQue] = append(pipelines[lastQue], pipeline)
}
}
}
return pipelines, nil
}
func (s *StatisticClick) countSession(ctx context.Context, quizID int64, from uint64, to uint64, questionIDs []int64) (map[int64]int64, error) {
placeholders := make([]string, len(questionIDs))
args := make([]interface{}, len(questionIDs)+3)
args[0] = quizID
for i, id := range questionIDs {
placeholders[i] = "?"
args[i+1] = id
}
args[len(args)-2] = from
args[len(args)-1] = to
query := fmt.Sprintf(`
SELECT ctxquestionid, COUNT(DISTINCT ctxsession) AS session_count
FROM statistics
WHERE ctxquizid = ? AND ctxquestionid IN (%s) AND event_time BETWEEN ? AND ?
GROUP BY ctxquestionid;
`, strings.Join(placeholders, ","))
rows, err := s.conn.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
counts := make(map[int64]int64)
for rows.Next() {
var questionID int64
var count int64
err := rows.Scan(&questionID, &count)
if err != nil {
return nil, err
}
counts[questionID] = count
}
if err := rows.Err(); err != nil {
return nil, err
}
return counts, nil
}

20
utils/binary_search.go Normal file

@ -0,0 +1,20 @@
package utils
func BinarySearch(target int64, array []int64) bool {
left := 0
right := len(array) - 1
for left <= right {
mid := (left + right) / 2
if array[mid] == target {
return true
} else if array[mid] < target {
left = mid + 1
} else if array[mid] > target {
right = mid - 1
}
}
return false
}