storer/internal/workers/file_cleaner/worker.go

71 lines
1.4 KiB
Go

package file_cleaner
import (
"context"
"errors"
"gitea.pena/SQuiz/storer/internal/models"
"gitea.pena/SQuiz/storer/internal/repository"
"github.com/go-redis/redis/v8"
"go.uber.org/zap"
"time"
)
type FileCleaner struct {
redisClient *redis.Client
repository *repository.S3
logger *zap.Logger
}
type Deps struct {
RedisClient *redis.Client
Repository *repository.S3
Logger *zap.Logger
}
func NewFileCleanerWC(deps Deps) *FileCleaner {
return &FileCleaner{
redisClient: deps.RedisClient,
repository: deps.Repository,
logger: deps.Logger,
}
}
func (wc *FileCleaner) Start(ctx context.Context) {
ticker := time.NewTicker(2 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
wc.processTask(ctx)
case <-ctx.Done():
return
}
}
}
func (wc *FileCleaner) processTask(ctx context.Context) {
for {
result, err := wc.redisClient.BLPop(ctx, 5*time.Second, models.RedisQueueS3DeleterKey).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
wc.logger.Error("queue is nil return")
return
}
wc.logger.Error("error reading from queue", zap.Error(err))
return
}
if len(result) < 2 {
wc.logger.Error("result is broken len < 2")
}
url := result[1]
err = wc.repository.DeleteFileFromS3(ctx, url)
if err != nil {
wc.logger.Error("error deleting file from S3", zap.Error(err))
}
}
}