worker/privilegewc/consumer.go
2024-02-19 21:20:09 +03:00

156 lines
4.4 KiB
Go

package privilegewc
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/themakers/hlog"
"github.com/twmb/franz-go/pkg/kgo"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
"penahub.gitlab.yandexcloud.net/backend/quiz/worker.git/wctools"
"strings"
"time"
)
// Config содержит параметры конфигурации.
type Config struct {
KafkaBroker string
KafkaTopic string
ServiceKey string
TickerInterval time.Duration
Logger hlog.Logger
ErrChan chan<- error
}
type KafkaConsumerWorker struct {
config Config
client *kgo.Client
redis *redis.Client
privilegeDAL *dal.DAL
}
// NewKafkaConsumerWorker создает новый экземпляр KafkaConsumerWorker.
func NewKafkaConsumerWorker(config Config, redis *redis.Client, privilegeDAL *dal.DAL) (*KafkaConsumerWorker, error) {
client, err := kgo.NewClient(
kgo.SeedBrokers(config.KafkaBroker),
kgo.ConsumerGroup("squiz1"),
kgo.ConsumeTopics(config.KafkaTopic),
kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().UnixMilli())),
)
if err != nil {
return nil, err
}
return &KafkaConsumerWorker{
config: config,
client: client,
privilegeDAL: privilegeDAL,
redis: redis,
}, nil
}
// Start запускает.
func (w *KafkaConsumerWorker) Start(ctx context.Context) {
ticker := time.NewTicker(w.config.TickerInterval)
defer ticker.Stop()
for {
fmt.Println("KONSUMER", w.config.TickerInterval)
select {
case <-ticker.C:
w.fetchMessages(ctx)
case <-ctx.Done():
w.config.Logger.Module("Kafka worker terminated")
return
}
}
}
// fetchAndProcessMessages извлекает сообщения из темы Kafka и обрабатывает их.
func (w *KafkaConsumerWorker) fetchMessages(ctx context.Context) {
fetches := w.client.PollFetches(ctx)
iter := fetches.RecordIter()
fmt.Println("KONSUMER1", fetches, w.config.ServiceKey)
for !iter.Done() {
record := iter.Next()
privilege, userID, err := wctools.IsValidMessage(record.Value, w.config.ServiceKey)
fmt.Println("KONSUMER2", err, userID)
if err != nil {
w.config.Logger.Module("Error validating Kafka message")
}
err = w.processValidMessage(ctx, privilege, userID)
if err != nil {
w.config.Logger.Module("Error processing valid message")
}
}
}
// processValidMessage обрабатывает валидное сообщение.
func (w *KafkaConsumerWorker) processValidMessage(ctx context.Context, privilege []model.PrivilegeMessage, userID string) error {
currentPrivileges, err := w.privilegeDAL.AccountRepo.GetPrivilegesByAccountID(ctx, userID)
if err != nil {
return err
}
// TODO: refactor getting accountId
accountId, err := w.privilegeDAL.AccountRepo.GetAccountByID(ctx, userID)
if err != nil {
return err
}
currentPrivilegeMap := make(map[string]*model.ShortPrivilege)
for i := range currentPrivileges {
currentPrivilegeMap[currentPrivileges[i].PrivilegeName] = &currentPrivileges[i]
}
for _, receivedPrivilege := range privilege {
fmt.Println("KONSUMERl", privilege, receivedPrivilege.PrivilegeID, wctools.FindPrivilegeName(receivedPrivilege.PrivilegeID))
if matchingCurrentPrivilege, found := currentPrivilegeMap[receivedPrivilege.PrivilegeID]; found {
matchingCurrentPrivilege.Amount += receivedPrivilege.Amount
matchingCurrentPrivilege.CreatedAt = time.Now()
err := w.privilegeDAL.AccountRepo.UpdatePrivilege(ctx, matchingCurrentPrivilege, accountId.ID)
if err != nil {
return err
}
} else {
newPrivilege := &model.ShortPrivilege{
PrivilegeID: receivedPrivilege.PrivilegeID,
PrivilegeName: wctools.FindPrivilegeName(receivedPrivilege.PrivilegeID),
Amount: receivedPrivilege.Amount,
CreatedAt: time.Now(),
}
err := w.privilegeDAL.AccountRepo.InsertPrivilege(ctx, newPrivilege, accountId.ID)
if err != nil {
return err
}
}
}
fmt.Println("RESET STALE", w.resetStaleMessages(ctx, accountId.ID))
return nil
}
func (w *KafkaConsumerWorker) resetStaleMessages(ctx context.Context, accountID string) error {
keys, err := w.redis.Keys(ctx, accountID+":*").Result()
if err != nil {
return err
}
for _, key := range keys {
renameRes := w.redis.Rename(ctx, key, strings.TrimPrefix(key, accountID+":"))
if renameRes == nil {
return renameRes.Err()
}
}
return nil
}