customer/internal/app/app.go

190 lines
5.6 KiB
Go

package app
import (
"context"
"errors"
"fmt"
"github.com/themakers/hlog"
"go.uber.org/zap/zapcore"
"os/signal"
"penahub.gitlab.yandexcloud.net/backend/penahub_common/mongo"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/app"
"penahub.gitlab.yandexcloud.net/external/trashlog.git/wrappers/zaptrashlog"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/controller/http"
"syscall"
"time"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
qutils "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/utils"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/initialize"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/models"
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/server"
"penahub.gitlab.yandexcloud.net/pena-services/customer/pkg/closer"
"penahub.gitlab.yandexcloud.net/pena-services/customer/pkg/kafka"
)
const (
shutdownTimeout = 5 * time.Second
)
func Run(config *models.Config, logger *zap.Logger) (appErr error) {
defer func() {
if recovered := recover(); recovered != nil {
appErr = errors.New("recovered panic on application run")
logger.Error("recovered panic on application run", zap.Any("recovered", recovered))
}
}()
closer := closer.New()
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
clickHouseLogger, err := zaptrashlog.NewCore(ctx, zap.InfoLevel, config.Service.TrashLogHost, "version", "Commit", time.Now().Unix())
if err != nil {
panic(err)
}
//telegrammLogger, err := zaptg.NewCore(ctx,
// zap.InfoLevel,
// "1408111289:AAHfWZRiBQRncb2gl2LtU8OeASjfJi4e8YE",
// "version.Release",
// "version.Commit",
// 0,
// -1001256687920,
//)
//if err != nil {
// panic(err)
//}
loggerForHlog := logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewTee(core, clickHouseLogger)
}))
loggerHlog := hlog.New(loggerForHlog).Module("customer")
loggerHlog.Emit(app.InfoSvcStarted{})
if err := kafka.Initialize(ctx, config.Service.Kafka.Brokers, []string{
config.Service.Kafka.Tariff.Topic,
}); err != nil {
return fmt.Errorf("failed initialize kafka: %w", err)
}
mongoDB, err := mongo.Connect(ctx, &mongo.ConnectDeps{
Configuration: &config.Database,
Timeout: 10 * time.Second,
})
if err != nil {
return fmt.Errorf("failed connection to db: %w", err)
}
kafkaTariffClient, err := kgo.NewClient(
kgo.SeedBrokers(config.Service.Kafka.Brokers...),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
kgo.DefaultProduceTopic(config.Service.Kafka.Tariff.Topic),
kgo.ConsumeTopics(config.Service.Kafka.Tariff.Topic),
)
if err != nil {
return err
}
brokers := initialize.NewBrokers(initialize.BrokersDeps{
Logger: logger,
TariffClient: kafkaTariffClient,
})
err = kafkaTariffClient.Ping(ctx)
if err != nil {
return err
}
clients := initialize.NewClients(initialize.ClientsDeps{
Logger: logger,
AuthURL: &config.Service.AuthMicroservice.URL,
HubadminURL: &config.Service.HubadminMicroservice.URL,
CurrencyURL: &config.Service.CurrencyMicroservice.URL,
DiscountServiceConfiguration: &config.Service.DiscountMicroservice,
PaymentServiceConfiguration: &config.Service.PaymentMicroservice,
VerificationURL: &config.Service.VerificationMicroservice.URL,
TemplategenURL: &config.Service.TemplategenMicroserviceURL.URL,
MailClient: &config.Service.Mail,
CodewordServiceHost: &config.Service.CodewordMicroservice,
})
repositories := initialize.NewRepositories(initialize.RepositoriesDeps{
Logger: logger,
MongoDB: mongoDB,
})
services := initialize.NewServices(initialize.ServicesDeps{
Logger: logger,
Repositories: repositories,
Clients: clients,
ConfigurationGRPC: &config.GRPC,
Brokers: brokers,
})
rpcControllers := initialize.NewRpcControllers(initialize.RpcControllersDeps{
Logger: logger,
Services: services,
})
encrypt := qutils.NewEncrypt(config.Service.PubKey, config.Service.PrivKey)
middleWare := http.NewMiddleWare(logger)
httpControllers := initialize.NewHttpControllers(initialize.HttpControllersDeps{
Logger: logger,
Encrypt: encrypt,
Producer: brokers.TariffProducer,
GRPC: &config.GRPC,
Repositories: repositories,
Clients: clients,
MiddleWare: middleWare,
})
serverHTTP := server.NewServer(server.ServerConfig{
Logger: logger,
Hlog: loggerHlog,
Controllers: []server.Controller{httpControllers.CurrencyController, httpControllers.HistoryController, httpControllers.CartController, httpControllers.WalletController, httpControllers.AccountController},
JWTConfig: &config.Service.JWT,
})
serverHTTP.ListRoutes()
serverGRPC, grpcErr := server.NewGRPC(server.DepsGRPC{Logger: logger})
if grpcErr != nil {
return grpcErr.Wrap("failed to init grpc server")
}
serverGRPC.Register(rpcControllers)
go func() {
if err := serverHTTP.Start(config.HTTP.Host + ":" + config.HTTP.Port); err != nil {
logger.Error("Server startup error", zap.Error(err))
cancel()
}
}()
go serverGRPC.Run(&config.GRPC)
closer.Add(mongoDB.Client().Disconnect)
closer.Add(serverHTTP.Shutdown)
closer.Add(serverGRPC.Stop)
closer.Add(closer.Wrap(kafkaTariffClient.Close))
<-ctx.Done()
logger.Info("shutting down app gracefully")
shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
if err := closer.Close(shutdownCtx); err != nil {
return fmt.Errorf("closer: %w", err)
}
return nil
}