package app import ( "context" "errors" "github.com/themakers/hlog" "go.uber.org/zap" "penahub.gitlab.yandexcloud.net/backend/quiz/common.git/dal" answerwc2 "penahub.gitlab.yandexcloud.net/backend/quiz/worker/internal/answerwc" "penahub.gitlab.yandexcloud.net/backend/quiz/worker/internal/initialize" privilegewc2 "penahub.gitlab.yandexcloud.net/backend/quiz/worker/internal/privilegewc" senders2 "penahub.gitlab.yandexcloud.net/backend/quiz/worker/internal/senders" "penahub.gitlab.yandexcloud.net/backend/quiz/worker/internal/workers/shortstat" "penahub.gitlab.yandexcloud.net/backend/quiz/worker/internal/workers/timeout" "penahub.gitlab.yandexcloud.net/backend/quiz/worker/pkg/closer" "time" ) var zapOptions = []zap.Option{ zap.AddCaller(), zap.AddCallerSkip(2), zap.AddStacktrace(zap.ErrorLevel), } type Build struct { Commit string Version string } func New(ctx context.Context, cfg initialize.Config, build Build) error { var ( err, workerErr error zapLogger *zap.Logger errChan = make(chan error) ) if cfg.LoggerProdMode { zapLogger, err = zap.NewProduction(zapOptions...) if err != nil { return err } } else { zapLogger, err = zap.NewDevelopment(zapOptions...) if err != nil { return err } } zapLogger = zapLogger.With( zap.String("SvcCommit", build.Commit), zap.String("SvcVersion", build.Version), zap.String("SvcBuildTime", time.Now().String()), ) logger := hlog.New(zapLogger) logger.Emit(InfoSvcStarted{}) shutdownGroup := closer.NewCloserGroup() go func() { for { select { case <-ctx.Done(): return case err := <-errChan: zapLogger.Error("Ошибка при работе воркера", zap.Error(err)) } } }() redisClient, err := initialize.Redis(ctx, cfg) if err != nil { zapLogger.Error("failed init redis", zap.Error(err)) return err } minioClient, err := initialize.NewMinio(cfg) if err != nil { zapLogger.Error("failed init minio", zap.Error(err)) return err } clients := initialize.NewClients(cfg, zapLogger) // tgSender, err := senders.NewTgSender(options.TgToken) // if err != nil { // fmt.Println(err) // return nil, err // } mailSender := senders2.NewMailLeadSender(clients.MailClient) leadSenders := []senders2.LeadSender{mailSender /* , tgSender */} pgdal, err := dal.New(ctx, cfg.PostgresCredentials, minioClient) if err != nil { zapLogger.Error("failed init postgres", zap.Error(err)) return err } kafkaWorker, err := privilegewc2.NewKafkaConsumerWorker(privilegewc2.Config{ KafkaBroker: cfg.KafkaBroker, KafkaTopic: cfg.KafkaTopic, ServiceKey: cfg.ServiceName, TickerInterval: time.Second * 10, ErrChan: errChan, }, redisClient, pgdal) if err != nil { zapLogger.Error("Failed start privilege worker", zap.Error(err)) return err } checkWorker := privilegewc2.NewCheckWorker(privilegewc2.Deps{ PrivilegeIDsDays: []string{"quizUnlimTime", "squizHideBadge"}, PrivilegeIDsCount: []string{"quizCnt", "quizManual"}, TickerInterval: time.Minute, PrivilegeDAL: pgdal, CustomerClient: clients.CustomerClient, }, errChan) go kafkaWorker.Start(ctx) go checkWorker.Start(ctx) toClientWorker := answerwc2.NewSendToClient(answerwc2.DepsSendToClient{ Redis: redisClient, Dal: pgdal, LeadSenders: leadSenders, CustomerService: clients.CustomerClient, }, errChan) toRespWorker := answerwc2.NewRespWorker(answerwc2.DepsRespWorker{ Redis: redisClient, Dal: pgdal, MailClient: mailSender, }, errChan) go toClientWorker.Start(ctx) go toRespWorker.Start(ctx) tow := timeout.New(pgdal, time.Minute) statW := shortstat.New(pgdal, 5*time.Minute) tow.ExposeErr(ctx, &workerErr) statW.ExposeErr(ctx, &workerErr) go tow.Start(ctx) go statW.Start(ctx) logger.Emit(InfoSvcReady{}) shutdownGroup.Add(closer.CloserFunc(pgdal.Close)) <-ctx.Done() timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 10*time.Second) defer timeoutCancel() if err := shutdownGroup.Call(timeoutCtx); err != nil { if errors.Is(err, context.DeadlineExceeded) { zapLogger.Error("Shutdown timed out", zap.Error(err)) } else { zapLogger.Error("Failed to shutdown services gracefully", zap.Error(err)) } return err } zapLogger.Info("Application has stopped") return nil }