worker/internal/privilegewc/consumer.go
2025-02-24 20:58:46 +03:00

150 lines
4.2 KiB
Go

package privilegewc
import (
"context"
"fmt"
"gitea.pena/SQuiz/common/dal"
"gitea.pena/SQuiz/common/model"
"gitea.pena/SQuiz/worker/internal/wctools"
"github.com/go-redis/redis/v8"
"github.com/twmb/franz-go/pkg/kgo"
"strings"
"time"
)
// Config содержит параметры конфигурации.
type Config struct {
KafkaBroker string
KafkaTopic string
ServiceKey string
TickerInterval time.Duration
ErrChan chan<- error
}
type KafkaConsumerWorker struct {
config Config
client *kgo.Client
redis *redis.Client
privilegeDAL *dal.DAL
}
// NewKafkaConsumerWorker создает новый экземпляр KafkaConsumerWorker.
func NewKafkaConsumerWorker(config Config, redis *redis.Client, privilegeDAL *dal.DAL) (*KafkaConsumerWorker, error) {
client, err := kgo.NewClient(
kgo.SeedBrokers(config.KafkaBroker),
kgo.ConsumerGroup("squiz1"),
kgo.ConsumeTopics(config.KafkaTopic),
kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().UnixMilli())),
)
if err != nil {
return nil, err
}
return &KafkaConsumerWorker{
config: config,
client: client,
privilegeDAL: privilegeDAL,
redis: redis,
}, nil
}
// Start запускает.
func (w *KafkaConsumerWorker) Start(ctx context.Context) {
ticker := time.NewTicker(w.config.TickerInterval)
defer ticker.Stop()
for {
fmt.Println("KONSUMER", w.config.TickerInterval)
select {
case <-ticker.C:
w.fetchMessages(ctx)
case <-ctx.Done():
fmt.Println("Kafka worker terminated")
return
}
}
}
// fetchAndProcessMessages извлекает сообщения из темы Kafka и обрабатывает их.
func (w *KafkaConsumerWorker) fetchMessages(ctx context.Context) {
fetches := w.client.PollFetches(ctx)
iter := fetches.RecordIter()
fmt.Println("KONSUMER1", fetches, w.config.ServiceKey)
for !iter.Done() {
record := iter.Next()
privilege, userID, err := wctools.IsValidMessage(record.Value, w.config.ServiceKey)
fmt.Println("KONSUMER2", err, userID)
if err != nil {
fmt.Println("Error validating Kafka message", err)
}
err = w.processValidMessage(ctx, privilege, userID)
if err != nil {
fmt.Println("Error processing valid message", err)
}
}
}
// processValidMessage обрабатывает валидное сообщение.
func (w *KafkaConsumerWorker) processValidMessage(ctx context.Context, privilege []model.PrivilegeMessage, userID string) error {
// TODO: refactor getting accountId
accountId, err := w.privilegeDAL.AccountRepo.GetAccountByID(ctx, userID)
fmt.Println("KONSUMEROOO", userID, accountId, err, privilege)
if err != nil {
return err
}
currentPrivilegeMap := make(map[string]model.ShortPrivilege)
for i := range accountId.Privileges {
currentPrivilegeMap[accountId.Privileges[i].PrivilegeID] = accountId.Privileges[i]
}
for _, receivedPrivilege := range privilege {
fmt.Println("KONSUMERl", privilege, receivedPrivilege.PrivilegeID, wctools.FindPrivilegeName(receivedPrivilege.PrivilegeID))
if matchingCurrentPrivilege, found := currentPrivilegeMap[receivedPrivilege.PrivilegeID]; found {
matchingCurrentPrivilege.Amount += receivedPrivilege.Amount
matchingCurrentPrivilege.CreatedAt = time.Now()
err := w.privilegeDAL.AccountRepo.UpdatePrivilege(ctx, &matchingCurrentPrivilege, accountId.ID)
if err != nil {
return err
}
} else {
newPrivilege := &model.ShortPrivilege{
PrivilegeID: receivedPrivilege.PrivilegeID,
PrivilegeName: wctools.FindPrivilegeName(receivedPrivilege.PrivilegeID),
Amount: receivedPrivilege.Amount,
CreatedAt: time.Now(),
}
err := w.privilegeDAL.AccountRepo.InsertPrivilege(ctx, newPrivilege, accountId.ID)
if err != nil {
return err
}
}
}
fmt.Println("RESET STALE", w.resetStaleMessages(ctx, accountId.ID))
return nil
}
func (w *KafkaConsumerWorker) resetStaleMessages(ctx context.Context, accountID string) error {
keys, err := w.redis.Keys(ctx, accountID+":*").Result()
if err != nil {
return err
}
for _, key := range keys {
renameRes := w.redis.Rename(ctx, key, strings.TrimPrefix(key, accountID+":"))
if renameRes == nil {
return renameRes.Err()
}
}
return nil
}