amocrm/internal/app/app.go
skeris c37d8fceba
Some checks failed
Deploy / CreateImage (push) Failing after 2m0s
Deploy / DeployService (push) Has been skipped
ci gitea
2025-02-08 00:06:49 +03:00

184 lines
4.7 KiB
Go

package app
import (
"amocrm/internal/brokers"
"amocrm/internal/controllers"
"amocrm/internal/initialize"
"amocrm/internal/repository"
"amocrm/internal/server/http"
"amocrm/internal/service"
"amocrm/internal/tools"
"amocrm/internal/workers/data_updater"
"amocrm/internal/workers/limiter"
"amocrm/internal/workers/post_deals_worker"
"amocrm/internal/workers/post_fields_worker"
"amocrm/internal/workers/queueUpdater"
"amocrm/internal/workers_methods"
"amocrm/pkg/amoClient"
"amocrm/pkg/closer"
pena_social_auth "amocrm/pkg/pena-social-auth"
"context"
"errors"
"gitea.pena/SQuiz/common/dal"
"time"
"fmt"
"go.uber.org/zap"
)
func Run(ctx context.Context, config initialize.Config, logger *zap.Logger) error {
defer func() {
if r := recover(); r != nil {
logger.Error("Recovered in app from a panic", zap.Any("error", r))
}
}()
logger.Info("App started", zap.Any("config", config))
ctx, cancel := context.WithCancel(ctx)
defer cancel()
shutdownGroup := closer.NewCloserGroup()
redisClient, err := initialize.Redis(ctx, config)
if err != nil {
logger.Error("error init redis client", zap.Error(err))
return err
}
kafka, err := initialize.KafkaConsumerInit(ctx, config)
if err != nil {
logger.Error("error init kafka consumer", zap.Error(err))
return err
}
producer := brokers.NewProducer(brokers.ProducerDeps{
KafkaClient: kafka,
Logger: logger,
})
amoRepo, err := dal.NewAmoDal(ctx, config.PostgresCredentials)
if err != nil {
logger.Error("error init amo repo in common repo", zap.Error(err))
return err
}
socialAithClient := pena_social_auth.NewClient(pena_social_auth.Deps{
PenaSocialAuthURL: config.PenaSocialAuthURL,
Logger: logger,
ReturnURL: config.ReturnURL,
})
rateLimiter := limiter.NewRateLimiter(ctx, 6, 1500*time.Millisecond)
amoClient := amoClient.NewAmoClient(amoClient.AmoDeps{
Logger: logger,
RedirectionURL: config.ReturnURL,
IntegrationID: config.IntegrationID,
IntegrationSecret: config.IntegrationSecret,
RateLimiter: rateLimiter,
})
redisRepo := repository.NewRepository(repository.Deps{
RedisClient: redisClient,
Logger: logger,
})
svc := service.NewService(service.Deps{
Repository: amoRepo,
Logger: logger,
SocialAuthClient: socialAithClient,
AmoClient: amoClient,
Producer: producer,
})
cntrlDeps := controllers.Deps{
Service: svc,
Logger: logger,
Verify: tools.NewVerify(config.IntegrationSecret, config.IntegrationID),
RedirectURL: config.RedirectURL,
}
controller := controllers.NewController(cntrlDeps)
webhookController := controllers.NewWebhookController(cntrlDeps)
workerMethods := workers_methods.NewWorkersMethods(workers_methods.Deps{
Repo: amoRepo,
AmoClient: amoClient,
Logger: logger,
})
dataUpdater := data_updater.NewDataUpdaterWC(data_updater.Deps{
Logger: logger,
Producer: producer,
})
queUpdater := queueUpdater.NewQueueUpdater(queueUpdater.Deps{
Logger: logger,
KafkaClient: kafka,
Methods: workerMethods,
})
dealsPoster := post_deals_worker.NewPostDealsWC(post_deals_worker.Deps{
AmoRepo: amoRepo,
AmoClient: amoClient,
RedisRepo: redisRepo,
Logger: logger,
})
fieldsPoster := post_fields_worker.NewPostFieldsWC(post_fields_worker.Deps{
AmoRepo: amoRepo,
AmoClient: amoClient,
RedisRepo: redisRepo,
Logger: logger,
})
go dataUpdater.Start(ctx)
go queUpdater.Start(ctx)
fmt.Println("UMBRELLA")
go dealsPoster.Start(ctx)
go fieldsPoster.Start(ctx)
server := http.NewServer(http.ServerConfig{
Controllers: []http.Controller{
controller,
webhookController,
},
})
go func() {
if err := server.Start(config.HTTPHost + ":" + config.HTTPPort); err != nil {
logger.Error("Server startup error", zap.Error(err))
cancel()
}
}()
server.ListRoutes()
shutdownGroup.Add(closer.CloserFunc(server.Shutdown))
shutdownGroup.Add(closer.CloserFunc(amoRepo.Close))
shutdownGroup.Add(closer.CloserFunc(redisRepo.Close))
shutdownGroup.Add(closer.CloserFunc(rateLimiter.Stop))
shutdownGroup.Add(closer.CloserFunc(dataUpdater.Stop))
shutdownGroup.Add(closer.CloserFunc(queUpdater.Stop))
shutdownGroup.Add(closer.CloserFunc(dealsPoster.Stop))
shutdownGroup.Add(closer.CloserFunc(fieldsPoster.Stop))
<-ctx.Done()
timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer timeoutCancel()
if err := shutdownGroup.Call(timeoutCtx); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
logger.Error("Shutdown timed out", zap.Error(err))
} else {
logger.Error("Failed to shutdown services gracefully", zap.Error(err))
}
return err
}
logger.Info("Application has stopped")
return nil
}