package app import ( "codeword/internal/controller/promocode" "codeword/internal/controller/recovery" "codeword/internal/controller/rpc_controllers" "codeword/internal/initialize" "codeword/internal/repository" "codeword/internal/server/grpc" httpserver "codeword/internal/server/http" "codeword/internal/services" "codeword/internal/worker/purge_worker" "codeword/internal/worker/recovery_worker" "codeword/pkg/closer" "codeword/utils" "context" "errors" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/zap" "time" ) func Run(ctx context.Context, cfg initialize.Config, logger *zap.Logger) error { defer func() { if r := recover(); r != nil { logger.Error("Recovered from a panic", zap.Any("error", r)) } }() logger.Info("Starting application", zap.String("AppName", cfg.AppName)) ctx, cancel := context.WithCancel(ctx) defer cancel() shutdownGroup := closer.NewCloserGroup() mdb, err := initialize.MongoDB(ctx, cfg) if err != nil { logger.Error("Failed to initialize MongoDB", zap.Error(err)) return err } if err = initialize.InitDatabaseIndexes(ctx, mdb, logger); err != nil { logger.Error("Failed to initialize db indexes", zap.Error(err)) return err } kafkaTariffClient, err := kgo.NewClient( kgo.SeedBrokers(cfg.KafkaBrokers), kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), kgo.DefaultProduceTopic(cfg.KafkaTopic), ) if err != nil { return err } err = kafkaTariffClient.Ping(ctx) if err != nil { return err } discountRpcClient, err := initialize.DiscountGRPCClient(cfg.DiscountServiceAddress) if err != nil { logger.Error("failed to connect to discount service", zap.Error(err)) return err } brokers := initialize.NewBrokers(initialize.BrokersDeps{ Logger: logger, TariffClient: kafkaTariffClient, Topic: cfg.KafkaTopic, }) rdb, err := initialize.Redis(ctx, cfg) if err != nil { logger.Error("failed to connect to redis db", zap.Error(err)) return err } encrypt := initialize.Encrypt(cfg) promoCodeRepo := repository.NewPromoCodeRepository(mdb.Collection("promoCodes")) statsRepo := repository.NewStatsRepository(repository.Deps{Rdb: nil, Mdb: mdb.Collection("promoStats")}) codewordRepo := repository.NewCodewordRepository(repository.Deps{Rdb: rdb, Mdb: mdb.Collection("codeword")}) userRepo := repository.NewUserRepository(repository.Deps{Rdb: nil, Mdb: mdb.Collection("users")}) recoveryEmailSender := initialize.RecoveryEmailSender(cfg, logger) authClient := initialize.AuthClient(cfg, logger) recoveryService := services.NewRecoveryService(services.Deps{ Logger: logger, CodewordRepository: codewordRepo, UserRepository: userRepo, Encrypt: encrypt, AuthClient: authClient, }) promoService := services.NewPromoCodeService(services.PromoDeps{ Logger: logger, PromoCodeRepo: promoCodeRepo, StatsRepo: statsRepo, Kafka: brokers.TariffProducer, DiscountClient: discountRpcClient, }) jwtUtil := utils.NewJWT(&cfg) authMiddleware := utils.NewAuthenticator(jwtUtil) recoveryController := recovery.NewRecoveryController(logger, recoveryService, cfg.DefaultRedirectionURL) promoCodeController := promocode.NewPromoCodeController(promocode.Deps{Logger: logger, PromoCodeService: promoService, AuthMiddleware: authMiddleware}) controllerRpc := rpc_controllers.InitRpcControllers(promoService) grpcServer, err := grpc.NewGRPC(logger) if err != nil { logger.Error("error init rpc server", zap.Error(err)) return err } grpcServer.Register(controllerRpc) recoveryWC := recovery_worker.NewRecoveryWC(recovery_worker.Deps{ Logger: logger, Redis: rdb, EmailSender: recoveryEmailSender, Mongo: mdb.Collection("codeword"), }) purgeWC := purge_worker.NewPurgeWC(purge_worker.Deps{ Logger: logger, Mongo: mdb.Collection("codeword"), }) go recoveryWC.Start(ctx) go purgeWC.Start(ctx) server := httpserver.NewServer(httpserver.ServerConfig{ Logger: logger, Controllers: []httpserver.Controller{recoveryController, promoCodeController}, }) go func() { if err := server.Start(cfg.HTTPHost + ":" + cfg.HTTPPort); err != nil { logger.Error("Server startup error", zap.Error(err)) cancel() } }() go grpcServer.Run(grpc.DepsGrpcRun{ Host: cfg.GrpcHost, Port: cfg.GrpcPort, }) server.ListRoutes() shutdownGroup.Add(closer.CloserFunc(server.Shutdown)) shutdownGroup.Add(closer.CloserFunc(grpcServer.Stop)) shutdownGroup.Add(closer.CloserFunc(mdb.Client().Disconnect)) shutdownGroup.Add(closer.CloserFunc(recoveryWC.Stop)) shutdownGroup.Add(closer.CloserFunc(purgeWC.Stop)) <-ctx.Done() timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 10*time.Second) defer timeoutCancel() if err := shutdownGroup.Call(timeoutCtx); err != nil { if errors.Is(err, context.DeadlineExceeded) { logger.Error("Shutdown timed out", zap.Error(err)) } else { logger.Error("Failed to shutdown services gracefully", zap.Error(err)) } return err } logger.Info("Application has stopped") return nil }