add first try test with mat view

This commit is contained in:
Pavel 2024-06-13 18:37:58 +03:00
parent 8eb1473d64
commit ec052f5f15
2 changed files with 97 additions and 4 deletions

@ -197,9 +197,12 @@ func NewClickHouseDAL(ctx context.Context, cred string) (*ClickHouseDAL, error)
return nil, fmt.Errorf("error ping database: %w", err)
}
statsClickRepo := statistics.NewClickStatistic(statistics.DepsClick{
statsClickRepo, err := statistics.NewClickStatistic(ctx, statistics.DepsClick{
Conn: conn,
})
if err != nil {
return nil, err
}
return &ClickHouseDAL{
conn: conn,

@ -1,6 +1,10 @@
package statistics
import "database/sql"
import (
"context"
"database/sql"
"fmt"
)
type DepsClick struct {
Conn *sql.DB
@ -10,8 +14,94 @@ type StatisticClick struct {
conn *sql.DB
}
func NewClickStatistic(deps DepsClick) *StatisticClick {
return &StatisticClick{
func NewClickStatistic(ctx context.Context, deps DepsClick) (*StatisticClick, error) {
s := &StatisticClick{
conn: deps.Conn,
}
err := s.checkMW(ctx)
if err != nil {
fmt.Println("error check material view existing")
return nil, err
}
return s, nil
}
func (s *StatisticClick) checkMW(ctx context.Context) error {
query := `
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_last_answers_events
ENGINE = MergeTree()
PARTITION BY toStartOfDay(event_time)
ORDER BY (ctxsession, event_time) AS
SELECT
event_time,ctxsession,ctxquizid,ctxquestionid,ctxidint,message,keyos,keydevice,keydevicetype,
keybrowser,ctxuserip,ctxuserport,keydomain,keypath,ctxquiz,ctxreferrer
FROM statistics WHERE message IN ('InfoQuizOpen', 'InfoAnswer', 'InfoResult')
AND event_level = 'info' AND create_time = (SELECT max(create_time) FROM statistics AS inner_table
WHERE inner_table.ctxsession = statistics.ctxsession);
`
_, err := s.conn.ExecContext(ctx, query)
if err != nil {
return err
}
return nil
}
type PipeLineStatsResp [][]struct {
Count int64
QuestionID int64
}
func (s *StatisticClick) GetPipelinesStatistics(ctx context.Context, from uint64, to uint64) ([]PipeLineStatsResp, error) {
query := `
SELECT
ctxsession,ctxquestionid,count(*) as session_count
FROM mv_last_answers_events WHERE event_time BETWEEN ? AND ?
GROUP BY ctxsession, ctxquestionid ORDER BY ctxsession, ctxquestionid
`
rows, err := s.conn.QueryContext(ctx, query, from, to)
if err != nil {
return nil, err
}
defer rows.Close()
var pipelines []PipeLineStatsResp
var currentPipeline PipeLineStatsResp
var lastSession string
for rows.Next() {
var session string
var questionID int64
var count int64
err := rows.Scan(&session, &questionID, &count)
if err != nil {
return nil, err
}
// новая сессия - новая воронка
if session != lastSession && lastSession != "" {
pipelines = append(pipelines, currentPipeline)
currentPipeline = PipeLineStatsResp{}
}
// текущая статистика в текущую воронку
currentPipeline = append(currentPipeline, []struct {
Count int64
QuestionID int64
}{{Count: count, QuestionID: questionID}})
lastSession = session
}
// последня воронка если есть то добавляем
if len(currentPipeline) > 0 {
pipelines = append(pipelines, currentPipeline)
}
if err := rows.Err(); err != nil {
return nil, err
}
return pipelines, nil
}