From 3b7c00cc97bc258306d9e22912bb24163d165c67 Mon Sep 17 00:00:00 2001 From: pasha1coil Date: Mon, 9 Jun 2025 13:03:03 +0300 Subject: [PATCH] added req metadata to kafka for fetch topic --- validate/common.go | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/validate/common.go b/validate/common.go index 88ae218..9735904 100644 --- a/validate/common.go +++ b/validate/common.go @@ -12,6 +12,7 @@ import ( "github.com/minio/minio-go/v7" "github.com/pioz/faker" "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kmsg" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "log" @@ -229,8 +230,6 @@ func ValidateKafka(brokers []string, topic string) error { kafkaClient, err := kgo.NewClient( kgo.SeedBrokers(brokers...), kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), - kgo.DefaultProduceTopic(topic), - kgo.ConsumeTopics(topic), ) if err != nil { return err @@ -243,6 +242,25 @@ func ValidateKafka(brokers []string, topic string) error { return err } + req := kmsg.NewMetadataRequest() + req.Topics = []kmsg.MetadataRequestTopic{ + {Topic: kmsg.StringPtr(topic)}, + } + + res, err := req.RequestWith(ctx, kafkaClient) + if err != nil { + return fmt.Errorf("metadata request failed: %w", err) + } + + if len(res.Topics) == 0 { + return fmt.Errorf("no metadata returned for topic %q", topic) + } + + // если == 0 то существует, https://kafka.apache.org/protocol.html#The_Messages_Metadata + if res.Topics[0].ErrorCode != 0 { + return fmt.Errorf("topic %q does not exist (error code %d)", topic, res.Topics[0].ErrorCode) + } + return nil }