260 lines
7.8 KiB
Go
260 lines
7.8 KiB
Go
package app
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"github.com/go-redis/redis/v8"
|
||
"github.com/gofiber/fiber/v2"
|
||
"github.com/skeris/appInit"
|
||
"github.com/themakers/hlog"
|
||
"go.uber.org/zap"
|
||
"go.uber.org/zap/zapcore"
|
||
"penahub.gitlab.yandexcloud.net/backend/penahub_common/log_mw"
|
||
"penahub.gitlab.yandexcloud.net/backend/penahub_common/privilege"
|
||
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
|
||
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/healthchecks"
|
||
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/middleware"
|
||
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
|
||
"penahub.gitlab.yandexcloud.net/backend/quiz/core/brokers"
|
||
"penahub.gitlab.yandexcloud.net/backend/quiz/core/clients/auth"
|
||
"penahub.gitlab.yandexcloud.net/backend/quiz/core/clients/telegram"
|
||
"penahub.gitlab.yandexcloud.net/backend/quiz/core/initialize"
|
||
"penahub.gitlab.yandexcloud.net/backend/quiz/core/models"
|
||
"penahub.gitlab.yandexcloud.net/backend/quiz/core/server"
|
||
"penahub.gitlab.yandexcloud.net/backend/quiz/core/service"
|
||
"penahub.gitlab.yandexcloud.net/backend/quiz/core/tools"
|
||
"penahub.gitlab.yandexcloud.net/backend/quiz/core/workers"
|
||
"penahub.gitlab.yandexcloud.net/external/trashlog/wrappers/zaptrashlog"
|
||
"time"
|
||
)
|
||
|
||
type App struct {
|
||
logger *zap.Logger
|
||
err chan error
|
||
}
|
||
|
||
func (a App) GetLogger() *zap.Logger {
|
||
return a.logger
|
||
}
|
||
|
||
func (a App) GetErr() chan error {
|
||
return a.err
|
||
}
|
||
|
||
var (
|
||
errInvalidOptions = errors.New("invalid options")
|
||
)
|
||
|
||
var zapOptions = []zap.Option{
|
||
zap.AddCaller(),
|
||
zap.AddCallerSkip(2),
|
||
zap.AddStacktrace(zap.ErrorLevel),
|
||
}
|
||
|
||
var _ appInit.CommonApp = (*App)(nil)
|
||
|
||
type Options struct {
|
||
LoggerProdMode bool `env:"IS_PROD_LOG" default:"false"`
|
||
IsProd bool `env:"IS_PROD" default:"false"`
|
||
NumberPort string `env:"PORT" default:"1488"`
|
||
CrtFile string `env:"CRT" default:"server.crt"`
|
||
KeyFile string `env:"KEY" default:"server.key"`
|
||
PostgresCredentials string `env:"PG_CRED" default:"host=localhost port=35432 user=squiz password=Redalert2 dbname=squiz sslmode=disable"`
|
||
HubAdminUrl string `env:"HUB_ADMIN_URL" default:"http://localhost:8001/"`
|
||
ServiceName string `env:"SERVICE_NAME" default:"squiz"`
|
||
AuthServiceURL string `env:"AUTH_URL" default:"http://localhost:8000/"`
|
||
GrpcHost string `env:"GRPC_HOST" default:"localhost"`
|
||
GrpcPort string `env:"GRPC_PORT" default:"9000"`
|
||
KafkaBrokers string `env:"KAFKA_BROKERS" default:"localhost:9092"`
|
||
KafkaTopic string `env:"KAFKA_TOPIC" default:"test-topic"`
|
||
KafkaGroup string `env:"KAFKA_GROUP" default:"mailnotifier"`
|
||
TrashLogHost string `env:"TRASH_LOG_HOST" default:"localhost:7113"`
|
||
ModuleLogger string `env:"MODULE_LOGGER" default:"core-local"`
|
||
ClickHouseCred string `env:"CLICK_HOUSE_CRED" default:"tcp://10.8.0.15:9000/default?sslmode=disable"`
|
||
RedisHost string `env:"REDIS_HOST" default:"localhost:6379"`
|
||
RedisPassword string `env:"REDIS_PASSWORD" default:"admin"`
|
||
RedisDB uint64 `env:"REDIS_DB" default:"2"`
|
||
S3Prefix string `env:"S3_PREFIX"`
|
||
}
|
||
|
||
func New(ctx context.Context, opts interface{}, ver appInit.Version) (appInit.CommonApp, error) {
|
||
var (
|
||
err, workerErr error
|
||
zapLogger *zap.Logger
|
||
errChan = make(chan error)
|
||
options Options
|
||
ok bool
|
||
)
|
||
|
||
if options, ok = opts.(Options); !ok {
|
||
return App{}, errInvalidOptions
|
||
}
|
||
|
||
if options.LoggerProdMode {
|
||
zapLogger, err = zap.NewProduction(zapOptions...)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
} else {
|
||
zapLogger, err = zap.NewDevelopment(zapOptions...)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
}
|
||
|
||
zapLogger = zapLogger.With(
|
||
zap.String("SvcCommit", ver.Commit),
|
||
zap.String("SvcVersion", ver.Release),
|
||
zap.String("SvcBuildTime", ver.BuildTime),
|
||
)
|
||
|
||
clickHouseLogger, err := zaptrashlog.NewCore(ctx, zap.InfoLevel, options.TrashLogHost, ver.Release, ver.Commit, time.Now().Unix())
|
||
if err != nil {
|
||
panic(err)
|
||
}
|
||
|
||
loggerForHlog := zapLogger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
|
||
return zapcore.NewTee(core, clickHouseLogger)
|
||
}))
|
||
|
||
loggerHlog := hlog.New(loggerForHlog).Module(options.ModuleLogger)
|
||
loggerHlog.With(models.AllFields{})
|
||
loggerHlog.Emit(InfoSvcStarted{})
|
||
|
||
authClient := auth.NewAuthClient(options.AuthServiceURL)
|
||
|
||
pgdal, err := dal.New(ctx, options.PostgresCredentials, nil)
|
||
if err != nil {
|
||
fmt.Println("NEW", err)
|
||
return nil, err
|
||
}
|
||
|
||
chDal, err := dal.NewClickHouseDAL(ctx, options.ClickHouseCred)
|
||
if err != nil {
|
||
fmt.Println("failed init clickhouse", err)
|
||
return nil, err
|
||
}
|
||
|
||
kafkaClient, err := initialize.KafkaInit(ctx, initialize.KafkaDeps{
|
||
KafkaGroup: options.KafkaGroup,
|
||
KafkaBrokers: options.KafkaBrokers,
|
||
KafkaTopic: options.KafkaTopic,
|
||
})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
producer := brokers.NewProducer(brokers.ProducerDeps{
|
||
KafkaClient: kafkaClient,
|
||
Logger: zapLogger,
|
||
})
|
||
|
||
redisClient := redis.NewClient(&redis.Options{
|
||
Addr: options.RedisHost,
|
||
Password: options.RedisPassword,
|
||
DB: int(options.RedisDB),
|
||
})
|
||
err = redisClient.Ping(ctx).Err()
|
||
if err != nil {
|
||
panic(fmt.Sprintf("error ping to redis db %v", err))
|
||
}
|
||
|
||
clientData := privilege.Client{
|
||
URL: options.HubAdminUrl,
|
||
ServiceName: options.ServiceName,
|
||
Privileges: model.Privileges,
|
||
}
|
||
fiberClient := &fiber.Client{}
|
||
privilegeController := privilege.NewPrivilege(clientData, fiberClient)
|
||
go tools.PublishPrivilege(privilegeController, 10, 5*time.Minute)
|
||
|
||
tgClient, err := telegram.NewTelegramClient(ctx, pgdal)
|
||
if err != nil {
|
||
panic(fmt.Sprintf("failed init tg clietns: %v", err))
|
||
}
|
||
|
||
tgWC := workers.NewTgListenerWC(workers.Deps{
|
||
BotID: int64(6712573453), // todo убрать
|
||
Redis: redisClient,
|
||
Dal: pgdal,
|
||
TgClient: tgClient,
|
||
})
|
||
|
||
go tgWC.Start(ctx)
|
||
|
||
// todo подумать над реализацией всего а то пока мне кажется что немного каша получается такой предикт что через некоторое время
|
||
// сложно будет разобраться что есть где
|
||
grpcControllers := initialize.InitRpcControllers(pgdal)
|
||
grpc, err := server.NewGRPC(zapLogger)
|
||
if err != nil {
|
||
fmt.Println("error:", err)
|
||
panic("err init grpc server")
|
||
}
|
||
grpc.Register(grpcControllers)
|
||
go grpc.Run(server.DepsGrpcRun{
|
||
Host: options.GrpcHost,
|
||
Port: options.GrpcPort,
|
||
})
|
||
|
||
app := fiber.New()
|
||
app.Use(middleware.JWTAuth())
|
||
app.Use(log_mw.ContextLogger(loggerHlog))
|
||
app.Get("/liveness", healthchecks.Liveness)
|
||
app.Get("/readiness", healthchecks.Readiness(&workerErr)) //todo parametrized readiness. should discuss ready reason
|
||
|
||
svc := service.New(service.Deps{
|
||
Dal: pgdal,
|
||
AuthClient: authClient,
|
||
Producer: producer,
|
||
ServiceName: options.ServiceName,
|
||
ChDAL: chDal,
|
||
TelegramClient: tgClient,
|
||
RedisClient: redisClient,
|
||
S3Prefix: options.S3Prefix,
|
||
})
|
||
|
||
svc.Register(app)
|
||
|
||
loggerHlog.Emit(InfoSvcReady{})
|
||
|
||
go func() {
|
||
defer func() {
|
||
if pgdal != nil {
|
||
pgdal.Close()
|
||
}
|
||
if chDal != nil {
|
||
if derr := chDal.Close(ctx); derr != nil {
|
||
fmt.Printf("error closing clickhouse: %v", derr)
|
||
}
|
||
}
|
||
err := grpc.Stop(ctx)
|
||
err = app.Shutdown()
|
||
loggerHlog.Emit(InfoSvcShutdown{Signal: err.Error()})
|
||
}()
|
||
|
||
if options.IsProd {
|
||
if err := app.ListenTLS(fmt.Sprintf(":%s", options.NumberPort), options.CrtFile, options.KeyFile); err != nil {
|
||
loggerHlog.Emit(ErrorCanNotServe{
|
||
Err: err,
|
||
})
|
||
errChan <- err
|
||
}
|
||
} else {
|
||
if err := app.Listen(fmt.Sprintf(":%s", options.NumberPort)); err != nil {
|
||
loggerHlog.Emit(ErrorCanNotServe{
|
||
Err: err,
|
||
})
|
||
errChan <- err
|
||
}
|
||
}
|
||
|
||
errChan <- nil
|
||
}()
|
||
// todo implement helper func for service app type. such as server preparing, logger preparing, healthchecks and etc.
|
||
return &App{
|
||
logger: zapLogger,
|
||
err: errChan,
|
||
}, err
|
||
}
|