core/internal/initialize/kafka.go

34 lines
661 B
Go
Raw Normal View History

package initialize
import (
"context"
"github.com/twmb/franz-go/pkg/kgo"
"time"
)
type KafkaDeps struct {
KafkaBrokers string
KafkaTopic string
KafkaGroup string
}
func KafkaInit(ctx context.Context, deps KafkaDeps) (*kgo.Client, error) {
kafkaClient, err := kgo.NewClient(
kgo.SeedBrokers(deps.KafkaBrokers),
kgo.ConsumerGroup(deps.KafkaGroup),
kgo.DefaultProduceTopic(deps.KafkaTopic),
kgo.ConsumeTopics(deps.KafkaTopic),
kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().UnixMilli())),
)
if err != nil {
return nil, err
}
err = kafkaClient.Ping(ctx)
if err != nil {
return nil, err
}
return kafkaClient, nil
}