change logic for sendig recovery to mail
This commit is contained in:
parent
13147de3d2
commit
19af554a16
@ -2,8 +2,10 @@ package recovery
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"codeword/internal/models"
|
"codeword/internal/models"
|
||||||
|
"codeword/internal/repository"
|
||||||
"codeword/internal/services"
|
"codeword/internal/services"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/gofiber/fiber/v2"
|
"github.com/gofiber/fiber/v2"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@ -80,6 +82,11 @@ func (r *RecoveryController) HandleRecoveryRequest(c *fiber.Ctx) error {
|
|||||||
err = r.service.RecoveryEmailTask(c.Context(), models.RecEmailDeps{UserID: user.ID.Hex(), Email: req.Email, SignWithID: signWithID, ID: id})
|
err = r.service.RecoveryEmailTask(c.Context(), models.RecEmailDeps{UserID: user.ID.Hex(), Email: req.Email, SignWithID: signWithID, ID: id})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.logger.Error("Failed to send recovery email", zap.Error(err))
|
r.logger.Error("Failed to send recovery email", zap.Error(err))
|
||||||
|
|
||||||
|
if errors.Is(err, repository.ErrAlreadyReported) {
|
||||||
|
return c.Status(fiber.StatusAlreadyReported).JSON(fiber.Map{"error": "already reported"})
|
||||||
|
}
|
||||||
|
|
||||||
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Internal Server Error"})
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Internal Server Error"})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -46,6 +46,17 @@ func (r *CodewordRepository) StoreRecoveryRecord(ctx context.Context, deps model
|
|||||||
|
|
||||||
// добавляем в очередь данные для отправки на почту в редис
|
// добавляем в очередь данные для отправки на почту в редис
|
||||||
func (r *CodewordRepository) InsertToQueue(ctx context.Context, deps models.RecEmailDeps) error {
|
func (r *CodewordRepository) InsertToQueue(ctx context.Context, deps models.RecEmailDeps) error {
|
||||||
|
sendLockKey := "email:sendLock:" + deps.Email
|
||||||
|
ttl := 5 * time.Minute
|
||||||
|
|
||||||
|
lockSuccess, err := r.rdb.SetNX(ctx, sendLockKey, "1", ttl).Result()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !lockSuccess {
|
||||||
|
return ErrAlreadyReported
|
||||||
|
}
|
||||||
|
|
||||||
task := models.RecoveryRecord{
|
task := models.RecoveryRecord{
|
||||||
ID: deps.ID,
|
ID: deps.ID,
|
||||||
UserID: deps.UserID,
|
UserID: deps.UserID,
|
||||||
@ -58,11 +69,7 @@ func (r *CodewordRepository) InsertToQueue(ctx context.Context, deps models.RecE
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := r.rdb.LPush(ctx, "recoveryQueue", taskBytes).Err(); err != nil {
|
return r.rdb.Set(ctx, "email:task:"+deps.Email, taskBytes, ttl).Err()
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// получаем данные юзера по подписи
|
// получаем данные юзера по подписи
|
||||||
|
|||||||
10
internal/repository/errors.go
Normal file
10
internal/repository/errors.go
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
package repository
|
||||||
|
|
||||||
|
import "errors"
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrPromoUserNotFound = errors.New("user not found")
|
||||||
|
ErrAlreadyReported = errors.New("already reported")
|
||||||
|
ErrDuplicateCodeword = errors.New("duplicate codeword")
|
||||||
|
ErrPromoCodeNotFound = errors.New("promo code not found")
|
||||||
|
)
|
||||||
@ -3,7 +3,6 @@ package repository
|
|||||||
import (
|
import (
|
||||||
"codeword/internal/models"
|
"codeword/internal/models"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
@ -11,11 +10,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
ErrDuplicateCodeword = errors.New("duplicate codeword")
|
|
||||||
ErrPromoCodeNotFound = errors.New("promo code not found")
|
|
||||||
)
|
|
||||||
|
|
||||||
// структура для горутины чтобы ошибки не пропускать
|
// структура для горутины чтобы ошибки не пропускать
|
||||||
type countResult struct {
|
type countResult struct {
|
||||||
count int64
|
count int64
|
||||||
|
|||||||
@ -3,7 +3,6 @@ package repository
|
|||||||
import (
|
import (
|
||||||
"codeword/internal/models"
|
"codeword/internal/models"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"github.com/go-redis/redis/v8"
|
"github.com/go-redis/redis/v8"
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
@ -18,8 +17,6 @@ type UserRepository struct {
|
|||||||
mdb *mongo.Collection
|
mdb *mongo.Collection
|
||||||
}
|
}
|
||||||
|
|
||||||
var ErrPromoUserNotFound = errors.New("user not found")
|
|
||||||
|
|
||||||
func NewUserRepository(deps Deps) *UserRepository {
|
func NewUserRepository(deps Deps) *UserRepository {
|
||||||
|
|
||||||
return &UserRepository{mdb: deps.Mdb}
|
return &UserRepository{mdb: deps.Mdb}
|
||||||
|
|||||||
@ -37,14 +37,13 @@ func NewRecoveryWC(deps Deps) *RecoveryWorker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wc *RecoveryWorker) Start(ctx context.Context) {
|
func (wc *RecoveryWorker) Start(ctx context.Context) {
|
||||||
ticker := time.NewTicker(1 * time.Second)
|
ticker := time.NewTicker(10 * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
wc.processTasks(ctx)
|
wc.processTasks(ctx)
|
||||||
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -52,29 +51,39 @@ func (wc *RecoveryWorker) Start(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wc *RecoveryWorker) processTasks(ctx context.Context) {
|
func (wc *RecoveryWorker) processTasks(ctx context.Context) {
|
||||||
result, err := wc.redis.BRPop(ctx, 1*time.Second, "recoveryQueue").Result()
|
var cursor uint64
|
||||||
if err != nil {
|
for {
|
||||||
if err != redis.Nil {
|
var keys []string
|
||||||
wc.logger.Error("Failed to BRPop from the recovery queue", zap.Error(err))
|
var err error
|
||||||
|
keys, cursor, err = wc.redis.Scan(ctx, cursor, "email:task:*", 0).Result()
|
||||||
|
if err != nil {
|
||||||
|
wc.logger.Error("Failed to scan for email tasks", zap.Error(err))
|
||||||
|
break
|
||||||
}
|
}
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(result) < 2 {
|
for _, key := range keys {
|
||||||
wc.logger.Error("Received unexpected number of elements from BRPop", zap.Strings("result", result))
|
taskBytes, err := wc.redis.GetDel(ctx, key).Result()
|
||||||
return
|
if err == redis.Nil {
|
||||||
}
|
continue
|
||||||
|
} else if err != nil {
|
||||||
|
wc.logger.Error("Failed to getdel recovery task", zap.String("key", key), zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
var task models.RecoveryRecord
|
var task models.RecoveryRecord
|
||||||
if err = json.Unmarshal([]byte(result[1]), &task); err != nil {
|
if json.Unmarshal([]byte(taskBytes), &task) != nil {
|
||||||
wc.logger.Error("Failed to unmarshal recovery task", zap.String("key", result[0]), zap.Error(err))
|
wc.logger.Error("Failed to unmarshal recovery task", zap.String("key", key), zap.String("task", taskBytes))
|
||||||
return
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err = wc.sendRecoveryTask(ctx, task)
|
err = wc.sendRecoveryTask(ctx, task)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
wc.logger.Error("Failed to send recovery task", zap.String("key", result[0]), zap.Error(err))
|
wc.logger.Error("Failed to send recovery task", zap.String("key", key), zap.Error(err))
|
||||||
return
|
}
|
||||||
|
}
|
||||||
|
if cursor == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user