Общая реактивность: единая шина #12

Open
skeris wants to merge 2 commits from event_bus into dev
Owner

нам нужно реализовать возможность отдавать сообщения для реактивности от любого сервиса через некоторый ебинфй интерфейс. даже если произошло что-то в воркере, мы должны оповестить об этом пользователя. для этого нужно

  • иметь единую шину передачи событий в обработчик sse
  • иметь едины интерфейс складывания в эту шину
  • уметь подписывать и отписывать пользователей от sse

использовать будем https://github.com/golang-queue/queue в качестве единого интерфейса
единой шиной пусть будет редис. т.е. сразу фигачим не через кольцевой буффер, а через редис
единая очередь должна реализовывать максимально простую логику:

  • продьюсеры должны в неё складывать уже сериализованные данные + айдишник того, кому отправляем сообщение.
  • консьюмер должен получить из очереди сообщение и отправить соотв пользователю.

сама работа с очередью крайне проста получается. но вот вопрос подписывания, отписывания и отправки конкретному пользователю это сложный момент. тут есть следующие подводные камни:

  • один и тот же пользователь может быть залогинен с нескольких устройств параллельно. надо отправить сообщение на все
  • мы не можем себе позволить при возникновении события разбудить все sse горутины и проверить в них. т.е. должна быть какая то единая dispatcher или fan-out сущность, которая консьюмит и будит конкретный набор горутин
  • мы не можем позволить себе искать нужную горутину в цикле, скорее всего нужна мэпа
  • мы не можем позволить себе стопать всю работу консьюмера отправкой
  • мы не можем позволить себе надолго стопать работу консьюмера регистрацией новой горутины или удалением старой
  • Данил говорил, что есть какие--то проблемы с отловом того что коннект sse отвалился в gofiber

у меня есть идеи, как это сделать, но я думаю тебе будет придумать своё решение, описать его сюда и потом сравним идеи

нам нужно реализовать возможность отдавать сообщения для реактивности от любого сервиса через некоторый ебинфй интерфейс. даже если произошло что-то в воркере, мы должны оповестить об этом пользователя. для этого нужно - иметь единую шину передачи событий в обработчик sse - иметь едины интерфейс складывания в эту шину - уметь подписывать и отписывать пользователей от sse использовать будем https://github.com/golang-queue/queue в качестве единого интерфейса единой шиной пусть будет редис. т.е. сразу фигачим не через кольцевой буффер, а через редис единая очередь должна реализовывать максимально простую логику: - продьюсеры должны в неё складывать уже сериализованные данные + айдишник того, кому отправляем сообщение. - консьюмер должен получить из очереди сообщение и отправить соотв пользователю. сама работа с очередью крайне проста получается. но вот вопрос подписывания, отписывания и отправки конкретному пользователю это сложный момент. тут есть следующие подводные камни: - один и тот же пользователь может быть залогинен с нескольких устройств параллельно. надо отправить сообщение на все - мы не можем себе позволить при возникновении события разбудить все sse горутины и проверить в них. т.е. должна быть какая то единая dispatcher или fan-out сущность, которая консьюмит и будит конкретный набор горутин - мы не можем позволить себе искать нужную горутину в цикле, скорее всего нужна мэпа - мы не можем позволить себе стопать всю работу консьюмера отправкой - мы не можем позволить себе надолго стопать работу консьюмера регистрацией новой горутины или удалением старой - Данил говорил, что есть какие--то проблемы с отловом того что коннект sse отвалился в gofiber у меня есть идеи, как это сделать, но я думаю тебе будет придумать своё решение, описать его сюда и потом сравним идеи
skeris added the
Kind/Enhancement
label 2025-06-20 20:01:29 +00:00
pasha1coil was assigned by skeris 2025-06-20 20:01:29 +00:00
Member

так я надумал, попытаюсь кратко изложить что надумал

всего я надумал 5 точек
1 самая незначительная, просто нужно упомянуть это модель sse события
туда бы положить следующие поля:

  • ид пользователя
  • тип события
  • дату в json.RawMessage
  • таймшамп

и модель sse соединения которая будет хранится в мапе

туда я думаю сложить вот это - но не уверен что этого может быть достаточно:

  • id соединения
  • id пользователя
  • device id для поддержки нескольких устройств
  • канал для передачи событий тех самых которые я чуть выше описывал - канал буфферизированный
  • ctx
  • cancel, эти два я думаю необходимы чтобы жизнью соединения управлять

вот в принципе по моделям все

некст точка - продюсер, почтальон

примерно я представляю его структуру так:

type SSEProducer struct {
	queue  *queue.Queue
	logger *zap.Logger
}

довольно нищая, но больше ничего и не нужно

тут будут методы отправки сообщения
пинга
закрытия

ну и самое важное что тут нужно будет реализовать это поключение с помощью - https://github.com/golang-queue/queue для того чтобы отправлять в очередь т.е. в редиску

второй микро чел, но побольше это консюмер
его структру вижу примерно такой:

type SSEConsumer struct {
	dispatcher *ConnectionDispatcher // это мегачел наш
	queue      *queue.Queue
	logger     *zap.Logger
}

