164 lines
4.0 KiB
Go
164 lines
4.0 KiB
Go
package app
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"gitea.pena/PenaSide/hlog"
|
|
"gitea.pena/SQuiz/common/dal"
|
|
answerwc2 "gitea.pena/SQuiz/worker/internal/answerwc"
|
|
"gitea.pena/SQuiz/worker/internal/initialize"
|
|
privilegewc2 "gitea.pena/SQuiz/worker/internal/privilegewc"
|
|
senders2 "gitea.pena/SQuiz/worker/internal/senders"
|
|
"gitea.pena/SQuiz/worker/internal/workers/shortstat"
|
|
"gitea.pena/SQuiz/worker/internal/workers/timeout"
|
|
"gitea.pena/SQuiz/worker/pkg/closer"
|
|
"go.uber.org/zap"
|
|
"time"
|
|
)
|
|
|
|
var zapOptions = []zap.Option{
|
|
zap.AddCaller(),
|
|
zap.AddCallerSkip(2),
|
|
zap.AddStacktrace(zap.ErrorLevel),
|
|
}
|
|
|
|
type Build struct {
|
|
Commit string
|
|
Version string
|
|
}
|
|
|
|
func New(ctx context.Context, cfg initialize.Config, build Build) error {
|
|
var (
|
|
err, workerErr error
|
|
zapLogger *zap.Logger
|
|
errChan = make(chan error)
|
|
)
|
|
|
|
if cfg.LoggerProdMode {
|
|
zapLogger, err = zap.NewProduction(zapOptions...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
zapLogger, err = zap.NewDevelopment(zapOptions...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
zapLogger = zapLogger.With(
|
|
zap.String("SvcCommit", build.Commit),
|
|
zap.String("SvcVersion", build.Version),
|
|
zap.String("SvcBuildTime", time.Now().String()),
|
|
)
|
|
|
|
logger := hlog.New(zapLogger)
|
|
logger.Emit(InfoSvcStarted{})
|
|
|
|
shutdownGroup := closer.NewCloserGroup()
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case err := <-errChan:
|
|
zapLogger.Error("Ошибка при работе воркера", zap.Error(err))
|
|
}
|
|
}
|
|
}()
|
|
|
|
redisClient, err := initialize.Redis(ctx, cfg)
|
|
if err != nil {
|
|
zapLogger.Error("failed init redis", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
minioClient, err := initialize.NewMinio(cfg)
|
|
if err != nil {
|
|
zapLogger.Error("failed init minio", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
clients := initialize.NewClients(cfg, zapLogger)
|
|
|
|
// tgSender, err := senders.NewTgSender(options.TgToken)
|
|
// if err != nil {
|
|
// fmt.Println(err)
|
|
// return nil, err
|
|
// }
|
|
mailSender := senders2.NewMailLeadSender(clients.MailClient)
|
|
leadSenders := []senders2.LeadSender{mailSender /* , tgSender */}
|
|
|
|
pgdal, err := dal.New(ctx, cfg.PostgresURL, minioClient)
|
|
if err != nil {
|
|
zapLogger.Error("failed init postgres", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
kafkaWorker, err := privilegewc2.NewKafkaConsumerWorker(privilegewc2.Config{
|
|
KafkaBroker: cfg.KafkaBrokers,
|
|
KafkaTopic: cfg.KafkaTopicTariff,
|
|
ServiceKey: cfg.ServiceName,
|
|
TickerInterval: time.Second * 10,
|
|
ErrChan: errChan,
|
|
}, redisClient, pgdal)
|
|
if err != nil {
|
|
zapLogger.Error("Failed start privilege worker", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
checkWorker := privilegewc2.NewCheckWorker(privilegewc2.Deps{
|
|
PrivilegeIDsDays: []string{"quizUnlimTime", "squizHideBadge"},
|
|
PrivilegeIDsCount: []string{"quizCnt", "quizManual"},
|
|
TickerInterval: time.Minute,
|
|
PrivilegeDAL: pgdal,
|
|
CustomerClient: clients.CustomerClient,
|
|
}, errChan)
|
|
|
|
go kafkaWorker.Start(ctx)
|
|
go checkWorker.Start(ctx)
|
|
|
|
toClientWorker := answerwc2.NewSendToClient(answerwc2.DepsSendToClient{
|
|
Redis: redisClient,
|
|
Dal: pgdal,
|
|
LeadSenders: leadSenders,
|
|
CustomerService: clients.CustomerClient,
|
|
}, errChan)
|
|
|
|
toRespWorker := answerwc2.NewRespWorker(answerwc2.DepsRespWorker{
|
|
Redis: redisClient,
|
|
Dal: pgdal,
|
|
MailClient: mailSender,
|
|
}, errChan)
|
|
|
|
go toClientWorker.Start(ctx)
|
|
go toRespWorker.Start(ctx)
|
|
|
|
tow := timeout.New(pgdal, time.Minute)
|
|
statW := shortstat.New(pgdal, 5*time.Minute)
|
|
tow.ExposeErr(ctx, &workerErr)
|
|
statW.ExposeErr(ctx, &workerErr)
|
|
go tow.Start(ctx)
|
|
go statW.Start(ctx)
|
|
|
|
logger.Emit(InfoSvcReady{})
|
|
shutdownGroup.Add(closer.CloserFunc(pgdal.Close))
|
|
|
|
<-ctx.Done()
|
|
|
|
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer timeoutCancel()
|
|
if err := shutdownGroup.Call(timeoutCtx); err != nil {
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
zapLogger.Error("Shutdown timed out", zap.Error(err))
|
|
} else {
|
|
zapLogger.Error("Failed to shutdown services gracefully", zap.Error(err))
|
|
}
|
|
return err
|
|
}
|
|
|
|
zapLogger.Info("Application has stopped")
|
|
return nil
|
|
}
|