generated from PenaSide/GolangTemplate
feat: env kafka
This commit is contained in:
parent
c71e158b9c
commit
5409141801
28
.env.test
28
.env.test
@ -1,25 +1,25 @@
|
||||
# HTTP settings
|
||||
HTTP_HOST=0.0.0.0
|
||||
HTTP_PORT=8080
|
||||
HTTP_PORT=8082
|
||||
|
||||
# MONGO settings
|
||||
MONGO_HOST=localhost
|
||||
GRPC_HOST=0.0.0.0
|
||||
GRPC_PORT=9082
|
||||
GRPC_DOMEN=customer-app:9082
|
||||
|
||||
MONGO_HOST=mongo
|
||||
MONGO_PORT=27017
|
||||
MONGO_USER=test
|
||||
MONGO_PASSWORD=test
|
||||
MONGO_AUTH=admin
|
||||
MONGO_DB_NAME=admin
|
||||
MONGO_AUTH=admin
|
||||
|
||||
# Auth Microservice settings
|
||||
AUTH_MICROSERVICE_USER_URL=http://localhost:8000/user
|
||||
AUTH_MICROSERVICE_USER_URL=http://pena-auth-service:8000/user
|
||||
HUBADMIN_MICROSERVICE_TARIFF_URL=http://hub-admin-service:8010/tariff
|
||||
CURRENCY_MICROSERVICE_TRANSLATE_URL=http://cbrf-service:8020/translate
|
||||
DISCOUNT_MICROSERVICE_GRPC_HOST=discount-service:9040
|
||||
PAYMENT_MICROSERVICE_GRPC_HOST=treasurer-service:9085
|
||||
|
||||
# Hub Admin Microservice settings
|
||||
HUBADMIN_MICROSERVICE_TARIFF_URL=http://localhost:8001/tariff
|
||||
|
||||
# Currency Microservice settings
|
||||
CURRENCY_MICROSERVICE_TRANSLATE_URL=http://localhost:8002/change
|
||||
|
||||
# Admin
|
||||
KAFKA_BROKERS=localhost:8080,localhost:1111
|
||||
KAFKA_TOPIC_TARIFF=tariff
|
||||
|
||||
# JWT settings
|
||||
JWT_ISSUER="pena-auth-service"
|
||||
|
@ -34,6 +34,9 @@ PAYMENT_MICROSERVICE_GRPC_HOST - хост микросервиса payment (GRPC
|
||||
JWT_PUBLIC_KEY - публичный ключ для верификации jwt токена
|
||||
JWT_ISSUER - издатель токена
|
||||
JWT_AUDIENCE - аудитория, которая может верифицировать токен
|
||||
|
||||
KAFKA_BROKERS - массив брокеров (localhost:8888,localhost:1111)
|
||||
KAFKA_TOPIC_TARIFF - название топика для сообщений тарифа
|
||||
```
|
||||
|
||||
## Полезные ссылки:
|
||||
|
@ -1,9 +0,0 @@
|
||||
{
|
||||
"kafka": {
|
||||
"brokers": ["redpanda:9092"],
|
||||
"consumerGroupId": "redpanda-study-group",
|
||||
"tariff": {
|
||||
"topic": "tariff-topic"
|
||||
}
|
||||
}
|
||||
}
|
@ -1,9 +0,0 @@
|
||||
{
|
||||
"kafka": {
|
||||
"brokers": ["redpanda:9092"],
|
||||
"consumerGroupId": "redpanda-study-group",
|
||||
"tariff": {
|
||||
"topic": "tariff-topic"
|
||||
}
|
||||
}
|
||||
}
|
@ -1,26 +1,3 @@
|
||||
# HTTP settings
|
||||
HTTP_HOST=0.0.0.0
|
||||
HTTP_PORT=8080
|
||||
|
||||
# MONGO settings
|
||||
MONGO_HOST=localhost
|
||||
MONGO_PORT=27017
|
||||
MONGO_USER=test
|
||||
MONGO_PASSWORD=test
|
||||
MONGO_AUTH=admin
|
||||
MONGO_DB_NAME=admin
|
||||
|
||||
# Auth Microservice settings
|
||||
AUTH_MICROSERVICE_USER_URL=http://localhost:8000/user
|
||||
|
||||
# Hub Admin Microservice settings
|
||||
HUBADMIN_MICROSERVICE_TARIFF_URL=http://localhost:8001/tariff
|
||||
|
||||
# Currency Microservice settings
|
||||
CURRENCY_MICROSERVICE_TRANSLATE_URL=http://localhost:8002/change
|
||||
|
||||
# Admin
|
||||
|
||||
# JWT settings
|
||||
JWT_ISSUER="pena-auth-service"
|
||||
JWT_AUDIENCE="pena"
|
||||
|
@ -25,6 +25,9 @@ services:
|
||||
- MONGO_DB_NAME=admin
|
||||
- MONGO_AUTH=admin
|
||||
|
||||
- KAFKA_BROKERS=localhost:8080,localhost:1111
|
||||
- KAFKA_TOPIC_TARIFF=tariff
|
||||
|
||||
- JWT_ISSUER=pena-auth-service
|
||||
- JWT_AUDIENCE=pena
|
||||
|
||||
|
@ -21,6 +21,9 @@ services:
|
||||
- MONGO_DB_NAME=customer
|
||||
- MONGO_AUTH=customer
|
||||
|
||||
- KAFKA_BROKERS=localhost:8080,localhost:1111
|
||||
- KAFKA_TOPIC_TARIFF=tariff
|
||||
|
||||
- AUTH_MICROSERVICE_USER_URL=https://admin.pena.digital/user
|
||||
- HUBADMIN_MICROSERVICE_TARIFF_URL=https://admin.pena.digital/strator/tariff
|
||||
- CURRENCY_MICROSERVICE_TRANSLATE_URL=http://10.6.0.11:3131/change
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/twmb/franz-go/pkg/kgo"
|
||||
"go.uber.org/zap"
|
||||
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/initialize"
|
||||
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/interface/swagger"
|
||||
@ -15,6 +16,7 @@ import (
|
||||
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/server"
|
||||
"penahub.gitlab.yandexcloud.net/pena-services/customer/internal/utils"
|
||||
"penahub.gitlab.yandexcloud.net/pena-services/customer/pkg/closer"
|
||||
"penahub.gitlab.yandexcloud.net/pena-services/customer/pkg/kafka"
|
||||
"penahub.gitlab.yandexcloud.net/pena-services/customer/pkg/mongo"
|
||||
)
|
||||
|
||||
@ -30,10 +32,16 @@ func Run(config *models.Config, logger *zap.Logger) (appErr error) {
|
||||
}
|
||||
}()
|
||||
|
||||
closer := closer.New()
|
||||
|
||||
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||
defer cancel()
|
||||
|
||||
closer := closer.New()
|
||||
if err := kafka.Initialize(ctx, config.Service.Kafka.Brokers, []string{
|
||||
config.Service.Kafka.Tariff.Topic,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mongoDB, err := mongo.Connect(ctx, &mongo.ConnectDeps{
|
||||
Configuration: &config.Database,
|
||||
@ -43,6 +51,21 @@ func Run(config *models.Config, logger *zap.Logger) (appErr error) {
|
||||
return fmt.Errorf("failed connection to db: %w", err)
|
||||
}
|
||||
|
||||
kafkaTariffClient, err := kgo.NewClient(
|
||||
kgo.SeedBrokers(config.Service.Kafka.Brokers...),
|
||||
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
|
||||
kgo.DefaultProduceTopic(config.Service.Kafka.Tariff.Topic),
|
||||
kgo.ConsumeTopics(config.Service.Kafka.Tariff.Topic),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
brokers := initialize.NewBrokers(initialize.BrokersDeps{
|
||||
Logger: logger,
|
||||
TariffClient: kafkaTariffClient,
|
||||
})
|
||||
|
||||
clients := initialize.NewClients(initialize.ClientsDeps{
|
||||
Logger: logger,
|
||||
AuthURL: &config.Service.AuthMicroservice.URL,
|
||||
@ -62,6 +85,7 @@ func Run(config *models.Config, logger *zap.Logger) (appErr error) {
|
||||
Repositories: repositories,
|
||||
Clients: clients,
|
||||
ConfigurationGRPC: &config.GRPC,
|
||||
Brokers: brokers,
|
||||
})
|
||||
|
||||
controllers := initialize.NewControllers(initialize.ControllersDeps{
|
||||
@ -99,6 +123,7 @@ func Run(config *models.Config, logger *zap.Logger) (appErr error) {
|
||||
closer.Add(mongoDB.Client().Disconnect)
|
||||
closer.Add(serverHTTP.Stop)
|
||||
closer.Add(serverGRPC.Stop)
|
||||
closer.Add(closer.Wrap(kafkaTariffClient.Close))
|
||||
|
||||
<-ctx.Done()
|
||||
|
||||
|
@ -9,6 +9,8 @@ import (
|
||||
"penahub.gitlab.yandexcloud.net/pena-services/customer/pkg/env"
|
||||
)
|
||||
|
||||
// TODO: обработать возможность читать конфиги ещё по json
|
||||
|
||||
func Configuration(path string) (*models.Config, error) {
|
||||
config, err := env.Parse[models.Config](path)
|
||||
if err != nil {
|
||||
|
@ -36,13 +36,12 @@ type ServiceConfiguration struct {
|
||||
}
|
||||
|
||||
type Kafka struct {
|
||||
Tariff TariffKafka `json:"tariff"`
|
||||
Brokers []string `json:"brokers"`
|
||||
ConsumerGroupID string `json:"consumerGroupId"`
|
||||
Tariff TariffKafka `json:"tariff"`
|
||||
Brokers []string `json:"brokers" env:"KAFKA_BROKERS,required"`
|
||||
}
|
||||
|
||||
type TariffKafka struct {
|
||||
Topic string `json:"topic"`
|
||||
Topic string `json:"topic" env:"KAFKA_TOPIC_TARIFF,required"`
|
||||
}
|
||||
|
||||
type JWTConfiguration struct {
|
||||
|
@ -193,6 +193,8 @@ func (receiver *Service) Pay(ctx context.Context, accessToken string, userID str
|
||||
receiver.logger.Error("failed to insert history on <Pay> of <CartService>", zap.Error(historyErr))
|
||||
}
|
||||
|
||||
// TODO: обработать ошибки при отправке сообщений
|
||||
|
||||
if sendErrors := receiver.tariffBrokerService.SendMany(ctx, account.UserID, tariffs); len(sendErrors) > 0 {
|
||||
for _, err := range sendErrors {
|
||||
receiver.logger.Error("failed to send tariffs to broker on <Pay> of <CartService>", zap.Error(err))
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
)
|
||||
|
||||
type Callback func(ctx context.Context) error
|
||||
type EasyCallback func()
|
||||
|
||||
type Closer struct {
|
||||
mutex sync.Mutex
|
||||
@ -48,3 +49,15 @@ func (receiver *Closer) Close(ctx context.Context) error {
|
||||
return fmt.Errorf("shutdown cancelled: %v", ctx.Err())
|
||||
}
|
||||
}
|
||||
|
||||
func (receiver *Closer) Wrap(callback EasyCallback) Callback {
|
||||
return Wrap(callback)
|
||||
}
|
||||
|
||||
func Wrap(callback EasyCallback) Callback {
|
||||
return func(_ context.Context) error {
|
||||
callback()
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user