Skip to content

Commit

Permalink
Updated readme, fixed compatibility tests
Browse files Browse the repository at this point in the history
- compatibility tests now parse reported json
- adjusted tests for docker for mac
- muted ZooKeeper connection
- reported default status is now error status
  • Loading branch information
andreas-schroeder committed Oct 4, 2016
1 parent 6f632b6 commit 5a306e6
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 33 deletions.
46 changes: 37 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ Health checker for Kafka brokers and clusters that operates by
* inserting a message in a dedicated health check topic and waiting for it to
become available on the consumer side,
* checking whether the broker is in the in-sync replica set for all partitions it replicates,
* checking whether under-repicated partitions exist, and
* checking whether offline partitions exist.
* checking whether under-replicated partitions or out-of-sync replicas exist,
* checking whether offline partitions exist, and
* checking whether the metadata of the cluster and the ZooKeeper metadata are consistent with each other.

## Status
[![Build Status](https://travis-ci.org/andreas-schroeder/kafka-health-check.svg?branch=master)](https://travis-ci.org/andreas-schroeder/kafka-health-check)
Expand Down Expand Up @@ -37,29 +38,54 @@ Broker health can be queried at `/`:

```
$ curl -s localhost:8000/
sync
{"status":"sync"}
```

Return codes and response bodies are:
Return codes and status values are:
* `200` with `sync` for a healthy broker that is fully in sync with all leaders.
* `200` with `imok` for a healthy broker that replays messages of its health
check topic, but is not fully in sync.
* `500` with `nook` for an unhealthy broker that fails to replay messages in its health
check topic within [100 milliseconds](./main.go#L42).


The returned json contains details about replicas the broker is lagging behind:

```
$ curl -s localhost:8000/
{"status":"imok","out-of-sync":[{"topic":"mytopic","partition":0}]}
```

## Cluster Health

Cluster health can be queried at `/cluster`:

```
$ curl -s localhost:8000/cluster
green
{"status":"green"}
```

Return codes and status values are:
* `200` with `green` if all replicas of all partitions of all topics are in sync and metadata is consistent.
* `200` with `yellow` if one or more partitions are under-replicated and metadata is consistent.
* `500` with `red` if one or more partitions are offline or metadata is inconsistent.

The returned json contains details about metadata status and partition replication:

```
$ curl -s localhost:8000/cluster
{"status":"yellow","topics":[
{"topic":"mytopic","Status":"yellow","partitions":[
{"id":2,"status":"yellow","OSR":[3]},{"id":1,"status":"yellow","OSR":[3]}
]}
]}
```

Return codes and response bodies are:
* `200` with `green` if all replicas of all partitions of all topics are in sync.
* `200` with `yellow` if one or more partitions are under-replicated.
* `500` with `red` if one or more partitions are offline.
The fields for additional info and structures are:
* `topics` for topic replication status: `[{"topic":"mytopic","Status":"yellow","partitions":[{"id":2,"status":"yellow","OSR":[3]}]`
In this data, `OSR` means out-of-sync replica, and contains the list of all brokers that are not in the ISR.
* `metadata` for inconsistencies between ZooKeeper and Kafka metadata: `[{"broker":3,"status":"red","problem":"Missing in ZooKeeper"}]`
* `zookeeper` for problems with ZooKeeper connection or data, contains a single string: `"Fetching brokers failed: ..."`

## Supported Kafka Versions

Expand Down Expand Up @@ -96,3 +122,5 @@ Run `make` to build after running `make deps` to restore the dependencies using
* The check will try to create the health check topic only on its first connection after startup. If the topic
disappears later while the check is running, it will not try to re-create its health check topic.
* If the broker health check fails, the cluster health will be set to `red`.
* For each check, the Kafka cluster metadata is fetched from ZooKeeper, i.e. the full data on brokers and topic
partitions with replicas.
6 changes: 3 additions & 3 deletions check/broker_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ const (
)

type BrokerStatus struct {
Status string `json:"status"`
UnderReplicated []ReplicationStatus `json:"under-replicated"`
Status string `json:"status"`
OutOfSync []ReplicationStatus `json:"out-of-sync,omitempty"`
}

type ReplicationStatus struct {
Expand Down Expand Up @@ -83,7 +83,7 @@ func (check *HealthCheck) brokerInSync(brokerStatus *BrokerStatus) bool {
if contains(partition.Replicas, brokerID) && !contains(partition.Isrs, brokerID) {
inSync = false
status := ReplicationStatus{Topic: topic.Name, Partition: partition.ID}
brokerStatus.UnderReplicated = append(brokerStatus.UnderReplicated, status)
brokerStatus.OutOfSync = append(brokerStatus.OutOfSync, status)
}
}
}
Expand Down
22 changes: 10 additions & 12 deletions check/cluster_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
type ClusterStatus struct {
Status string `json:"status"`
Topics []TopicStatus `json:"topics,omitempty"`
Brokers []BrokerMetadata `json:"metadata,omitempty"`
Metadata []BrokerMetadata `json:"metadata,omitempty"`
ZooKeeper string `json:"zookeeper-connection,omitempty"`
}

Expand All @@ -20,10 +20,10 @@ type TopicStatus struct {
}

type PartitionStatus struct {
ID int32 `json:"id"`
Status string `json:"status"`
ZooKeeper string `json:"zookeeper,omitempty"`
OutOfSyncBrokers []int32 `json:"out-of-sync-brokers,omitempty"`
ID int32 `json:"id"`
Status string `json:"status"`
ZooKeeper string `json:"zookeeper,omitempty"`
OutOfSyncReplicas []int32 `json:"OSR,omitempty"`
}

type BrokerMetadata struct {
Expand Down Expand Up @@ -68,14 +68,14 @@ func (check *HealthCheck) checkBrokerMetadata(metadata *proto.MetadataResp, zkBr

for _, broker := range brokersFromMeta {
if !contains(zkBrokers, broker) {
cluster.Brokers = append(cluster.Brokers, BrokerMetadata{broker, red, "missing in ZooKeeper"})
cluster.Metadata = append(cluster.Metadata, BrokerMetadata{broker, red, "Missing in ZooKeeper"})
status = red
}
}

for _, broker := range zkBrokers {
if !contains(brokersFromMeta, broker) {
cluster.Brokers = append(cluster.Brokers, BrokerMetadata{broker, red, "missing in Metadata"})
cluster.Metadata = append(cluster.Metadata, BrokerMetadata{broker, red, "Missing in metadata"})
status = red
}
}
Expand Down Expand Up @@ -110,11 +110,10 @@ func (check *HealthCheck) checkTopics(metadata *proto.MetadataResp, zkTopics []Z
pStatus := checkPartition(partition, zkPartitionMap, &topicStatus)
topicStatus.Status = worstStatus(topicStatus.Status, pStatus)
}
cluster.Topics = append(cluster.Topics, topicStatus)
status = worstStatus(topicStatus.Status, status)

if topicStatus.Status != green {
log.Infof("Reporting topic %s as %s", topicStatus.Topic, topicStatus.Status)
cluster.Topics = append(cluster.Topics, topicStatus)
status = worstStatus(topicStatus.Status, status)
}
}

Expand All @@ -137,7 +136,7 @@ func checkPartition(partition proto.MetadataRespPartition, zkPartitionMap map[in
if len(partition.Isrs) < len(replicas) {
for _, replica := range replicas {
if !contains(partition.Isrs, replica) {
status.OutOfSyncBrokers = append(status.OutOfSyncBrokers, replica)
status.OutOfSyncReplicas = append(status.OutOfSyncReplicas, replica)
}
}
status.Status = yellow // partition is under-replicated.
Expand All @@ -146,7 +145,6 @@ func checkPartition(partition proto.MetadataRespPartition, zkPartitionMap map[in
status.Status = red // partition is offline.
}
if status.Status != green {
log.Infof("Reporting partition %d of topic %s as %s", status.ID, topicStatus.Topic, status.Status)
topicStatus.Partitions = append(topicStatus.Partitions, status)
}

Expand Down
10 changes: 9 additions & 1 deletion check/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,16 @@ type zkConnection struct {
connection *zk.Conn
}

type zkNullLogger struct {
}

func (zkNullLogger) Printf(string, ...interface{}) {}

func (zkConn *zkConnection) Connect(servers []string, sessionTimeout time.Duration) (<-chan zk.Event, error) {
connection, events, err := zk.Connect(servers, sessionTimeout)
loggerOption := func(c *zk.Conn) {
c.SetLogger(zkNullLogger{})
}
connection, events, err := zk.Connect(servers, sessionTimeout, loggerOption)
zkConn.connection = connection
return events, err
}
Expand Down
2 changes: 1 addition & 1 deletion check/status_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (check *HealthCheck) ServeHealth(brokerUpdates <-chan Update, clusterUpdate

// goroutine that encapsulates the current status
go func() {
status := Update{errorStatus, nil}
status := Update{errorStatus, []byte(fmt.Sprintf(`{"status":"%s"}`, errorStatus))}
for {
select {
case update := <-updates:
Expand Down
2 changes: 2 additions & 0 deletions compatibility/docker/kafka/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ VOLUME /kafka/config
# Expose Kafka data volume.
VOLUME /var/lib/kafka

ENV advertised_host=""

# Define start wrapper as entrypoint.
ENTRYPOINT [ "kafka/bin/start" ]

Expand Down
3 changes: 2 additions & 1 deletion compatibility/docker/kafka/files/bin/start
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ start_zookeeper() {

start_kafka() {
docker_container_ip=$(ip -4 addr show scope global dev eth0 | awk '/inet/ { print $2 }' | cut -d / -f 1)
sed -r -i "s/(^|^#)(advertised\.host\.name)=(.*)/\2=${docker_container_ip}/g" config/server.properties
host_name_setting="${advertised_host:-$docker_container_ip}"
sed -r -i "s/(^|^#)(advertised\.host\.name)=(.*)/\2=${host_name_setting}/g" config/server.properties
bin/kafka-server-start.sh config/server.properties
}

Expand Down
57 changes: 51 additions & 6 deletions compatibility/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/hashicorp/go-version"
"github.com/smallfish/simpleyaml"
"encoding/json"
)

type versionSpec struct {
Expand All @@ -33,6 +34,8 @@ func main() {
flag.StringVar(&baseDir, "base-dir", "", "directory containing the Java and Kafka docker build contexts")
flag.Parse()

determineIp()

specs := parseVersionSpecs(baseDir)

successTotal := true
Expand All @@ -55,6 +58,36 @@ func main() {
}
}

var advertisedHost string

func determineIp() {
ifaces, err := net.Interfaces()
if err != nil {
log.Fatal("Error while fetching network interfaces, aborting.", err)
}
for _, i := range ifaces {
addrs, err := i.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip.IsGlobalUnicast() && ip.To4() != nil {
advertisedHost = ip.To4().String()
log.Println("using", advertisedHost, "as IP for Kafka containers.")
return
}
}
}
log.Fatal("Unable to find a local private IP, aborting.")
}

func runTest(tag string, spec versionSpec, healthCheckCommand string) bool {
zkID, kafkaID, hcCmd := startAll(tag, healthCheckCommand)

Expand Down Expand Up @@ -101,8 +134,11 @@ func startAll(tag, healthCheckCommand string) (zkID, kafkaID string, hcCmd *exec
}

log.Print("Starting Kafka...")
err = exec.Command("docker", "run", "-d", "--name", kafkaID, "-p", "9092:9092",
"--link", zkID+":zookeeper", tag).Run()
kCmd := exec.Command("docker", "run", "-d",
"--env", "advertised_host=" + advertisedHost,
"--name", kafkaID, "-p", "9092:9092",
"--link", zkID+":zookeeper", tag)
kCmd.Run()
if err != nil {
log.Fatal("Failed to start Kafka: ", err)
}
Expand Down Expand Up @@ -139,7 +175,7 @@ func waitForZooKeeper() bool {
log.Println("Failed to connect to ZooKeeper:", err)
continue
}
fmt.Fprintf(conn, "ruok")
fmt.Fprint(conn, "ruok")
status, err := bufio.NewReader(conn).ReadString('\n')
if err != nil && err != io.EOF {
log.Println("Failed to read ZooKeeper status:", err)
Expand All @@ -152,6 +188,10 @@ func waitForZooKeeper() bool {
return false
}

type Status struct {
Status string `json:"status"`
}

func waitForResponse(url, expected string) bool {
for retries := 10; retries > 0; retries-- {
if retries < 10 {
Expand All @@ -167,12 +207,17 @@ func waitForResponse(url, expected string) bool {
log.Println("reading response returned error", err, "retrying", retries, "more times...")
continue
}
status := strings.TrimSpace(string(statusBytes))
var status Status
err = json.Unmarshal(statusBytes, &status)
if err != nil {
log.Println("parsing response", string(statusBytes), "returned error:", err, "retrying", retries, "more times...")
continue
}

if status == expected {
if status.Status == expected {
return true
}
log.Println("reported status is", status, "retrying", retries, "more times...")
log.Println("reported status is", status.Status, "retrying", retries, "more times...")
}
return false
}
Expand Down

0 comments on commit 5a306e6

Please sign in to comment.