package privilegewc import ( "context" "fmt" "github.com/go-redis/redis/v8" "github.com/twmb/franz-go/pkg/kgo" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model" "penahub.gitlab.yandexcloud.net/backend/quiz/worker/wctools" "strings" "time" ) // Config содержит параметры конфигурации. type Config struct { KafkaBroker string KafkaTopic string ServiceKey string TickerInterval time.Duration ErrChan chan<- error } type KafkaConsumerWorker struct { config Config client *kgo.Client redis *redis.Client privilegeDAL *dal.DAL } // NewKafkaConsumerWorker создает новый экземпляр KafkaConsumerWorker. func NewKafkaConsumerWorker(config Config, redis *redis.Client, privilegeDAL *dal.DAL) (*KafkaConsumerWorker, error) { client, err := kgo.NewClient( kgo.SeedBrokers(config.KafkaBroker), kgo.ConsumerGroup("squiz1"), kgo.ConsumeTopics(config.KafkaTopic), kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().UnixMilli())), ) if err != nil { return nil, err } return &KafkaConsumerWorker{ config: config, client: client, privilegeDAL: privilegeDAL, redis: redis, }, nil } // Start запускает. func (w *KafkaConsumerWorker) Start(ctx context.Context) { ticker := time.NewTicker(w.config.TickerInterval) defer ticker.Stop() for { fmt.Println("KONSUMER", w.config.TickerInterval) select { case <-ticker.C: w.fetchMessages(ctx) case <-ctx.Done(): fmt.Println("Kafka worker terminated") return } } } // fetchAndProcessMessages извлекает сообщения из темы Kafka и обрабатывает их. func (w *KafkaConsumerWorker) fetchMessages(ctx context.Context) { fetches := w.client.PollFetches(ctx) iter := fetches.RecordIter() fmt.Println("KONSUMER1", fetches, w.config.ServiceKey) for !iter.Done() { record := iter.Next() privilege, userID, err := wctools.IsValidMessage(record.Value, w.config.ServiceKey) fmt.Println("KONSUMER2", err, userID) if err != nil { fmt.Println("Error validating Kafka message", err) } err = w.processValidMessage(ctx, privilege, userID) if err != nil { fmt.Println("Error processing valid message", err) } } } // processValidMessage обрабатывает валидное сообщение. func (w *KafkaConsumerWorker) processValidMessage(ctx context.Context, privilege []model.PrivilegeMessage, userID string) error { // TODO: refactor getting accountId accountId, err := w.privilegeDAL.AccountRepo.GetAccountByID(ctx, userID) fmt.Println("KONSUMEROOO", userID, accountId, err, privilege) if err != nil { return err } currentPrivilegeMap := make(map[string]model.ShortPrivilege) for i := range accountId.Privileges { currentPrivilegeMap[accountId.Privileges[i].PrivilegeID] = accountId.Privileges[i] } for _, receivedPrivilege := range privilege { fmt.Println("KONSUMERl", privilege, receivedPrivilege.PrivilegeID, wctools.FindPrivilegeName(receivedPrivilege.PrivilegeID)) if matchingCurrentPrivilege, found := currentPrivilegeMap[receivedPrivilege.PrivilegeID]; found { matchingCurrentPrivilege.Amount += receivedPrivilege.Amount matchingCurrentPrivilege.CreatedAt = time.Now() err := w.privilegeDAL.AccountRepo.UpdatePrivilege(ctx, &matchingCurrentPrivilege, accountId.ID) if err != nil { return err } } else { newPrivilege := &model.ShortPrivilege{ PrivilegeID: receivedPrivilege.PrivilegeID, PrivilegeName: wctools.FindPrivilegeName(receivedPrivilege.PrivilegeID), Amount: receivedPrivilege.Amount, CreatedAt: time.Now(), } err := w.privilegeDAL.AccountRepo.InsertPrivilege(ctx, newPrivilege, accountId.ID) if err != nil { return err } } } fmt.Println("RESET STALE", w.resetStaleMessages(ctx, accountId.ID)) return nil } func (w *KafkaConsumerWorker) resetStaleMessages(ctx context.Context, accountID string) error { keys, err := w.redis.Keys(ctx, accountID+":*").Result() if err != nil { return err } for _, key := range keys { renameRes := w.redis.Rename(ctx, key, strings.TrimPrefix(key, accountID+":")) if renameRes == nil { return renameRes.Err() } } return nil }