added req metadata to kafka for fetch topic

This commit is contained in:
pasha1coil 2025-06-09 13:03:03 +03:00
parent 7e4b3ae9e1
commit 3b7c00cc97

@ -12,6 +12,7 @@ import (
"github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7"
"github.com/pioz/faker" "github.com/pioz/faker"
"github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
"log" "log"
@ -229,8 +230,6 @@ func ValidateKafka(brokers []string, topic string) error {
kafkaClient, err := kgo.NewClient( kafkaClient, err := kgo.NewClient(
kgo.SeedBrokers(brokers...), kgo.SeedBrokers(brokers...),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
kgo.DefaultProduceTopic(topic),
kgo.ConsumeTopics(topic),
) )
if err != nil { if err != nil {
return err return err
@ -243,6 +242,25 @@ func ValidateKafka(brokers []string, topic string) error {
return err return err
} }
req := kmsg.NewMetadataRequest()
req.Topics = []kmsg.MetadataRequestTopic{
{Topic: kmsg.StringPtr(topic)},
}
res, err := req.RequestWith(ctx, kafkaClient)
if err != nil {
return fmt.Errorf("metadata request failed: %w", err)
}
if len(res.Topics) == 0 {
return fmt.Errorf("no metadata returned for topic %q", topic)
}
// если == 0 то существует, https://kafka.apache.org/protocol.html#The_Messages_Metadata
if res.Topics[0].ErrorCode != 0 {
return fmt.Errorf("topic %q does not exist (error code %d)", topic, res.Topics[0].ErrorCode)
}
return nil return nil
} }