ref code, add purgeWC and base doc
This commit is contained in:
parent
487dc8bc2d
commit
b767702160
6
.env
6
.env
@ -21,7 +21,9 @@ PUBLIC_CURVE_KEY="-----BEGIN PUBLIC KEY-----\nMCowBQYDK2VwAyEAEbnIvjIMle4rqVol6K
|
||||
|
||||
PRIVATE_CURVE_KEY="-----BEGIN PRIVATE KEY-----\nMC4CAQAwBQYDK2VwBCIEIKn0BKwF3vZvODgWAnUIwQhd8de5oZhY48gc23EWfrfs\n-----END PRIVATE KEY-----"
|
||||
|
||||
SIGN_SECRET=group
|
||||
# SIGN_SECRET="group"
|
||||
|
||||
SIGN_SECRET="secret"
|
||||
|
||||
# SMTP settings
|
||||
SMTP_API_URL="https://api.smtp.bz/v1/smtp/send"
|
||||
@ -34,4 +36,4 @@ SMTP_SENDER="noreply@mailing.pena.digital"
|
||||
|
||||
# URL settings
|
||||
DEFAULT_REDIRECTION_URL = "def.url"
|
||||
AUTH_REFRESH_URL = "http://localhost:8000/auth/exchange"
|
||||
AUTH_EXCHANGE_URL = "http://localhost:8000/auth/exchange"
|
@ -0,0 +1,87 @@
|
||||
openapi: 3.0.0
|
||||
info:
|
||||
title: Codeword Recovery Service API
|
||||
version: 1.0.0
|
||||
description: API for handling password recovery for the Codeword service.
|
||||
|
||||
|
||||
paths:
|
||||
/liveness:
|
||||
get:
|
||||
summary: Роут проверки активности
|
||||
responses:
|
||||
'200':
|
||||
description: Успех – сервис запущен
|
||||
|
||||
/readiness:
|
||||
get:
|
||||
summary: Роут проверки базы данных
|
||||
responses:
|
||||
'200':
|
||||
description: Успех — сервис готов и соединение с БД живо
|
||||
'503':
|
||||
description: Служба недоступна — не удалось выполнить проверку связи с БД
|
||||
|
||||
/recover:
|
||||
post:
|
||||
summary: Запустите процесс восстановления пароля
|
||||
requestBody:
|
||||
required: true
|
||||
content:
|
||||
application/x-www-form-urlencoded:
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
email:
|
||||
type: string
|
||||
format: email
|
||||
description: Электронная почта, на которую нужно отправить инструкции по восстановлению
|
||||
Referrer:
|
||||
type: string
|
||||
description: URL-адрес referral, если он доступен
|
||||
RedirectionURL:
|
||||
type: string
|
||||
description: URL-адрес, на который перенаправляется пользователь после отправки электронного письма
|
||||
|
||||
responses:
|
||||
'200':
|
||||
description: Запрос на восстановление принят, и возвращен идентификатор записи восстановления
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
id:
|
||||
type: string
|
||||
description: Идентификатор запроса на восстановление
|
||||
'404':
|
||||
description: Пользователь не найден по электронной почте
|
||||
'500':
|
||||
description: Внутренняя ошибка сервера – разные причины
|
||||
|
||||
/recover/{sign}:
|
||||
get:
|
||||
summary: Обработать ссылку восстановления, в которой содержится подпись и обменять ее на токены
|
||||
parameters:
|
||||
- in: path
|
||||
name: sign
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
description: Подпись восстановления как часть URL-адреса восстановления
|
||||
responses:
|
||||
'200':
|
||||
description: Восстановление успешно, информация для обмена токенов возвращена
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
accessToken:
|
||||
type: string
|
||||
refreshToken:
|
||||
type: string
|
||||
'406':
|
||||
description: NotAcceptable - срок действия ссылки для восстановления истек или она недействительна
|
||||
'500':
|
||||
description: Внутренняя ошибка сервера – разные причины
|
@ -41,13 +41,11 @@ func (a *AuthClient) RefreshAuthToken(userID, signature string) (*models.Refresh
|
||||
|
||||
agent := a.deps.FiberClient.Post(a.deps.AuthUrl)
|
||||
agent.Set("Content-Type", "application/json").Body(bodyBytes)
|
||||
//todo надо что-то придумать с авторизаиционными токенами
|
||||
agent.Set("Authorization", "Bearer "+"123")
|
||||
|
||||
statusCode, resBody, errs := agent.Bytes()
|
||||
if len(errs) > 0 {
|
||||
for _, err := range errs {
|
||||
a.deps.Logger.Error("Error in refresh auth token request", zap.Error(err))
|
||||
a.deps.Logger.Error("Error in exchange auth token request", zap.Error(err))
|
||||
}
|
||||
return nil, fmt.Errorf("request failed: %v", errs)
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"codeword/internal/repository"
|
||||
httpserver "codeword/internal/server/http"
|
||||
"codeword/internal/services"
|
||||
"codeword/internal/worker/purge_worker"
|
||||
"codeword/internal/worker/recovery_worker"
|
||||
"context"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
@ -26,8 +27,8 @@ func Run(ctx context.Context, cfg initialize.Config, logger *zap.Logger) error {
|
||||
encrypt := initialize.InitializeEncrypt(cfg)
|
||||
codewordRepo := repository.NewCodewordRepository(repository.Deps{Rdb: rdb, Mdb: mdb.Collection("codeword")})
|
||||
userRepo := repository.NewUserRepository(repository.Deps{Rdb: nil, Mdb: mdb.Collection("users")})
|
||||
recoveryEmailSender := initialize.InitializeRecoveryEmailSender(cfg, logger)
|
||||
authClient := initialize.InitializeAuthClient(cfg, logger)
|
||||
recoveryEmailSender := initialize.RecoveryEmailSender(cfg, logger)
|
||||
authClient := initialize.AuthClient(cfg, logger)
|
||||
|
||||
recoveryService := services.NewRecoveryService(services.Deps{
|
||||
Logger: logger,
|
||||
@ -46,7 +47,13 @@ func Run(ctx context.Context, cfg initialize.Config, logger *zap.Logger) error {
|
||||
Mongo: mdb.Collection("codeword"),
|
||||
})
|
||||
|
||||
purgeWC := purge_worker.NewRecoveryWC(purge_worker.Deps{
|
||||
Logger: logger,
|
||||
Mongo: mdb.Collection("codeword"),
|
||||
})
|
||||
|
||||
go recoveryWC.Start(ctx)
|
||||
go purgeWC.Start(ctx)
|
||||
|
||||
server := httpserver.NewServer(httpserver.ServerConfig{
|
||||
Logger: logger,
|
||||
@ -55,7 +62,7 @@ func Run(ctx context.Context, cfg initialize.Config, logger *zap.Logger) error {
|
||||
|
||||
go func() {
|
||||
if err := server.Start(cfg.HTTPHost + ":" + cfg.HTTPPort); err != nil {
|
||||
logger.Error("Ошибка запуска сервера", zap.Error(err))
|
||||
logger.Error("Server startup error", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
@ -64,7 +71,7 @@ func Run(ctx context.Context, cfg initialize.Config, logger *zap.Logger) error {
|
||||
if err := shutdownApp(server, mdb, logger); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Info("Приложение остановлено")
|
||||
logger.Info("The application has stopped")
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -87,7 +94,7 @@ func shutdownHTTPServer(server *httpserver.Server, logger *zap.Logger) error {
|
||||
defer cancel()
|
||||
|
||||
if err := server.Shutdown(ctx); err != nil {
|
||||
logger.Error("Ошибка при остановке HTTP-сервера", zap.Error(err))
|
||||
logger.Error("Error stopping HTTP server", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@ -98,7 +105,7 @@ func shutdownMongoDB(mdb *mongo.Database, logger *zap.Logger) error {
|
||||
defer cancel()
|
||||
|
||||
if err := mdb.Client().Disconnect(ctx); err != nil {
|
||||
logger.Error("Ошибка при закрытии соединения с MongoDB", zap.Error(err))
|
||||
logger.Error("Error when closing MongoDB connection", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
@ -39,18 +39,18 @@ func (r *RecoveryController) HandleRecoveryRequest(c *fiber.Ctx) error {
|
||||
redirectionURL = r.defaultURL
|
||||
}
|
||||
|
||||
key, err := r.service.GenerateKey()
|
||||
if err != nil {
|
||||
r.logger.Error("Failed to generate key", zap.Error(err))
|
||||
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Internal Server Error"})
|
||||
}
|
||||
|
||||
user, err := r.service.FindUserByEmail(c.Context(), email)
|
||||
if err != nil || user == nil {
|
||||
r.logger.Error("Failed to find user by email", zap.Error(err))
|
||||
return c.Status(fiber.StatusNotFound).JSON(fiber.Map{"error": "User not found"})
|
||||
}
|
||||
|
||||
key, err := r.service.GenerateKey()
|
||||
if err != nil {
|
||||
r.logger.Error("Failed to generate key", zap.Error(err))
|
||||
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Internal Server Error"})
|
||||
}
|
||||
|
||||
signUrl := redirectionURL + base64.URLEncoding.EncodeToString(key)
|
||||
sign := base64.URLEncoding.EncodeToString(key)
|
||||
|
||||
@ -71,6 +71,8 @@ func (r *RecoveryController) HandleRecoveryRequest(c *fiber.Ctx) error {
|
||||
})
|
||||
}
|
||||
|
||||
// todo тут скорее всего помимо подписи будет передаваться еще что-то, например email пользователя от фронта для поиска в бд
|
||||
|
||||
// HandleRecoveryLink обрабатывает ссылку восстановления и обменивает ее на токены
|
||||
func (r *RecoveryController) HandleRecoveryLink(c *fiber.Ctx) error {
|
||||
key := c.Params("sign")
|
||||
@ -83,7 +85,7 @@ func (r *RecoveryController) HandleRecoveryLink(c *fiber.Ctx) error {
|
||||
|
||||
if time.Since(record.CreatedAt) > 15*time.Minute {
|
||||
r.logger.Error("Recovery link expired", zap.String("signature", key))
|
||||
return c.Status(fiber.StatusUnauthorized).JSON(fiber.Map{"error": "Recovery link expired"})
|
||||
return c.Status(fiber.StatusNotAcceptable).JSON(fiber.Map{"error": "Recovery link expired"})
|
||||
}
|
||||
|
||||
tokens, err := r.service.ExchangeForTokens(record.UserID, record.Sign)
|
||||
|
@ -1 +1,3 @@
|
||||
package errors
|
||||
|
||||
// пока не нужен
|
||||
|
@ -6,7 +6,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func InitializeRecoveryEmailSender(cfg Config, logger *zap.Logger) *client.RecoveryEmailSender {
|
||||
func RecoveryEmailSender(cfg Config, logger *zap.Logger) *client.RecoveryEmailSender {
|
||||
return client.NewRecoveryEmailSender(client.RecoveryEmailSenderDeps{
|
||||
SmtpApiUrl: cfg.SmtpApiUrl,
|
||||
SmtpHost: cfg.SmtpHost,
|
||||
@ -22,7 +22,7 @@ func InitializeRecoveryEmailSender(cfg Config, logger *zap.Logger) *client.Recov
|
||||
})
|
||||
}
|
||||
|
||||
func InitializeAuthClient(cfg Config, logger *zap.Logger) *client.AuthClient {
|
||||
func AuthClient(cfg Config, logger *zap.Logger) *client.AuthClient {
|
||||
return client.NewAuthClient(client.AuthClientDeps{
|
||||
AuthUrl: cfg.AuthURL,
|
||||
Logger: logger,
|
||||
|
@ -30,7 +30,7 @@ type Config struct {
|
||||
SmtpApiKey string `env:"SMTP_API_KEY"`
|
||||
SmtpSender string `env:"SMTP_SENDER"`
|
||||
DefaultRedirectionURL string `env:"DEFAULT_REDIRECTION_URL"`
|
||||
AuthURL string `env:"AUTH_REFRESH_URL"`
|
||||
AuthURL string `env:"AUTH_EXCHANGE_URL"`
|
||||
}
|
||||
|
||||
func LoadConfig() (*Config, error) {
|
||||
|
83
internal/repository/codeword_repository.go
Normal file
83
internal/repository/codeword_repository.go
Normal file
@ -0,0 +1,83 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"codeword/internal/models"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/readpref"
|
||||
"time"
|
||||
)
|
||||
|
||||
type codewordRepository struct {
|
||||
mdb *mongo.Collection
|
||||
rdb *redis.Client
|
||||
}
|
||||
|
||||
func NewCodewordRepository(deps Deps) *codewordRepository {
|
||||
|
||||
return &codewordRepository{mdb: deps.Mdb, rdb: deps.Rdb}
|
||||
}
|
||||
|
||||
// сохраняем полученные данные о пользователе и подписи в бд
|
||||
func (r *codewordRepository) StoreRecoveryRecord(ctx context.Context, userID, email, key, url string) (string, error) {
|
||||
newID := primitive.NewObjectID()
|
||||
record := models.RestoreRequest{
|
||||
ID: newID,
|
||||
UserID: userID,
|
||||
Email: email,
|
||||
Sign: key,
|
||||
SignUrl: url,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
|
||||
_, err := r.mdb.InsertOne(ctx, record)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return newID.Hex(), nil
|
||||
}
|
||||
|
||||
// добавляем в очередь данные для отправки на почту в редис
|
||||
func (r *codewordRepository) InsertToQueue(ctx context.Context, userID string, email string, key []byte, id string) error {
|
||||
task := models.RecoveryRecord{
|
||||
ID: id,
|
||||
UserID: userID,
|
||||
Email: email,
|
||||
Key: key,
|
||||
}
|
||||
|
||||
taskBytes, err := json.Marshal(task)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := r.rdb.LPush(ctx, "recoveryQueue", taskBytes).Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// получаем данные юзера по подписи
|
||||
func (r *codewordRepository) GetRecoveryRecord(ctx context.Context, key string) (*models.RestoreRequest, error) {
|
||||
var restoreRequest models.RestoreRequest
|
||||
|
||||
filter := bson.M{"sign": key}
|
||||
|
||||
err := r.mdb.FindOne(ctx, filter).Decode(&restoreRequest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &restoreRequest, nil
|
||||
}
|
||||
|
||||
// пингует в монгу чтобы проверить подключение
|
||||
func (r *codewordRepository) Ping(ctx context.Context) error {
|
||||
return r.mdb.Database().Client().Ping(ctx, readpref.Primary())
|
||||
}
|
@ -3,13 +3,9 @@ package repository
|
||||
import (
|
||||
"codeword/internal/models"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/readpref"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Deps struct {
|
||||
@ -17,10 +13,8 @@ type Deps struct {
|
||||
Rdb *redis.Client
|
||||
}
|
||||
|
||||
type codewordRepository struct {
|
||||
mdb *mongo.Collection
|
||||
rdb *redis.Client
|
||||
}
|
||||
// todo: возможно стоит разделить два репозитория в одном файле на два файла чтобы не было путаницы,
|
||||
// а deps структуру оставить одну дабы избежать путаницу
|
||||
|
||||
type userRepository struct {
|
||||
mdb *mongo.Collection
|
||||
@ -31,11 +25,7 @@ func NewUserRepository(deps Deps) *userRepository {
|
||||
return &userRepository{mdb: deps.Mdb}
|
||||
}
|
||||
|
||||
func NewCodewordRepository(deps Deps) *codewordRepository {
|
||||
|
||||
return &codewordRepository{mdb: deps.Mdb, rdb: deps.Rdb}
|
||||
}
|
||||
|
||||
// ищем пользователя по мейлу в коллекции users
|
||||
func (r *userRepository) FindByEmail(ctx context.Context, email string) (*models.User, error) {
|
||||
var user models.User
|
||||
|
||||
@ -48,59 +38,3 @@ func (r *userRepository) FindByEmail(ctx context.Context, email string) (*models
|
||||
}
|
||||
return &user, nil
|
||||
}
|
||||
|
||||
func (r *codewordRepository) StoreRecoveryRecord(ctx context.Context, userID, email, key, url string) (string, error) {
|
||||
newID := primitive.NewObjectID()
|
||||
record := models.RestoreRequest{
|
||||
ID: newID,
|
||||
UserID: userID,
|
||||
Email: email,
|
||||
Sign: key,
|
||||
SignUrl: url,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
|
||||
_, err := r.mdb.InsertOne(ctx, record)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return newID.Hex(), nil
|
||||
}
|
||||
|
||||
func (r *codewordRepository) InsertToQueue(ctx context.Context, userID string, email string, key []byte, id string) error {
|
||||
task := models.RecoveryRecord{
|
||||
ID: id,
|
||||
UserID: userID,
|
||||
Email: email,
|
||||
Key: key,
|
||||
}
|
||||
|
||||
taskBytes, err := json.Marshal(task)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := r.rdb.LPush(ctx, "recoveryQueue", taskBytes).Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *codewordRepository) GetRecoveryRecord(ctx context.Context, key string) (*models.RestoreRequest, error) {
|
||||
var restoreRequest models.RestoreRequest
|
||||
|
||||
filter := bson.M{"sign": key}
|
||||
|
||||
err := r.mdb.FindOne(ctx, filter).Decode(&restoreRequest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &restoreRequest, nil
|
||||
}
|
||||
|
||||
func (r *codewordRepository) Ping(ctx context.Context) error {
|
||||
return r.mdb.Database().Client().Ping(ctx, readpref.Primary())
|
||||
}
|
||||
|
@ -56,6 +56,7 @@ func (s *RecoveryService) GenerateKey() ([]byte, error) {
|
||||
return key, nil
|
||||
}
|
||||
|
||||
// вызывает пингование в бд
|
||||
func (s *RecoveryService) Ping(ctx context.Context) error {
|
||||
err := s.repositoryCodeword.Ping(ctx)
|
||||
if err != nil {
|
||||
@ -89,7 +90,7 @@ func (s *RecoveryService) StoreRecoveryRecord(ctx context.Context, userID, email
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// SendRecoveryEmail посылает письмо для восстановления доступа пользователю
|
||||
// RecoveryEmailTask посылает письмо для восстановления доступа пользователю
|
||||
func (s *RecoveryService) RecoveryEmailTask(ctx context.Context, userID string, email string, key []byte, id string) error {
|
||||
err := s.repositoryCodeword.InsertToQueue(ctx, userID, email, key, id)
|
||||
if err != nil {
|
||||
@ -107,6 +108,7 @@ func (s *RecoveryService) GetRecoveryRecord(ctx context.Context, key string) (*m
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// сомнительный вариант но как я думаю верный, что false==err
|
||||
result, err := s.encrypt.VerifySignature(byteKey)
|
||||
if err != nil || result == false {
|
||||
s.logger.Error("Failed to verify signature", zap.String("signature", key), zap.Error(err))
|
||||
@ -121,6 +123,7 @@ func (s *RecoveryService) GetRecoveryRecord(ctx context.Context, key string) (*m
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// меняет подпись на токены идя в auth сервис
|
||||
func (s *RecoveryService) ExchangeForTokens(userID string, signature string) (map[string]string, error) {
|
||||
tokens, err := s.authClient.RefreshAuthToken(userID, signature)
|
||||
if err != nil {
|
||||
|
@ -53,6 +53,7 @@ func (receiver *Encrypt) VerifySignature(signature []byte) (isValid bool, err er
|
||||
return ed25519.Verify(publicKey, []byte(receiver.signSecret), signature), nil
|
||||
}
|
||||
|
||||
// TODO подумать над тем чтобы подпись генерилась каждый раз разгая
|
||||
func (receiver *Encrypt) SignCommonSecret() (signature []byte, err error) {
|
||||
defer func() {
|
||||
if recovered := recover(); recovered != nil {
|
||||
|
56
internal/worker/purge_worker/purge_worker.go
Normal file
56
internal/worker/purge_worker/purge_worker.go
Normal file
@ -0,0 +1,56 @@
|
||||
package purge_worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Deps struct {
|
||||
Logger *zap.Logger
|
||||
Mongo *mongo.Collection
|
||||
}
|
||||
|
||||
type purgeWorker struct {
|
||||
logger *zap.Logger
|
||||
mongo *mongo.Collection
|
||||
}
|
||||
|
||||
func NewRecoveryWC(deps Deps) *purgeWorker {
|
||||
return &purgeWorker{
|
||||
logger: deps.Logger,
|
||||
mongo: deps.Mongo,
|
||||
}
|
||||
}
|
||||
|
||||
func (wc *purgeWorker) Start(ctx context.Context) {
|
||||
ticker := time.NewTicker(1 * time.Hour)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
wc.processTasks(ctx)
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (wc *purgeWorker) processTasks(ctx context.Context) {
|
||||
wc.logger.Info("Checking cleaning records")
|
||||
|
||||
oneHourAgo := time.Now().Add(-1 * time.Hour)
|
||||
|
||||
filter := bson.M{"created_at": bson.M{"$lt": oneHourAgo}}
|
||||
|
||||
result, err := wc.mongo.DeleteMany(ctx, filter)
|
||||
if err != nil {
|
||||
wc.logger.Error("Error when trying to delete old entries", zap.Error(err))
|
||||
} else {
|
||||
wc.logger.Info("Deleted documents", zap.Int64("count", result.DeletedCount))
|
||||
}
|
||||
}
|
@ -20,15 +20,15 @@ type Deps struct {
|
||||
Mongo *mongo.Collection
|
||||
}
|
||||
|
||||
type recoveryWorker struct {
|
||||
type RecoveryWorker struct {
|
||||
logger *zap.Logger
|
||||
redis *redis.Client
|
||||
emailSender *client.RecoveryEmailSender
|
||||
mongo *mongo.Collection
|
||||
}
|
||||
|
||||
func NewRecoveryWC(deps Deps) *recoveryWorker {
|
||||
return &recoveryWorker{
|
||||
func NewRecoveryWC(deps Deps) *RecoveryWorker {
|
||||
return &RecoveryWorker{
|
||||
logger: deps.Logger,
|
||||
redis: deps.Redis,
|
||||
emailSender: deps.EmailSender,
|
||||
@ -36,7 +36,7 @@ func NewRecoveryWC(deps Deps) *recoveryWorker {
|
||||
}
|
||||
}
|
||||
|
||||
func (wc *recoveryWorker) Start(ctx context.Context) {
|
||||
func (wc *RecoveryWorker) Start(ctx context.Context) {
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
@ -51,7 +51,7 @@ func (wc *recoveryWorker) Start(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (wc *recoveryWorker) processTasks(ctx context.Context) {
|
||||
func (wc *RecoveryWorker) processTasks(ctx context.Context) {
|
||||
result, err := wc.redis.BRPop(ctx, 1*time.Second, "recoveryQueue").Result()
|
||||
if err != nil {
|
||||
if err != redis.Nil {
|
||||
@ -78,7 +78,7 @@ func (wc *recoveryWorker) processTasks(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (wc *recoveryWorker) sendRecoveryTask(ctx context.Context, task models.RecoveryRecord) error {
|
||||
func (wc *RecoveryWorker) sendRecoveryTask(ctx context.Context, task models.RecoveryRecord) error {
|
||||
err := wc.emailSender.SendRecoveryEmail(task.Email, task.Key)
|
||||
if err != nil {
|
||||
wc.logger.Error("Failed to send recovery email", zap.Error(err))
|
||||
|
Loading…
Reference in New Issue
Block a user