package app import ( "context" "errors" "fmt" "os/signal" "syscall" "time" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/zap" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/initialize" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/swagger" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/server" "penahub.gitlab.yandexcloud.net/pena-services/customer/internal/utils" "penahub.gitlab.yandexcloud.net/pena-services/customer/pkg/closer" "penahub.gitlab.yandexcloud.net/pena-services/customer/pkg/kafka" "penahub.gitlab.yandexcloud.net/pena-services/customer/pkg/mongo" ) const ( shutdownTimeout = 5 * time.Second ) func Run(config *models.Config, logger *zap.Logger) (appErr error) { defer func() { if recovered := recover(); recovered != nil { appErr = errors.New("recovered panic on application run") logger.Error("recovered panic on application run", zap.Any("recovered", recovered)) } }() closer := closer.New() ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() if err := kafka.Initialize(ctx, config.Service.Kafka.Brokers, []string{ config.Service.Kafka.Tariff.Topic, }); err != nil { return fmt.Errorf("failed initialize kafka: %w", err) } mongoDB, err := mongo.Connect(ctx, &mongo.ConnectDeps{ Configuration: &config.Database, Timeout: 10 * time.Second, }) if err != nil { return fmt.Errorf("failed connection to db: %w", err) } kafkaTariffClient, err := kgo.NewClient( kgo.SeedBrokers(config.Service.Kafka.Brokers...), kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), kgo.DefaultProduceTopic(config.Service.Kafka.Tariff.Topic), kgo.ConsumeTopics(config.Service.Kafka.Tariff.Topic), ) if err != nil { return err } brokers := initialize.NewBrokers(initialize.BrokersDeps{ Logger: logger, TariffClient: kafkaTariffClient, }) clients := initialize.NewClients(initialize.ClientsDeps{ Logger: logger, AuthURL: &config.Service.AuthMicroservice.URL, HubadminURL: &config.Service.HubadminMicroservice.URL, CurrencyURL: &config.Service.CurrencyMicroservice.URL, DiscountServiceConfiguration: &config.Service.DiscountMicroservice, PaymentServiceConfiguration: &config.Service.PaymentMicroservice, }) repositories := initialize.NewRepositories(initialize.RepositoriesDeps{ Logger: logger, MongoDB: mongoDB, }) services := initialize.NewServices(initialize.ServicesDeps{ Logger: logger, Repositories: repositories, Clients: clients, ConfigurationGRPC: &config.GRPC, Brokers: brokers, }) controllers := initialize.NewControllers(initialize.ControllersDeps{ Logger: logger, Services: services, }) openapi, err := swagger.GetSwagger() if err != nil { return fmt.Errorf("failed to loading openapi spec: %w", err) } api := swagger.NewAPI2(logger, mongoDB, config, brokers.TariffConsumer, brokers.TariffProducer) serverHTTP, httpErr := server.NewHTTP(server.DepsHTTP{ Logger: logger, Swagger: openapi, AuthenticationFunc: utils.NewAuthenticator(utils.NewJWT(&config.Service.JWT)), }) if httpErr != nil { return httpErr.Wrap("failed to init http server") } serverGRPC, grpcErr := server.NewGRPC(server.DepsGRPC{Logger: logger}) if grpcErr != nil { return httpErr.Wrap("failed to init grpc server") } serverHTTP.Register(&api) serverGRPC.Register(controllers) go serverHTTP.Run(&config.HTTP) go serverGRPC.Run(&config.GRPC) closer.Add(mongoDB.Client().Disconnect) closer.Add(serverHTTP.Stop) closer.Add(serverGRPC.Stop) closer.Add(closer.Wrap(kafkaTariffClient.Close)) <-ctx.Done() logger.Info("shutting down app gracefully") shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) defer cancel() if err := closer.Close(shutdownCtx); err != nil { return fmt.Errorf("closer: %w", err) } return nil }