diff --git a/dal/dal.go b/dal/dal.go index 9217abe..cf2ad7d 100644 --- a/dal/dal.go +++ b/dal/dal.go @@ -197,9 +197,12 @@ func NewClickHouseDAL(ctx context.Context, cred string) (*ClickHouseDAL, error) return nil, fmt.Errorf("error ping database: %w", err) } - statsClickRepo := statistics.NewClickStatistic(statistics.DepsClick{ + statsClickRepo, err := statistics.NewClickStatistic(ctx, statistics.DepsClick{ Conn: conn, }) + if err != nil { + return nil, err + } return &ClickHouseDAL{ conn: conn, diff --git a/repository/statistics/click_statistics.go b/repository/statistics/click_statistics.go index df2774b..3622e56 100644 --- a/repository/statistics/click_statistics.go +++ b/repository/statistics/click_statistics.go @@ -1,6 +1,10 @@ package statistics -import "database/sql" +import ( + "context" + "database/sql" + "fmt" +) type DepsClick struct { Conn *sql.DB @@ -10,8 +14,94 @@ type StatisticClick struct { conn *sql.DB } -func NewClickStatistic(deps DepsClick) *StatisticClick { - return &StatisticClick{ +func NewClickStatistic(ctx context.Context, deps DepsClick) (*StatisticClick, error) { + s := &StatisticClick{ conn: deps.Conn, } + + err := s.checkMW(ctx) + if err != nil { + fmt.Println("error check material view existing") + return nil, err + } + + return s, nil +} + +func (s *StatisticClick) checkMW(ctx context.Context) error { + query := ` +CREATE MATERIALIZED VIEW IF NOT EXISTS mv_last_answers_events +ENGINE = MergeTree() +PARTITION BY toStartOfDay(event_time) +ORDER BY (ctxsession, event_time) AS +SELECT + event_time,ctxsession,ctxquizid,ctxquestionid,ctxidint,message,keyos,keydevice,keydevicetype, + keybrowser,ctxuserip,ctxuserport,keydomain,keypath,ctxquiz,ctxreferrer +FROM statistics WHERE message IN ('InfoQuizOpen', 'InfoAnswer', 'InfoResult') + AND event_level = 'info' AND create_time = (SELECT max(create_time) FROM statistics AS inner_table + WHERE inner_table.ctxsession = statistics.ctxsession); +` + _, err := s.conn.ExecContext(ctx, query) + if err != nil { + return err + } + return nil +} + +type PipeLineStatsResp [][]struct { + Count int64 + QuestionID int64 +} + +func (s *StatisticClick) GetPipelinesStatistics(ctx context.Context, from uint64, to uint64) ([]PipeLineStatsResp, error) { + query := ` +SELECT + ctxsession,ctxquestionid,count(*) as session_count +FROM mv_last_answers_events WHERE event_time BETWEEN ? AND ? +GROUP BY ctxsession, ctxquestionid ORDER BY ctxsession, ctxquestionid +` + rows, err := s.conn.QueryContext(ctx, query, from, to) + if err != nil { + return nil, err + } + defer rows.Close() + + var pipelines []PipeLineStatsResp + var currentPipeline PipeLineStatsResp + var lastSession string + + for rows.Next() { + var session string + var questionID int64 + var count int64 + err := rows.Scan(&session, &questionID, &count) + if err != nil { + return nil, err + } + + // новая сессия - новая воронка + if session != lastSession && lastSession != "" { + pipelines = append(pipelines, currentPipeline) + currentPipeline = PipeLineStatsResp{} + } + + // текущая статистика в текущую воронку + currentPipeline = append(currentPipeline, []struct { + Count int64 + QuestionID int64 + }{{Count: count, QuestionID: questionID}}) + + lastSession = session + } + + // последня воронка если есть то добавляем + if len(currentPipeline) > 0 { + pipelines = append(pipelines, currentPipeline) + } + + if err := rows.Err(); err != nil { + return nil, err + } + + return pipelines, nil }