added validate kafka
This commit is contained in:
parent
b6fd75fe81
commit
ecdff721f8
3
go.mod
3
go.mod
@ -9,6 +9,7 @@ require (
|
||||
github.com/minio/minio-go/v7 v7.0.81
|
||||
github.com/pioz/faker v1.7.3
|
||||
github.com/stretchr/testify v1.9.0
|
||||
github.com/twmb/franz-go v1.18.0
|
||||
go.mongodb.org/mongo-driver v1.13.1
|
||||
)
|
||||
|
||||
@ -27,9 +28,11 @@ require (
|
||||
github.com/mattn/go-runewidth v0.0.15 // indirect
|
||||
github.com/minio/md5-simd v1.1.2 // indirect
|
||||
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.21 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/rivo/uniseg v0.2.0 // indirect
|
||||
github.com/rs/xid v1.6.0 // indirect
|
||||
github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect
|
||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||
github.com/valyala/fasthttp v1.50.0 // indirect
|
||||
github.com/valyala/tcplisten v1.0.0 // indirect
|
||||
|
6
go.sum
6
go.sum
@ -40,6 +40,8 @@ github.com/minio/minio-go/v7 v7.0.81 h1:SzhMN0TQ6T/xSBu6Nvw3M5M8voM+Ht8RH3hE8S7z
|
||||
github.com/minio/minio-go/v7 v7.0.81/go.mod h1:84gmIilaX4zcvAWWzJ5Z1WI5axN+hAbM5w25xf8xvC0=
|
||||
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
|
||||
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
|
||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
||||
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||
github.com/pioz/faker v1.7.3 h1:Tez8Emuq0UN+/d6mo3a9m/9ZZ/zdfJk0c5RtRatrceM=
|
||||
github.com/pioz/faker v1.7.3/go.mod h1:xSpay5w/oz1a6+ww0M3vfpe40pSIykeUPeWEc3TvVlc=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
@ -52,6 +54,10 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
|
||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw=
|
||||
github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I=
|
||||
github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M=
|
||||
github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg=
|
||||
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
|
||||
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
|
||||
github.com/valyala/fasthttp v1.50.0 h1:H7fweIlBm0rXLs2q0XbalvJ6r0CUPFWK3/bB4N13e9M=
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/pioz/faker"
|
||||
"github.com/twmb/franz-go/pkg/kgo"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"log"
|
||||
@ -207,3 +208,39 @@ func ValidateSmtp(apiKey string) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func ValidateKafka(brokers []string, topic string) error {
|
||||
if len(brokers) == 0 {
|
||||
return fmt.Errorf("kafka brokers is empty")
|
||||
}
|
||||
if topic == "" {
|
||||
return fmt.Errorf("kafka topic is empty")
|
||||
}
|
||||
|
||||
for _, addr := range brokers {
|
||||
if addr == "" {
|
||||
return fmt.Errorf("empty kafka broker")
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
kafkaClient, err := kgo.NewClient(
|
||||
kgo.SeedBrokers(brokers...),
|
||||
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
|
||||
kgo.DefaultProduceTopic(topic),
|
||||
kgo.ConsumeTopics(topic),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer kafkaClient.Close()
|
||||
|
||||
err = kafkaClient.Ping(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -23,3 +23,8 @@ func TestValidateSmtp(t *testing.T) {
|
||||
err := ValidateSmtp("P0YsjUB137upXrr1NiJefHmXVKW1hmBWlpev")
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestValidateKafka(t *testing.T) {
|
||||
err := ValidateKafka([]string{"localhost:9092"}, "test-topic")
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user