codeword/internal/app/app.go

231 lines
6.9 KiB
Go

package app
import (
"codeword/internal/controller/admin/admin_promocode"
"codeword/internal/controller/admin/admin_recovery"
"codeword/internal/controller/client/client_promocode"
"codeword/internal/controller/client/client_recovery"
"codeword/internal/controller/rpc_controllers"
"codeword/internal/initialize"
"codeword/internal/models"
"codeword/internal/repository"
"codeword/internal/server/grpc"
httpserver "codeword/internal/server/http"
"codeword/internal/services"
"codeword/internal/utils/middleware"
"codeword/internal/worker/purge_worker"
"codeword/internal/worker/recovery_worker"
"codeword/pkg/closer"
"context"
"errors"
"github.com/themakers/hlog"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/app"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/wrappers/zaptrashlog"
"time"
)
type Build struct {
Commit string
Version string
BuildTime int64
}
func Run(ctx context.Context, cfg initialize.Config, logger *zap.Logger, build Build) error {
defer func() {
if r := recover(); r != nil {
logger.Error("Recovered from a panic", zap.Any("error", r))
}
}()
logger.Info("Starting application", zap.String("AppName", cfg.AppName))
ctx, cancel := context.WithCancel(ctx)
defer cancel()
clickHouseLogger, err := zaptrashlog.NewCore(ctx, zap.InfoLevel, cfg.TrashLogHost, build.Version, build.Commit, build.BuildTime)
if err != nil {
panic(err)
}
loggerForHlog := logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewTee(core, clickHouseLogger)
}))
loggerHlog := hlog.New(loggerForHlog).Module(cfg.ModuleLogger)
loggerHlog.With(models.AllFields{})
loggerHlog.Emit(app.InfoSvcStarted{})
shutdownGroup := closer.NewCloserGroup()
mdb, err := initialize.MongoDB(ctx, cfg)
if err != nil {
logger.Error("Failed to initialize MongoDB", zap.Error(err))
return err
}
if err = initialize.InitDatabaseIndexes(ctx, mdb, logger); err != nil {
logger.Error("Failed to initialize db indexes", zap.Error(err))
return err
}
kafkaTariffClient, err := kgo.NewClient(
kgo.SeedBrokers(cfg.KafkaBrokers),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
kgo.DefaultProduceTopic(cfg.KafkaTopic),
)
if err != nil {
return err
}
err = kafkaTariffClient.Ping(ctx)
if err != nil {
return err
}
discountRpcClient, err := initialize.DiscountGRPCClient(cfg.DiscountServiceAddress)
if err != nil {
logger.Error("failed to connect to discount service", zap.Error(err))
return err
}
brokers := initialize.NewBrokers(initialize.BrokersDeps{
Logger: logger,
TariffClient: kafkaTariffClient,
Topic: cfg.KafkaTopic,
})
rdb, err := initialize.Redis(ctx, cfg)
if err != nil {
logger.Error("failed to connect to redis db", zap.Error(err))
return err
}
encrypt := initialize.Encrypt(cfg)
promoCodeRepo := repository.NewPromoCodeRepository(mdb.Collection("promoCodes"))
statsRepo := repository.NewStatsRepository(repository.Deps{Rdb: nil, Mdb: mdb.Collection("promoStats")})
codewordRepo := repository.NewCodewordRepository(repository.Deps{Rdb: rdb, Mdb: mdb.Collection("codeword")})
userRepo := repository.NewUserRepository(repository.Deps{Rdb: nil, Mdb: mdb.Collection("users")})
recoveryEmailSender := initialize.RecoveryEmailSender(cfg, logger)
authClient := initialize.AuthClient(cfg, logger)
recoveryService := services.NewRecoveryService(services.Deps{
Logger: logger,
CodewordRepository: codewordRepo,
UserRepository: userRepo,
Encrypt: encrypt,
AuthClient: authClient,
})
promoService := services.NewPromoCodeService(services.PromoDeps{
Logger: logger,
PromoCodeRepo: promoCodeRepo,
StatsRepo: statsRepo,
Kafka: brokers.TariffProducer,
DiscountClient: discountRpcClient,
})
jwtUtil := middleware.NewJWT(&cfg)
clientRecoveryController := client_recovery.NewRecoveryController(client_recovery.Deps{
Logger: logger,
Service: recoveryService,
DefaultURL: cfg.DefaultRedirectionURL,
RecoveryURL: cfg.RecoveryUrl,
})
clientPromoCodeController := client_promocode.NewPromoCodeController(client_promocode.Deps{Logger: logger, PromoCodeService: promoService})
adminRecoveryController := admin_recovery.NewRecoveryController(admin_recovery.Deps{
Logger: logger,
Service: recoveryService,
DefaultURL: cfg.DefaultRedirectionURL,
RecoveryURL: cfg.RecoveryUrl,
})
adminPromoCodeController := admin_promocode.NewPromoCodeController(admin_promocode.Deps{Logger: logger, PromoCodeService: promoService})
controllerRpc := rpc_controllers.InitRpcControllers(promoService)
grpcServer, err := grpc.NewGRPC(logger)
if err != nil {
logger.Error("error init rpc server", zap.Error(err))
return err
}
grpcServer.Register(controllerRpc)
recoveryWC := recovery_worker.NewRecoveryWC(recovery_worker.Deps{
Logger: logger,
Redis: rdb,
EmailSender: recoveryEmailSender,
Mongo: mdb.Collection("codeword"),
})
purgeWC := purge_worker.NewPurgeWC(purge_worker.Deps{
Logger: logger,
Mongo: mdb.Collection("codeword"),
})
go recoveryWC.Start(ctx)
go purgeWC.Start(ctx)
clientServer := httpserver.NewServer(httpserver.ServerConfig{
Logger: logger,
Controllers: []httpserver.Controller{clientRecoveryController, clientPromoCodeController},
Hlogger: loggerHlog,
JWT: jwtUtil,
})
adminServer := httpserver.NewServer(httpserver.ServerConfig{
Logger: logger,
Controllers: []httpserver.Controller{adminRecoveryController, adminPromoCodeController},
Hlogger: loggerHlog,
JWT: jwtUtil,
})
go func() {
if err := clientServer.Start(cfg.HTTPClientHost + ":" + cfg.HTTPClientPort); err != nil {
logger.Error("Client server startup error", zap.Error(err))
cancel()
}
}()
go func() {
if err := adminServer.Start(cfg.HTTPAdminHost + ":" + cfg.HTTPAdminPort); err != nil {
logger.Error("Admin server startup error", zap.Error(err))
cancel()
}
}()
go grpcServer.Run(grpc.DepsGrpcRun{
Host: cfg.GrpcHost,
Port: cfg.GrpcPort,
})
clientServer.ListRoutes()
adminServer.ListRoutes()
shutdownGroup.Add(closer.CloserFunc(clientServer.Shutdown))
shutdownGroup.Add(closer.CloserFunc(adminServer.Shutdown))
shutdownGroup.Add(closer.CloserFunc(grpcServer.Stop))
shutdownGroup.Add(closer.CloserFunc(mdb.Client().Disconnect))
shutdownGroup.Add(closer.CloserFunc(recoveryWC.Stop))
shutdownGroup.Add(closer.CloserFunc(purgeWC.Stop))
<-ctx.Done()
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer timeoutCancel()
if err := shutdownGroup.Call(timeoutCtx); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
logger.Error("Shutdown timed out", zap.Error(err))
} else {
logger.Error("Failed to shutdown services gracefully", zap.Error(err))
}
return err
}
logger.Info("Application has stopped")
return nil
}