тут в теории будет мало кода:

  • самый большой это будет конструктор - тк нам нужно определить правильно queue чтобы читала правильно и как бы тут будет основная логика обработки сообщений

  • по дефолту Start Stop

  • в теории можно еще добавить что в продюсер что в консюмер такие штучки как хелсчеки и "статистику" для прометеуса ( но это на будущее, пока хз надо ли оно нам, просто как предложение)

в принципе тут все

и последний мега чел это ConnectionDispatcher - это будет управляющий соединениями пользователей
думал структурку для него примерно вот это надумал:

type ConnectionDispatcher struct {
	connections map[string]map[string]models.SSEConnection // userID - deviceID - connection
	mu          sync.RWMutex // для мапы, но возможно sync.Map юзать - но очень не уверен
	logger      *zap.Logger
}

тут будут методы регистрации соединения, удаления зарегистрированного соединения, отправка юзеру события (и даже нескольким) и воркер который будет следить за контекстом соединений по контексту из первой структуры и удалять погибшие соединения

вроде все ключевое описал... довольно тяжко слов не хватает все это описать более подробно

так я надумал, попытаюсь кратко изложить что надумал всего я надумал 5 точек 1 самая незначительная, просто нужно упомянуть это модель sse события туда бы положить следующие поля: - ид пользователя - тип события - дату в json.RawMessage - таймшамп и модель sse соединения которая будет хранится в мапе туда я думаю сложить вот это - но не уверен что этого может быть достаточно: - id соединения - id пользователя - device id для поддержки нескольких устройств - канал для передачи событий тех самых которые я чуть выше описывал - канал буфферизированный - ctx - cancel, эти два я думаю необходимы чтобы жизнью соединения управлять вот в принципе по моделям все некст точка - продюсер, почтальон примерно я представляю его структуру так: ```go type SSEProducer struct { queue *queue.Queue logger *zap.Logger } ``` довольно нищая, но больше ничего и не нужно тут будут методы отправки сообщения пинга закрытия ну и самое важное что тут нужно будет реализовать это поключение с помощью - https://github.com/golang-queue/queue для того чтобы отправлять в очередь т.е. в редиску второй микро чел, но побольше это консюмер его структру вижу примерно такой: ```go type SSEConsumer struct { dispatcher *ConnectionDispatcher // это мегачел наш queue *queue.Queue logger *zap.Logger } ``` тут в теории будет мало кода: - самый большой это будет конструктор - тк нам нужно определить правильно queue чтобы читала правильно и как бы тут будет основная логика обработки сообщений - по дефолту Start Stop - в теории можно еще добавить что в продюсер что в консюмер такие штучки как хелсчеки и "статистику" для прометеуса ( но это на будущее, пока хз надо ли оно нам, просто как предложение) в принципе тут все и последний мега чел это ConnectionDispatcher - это будет управляющий соединениями пользователей думал структурку для него примерно вот это надумал: ```go type ConnectionDispatcher struct { connections map[string]map[string]models.SSEConnection // userID - deviceID - connection mu sync.RWMutex // для мапы, но возможно sync.Map юзать - но очень не уверен logger *zap.Logger } ``` тут будут методы регистрации соединения, удаления зарегистрированного соединения, отправка юзеру события (и даже нескольким) и воркер который будет следить за контекстом соединений по контексту из первой структуры и удалять погибшие соединения вроде все ключевое описал... довольно тяжко слов не хватает все это описать более подробно
Author
Owner

ctx и cancel в структуры не складываем. управлять соединением должен fiber, с нашей стороны не должно быть инструментов для его обрыва, мы лишь должны узнать, что оно оборвалось

что будет ключем этой мэпы? с подключениями? можем ли мы обойтись без мьютекса в сценарии, когда добавляется ещё один коннект для одного и того же пользователя?

насчет метода пинга я не понял. ты предлагаешь пинг гонять через очередь?

а для чего в ConnectDispatcher мэпа мэп?

ctx и cancel в структуры не складываем. управлять соединением должен fiber, с нашей стороны не должно быть инструментов для его обрыва, мы лишь должны узнать, что оно оборвалось что будет ключем этой мэпы? с подключениями? можем ли мы обойтись без мьютекса в сценарии, когда добавляется ещё один коннект для одного и того же пользователя? насчет метода пинга я не понял. ты предлагаешь пинг гонять через очередь? а для чего в ConnectDispatcher мэпа мэп?
Member

ctx и cancel в структуры не складываем. управлять соединением должен fiber, с нашей стороны не должно быть инструментов для его обрыва, мы лишь должны узнать, что оно оборвалось

окей

что будет ключем этой мэпы? с подключениями? можем ли мы обойтись без мьютекса в сценарии, когда добавляется ещё один коннект для одного и того же пользователя?

мапа будет примерно такая - [key1-userid] = [key2-deviceid]SSEConnection
без мьютекса думаю не получится, велик шанс гонки данных, надо подумать может как то подругому это реализовать есть способ...

а для чего в ConnectDispatcher мэпа мэп?

для того чтобы отправлять на каждое подключенное устройство пользователя, достаточно будет иметь только его id тогда можем потом ренжить по мапе с девайс-конекшндата

