amocrm/internal/initialize/kafka.go

28 lines
602 B
Go
Raw Normal View History

package initialize
import (
"context"
"github.com/twmb/franz-go/pkg/kgo"
"time"
)
func KafkaConsumerInit(ctx context.Context, config Config) (*kgo.Client, error) {
kafkaClient, err := kgo.NewClient(
kgo.SeedBrokers(config.KafkaBrokers),
2024-04-26 16:37:56 +00:00
kgo.ConsumerGroup(config.KafkaGroup),
2025-02-25 10:04:58 +00:00
kgo.DefaultProduceTopic(config.KafkaTopicQueueAmo),
kgo.ConsumeTopics(config.KafkaTopicQueueAmo),
kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().UnixMilli())),
)
if err != nil {
return nil, err
}
err = kafkaClient.Ping(ctx)
if err != nil {
return nil, err
}
return kafkaClient, nil
}