package app import ( "context" "errors" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/brokers" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/controllers" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/initialize" http "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/server" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/service" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/workers/data_updater" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/workers/limiter" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/workers/post_deals_worker" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/workers/queueUpdater" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/internal/workers_methods" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/pkg/bitrixClient" "penahub.gitlab.yandexcloud.net/backend/quiz/bitrix/pkg/closer" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/utils" "time" "go.uber.org/zap" ) func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) error { defer func() { if r := recover(); r != nil { logger.Error("Recovered from a panic", zap.Any("error", r)) } }() logger.Info("App started", zap.Any("config", config)) ctx, cancel := context.WithCancel(ctx) defer cancel() shutdownGroup := closer.NewCloserGroup() kafka, err := initialize.KafkaConsumerInit(ctx, config) if err != nil { logger.Error("error init kafka consumer", zap.Error(err)) return err } producer := brokers.NewProducer(brokers.ProducerDeps{ KafkaClient: kafka, Logger: logger, }) encrypt := utils.NewEncrypt(config.PublicKey, config.PrivateKey) bitrixRepo, err := dal.NewBitrixDal(ctx, config.PostgresCredentials) if err != nil { logger.Error("error init bitrix repo in common repo", zap.Error(err)) return err } // https://apidocs.bitrix24.ru/limits.html rateLimiter := limiter.NewRateLimiter(ctx, 50, 2*time.Second) bitrixClientApi := bitrixClient.NewBitrixClient(bitrixClient.BitrixDeps{ Logger: logger, RedirectionURL: config.ReturnURL, IntegrationID: config.IntegrationID, IntegrationSecret: config.IntegrationSecret, RateLimiter: rateLimiter, }) svc := service.NewService(service.Deps{ Repository: bitrixRepo, Logger: logger, BitrixClient: bitrixClientApi, Producer: producer, Config: config, Encrypt: encrypt, }) cntrlDeps := controllers.Deps{ Service: svc, Logger: logger, RedirectURL: config.RedirectURL, Encrypt: encrypt, } controller := controllers.NewController(cntrlDeps) webhookController := controllers.NewWebhookController(cntrlDeps) workerMethods := workers_methods.NewWorkersMethods(workers_methods.Deps{ Repo: bitrixRepo, BitrixClient: bitrixClientApi, Logger: logger, }) dataUpdater := data_updater.NewDataUpdaterWC(data_updater.Deps{ Logger: logger, Producer: producer, }) queUpdater := queueUpdater.NewQueueUpdater(queueUpdater.Deps{ Logger: logger, KafkaClient: kafka, Methods: workerMethods, }) dealsPoster := post_deals_worker.NewDealsWC(post_deals_worker.Deps{ BitrixRepo: bitrixRepo, BitrixClient: bitrixClientApi, Logger: logger, }) // //fieldsPoster := post_fields_worker.NewPostFieldsWC(post_fields_worker.Deps{ // AmoRepo: amoRepo, // AmoClient: amoClient, // RedisRepo: redisRepo, // Logger: logger, //}) // go dataUpdater.Start(ctx) go queUpdater.Start(ctx) go dealsPoster.Start(ctx) //go fieldsPoster.Start(ctx) server := http.NewServer(http.ServerConfig{ Controllers: []http.Controller{ controller, webhookController, }, }) go func() { if err := server.Start(config.HTTPHost + ":" + config.HTTPPort); err != nil { logger.Error("Server startup error", zap.Error(err)) cancel() } }() server.ListRoutes() shutdownGroup.Add(closer.CloserFunc(server.Shutdown)) shutdownGroup.Add(closer.CloserFunc(bitrixRepo.Close)) //shutdownGroup.Add(closer.CloserFunc(redisRepo.Close)) shutdownGroup.Add(closer.CloserFunc(rateLimiter.Stop)) shutdownGroup.Add(closer.CloserFunc(dataUpdater.Stop)) shutdownGroup.Add(closer.CloserFunc(queUpdater.Stop)) shutdownGroup.Add(closer.CloserFunc(dealsPoster.Stop)) //shutdownGroup.Add(closer.CloserFunc(fieldsPoster.Stop)) <-ctx.Done() timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 10*time.Second) defer timeoutCancel() if err := shutdownGroup.Call(timeoutCtx); err != nil { if errors.Is(err, context.DeadlineExceeded) { logger.Error("Shutdown timed out", zap.Error(err)) } else { logger.Error("Failed to shutdown services gracefully", zap.Error(err)) } return err } logger.Info("Application has stopped") return nil }