telegram/instance_worker/worker.v
2024-08-27 11:48:39 +03:00

300 lines
8.3 KiB
V
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

module instance_worker
import time
import models
import dariotarantini.vgram
import repository
import bot_manager
import tools
pub struct InstanceWorker {
pub mut:
instance_id int
tg_sender vgram.Bot
repo repository.Repo
bot_manager bot_manager.BotManager
stop_ch chan int
}
pub fn (mut worker InstanceWorker) start() {
mut max_checkout := worker.get_max_checkout()or {
eprintln(err)
return
}
now := time.now().unix()
if now>max_checkout{
d := now - max_checkout
if d >=10{
max_checkout = now
}else{
max_checkout = now+10
}
}else{
max_checkout +=10
}
max_checkout += 5*60
new_instance := models.TelegramIntegrationInstance{
checkout: max_checkout
limit: models.bot_limit
amount: 0
}
worker.register_instance(new_instance) or {
eprintln(err)
return
}
mut count := 0
mut ticker := tools.new_ticker(models.check_interval)
defer {
ticker.stop()
}
wait_time := max_checkout - time.now().unix()
if wait_time > 0 {
time.sleep(wait_time * time.second)
}
for {
select {
_ := <-ticker.c{
// начинаем транзакцию
worker.repo.pg_db.exec('BEGIN') or { panic(err) }
// блокируем таблицы
worker.repo.pg_db.exec('LOCK TABLE telegram_integration IN ACCESS EXCLUSIVE MODE') or { panic(err) }
worker.repo.pg_db.exec('LOCK TABLE telegram_integration_instance IN ACCESS EXCLUSIVE MODE') or { panic(err) }
// обновляем только тогда когда прошла хотябы 1 итерация
if count>0{
worker.update_instance_checkout() or {
eprintln(err)
worker.repo.pg_db.exec('ROLLBACK') or { panic(err)}
return
}
}
// ищем и удаляем устаревшие инстансы
worker.remove_stale_instances() or {
eprintln(err)
worker.repo.pg_db.exec('ROLLBACK') or { panic(err)}
return
}
current_bots := worker.manage_bots()or{
eprintln(err)
worker.repo.pg_db.exec('ROLLBACK') or { panic(err)}
return
}
// если все успешно комитим транзакцию
worker.repo.pg_db.exec('COMMIT') or { panic(err) }
// обновляем количество итераций
count += 1
// запускаем ботов через манагера
worker.bot_manager.manage_bots(current_bots)or{
eprintln(err)
return
}
// ищем удаленных или стопнутых
deleted_stopped := worker.get_stopped_deleted() or{
eprintln(err)
return
}
// удаляем удаленных или стопнутых
worker.bot_manager.deleted_bot(deleted_stopped)or{
eprintln(err)
return
}
// оновляем инстансы у удаленных на 0
worker.update_instance(deleted_stopped)or{
eprintln(err)
return
}
}
_ := <-worker.stop_ch{
worker.cleanup() or {
println('$err')
}
return
}
}
}
}
fn (mut worker InstanceWorker) register_instance(data models.TelegramIntegrationInstance)! {
instance_id := sql worker.repo.pg_db {
insert data into models.TelegramIntegrationInstance
} or {
return error('Failed register telegram integration instance: $err')
}
worker.instance_id = instance_id
}
fn (mut worker InstanceWorker) get_max_checkout() !i64 {
selected_members := sql worker.repo.pg_db {
select from models.TelegramIntegrationInstance order by checkout desc
}or {
return error('Failed to get max checkout ${err}')
}
if selected_members.len == 0{
return 0
}
return selected_members[0].checkout
}
fn (mut worker InstanceWorker)update_instance_checkout()!{
now := time.now().unix()
sql worker.repo.pg_db {
update models.TelegramIntegrationInstance
set checkout = now where id == worker.instance_id
}or {
return error('Failed update instance checkout: $err')
}
}
fn (mut worker InstanceWorker) remove_stale_instances() ! {
expired := time.now().unix() - 10 * 60 // 10 минут назад
data_expired := sql worker.repo.pg_db {
select from models.TelegramIntegrationInstance where checkout < expired
}or{
return error('Failed get expired data: $err')
}
sql worker.repo.pg_db {
delete from models.TelegramIntegrationInstance where checkout < expired
}or{
return error('Failed remove stale instances: $err')
}
for _,data in data_expired{
sql worker.repo.pg_db{
update models.TelegramIntegration set instance_id = 0 where instance_id == data.id
} or{
return error('Failed update integration table: $err')
}
}
}
fn (mut worker InstanceWorker) manage_bots() !map[i64]models.TelegramIntegration {
mut current_instance_bots := map[i64]models.TelegramIntegration{}
// получаем количество ботов которые еще не назначены ни одному инстансу
mut free_bots := sql worker.repo.pg_db {
select from models.TelegramIntegration where status == models.active_status.str() && deleted == false && instance_id == 0
}or{
return error('Failed fetch free bots: $err')
}
// получаем все "живые" инстансы
all_active_instance:= sql worker.repo.pg_db{
select from models.TelegramIntegrationInstance
}or {
return error('Failed get current instance: $err')
}
// считаем общее количество слотов
mut total_slots := 0
for instance in all_active_instance {
total_slots += instance.limit - instance.amount
}
// количество свободных ботов
bots_assign := free_bots.len
// если количество ботов больше чем суммарное количество слотов всех инстансов отправляем оповещение
if bots_assign > total_slots {
msg := 'Осторожно: Свободных ботов больше чем инстансы могут взять в работу.'
worker.tg_sender.send_message(vgram.ConfigSendMessage{
chat_id: models.telegram_channel_id,
text: msg
})
}
// распределяем ботов по инстансам
mut bot_index := 0
mut assigned_bots := 0
for instance in all_active_instance {
free_slots := instance.limit - instance.amount
mut bots_for_instance := bots_assign / all_active_instance.len
// если доступных слотов больше то используем минимальное значение
if bots_for_instance > free_slots {
bots_for_instance = free_slots
}
sql worker.repo.pg_db {
update models.TelegramIntegrationInstance set amount = amount + bots_for_instance where id == instance.id
} or {
return error('Failed update instance amount: $err')
}
// назначаем ботов инстансу
for i := 0; i < bots_for_instance; i++ {
if bot_index >= free_bots.len {
break
}
bot := free_bots[bot_index]
if instance.id == worker.instance_id{
current_instance_bots[bot.id] = bot
}
sql worker.repo.pg_db {
update models.TelegramIntegration set instance_id = instance.id where id == bot.id
} or {
return error('Failed update bot instance_id: $err')
}
bot_index++
assigned_bots++
}
// если все боты распределены выходим
if assigned_bots >= bots_assign {
break
}
}
return current_instance_bots
}
fn (mut worker InstanceWorker) get_stopped_deleted() ![]models.TelegramIntegration {
mut results := sql worker.repo.pg_db {
select from models.TelegramIntegration where (status == models.stopped_status.str() || deleted == true) && instance_id == worker.instance_id
}or{
return error('Failed fetch stopped deleted bots: $err')
}
return results
}
fn (mut worker InstanceWorker)update_instance(bots []models.TelegramIntegration)!{
for bot in bots{
sql worker.repo.pg_db{
update models.TelegramIntegration set instance_id = 0 where id == bot.id
}or{
return error('Failed update instance for bots: $err')
}
}
}
pub fn (mut worker InstanceWorker) stop(){
worker.stop_ch <- 1
}
fn (mut worker InstanceWorker) cleanup() !{
worker.bot_manager.stop_all_bots()
sql worker.repo.pg_db {
delete from models.TelegramIntegrationInstance where id == worker.instance_id
} or {
return error('Error deleting instance in cleanup: $err')
}
sql worker.repo.pg_db {
update models.TelegramIntegration set instance_id = 0 where instance_id == worker.instance_id
}or{
return error('Error updating instance_id in cleanup: $err')
}
}