diff --git a/go.mod b/go.mod index 9446175..d09f8e7 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/minio/minio-go/v7 v7.0.81 github.com/pioz/faker v1.7.3 github.com/stretchr/testify v1.9.0 + github.com/twmb/franz-go v1.18.0 go.mongodb.org/mongo-driver v1.13.1 ) @@ -27,9 +28,11 @@ require ( github.com/mattn/go-runewidth v0.0.15 // indirect github.com/minio/md5-simd v1.1.2 // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/rs/xid v1.6.0 // indirect + github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.50.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect diff --git a/go.sum b/go.sum index 47dc901..a8cfb8d 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,8 @@ github.com/minio/minio-go/v7 v7.0.81 h1:SzhMN0TQ6T/xSBu6Nvw3M5M8voM+Ht8RH3hE8S7z github.com/minio/minio-go/v7 v7.0.81/go.mod h1:84gmIilaX4zcvAWWzJ5Z1WI5axN+hAbM5w25xf8xvC0= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pioz/faker v1.7.3 h1:Tez8Emuq0UN+/d6mo3a9m/9ZZ/zdfJk0c5RtRatrceM= github.com/pioz/faker v1.7.3/go.mod h1:xSpay5w/oz1a6+ww0M3vfpe40pSIykeUPeWEc3TvVlc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -52,6 +54,10 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw= +github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I= +github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M= +github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.50.0 h1:H7fweIlBm0rXLs2q0XbalvJ6r0CUPFWK3/bB4N13e9M= diff --git a/validate/common.go b/validate/common.go index f4abd33..8e509d0 100644 --- a/validate/common.go +++ b/validate/common.go @@ -10,6 +10,7 @@ import ( "github.com/gofiber/fiber/v2" "github.com/minio/minio-go/v7" "github.com/pioz/faker" + "github.com/twmb/franz-go/pkg/kgo" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "log" @@ -207,3 +208,39 @@ func ValidateSmtp(apiKey string) error { } return nil } + +func ValidateKafka(brokers []string, topic string) error { + if len(brokers) == 0 { + return fmt.Errorf("kafka brokers is empty") + } + if topic == "" { + return fmt.Errorf("kafka topic is empty") + } + + for _, addr := range brokers { + if addr == "" { + return fmt.Errorf("empty kafka broker") + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + kafkaClient, err := kgo.NewClient( + kgo.SeedBrokers(brokers...), + kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), + kgo.DefaultProduceTopic(topic), + kgo.ConsumeTopics(topic), + ) + if err != nil { + return err + } + + defer kafkaClient.Close() + + err = kafkaClient.Ping(ctx) + if err != nil { + return err + } + + return nil +} diff --git a/validate/validate_test.go b/validate/validate_test.go index f9b2468..6737a75 100644 --- a/validate/validate_test.go +++ b/validate/validate_test.go @@ -23,3 +23,8 @@ func TestValidateSmtp(t *testing.T) { err := ValidateSmtp("P0YsjUB137upXrr1NiJefHmXVKW1hmBWlpev") assert.NoError(t, err) } + +func TestValidateKafka(t *testing.T) { + err := ValidateKafka([]string{"localhost:9092"}, "test-topic") + assert.NoError(t, err) +}