package app import ( "amocrm/internal/controllers" "amocrm/internal/service" "amocrm/internal/tools" "context" "errors" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/brokers" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/initialize" http "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/server" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/pkg/closer" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal" "time" "go.uber.org/zap" ) func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) error { defer func() { if r := recover(); r != nil { logger.Error("Recovered from a panic", zap.Any("error", r)) } }() logger.Info("App started", zap.Any("config", config)) ctx, cancel := context.WithCancel(ctx) defer cancel() shutdownGroup := closer.NewCloserGroup() //redisClient, err := initialize.Redis(ctx, config) //if err != nil { // logger.Error("error init redis client", zap.Error(err)) // return err //} kafka, err := initialize.KafkaConsumerInit(ctx, config) if err != nil { logger.Error("error init kafka consumer", zap.Error(err)) return err } producer := brokers.NewProducer(brokers.ProducerDeps{ KafkaClient: kafka, Logger: logger, }) bitrixRepo, err := dal.NewBitrixDal(ctx, config.PostgresCredentials) if err != nil { logger.Error("error init bitrix repo in common repo", zap.Error(err)) return err } //socialAithClient := pena_social_auth.NewClient(pena_social_auth.Deps{ // PenaSocialAuthURL: config.PenaSocialAuthURL, // Logger: logger, // ReturnURL: config.ReturnURL, //}) // //rateLimiter := limiter.NewRateLimiter(ctx, 6, 1500*time.Millisecond) // //amoClient := amoClient.NewAmoClient(amoClient.AmoDeps{ // Logger: logger, // RedirectionURL: config.ReturnURL, // IntegrationID: config.IntegrationID, // IntegrationSecret: config.IntegrationSecret, // RateLimiter: rateLimiter, //}) //redisRepo := repository.NewRepository(repository.Deps{ // RedisClient: redisClient, // Logger: logger, //}) svc := service.NewService(service.Deps{ Repository: amoRepo, Logger: logger, SocialAuthClient: socialAithClient, AmoClient: amoClient, Producer: producer, }) cntrlDeps := controllers.Deps{ Service: svc, Logger: logger, Verify: tools.NewVerify(config.IntegrationSecret, config.IntegrationID), RedirectURL: config.RedirectURL, } controller := controllers.NewController(cntrlDeps) webhookController := controllers.NewWebhookController(cntrlDeps) //workerMethods := workers_methods.NewWorkersMethods(workers_methods.Deps{ // Repo: amoRepo, // AmoClient: amoClient, // Logger: logger, //}) //dataUpdater := data_updater.NewDataUpdaterWC(data_updater.Deps{ // Logger: logger, // Producer: producer, //}) // //queUpdater := queueUpdater.NewQueueUpdater(queueUpdater.Deps{ // Logger: logger, // KafkaClient: kafka, // Methods: workerMethods, //}) // //dealsPoster := post_deals_worker.NewPostDealsWC(post_deals_worker.Deps{ // AmoRepo: amoRepo, // AmoClient: amoClient, // RedisRepo: redisRepo, // Logger: logger, //}) // //fieldsPoster := post_fields_worker.NewPostFieldsWC(post_fields_worker.Deps{ // AmoRepo: amoRepo, // AmoClient: amoClient, // RedisRepo: redisRepo, // Logger: logger, //}) // //go dataUpdater.Start(ctx) //go queUpdater.Start(ctx) //go dealsPoster.Start(ctx) //go fieldsPoster.Start(ctx) server := http.NewServer(http.ServerConfig{ Controllers: []http.Controller{ controller, webhookController, }, }) go func() { if err := server.Start(config.HTTPHost + ":" + config.HTTPPort); err != nil { logger.Error("Server startup error", zap.Error(err)) cancel() } }() server.ListRoutes() shutdownGroup.Add(closer.CloserFunc(server.Shutdown)) shutdownGroup.Add(closer.CloserFunc(amoRepo.Close)) shutdownGroup.Add(closer.CloserFunc(redisRepo.Close)) shutdownGroup.Add(closer.CloserFunc(rateLimiter.Stop)) shutdownGroup.Add(closer.CloserFunc(dataUpdater.Stop)) shutdownGroup.Add(closer.CloserFunc(queUpdater.Stop)) shutdownGroup.Add(closer.CloserFunc(dealsPoster.Stop)) shutdownGroup.Add(closer.CloserFunc(fieldsPoster.Stop)) <-ctx.Done() timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 10*time.Second) defer timeoutCancel() if err := shutdownGroup.Call(timeoutCtx); err != nil { if errors.Is(err, context.DeadlineExceeded) { logger.Error("Shutdown timed out", zap.Error(err)) } else { logger.Error("Failed to shutdown services gracefully", zap.Error(err)) } return err } logger.Info("Application has stopped") return nil }