From 1665d40f27f427b3bc413ae9c8fe89e9d2c260f0 Mon Sep 17 00:00:00 2001 From: Pavel Date: Thu, 11 Jul 2024 14:09:17 +0300 Subject: [PATCH] start building tg worker --- workers/tg_worker.go | 64 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 55 insertions(+), 9 deletions(-) diff --git a/workers/tg_worker.go b/workers/tg_worker.go index 1b57cef..df45145 100644 --- a/workers/tg_worker.go +++ b/workers/tg_worker.go @@ -2,28 +2,35 @@ package workers import ( "context" + "encoding/json" + "fmt" "github.com/go-redis/redis/v8" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal" + "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model" + "penahub.gitlab.yandexcloud.net/backend/quiz/core/clients/telegram" "time" ) type Deps struct { - BotID int64 - Redis *redis.Client - Dal *dal.DAL + BotID int64 + Redis *redis.Client + Dal *dal.DAL + TgClient *telegram.TelegramClient } type TgListenerWorker struct { - botID int64 - redis *redis.Client - dal *dal.DAL + botID int64 + redis *redis.Client + dal *dal.DAL + tgClient *telegram.TelegramClient } func NewTgListenerWC(deps Deps) *TgListenerWorker { return &TgListenerWorker{ - botID: deps.BotID, - redis: deps.Redis, - dal: deps.Dal, + botID: deps.BotID, + redis: deps.Redis, + dal: deps.Dal, + tgClient: deps.TgClient, } } @@ -42,5 +49,44 @@ func (wc *TgListenerWorker) Start(ctx context.Context) { } func (wc *TgListenerWorker) processTasks(ctx context.Context) { + var cursor uint64 + for { + var keys []string + var err error + keys, cursor, err = wc.redis.Scan(ctx, cursor, "telegram_task:*", 0).Result() + if err != nil { + fmt.Println("Failed scan for telegram tasks:", err) + break + } + for _, key := range keys { + func() { + taskBytes, err := wc.redis.GetDel(ctx, key).Result() + if err == redis.Nil { + return + } else if err != nil { + fmt.Println("Failed getdel telegram task:", err) + return + } + + defer func() { + + }() + + var task model.TgRedisTask + if json.Unmarshal([]byte(taskBytes), &task) != nil { + fmt.Println("Failed unmarshal telegram task:", err) + return + } + + inviteLink, chatID, err := wc.tgClient.CreateChannel(task.Name, wc.botID) + if err != nil { + fmt.Println("Failed create tg channel:", err) + } + }() + } + if cursor == 0 { + break + } + } }