package app import ( "context" "errors" "fmt" "gitea.pena/PenaSide/common/mongo" "gitea.pena/PenaSide/customer/internal/interface/controller/http" "gitea.pena/PenaSide/trashlog/app" "gitea.pena/PenaSide/trashlog/wrappers/zaptrashlog" "github.com/themakers/hlog" "go.uber.org/zap/zapcore" tb "gopkg.in/tucnak/telebot.v2" "os/signal" "syscall" "time" "gitea.pena/PenaSide/customer/internal/initialize" "gitea.pena/PenaSide/customer/internal/models" "gitea.pena/PenaSide/customer/internal/server" "gitea.pena/PenaSide/customer/pkg/closer" "gitea.pena/PenaSide/customer/pkg/kafka" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/zap" ) const ( shutdownTimeout = 5 * time.Second ) type Build struct { Commit string Version string BuildTime int64 } func Run(config *models.Config, logger *zap.Logger, build Build) (appErr error) { defer func() { if recovered := recover(); recovered != nil { appErr = errors.New("recovered panic on application run") logger.Error("recovered panic on application run", zap.Any("recovered", recovered)) } }() closer := closer.New() ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() clickHouseLogger, err := zaptrashlog.NewCore(ctx, zap.InfoLevel, config.TrashLogHost, build.Version, build.Commit, build.BuildTime) if err != nil { panic(err) } loggerForHlog := logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core { return zapcore.NewTee(core, clickHouseLogger) }), zap.AddCallerSkip(2)) loggerHlog := hlog.New(loggerForHlog).Module(models.ModuleLogger) loggerHlog.With(models.AllFields{}) loggerHlog.Emit(app.InfoSvcStarted{}) if err = kafka.Initialize(ctx, config.KafkaBrokers, []string{ config.KafkaTopicTariff, }); err != nil { return fmt.Errorf("failed initialize kafka: %w", err) } mongoDB, err := mongo.Connect(ctx, &mongo.ConnectDeps{ Configuration: &config.ExternalCfg.Database, Timeout: 10 * time.Second, }) if err != nil { return fmt.Errorf("failed connection to db: %w", err) } kafkaTariffClient, err := kgo.NewClient( kgo.SeedBrokers(config.KafkaBrokers...), kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), kgo.DefaultProduceTopic(config.KafkaTopicTariff), kgo.ConsumeTopics(config.KafkaTopicTariff), ) if err != nil { return err } brokers := initialize.NewBrokers(initialize.BrokersDeps{ Logger: logger, TariffClient: kafkaTariffClient, }) err = kafkaTariffClient.Ping(ctx) if err != nil { return err } notificationBot, err := tb.NewBot(tb.Settings{ Token: config.NotificationBotToken, Verbose: false, ParseMode: tb.ModeHTML, Poller: &tb.LongPoller{ Timeout: time.Second, }, }) if err != nil { //logger.Emit(json.Token(err)) return err } if _, err = notificationBot.Send(tb.ChatID(config.NotificationRsPayChannel), fmt.Sprintf("PING MSG from %s", models.ModuleLogger)); err != nil { return err } if _, err = notificationBot.Send(tb.ChatID(config.NotificationChannel), fmt.Sprintf("PING MSG from %s", models.ModuleLogger)); err != nil { return err } clients := initialize.NewClients(initialize.ClientsDeps{ Logger: logger, AuthURL: config.AuthMicroserviceURL, HubadminURL: config.HubadminMicroserviceURL, CurrencyURL: config.CurrencyMicroserviceURL, DiscountServiceURL: config.DiscountMicroserviceGRPC, PaymentServiceURL: config.PaymentMicroserviceGRPC, VerificationURL: config.VerificationMicroservice, TemplategenURL: config.TemplategenMicroserviceURL, MailClient: config.ExternalCfg.MailClientCfg, CodewordServiceHost: config.CodewordMicroserviceGRPC, Notifier: notificationBot, NotificationRsPayChannel: config.NotificationRsPayChannel, }) repositories := initialize.NewRepositories(initialize.RepositoriesDeps{ Logger: logger, MongoDB: mongoDB, }) services := initialize.NewServices(initialize.ServicesDeps{ Logger: logger, Repositories: repositories, Clients: clients, Brokers: brokers, Notifier: notificationBot, NotificationChannel: config.NotificationChannel, AdminURL: config.AdminURL, }) rpcControllers := initialize.NewRpcControllers(initialize.RpcControllersDeps{ Logger: logger, Services: services, Repositories: repositories, HLogger: loggerHlog, }) encrypt := &config.ExternalCfg.EncryptCommon middleWare := http.NewMiddleWare(logger) httpControllers := initialize.NewHttpControllers(initialize.HttpControllersDeps{ Logger: logger, Encrypt: encrypt, Producer: brokers.TariffProducer, GRPCDomain: config.GrpcDomain, Repositories: repositories, Clients: clients, MiddleWare: middleWare, }) serverClientHTTP := server.NewServer(server.ServerConfig{ Logger: logger, Hlog: loggerHlog, Controllers: []server.Controller{httpControllers.CurrencyClientController, httpControllers.HistoryClientController, httpControllers.CartClientController, httpControllers.WalletClientController, httpControllers.AccountClientController}, JWTConfig: &config.ExternalCfg.JwtCfg, }) serverAdminHTTP := server.NewServer(server.ServerConfig{ Logger: logger, Hlog: loggerHlog, Controllers: []server.Controller{httpControllers.AccountAdminController, httpControllers.CurrencyAdminController, httpControllers.HistoryAdminController}, JWTConfig: &config.ExternalCfg.JwtCfg, }) serverClientHTTP.ListRoutes() serverAdminHTTP.ListRoutes() serverGRPC, grpcErr := server.NewGRPC(server.DepsGRPC{Logger: logger}) if grpcErr != nil { return grpcErr.Wrap("failed to init grpc server") } serverGRPC.Register(rpcControllers) go func() { if err := serverClientHTTP.Start(config.ClientHttpURL); err != nil { logger.Error("Server external startup error", zap.Error(err)) cancel() } }() go func() { if err := serverAdminHTTP.Start(config.AdminHttpURL); err != nil { logger.Error("Server internal startup error", zap.Error(err)) cancel() } }() go serverGRPC.Run(config.GrpcURL) closer.Add(mongoDB.Client().Disconnect) closer.Add(serverClientHTTP.Shutdown) closer.Add(serverAdminHTTP.Shutdown) closer.Add(serverGRPC.Stop) closer.Add(closer.Wrap(kafkaTariffClient.Close)) <-ctx.Done() logger.Info("shutting down app gracefully") shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) defer cancel() if err := closer.Close(shutdownCtx); err != nil { return fmt.Errorf("closer: %w", err) } return nil }