Skip to content

Commit

Permalink
Metricbeat: Kafka integration tests with 1.1 (#7616)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoriano authored and ruflin committed Jul 17, 2018
1 parent 904ac3d commit 4946202
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Add Elasticsearch ml_job metricsets. {pull}7196[7196]
- Add support for bearer token files to HTTP helper. {pull}7527[7527]
- Add Elasticsearch index recovery metricset. {pull}7225[7225]
- Run Kafka integration tests on version 1.1.0 {pull}7616[7616]

*Packetbeat*

Expand Down
9 changes: 8 additions & 1 deletion metricbeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,14 @@ services:
build: ./module/jolokia/_meta

kafka:
build: ./module/kafka/_meta
build:
context: ./module/kafka/_meta
dockerfile: Dockerfile.1.1.0

kafka_0_10_2:
build:
context: ./module/kafka/_meta
dockerfile: Dockerfile.0.10.2

kibana:
build: ./module/kibana/_meta
Expand Down
2 changes: 2 additions & 0 deletions metricbeat/docs/modules/kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ This is the Kafka module.

The default metricsets are `consumergroup` and `partition`.

This module is tested with Kafka 0.10.2 and 1.1.0.


[float]
=== Example configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ FROM debian:stretch

ENV KAFKA_HOME /kafka
# The advertised host is kafka. This means it will not work if container is started locally and connected from localhost to it
ENV KAFKA_ADVERTISED_HOST kafka
ENV KAFKA_LOGS_DIR="/kafka-logs"
ENV KAFKA_VERSION 0.10.2.1
ENV _JAVA_OPTIONS "-Djava.net.preferIPv4Stack=true"
ENV TERM=linux

RUN apt-get update && apt-get install -y curl openjdk-8-jre-headless netcat
RUN apt-get update && apt-get install -y curl openjdk-8-jre-headless netcat dnsutils

RUN mkdir -p ${KAFKA_LOGS_DIR} && mkdir -p ${KAFKA_HOME} && curl -s -o $INSTALL_DIR/kafka.tgz \
"http://ftp.wayne.edu/apache/kafka/${KAFKA_VERSION}/kafka_2.11-${KAFKA_VERSION}.tgz" && \
Expand Down
25 changes: 25 additions & 0 deletions metricbeat/module/kafka/_meta/Dockerfile.1.1.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
FROM debian:stretch

ENV KAFKA_HOME /kafka
# The advertised host is kafka. This means it will not work if container is started locally and connected from localhost to it
ENV KAFKA_LOGS_DIR="/kafka-logs"
ENV KAFKA_VERSION 1.1.0
ENV _JAVA_OPTIONS "-Djava.net.preferIPv4Stack=true"
ENV TERM=linux

RUN apt-get update && apt-get install -y curl openjdk-8-jre-headless netcat dnsutils

RUN mkdir -p ${KAFKA_LOGS_DIR} && mkdir -p ${KAFKA_HOME} && curl -s -o $INSTALL_DIR/kafka.tgz \
"http://ftp.wayne.edu/apache/kafka/${KAFKA_VERSION}/kafka_2.11-${KAFKA_VERSION}.tgz" && \
tar xzf ${INSTALL_DIR}/kafka.tgz -C ${KAFKA_HOME} --strip-components 1

ADD run.sh /run.sh
ADD healthcheck.sh /healthcheck.sh

EXPOSE 9092
EXPOSE 2181

# Healthcheck creates an empty topic foo. As soon as a topic is created, it assumes broke is available
HEALTHCHECK --interval=1s --retries=90 CMD /healthcheck.sh

ENTRYPOINT ["/run.sh"]
2 changes: 2 additions & 0 deletions metricbeat/module/kafka/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
This is the Kafka module.

The default metricsets are `consumergroup` and `partition`.

This module is tested with Kafka 0.10.2 and 1.1.0.
1 change: 1 addition & 0 deletions metricbeat/module/kafka/_meta/env
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
KAFKA_0_10_2_HOST=kafka_0_10_2
KAFKA_HOST=kafka
KAFKA_PORT=9092
2 changes: 2 additions & 0 deletions metricbeat/module/kafka/_meta/run.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/bin/bash

KAFKA_ADVERTISED_HOST=$(dig +short $HOSTNAME)

wait_for_port() {
count=20
port=$1
Expand Down
18 changes: 15 additions & 3 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package kafka

import (
"bytes"
"crypto/tls"
"fmt"
"io"
Expand Down Expand Up @@ -370,7 +369,7 @@ func findMatchingAddress(
}

// get connection 'port'
_, port, err := net.SplitHostPort(addr)
host, port, err := net.SplitHostPort(addr)
if err != nil || port == "" {
port = "9092"
}
Expand All @@ -393,6 +392,19 @@ func findMatchingAddress(
}
}

// try matching ip of configured host with broker list, this would
// match if hosts of advertised addresses are IPs, but configured host
// is a hostname
ips, err := net.LookupIP(host)
if err == nil {
for _, ip := range ips {
addr := net.JoinHostPort(ip.String(), port)
if i, found := indexOf(addr, brokers); found {
return i, true
}
}
}

// try to find broker id by comparing the machines local hostname to
// broker hostnames in metadata
if host, err := os.Hostname(); err == nil {
Expand Down Expand Up @@ -466,7 +478,7 @@ func lookupHosts(ips []net.IP) []string {
func anyIPsMatch(as, bs []net.IP) bool {
for _, a := range as {
for _, b := range bs {
if bytes.Equal(a, b) {
if a.Equal(b) {
return true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func generateKafkaData(t *testing.T, topic string) {
client, err := sarama.NewClient([]string{getTestKafkaHost()}, config)
if err != nil {
t.Errorf("%s", err)
t.FailNow()
}

producer, err := sarama.NewSyncProducerFromClient(client)
Expand All @@ -146,10 +147,13 @@ func generateKafkaData(t *testing.T, topic string) {

_, _, err = producer.SendMessage(msg)
if err != nil {
t.Errorf("FAILED to send message: %s\n", err)
t.Errorf("failed to send message: %s\n", err)
}

client.RefreshMetadata(topic)
err = client.RefreshMetadata(topic)
if err != nil {
t.Errorf("failed to refresh metadata for topic '%s': %s\n", topic, err)
}
}

func getConfig(topic string) map[string]interface{} {
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/tests/system/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
kafka-python==1.4.2
kafka-python==1.4.3
elasticsearch==6.2.0
14 changes: 8 additions & 6 deletions metricbeat/tests/system/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from nose.plugins.skip import SkipTest


class Test(metricbeat.BaseTest):

class KafkaTest(metricbeat.BaseTest):
COMPOSE_SERVICES = ['kafka']

@unittest.skipUnless(metricbeat.INTEGRATION_TESTS, "integration test")
Expand Down Expand Up @@ -35,13 +34,16 @@ def test_partition(self):
self.assert_fields_are_documented(evt)

def create_topic(self):

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=self.get_hosts()[
0], retries=20, retry_backoff_ms=500, api_version=("0.10"))
producer = KafkaProducer(bootstrap_servers=self.get_hosts()[0],
retries=20, retry_backoff_ms=500)
producer.send('foobar', b'some_message_bytes')

def get_hosts(self):
return [os.getenv('KAFKA_HOST', 'localhost') + ':' +
return [self.compose_hosts()[0] + ':' +
os.getenv('KAFKA_PORT', '9092')]


class Kafka_0_10_2_Test(KafkaTest):
COMPOSE_SERVICES = ['kafka_0_10_2']

0 comments on commit 4946202

Please sign in to comment.