added new cmd refresh tokens

This commit is contained in:
Pasha 2024-12-02 23:41:46 +03:00 committed by skeris
parent 0e13df1f83
commit 54b558636c
4 changed files with 190 additions and 80 deletions

94
cmd/tokens/main.go Normal file

@ -0,0 +1,94 @@
package main
import (
"amocrm/cmd/tokens/refresh_wc"
"amocrm/internal/initialize"
"amocrm/internal/workers/limiter"
"amocrm/pkg/amoClient"
"amocrm/pkg/closer"
"context"
"errors"
"fmt"
"go.uber.org/zap"
"os"
"os/signal"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
"syscall"
"time"
)
func main() {
logger, err := zap.NewProduction()
if err != nil {
fmt.Printf("Failed to initialize logger: %v\n", err)
os.Exit(1)
}
logger = logger.Named("REFRESH_WC")
config, err := initialize.LoadConfig()
if err != nil {
logger.Fatal("Failed to load config", zap.Error(err))
}
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
if err = run(ctx, logger, *config); err != nil {
logger.Fatal("App exited with error", zap.Error(err))
}
}
func run(ctx context.Context, logger *zap.Logger, cfg initialize.Config) error {
defer func() {
if r := recover(); r != nil {
logger.Error("Recovered in app from a panic", zap.Any("error", r))
}
}()
shutdownGroup := closer.NewCloserGroup()
amoRepo, err := dal.NewAmoDal(ctx, cfg.PostgresCredentials)
if err != nil {
logger.Error("error init amo repo", zap.Error(err))
return err
}
rateLimiter := limiter.NewRateLimiter(ctx, 6, 1500*time.Millisecond)
amoCl := amoClient.NewAmoClient(amoClient.AmoDeps{
Logger: logger,
RedirectionURL: cfg.ReturnURL,
IntegrationID: cfg.IntegrationID,
IntegrationSecret: cfg.IntegrationSecret,
RateLimiter: rateLimiter,
})
refreshWC := refresh_wc.NewRefreshWC(refresh_wc.Deps{
Logger: logger,
AmoClient: amoCl,
Repo: amoRepo,
})
go refreshWC.Start(ctx)
shutdownGroup.Add(closer.CloserFunc(amoRepo.Close))
shutdownGroup.Add(closer.CloserFunc(rateLimiter.Stop))
shutdownGroup.Add(closer.CloserFunc(refreshWC.Stop))
<-ctx.Done()
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer timeoutCancel()
if err := shutdownGroup.Call(timeoutCtx); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
logger.Error("Shutdown timed out", zap.Error(err))
} else {
logger.Error("Failed to shutdown services gracefully", zap.Error(err))
}
return err
}
logger.Info("Application has stopped")
return nil
}

@ -0,0 +1,94 @@
package refresh_wc
import (
"amocrm/internal/models"
"amocrm/pkg/amoClient"
"amocrm/pkg/timer"
"context"
"go.uber.org/zap"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
"time"
)
type Deps struct {
AmoClient *amoClient.Amo
Repo *dal.AmoDal
Logger *zap.Logger
}
type WebHookUpdater struct {
amoClient *amoClient.Amo
repo *dal.AmoDal
logger *zap.Logger
}
func NewRefreshWC(deps Deps) *WebHookUpdater {
return &WebHookUpdater{
amoClient: deps.AmoClient,
repo: deps.Repo,
logger: deps.Logger,
}
}
func (wc *WebHookUpdater) Start(ctx context.Context) {
nextStart := timer.CalculateTime(3)
ticker := time.NewTicker(time.Nanosecond * time.Duration(nextStart))
defer ticker.Stop()
for {
select {
case <-ticker.C:
wc.processTasks(ctx)
nextStart = timer.CalculateTime(3)
ticker.Reset(time.Nanosecond * time.Duration(nextStart))
case <-ctx.Done():
return
}
}
}
// todo если так нормально будет, то предлагаю батчами сделать, батч на 1000 к примеру,
// делим количество всего токенов на размер батча чтобы было без остатка и строим вейт группу вокруг этого
func (wc *WebHookUpdater) processTasks(ctx context.Context) {
currentTokens, err := wc.repo.AmoRepo.GetAllTokens(ctx)
if err != nil {
wc.logger.Error("Failed to get all tokens", zap.Error(err))
return
}
for _, token := range currentTokens {
user, err := wc.repo.AmoRepo.GetCurrentAccount(ctx, token.AccountID)
if err != nil {
wc.logger.Error("error getting account by id", zap.Error(err))
continue
}
req := models.UpdateWebHookReq{
GrantType: "refresh_token",
RefreshToken: token.RefreshToken,
}
resp, err := wc.amoClient.CreateWebHook(&req, user.Subdomain)
if err != nil {
wc.logger.Error("error create webhook", zap.Error(err))
continue
}
err = wc.repo.AmoRepo.WebhookUpdate(ctx, model.Token{
AccountID: token.AccountID,
RefreshToken: resp.RefreshToken,
AccessToken: resp.AccessToken,
Expiration: time.Now().Unix() + resp.ExpiresIn,
CreatedAt: time.Now().Unix(),
})
if err != nil {
wc.logger.Error("error update token in db", zap.Error(err))
return
}
}
}
func (wc *WebHookUpdater) Stop(_ context.Context) error {
return nil
}

@ -1,78 +0,0 @@
package tokens
import (
"amocrm/pkg/amoClient"
"context"
"go.uber.org/zap"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
"time"
)
type Deps struct {
AmoClient *amoClient.Amo
Repo *dal.AmoDal
Logger *zap.Logger
}
type Token struct {
amoClient *amoClient.Amo
repo *dal.AmoDal
logger *zap.Logger
}
func NewRefreshWC(deps Deps) *Token {
return &Token{
amoClient: deps.AmoClient,
repo: deps.Repo,
logger: deps.Logger,
}
}
func (wc *Token) Start(ctx context.Context) {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
wc.processTasks(ctx)
case <-ctx.Done():
return
}
}
}
func (wc *Token) processTasks(ctx context.Context) {
//tokens, err := wc.repo.AmoRepo.CheckExpired(ctx)
//if err != nil {
// wc.logger.Error("error fetch expired tokens in mongo", zap.Error(err))
// return
//}
//for _, token := range tokens {
// req := models.UpdateWebHookReq{
// GrantType: "refresh_token",
// RefreshToken: token.RefreshToken,
// }
// newTokens, err := wc.amoClient.CreateWebHook(&req)
// if err != nil {
// wc.logger.Error("error create webhook for update tokens", zap.Error(err))
// continue
// }
// err = wc.repo.AmoRepo.WebhookUpdate(ctx, model.Token{
// AccountID: token.AccountID,
// RefreshToken: newTokens.RefreshToken,
// AccessToken: newTokens.AccessToken,
// Expiration: time.Now().Unix() + newTokens.ExpiresIn,
// CreatedAt: time.Now().Unix(),
// })
// if err != nil {
// wc.logger.Error("error update new tokens in mongo", zap.Error(err))
// continue
// }
//}
}
func (wc *Token) Stop(_ context.Context) error {
return nil
}

@ -4,10 +4,10 @@ import (
"time"
)
func calculateTime() int64 {
func CalculateTime(hour int) int64 {
now := time.Now()
targetTime := time.Date(now.Year(), now.Month(), now.Day(), 4, 0, 0, 0, now.Location())
targetTime := time.Date(now.Year(), now.Month(), now.Day(), hour, 0, 0, 0, now.Location())
if now.After(targetTime) {
targetTime = targetTime.AddDate(0, 0, 1)
}