From e767c6d28c96094850d81ce554db7858b5c91c89 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Tue, 17 Jul 2018 17:35:49 +0200 Subject: [PATCH] Fix issue when kafka advertised address is an IP --- metricbeat/module/kafka/broker.go | 18 +++++++++++++++--- .../partition/partition_integration_test.go | 8 ++++++-- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index 736e858ae285..dd5a15b47a88 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -18,7 +18,6 @@ package kafka import ( - "bytes" "crypto/tls" "fmt" "io" @@ -370,7 +369,7 @@ func findMatchingAddress( } // get connection 'port' - _, port, err := net.SplitHostPort(addr) + host, port, err := net.SplitHostPort(addr) if err != nil || port == "" { port = "9092" } @@ -393,6 +392,19 @@ func findMatchingAddress( } } + // try matching ip of configured host with broker list, this would + // match if hosts of advertised addresses are IPs, but configured host + // is a hostname + ips, err := net.LookupIP(host) + if err == nil { + for _, ip := range ips { + addr := net.JoinHostPort(ip.String(), port) + if i, found := indexOf(addr, brokers); found { + return i, true + } + } + } + // try to find broker id by comparing the machines local hostname to // broker hostnames in metadata if host, err := os.Hostname(); err == nil { @@ -466,7 +478,7 @@ func lookupHosts(ips []net.IP) []string { func anyIPsMatch(as, bs []net.IP) bool { for _, a := range as { for _, b := range bs { - if bytes.Equal(a, b) { + if a.Equal(b) { return true } } diff --git a/metricbeat/module/kafka/partition/partition_integration_test.go b/metricbeat/module/kafka/partition/partition_integration_test.go index 973b81e6cb11..f96e35a53963 100644 --- a/metricbeat/module/kafka/partition/partition_integration_test.go +++ b/metricbeat/module/kafka/partition/partition_integration_test.go @@ -131,6 +131,7 @@ func generateKafkaData(t *testing.T, topic string) { client, err := sarama.NewClient([]string{getTestKafkaHost()}, config) if err != nil { t.Errorf("%s", err) + t.FailNow() } producer, err := sarama.NewSyncProducerFromClient(client) @@ -146,10 +147,13 @@ func generateKafkaData(t *testing.T, topic string) { _, _, err = producer.SendMessage(msg) if err != nil { - t.Errorf("FAILED to send message: %s\n", err) + t.Errorf("failed to send message: %s\n", err) } - client.RefreshMetadata(topic) + err = client.RefreshMetadata(topic) + if err != nil { + t.Errorf("failed to refresh metadata for topic '%s': %s\n", topic, err) + } } func getConfig(topic string) map[string]interface{} {