Skip to content

Commit

Permalink
Fix issue when kafka advertised address is an IP
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoriano committed Jul 17, 2018
1 parent 068f435 commit e767c6d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 5 deletions.
18 changes: 15 additions & 3 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package kafka

import (
"bytes"
"crypto/tls"
"fmt"
"io"
Expand Down Expand Up @@ -370,7 +369,7 @@ func findMatchingAddress(
}

// get connection 'port'
_, port, err := net.SplitHostPort(addr)
host, port, err := net.SplitHostPort(addr)
if err != nil || port == "" {
port = "9092"
}
Expand All @@ -393,6 +392,19 @@ func findMatchingAddress(
}
}

// try matching ip of configured host with broker list, this would
// match if hosts of advertised addresses are IPs, but configured host
// is a hostname
ips, err := net.LookupIP(host)
if err == nil {
for _, ip := range ips {
addr := net.JoinHostPort(ip.String(), port)
if i, found := indexOf(addr, brokers); found {
return i, true
}
}
}

// try to find broker id by comparing the machines local hostname to
// broker hostnames in metadata
if host, err := os.Hostname(); err == nil {
Expand Down Expand Up @@ -466,7 +478,7 @@ func lookupHosts(ips []net.IP) []string {
func anyIPsMatch(as, bs []net.IP) bool {
for _, a := range as {
for _, b := range bs {
if bytes.Equal(a, b) {
if a.Equal(b) {
return true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func generateKafkaData(t *testing.T, topic string) {
client, err := sarama.NewClient([]string{getTestKafkaHost()}, config)
if err != nil {
t.Errorf("%s", err)
t.FailNow()
}

producer, err := sarama.NewSyncProducerFromClient(client)
Expand All @@ -146,10 +147,13 @@ func generateKafkaData(t *testing.T, topic string) {

_, _, err = producer.SendMessage(msg)
if err != nil {
t.Errorf("FAILED to send message: %s\n", err)
t.Errorf("failed to send message: %s\n", err)
}

client.RefreshMetadata(topic)
err = client.RefreshMetadata(topic)
if err != nil {
t.Errorf("failed to refresh metadata for topic '%s': %s\n", topic, err)
}
}

func getConfig(topic string) map[string]interface{} {
Expand Down

0 comments on commit e767c6d

Please sign in to comment.