насчет метода пинга я не понял. ты предлагаешь пинг гонять через очередь?

эээ, это я что то чиканулся видимо когда писал, вообще про очередь не подумал - думал прежде чем подключать "пакет очередь" делать пинг в редис, чтобы его проверить что работает, но наверное лишнее

> ctx и cancel в структуры не складываем. управлять соединением должен fiber, с нашей стороны не должно быть инструментов для его обрыва, мы лишь должны узнать, что оно оборвалось окей > что будет ключем этой мэпы? с подключениями? можем ли мы обойтись без мьютекса в сценарии, когда добавляется ещё один коннект для одного и того же пользователя? мапа будет примерно такая - [key1-userid] = [key2-deviceid]SSEConnection без мьютекса думаю не получится, велик шанс гонки данных, надо подумать может как то подругому это реализовать есть способ... > а для чего в ConnectDispatcher мэпа мэп? для того чтобы отправлять на каждое подключенное устройство пользователя, достаточно будет иметь только его id тогда можем потом ренжить по мапе с девайс-конекшндата > насчет метода пинга я не понял. ты предлагаешь пинг гонять через очередь? эээ, это я что то чиканулся видимо когда писал, вообще про очередь не подумал - думал прежде чем подключать "пакет очередь" делать пинг в редис, чтобы его проверить что работает, но наверное лишнее
Author
Owner

у нас не будет необходимости послать сообщение на конкретное устройство пользователя. гарантирую. поэтому мэпа тма лишняя, достаточно слайса. и слайс будет не сильно большой кроме случаев, когда нас будут хотеть досить. так что пришло сообщение - в цикле разослал по всем элементам слайса. т.е. не мэпа мэп, а мэпа слайсов

в случае с мэпой в целом довольно сложно избежать гонки. но мне кажутся два варианта довольно рабочими в случае слайса:

  • в случае если мы добавляем или убавляем слайсы, т.е. изменяем коннекты одного юзера, мы должны в цикл рассылки собщений получать нечто типа копии слайса, чтобы иметь возможность заменить один слайс другим без гонки. при изменении слайса надо делать копию имеющегося и заменять слайс только тогда. это всё ещё не решает проблему с заменой значения в мэпе
  • мэпа почему гонку вызывает - потому что у неё внутри массивы. и от этого никуда не убежать. и чем шире будет мэпа, тем на дольше надо будет стопить корутину. моя идея такая: во-первых, это должна быть не мэпа, а указатель на мэпу, чтобы можно было атомарно заменять. во-вторых, хранить указатели на мэпу в кольцевом буффере, чтобы реализовывать логика что ты не стопаешь мир на то чтобы заменить значение в мэпе, а заменяешь значение в следующей по кольцу мэпе и заменяшь мэпу. да, потребляет больше памяти, но позволяет не стопать мир почти никак
у нас не будет необходимости послать сообщение на конкретное устройство пользователя. гарантирую. поэтому мэпа тма лишняя, достаточно слайса. и слайс будет не сильно большой кроме случаев, когда нас будут хотеть досить. так что пришло сообщение - в цикле разослал по всем элементам слайса. т.е. не мэпа мэп, а мэпа слайсов в случае с мэпой в целом довольно сложно избежать гонки. но мне кажутся два варианта довольно рабочими в случае слайса: - в случае если мы добавляем или убавляем слайсы, т.е. изменяем коннекты одного юзера, мы должны в цикл рассылки собщений получать нечто типа копии слайса, чтобы иметь возможность заменить один слайс другим без гонки. при изменении слайса надо делать копию имеющегося и заменять слайс только тогда. это всё ещё не решает проблему с заменой значения в мэпе - мэпа почему гонку вызывает - потому что у неё внутри массивы. и от этого никуда не убежать. и чем шире будет мэпа, тем на дольше надо будет стопить корутину. моя идея такая: во-первых, это должна быть не мэпа, а указатель на мэпу, чтобы можно было атомарно заменять. во-вторых, хранить указатели на мэпу в кольцевом буффере, чтобы реализовывать логика что ты не стопаешь мир на то чтобы заменить значение в мэпе, а заменяешь значение в следующей по кольцу мэпе и заменяшь мэпу. да, потребляет больше памяти, но позволяет не стопать мир почти никак
Member

окей, понял
попробую с мэпой слайсов сделать
вроде годно звучит

окей, понял попробую с мэпой слайсов сделать вроде годно звучит
pasha1coil added 1 commit 2025-06-25 14:07:00 +00:00
pasha1coil added 1 commit 2025-06-26 10:35:27 +00:00
This pull request can be merged automatically.
You are not authorized to merge this pull request.

Checkout

From your project repository, check out a new branch and test the changes.
git fetch -u origin event_bus:event_bus
git checkout event_bus
Sign in to join this conversation.
No reviewers
No Milestone
No project
No Assignees
2 Participants
Notifications
Due Date
The due date is invalid or out of range. Please use the format 'yyyy-mm-dd'.

No due date set.

Dependencies

No dependencies set.

Reference: PenaSide/customer#12
No description provided.