customer/internal/interface/sse/consumer.go

65 lines
1.4 KiB
Go

package sse
import (
"context"
"encoding/json"
"fmt"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
"github.com/golang-queue/redisdb"
)
type SSEConsumer struct {
dispatcher *ConnectionDispatcher
queue *queue.Queue
}
func NewSSEConsumer(redisAddr, redisPassword string, redisDB int, dispatcher *ConnectionDispatcher) (*SSEConsumer, error) {
w := redisdb.NewWorker(
redisdb.WithAddr(redisAddr),
redisdb.WithDB(redisDB),
redisdb.WithPassword(redisPassword),
redisdb.WithChannel(RedisKeySSEEvents),
redisdb.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
var sseMessage SSEEvent
if err := json.Unmarshal(m.Payload(), &sseMessage); err != nil {
return fmt.Errorf("failed to unmarshal SSE message: %w", err)
}
event := SSEEvent{
UserID: sseMessage.UserID,
EventType: sseMessage.EventType,
Data: sseMessage.Data,
Timestamp: sseMessage.Timestamp,
}
if err := dispatcher.SendToUser(event.UserID, event); err != nil {
return err
}
return nil
}),
)
q, err := queue.NewQueue(
queue.WithWorkerCount(10), // 10 воркеров
queue.WithWorker(w),
)
if err != nil {
return nil, err
}
return &SSEConsumer{
dispatcher: dispatcher,
queue: q,
}, nil
}
func (r *SSEConsumer) Start() {
r.queue.Start()
}
func (r *SSEConsumer) Stop() {
r.queue.Release()
}