182 lines
4.7 KiB
Go
182 lines
4.7 KiB
Go
package app
|
|
|
|
import (
|
|
"amocrm/internal/brokers"
|
|
"amocrm/internal/controllers"
|
|
"amocrm/internal/initialize"
|
|
"amocrm/internal/repository"
|
|
"amocrm/internal/server/http"
|
|
"amocrm/internal/service"
|
|
"amocrm/internal/tools"
|
|
"amocrm/internal/workers/data_updater"
|
|
"amocrm/internal/workers/limiter"
|
|
"amocrm/internal/workers/post_deals_worker"
|
|
"amocrm/internal/workers/post_fields_worker"
|
|
"amocrm/internal/workers/queueUpdater"
|
|
"amocrm/internal/workers_methods"
|
|
"amocrm/pkg/amoClient"
|
|
"amocrm/pkg/closer"
|
|
pena_social_auth "amocrm/pkg/pena-social-auth"
|
|
"context"
|
|
"errors"
|
|
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) error {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
logger.Error("Recovered from a panic", zap.Any("error", r))
|
|
}
|
|
}()
|
|
|
|
logger.Info("App started", zap.Any("config", config))
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
shutdownGroup := closer.NewCloserGroup()
|
|
|
|
redisClient, err := initialize.Redis(ctx, config)
|
|
if err != nil {
|
|
logger.Error("error init redis client", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
kafka, err := initialize.KafkaConsumerInit(ctx, config)
|
|
if err != nil {
|
|
logger.Error("error init kafka consumer", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
producer := brokers.NewProducer(brokers.ProducerDeps{
|
|
KafkaClient: kafka,
|
|
Logger: logger,
|
|
})
|
|
|
|
amoRepo, err := dal.NewAmoDal(ctx, config.PostgresURL)
|
|
if err != nil {
|
|
logger.Error("error init amo repo in common repo", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
socialAithClient := pena_social_auth.NewClient(pena_social_auth.Deps{
|
|
PenaSocialAuthURL: config.PenaSocialAuthMicroserviceURL,
|
|
Logger: logger,
|
|
ReturnURL: config.OauthReturnURL,
|
|
})
|
|
|
|
rateLimiter := limiter.NewRateLimiter(ctx, 6, 1500*time.Millisecond)
|
|
|
|
amoClient := amoClient.NewAmoClient(amoClient.AmoDeps{
|
|
Logger: logger,
|
|
RedirectionURL: config.OauthReturnURL,
|
|
IntegrationID: config.AmoIntegrationID,
|
|
IntegrationSecret: config.AmoIntegrationSecret,
|
|
RateLimiter: rateLimiter,
|
|
})
|
|
|
|
redisRepo := repository.NewRepository(repository.Deps{
|
|
RedisClient: redisClient,
|
|
Logger: logger,
|
|
})
|
|
|
|
svc := service.NewService(service.Deps{
|
|
Repository: amoRepo,
|
|
Logger: logger,
|
|
SocialAuthClient: socialAithClient,
|
|
AmoClient: amoClient,
|
|
Producer: producer,
|
|
})
|
|
|
|
cntrlDeps := controllers.Deps{
|
|
Service: svc,
|
|
Logger: logger,
|
|
Verify: tools.NewVerify(config.AmoIntegrationSecret, config.AmoIntegrationID),
|
|
RedirectURL: config.QuizIntegrationsRedirectURL,
|
|
}
|
|
|
|
controller := controllers.NewController(cntrlDeps)
|
|
|
|
webhookController := controllers.NewWebhookController(cntrlDeps)
|
|
|
|
workerMethods := workers_methods.NewWorkersMethods(workers_methods.Deps{
|
|
Repo: amoRepo,
|
|
AmoClient: amoClient,
|
|
Logger: logger,
|
|
})
|
|
|
|
dataUpdater := data_updater.NewDataUpdaterWC(data_updater.Deps{
|
|
Logger: logger,
|
|
Producer: producer,
|
|
})
|
|
|
|
queUpdater := queueUpdater.NewQueueUpdater(queueUpdater.Deps{
|
|
Logger: logger,
|
|
KafkaClient: kafka,
|
|
Methods: workerMethods,
|
|
})
|
|
|
|
dealsPoster := post_deals_worker.NewPostDealsWC(post_deals_worker.Deps{
|
|
AmoRepo: amoRepo,
|
|
AmoClient: amoClient,
|
|
RedisRepo: redisRepo,
|
|
Logger: logger,
|
|
})
|
|
|
|
fieldsPoster := post_fields_worker.NewPostFieldsWC(post_fields_worker.Deps{
|
|
AmoRepo: amoRepo,
|
|
AmoClient: amoClient,
|
|
RedisRepo: redisRepo,
|
|
Logger: logger,
|
|
})
|
|
|
|
go dataUpdater.Start(ctx)
|
|
go queUpdater.Start(ctx)
|
|
go dealsPoster.Start(ctx)
|
|
go fieldsPoster.Start(ctx)
|
|
|
|
server := http.NewServer(http.ServerConfig{
|
|
Controllers: []http.Controller{
|
|
controller,
|
|
webhookController,
|
|
},
|
|
})
|
|
|
|
go func() {
|
|
if err := server.Start(config.ClientHttpURL); err != nil {
|
|
logger.Error("Server startup error", zap.Error(err))
|
|
cancel()
|
|
}
|
|
}()
|
|
|
|
server.ListRoutes()
|
|
|
|
shutdownGroup.Add(closer.CloserFunc(server.Shutdown))
|
|
shutdownGroup.Add(closer.CloserFunc(amoRepo.Close))
|
|
shutdownGroup.Add(closer.CloserFunc(redisRepo.Close))
|
|
shutdownGroup.Add(closer.CloserFunc(rateLimiter.Stop))
|
|
shutdownGroup.Add(closer.CloserFunc(dataUpdater.Stop))
|
|
shutdownGroup.Add(closer.CloserFunc(queUpdater.Stop))
|
|
shutdownGroup.Add(closer.CloserFunc(dealsPoster.Stop))
|
|
shutdownGroup.Add(closer.CloserFunc(fieldsPoster.Stop))
|
|
|
|
<-ctx.Done()
|
|
|
|
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer timeoutCancel()
|
|
if err := shutdownGroup.Call(timeoutCtx); err != nil {
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
logger.Error("Shutdown timed out", zap.Error(err))
|
|
} else {
|
|
logger.Error("Failed to shutdown services gracefully", zap.Error(err))
|
|
}
|
|
return err
|
|
}
|
|
|
|
logger.Info("Application has stopped")
|
|
return nil
|
|
}
|