notifier/internal/initialize/kafka.go

28 lines
541 B
Go

package initialize
import (
"context"
"github.com/twmb/franz-go/pkg/kgo"
"time"
)
func KafkaConsumerInit(ctx context.Context, config Config) (*kgo.Client, error) {
kafkaClient, err := kgo.NewClient(
kgo.SeedBrokers(config.KafkaBrokers),
kgo.ConsumerGroup(config.KafkaGroup),
kgo.ConsumeTopics(config.KafkaTopic),
kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().UnixMilli())),
)
if err != nil {
return nil, err
}
err = kafkaClient.Ping(ctx)
if err != nil {
return nil, err
}
return kafkaClient, nil
}