worker/app/app.go

227 lines
6.3 KiB
Go
Raw Normal View History

2024-02-19 18:20:09 +00:00
package app
import (
"context"
"errors"
"github.com/go-redis/redis/v8"
"github.com/gofiber/fiber/v2"
"github.com/skeris/appInit"
"github.com/themakers/hlog"
"go.uber.org/zap"
"google.golang.org/grpc"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal"
"penahub.gitlab.yandexcloud.net/backend/quiz/common.git/model"
"penahub.gitlab.yandexcloud.net/backend/quiz/worker.git/answerwc"
"penahub.gitlab.yandexcloud.net/backend/quiz/worker.git/clients/customer"
"penahub.gitlab.yandexcloud.net/backend/quiz/worker.git/clients/mailclient"
"penahub.gitlab.yandexcloud.net/backend/quiz/worker.git/privilegewc"
"penahub.gitlab.yandexcloud.net/backend/quiz/worker.git/workers/shortstat"
"penahub.gitlab.yandexcloud.net/backend/quiz/worker.git/workers/timeout"
2024-04-11 16:45:06 +00:00
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
2024-03-13 16:36:23 +00:00
2024-04-11 16:45:06 +00:00
"fmt"
2024-02-19 18:20:09 +00:00
"time"
)
type App struct {
logger *zap.Logger
err chan error
}
func (a App) GetLogger() *zap.Logger {
return a.logger
}
func (a App) GetErr() chan error {
return a.err
}
var (
errInvalidOptions = errors.New("invalid options")
)
var zapOptions = []zap.Option{
zap.AddCaller(),
zap.AddCallerSkip(2),
zap.AddStacktrace(zap.ErrorLevel),
}
var _ appInit.CommonApp = (*App)(nil)
type Options struct {
ServiceName string `env:"SERVICE_NAME" default:"squiz"`
KafkaBroker string `env:"KAFKA_BROKER"`
KafkaTopic string `env:"KAFKA_TOPIC"`
PrivilegeID string `env:"QUIZ_ID"`
Amount uint64 `env:"AMOUNT"`
UnlimID string `env:"UNLIM_ID"`
LoggerProdMode bool `env:"IS_PROD_LOG" default:"false"`
IsProd bool `env:"IS_PROD" default:"false"`
2024-04-11 16:45:06 +00:00
MinioEP string `env:"MINIO_EP" default:"localhost:3002"`
MinioAK string `env:"MINIO_AK" default:"minio"`
MinioSK string `env:"MINIO_SK" default:"miniostorage"`
2024-02-19 18:20:09 +00:00
PostgresCredentials string `env:"PG_CRED" default:"host=localhost port=5432 user=squiz password=Redalert2 dbname=squiz sslmode=disable"`
RedisHost string `env:"REDIS_HOST"`
RedisPassword string `env:"REDIS_PASSWORD"`
RedisDB uint64 `env:"REDIS_DB"`
SmtpHost string `env:"SMTP_HOST"`
SmtpPort string `env:"SMTP_PORT"`
SmtpSender string `env:"SMTP_SENDER"`
SmtpIdentity string `env:"SMTP_IDENTITY"`
SmtpUsername string `env:"SMTP_USERNAME"`
SmtpPassword string `env:"SMTP_PASSWORD"`
SmtpApiKey string `env:"SMTP_API_KEY"`
CustomerServiceAddress string `env:"CUSTOMER_SERVICE_ADDRESS"`
}
func New(ctx context.Context, opts interface{}, ver appInit.Version) (appInit.CommonApp, error) {
var (
err, workerErr error
zapLogger *zap.Logger
errChan = make(chan error)
options Options
ok bool
)
if options, ok = opts.(Options); !ok {
return App{}, errInvalidOptions
}
if options.LoggerProdMode {
zapLogger, err = zap.NewProduction(zapOptions...)
if err != nil {
return nil, err
}
} else {
zapLogger, err = zap.NewDevelopment(zapOptions...)
if err != nil {
return nil, err
}
}
zapLogger = zapLogger.With(
zap.String("SvcCommit", ver.Commit),
zap.String("SvcVersion", ver.Release),
zap.String("SvcBuildTime", ver.BuildTime),
)
logger := hlog.New(zapLogger)
logger.Emit(InfoSvcStarted{})
zapLogger.Info("config", zap.Any("options", options))
go func() {
for {
select {
case <-ctx.Done():
return
case err := <-errChan:
zapLogger.Error("Ошибка при работе воркера", zap.Error(err))
}
}
}()
//init redis
redisClient := redis.NewClient(&redis.Options{
Addr: options.RedisHost,
Password: options.RedisPassword,
DB: int(options.RedisDB),
})
smtpData := mailclient.ClientDeps{
Host: options.SmtpHost,
Port: options.SmtpPort,
Sender: options.SmtpSender,
ApiKey: options.SmtpApiKey,
Auth: &mailclient.PlainAuth{
Identity: options.SmtpIdentity,
Username: options.SmtpUsername,
Password: options.SmtpPassword,
},
FiberClient: &fiber.Client{},
Logger: logger,
}
mailClient := mailclient.NewClient(smtpData)
customerServiceConn, err := grpc.Dial(options.CustomerServiceAddress, grpc.WithInsecure())
if err != nil {
return nil, err
}
customerServiceClient := customer.NewCustomerServiceClient(customerServiceConn)
2024-04-11 16:45:06 +00:00
minioClient, err := minio.New(options.MinioEP, &minio.Options{
Creds: credentials.NewStaticV4(options.MinioAK, options.MinioSK, ""),
Secure: options.IsProd,
})
if err != nil {
fmt.Println("MINIOERR", options.MinioEP, err)
return nil, err
}
pgdal, err := dal.New(ctx, options.PostgresCredentials, nil, minioClient)
2024-02-19 18:20:09 +00:00
if err != nil {
return nil, err
}
kafkaWorker, err := privilegewc.NewKafkaConsumerWorker(privilegewc.Config{
KafkaBroker: options.KafkaBroker,
KafkaTopic: options.KafkaTopic,
ServiceKey: options.ServiceName,
TickerInterval: time.Second * 10,
Logger: logger,
ErrChan: errChan,
}, redisClient, pgdal)
if err != nil {
logger.Module("Failed start privilege worker")
return nil, err
}
checkWorker := privilegewc.NewCheckWorker(privilegewc.CheckWorkerConfig{
DefaultData: model.DefaultData{
PrivilegeID: options.PrivilegeID,
Amount: options.Amount,
UnlimID: options.UnlimID,
},
TickerInterval: time.Minute,
Logger: logger,
ErrChan: errChan,
}, pgdal)
go kafkaWorker.Start(ctx)
go checkWorker.Start(ctx)
toClientWorker := answerwc.NewSendToClient(answerwc.DepsSendToClient{
Redis: redisClient,
Dal: pgdal,
MailClient: mailClient,
CustomerService: customerServiceClient,
}, logger, errChan)
toRespWorker := answerwc.NewRespWorker(answerwc.DepsRespWorker{
Redis: redisClient,
Dal: pgdal,
MailClient: mailClient,
}, logger, errChan)
go toClientWorker.Start(ctx)
go toRespWorker.Start(ctx)
tow := timeout.New(pgdal, time.Minute)
statW := shortstat.New(pgdal, 5*time.Minute)
tow.ExposeErr(ctx, &workerErr)
statW.ExposeErr(ctx, &workerErr)
go tow.Start(ctx)
go func() {
// defer pgdal.CloseWorker()
statW.Start(ctx)
}()
logger.Emit(InfoSvcReady{})
// todo implement helper func for service app type. such as server preparing, logger preparing, healthchecks and etc.
return &App{
logger: zapLogger,
err: make(chan error),
}, err
}