Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumergoup additions #10

Merged
merged 10 commits into from
Aug 16, 2015
49 changes: 36 additions & 13 deletions consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ type ConsumergroupInstance struct {
ID string
}

// ConsumergroupList implements the sortable interface on top of a consumer group list
type ConsumergroupList []*Consumergroup

// ConsumergroupInstanceList implements the sortable interface on top of a consumer instance list
type ConsumergroupInstanceList []*ConsumergroupInstance

type Registration struct {
Expand Down Expand Up @@ -122,7 +125,7 @@ func (cg *Consumergroup) Instances() (ConsumergroupInstanceList, error) {

// WatchInstances returns a ConsumergroupInstanceList, and a channel that will be closed
// as soon the instance list changes.
func (cg *Consumergroup) WatchInstances() (ConsumergroupInstanceList, <-chan struct{}, error) {
func (cg *Consumergroup) WatchInstances() (ConsumergroupInstanceList, <-chan zk.Event, error) {
node := fmt.Sprintf("%s/consumers/%s/ids", cg.kz.conf.Chroot, cg.Name)
if exists, err := cg.kz.exists(node); err != nil {
return nil, nil, err
Expand All @@ -142,13 +145,7 @@ func (cg *Consumergroup) WatchInstances() (ConsumergroupInstanceList, <-chan str
result = append(result, cg.Instance(cgi))
}

channel := make(chan struct{})
go func() {
<-c
close(channel)
}()

return result, channel, nil
return result, c, nil
}

// NewInstance instantiates a new ConsumergroupInstance inside this consumer group,
Expand Down Expand Up @@ -184,6 +181,26 @@ func (cg *Consumergroup) PartitionOwner(topic string, partition int32) (*Consume
}
}

// WatchPartitionOwner retrieves what instance is currently owning the partition, and sets a
// Zookeeper watch to be notified of changes. If the partition currently does not have an owner,
// the function returns nil for every return value. In this case is should be safe to claim
// the partition for an instance.
func (cg *Consumergroup) WatchPartitionOwner(topic string, partition int32) (*ConsumergroupInstance, <-chan zk.Event, error) {
node := fmt.Sprintf("%s/consumers/%s/owners/%s/%d", cg.kz.conf.Chroot, cg.Name, topic, partition)
instanceID, _, changed, err := cg.kz.conn.GetW(node)

switch err {
case nil:
return &ConsumergroupInstance{cg: cg, ID: string(instanceID)}, changed, nil

case zk.ErrNoNode:
return nil, nil, nil

default:
return nil, nil, err
}
}

// Registered checks whether the consumergroup instance is registered in Zookeeper.
func (cgi *ConsumergroupInstance) Registered() (bool, error) {
node := fmt.Sprintf("%s/consumers/%s/ids/%s", cgi.cg.kz.conf.Chroot, cgi.cg.Name, cgi.ID)
Expand All @@ -205,18 +222,26 @@ func (cgi *ConsumergroupInstance) Registration() (*Registration, error) {
return reg, nil
}

// Register registers the consumergroup instance in Zookeeper.
func (cgi *ConsumergroupInstance) Register(topics []string) error {
// RegisterSubscription registers the consumer instance in Zookeeper, with its subscription.
func (cgi *ConsumergroupInstance) RegisterWithSubscription(subscriptionJSON []byte) error {
if exists, err := cgi.Registered(); err != nil {
return err
} else if exists {
return ErrInstanceAlreadyRegistered
}

// Create an ephemeral node for the the consumergroup instance.
node := fmt.Sprintf("%s/consumers/%s/ids/%s", cgi.cg.kz.conf.Chroot, cgi.cg.Name, cgi.ID)
return cgi.cg.kz.create(node, subscriptionJSON, true)
}

// Register registers the consumergroup instance in Zookeeper.
func (cgi *ConsumergroupInstance) Register(topics []string) error {
subscription := make(map[string]int)
for _, topic := range topics {
subscription[topic] = 1
}

data, err := json.Marshal(&Registration{
Pattern: RegPatternStatic,
Subscription: subscription,
Expand All @@ -227,9 +252,7 @@ func (cgi *ConsumergroupInstance) Register(topics []string) error {
return err
}

// Create an ephemeral node for the the consumergroup instance.
node := fmt.Sprintf("%s/consumers/%s/ids/%s", cgi.cg.kz.conf.Chroot, cgi.cg.Name, cgi.ID)
return cgi.cg.kz.create(node, data, true)
return cgi.RegisterWithSubscription(data)
}

// Deregister removes the registration of the instance from zookeeper.
Expand Down
74 changes: 74 additions & 0 deletions functional_consumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,80 @@ func TestConsumergroupInstanceClaimPartitionSame(t *testing.T) {
}
}

func TestConsumergroupInstanceWatchPartitionClaim(t *testing.T) {
kz, err := NewKazoo(zookeeperPeers, nil)
if err != nil {
t.Fatal(err)
}
defer assertSuccessfulClose(t, kz)

cg := kz.Consumergroup("test.kazoo.TestConsumergroupInstanceWatchPartitionClaim")
if err := cg.Create(); err != nil {
t.Fatal(err)
}
defer func() {
if err := cg.Delete(); err != nil {
t.Error(err)
}
}()

instance1 := cg.NewInstance()
if err := instance1.Register([]string{"test.4"}); err != nil {
t.Fatal(err)
}

// Assert the partition isn't claimed
instance, change, err := cg.WatchPartitionOwner("test.4", 0)
if err != nil {
t.Fatal(err)
}
if instance != nil {
t.Fatal("An unclaimed partition should not return an instance")
}
if change != nil {
t.Fatal("An unclaimed partition should not return a watch")
}

// Now claim the partition
if err := instance1.ClaimPartition("test.4", 0); err != nil {
t.Fatal(err)
}

// This time, we should get an insance back
instance, change, err = cg.WatchPartitionOwner("test.4", 0)
if err != nil {
t.Fatal(err)
}

if instance.ID != instance1.ID {
t.Error("Our instance should have claimed the partition")
}

go func() {
time.Sleep(100 * time.Millisecond)
if err := instance1.ReleasePartition("test.4", 0); err != nil {
t.Fatal(err)
}
}()

// Wait for the zookeeper watch to trigger
<-change

// Ensure the partition is no longer claimed
instance, err = cg.PartitionOwner("test.4", 0)
if err != nil {
t.Fatal(err)
}
if instance != nil {
t.Error("The partition should have been release by now")
}

// Cleanup
if err := instance1.Deregister(); err != nil {
t.Error(err)
}
}

func TestConsumergroupOffsets(t *testing.T) {
kz, err := NewKazoo(zookeeperPeers, nil)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions kazoo.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ func ParseConnectionString(zookeeper string) (nodes []string, chroot string) {
return
}

// BuildConnectionString builds a Zookeeper connection string for a list of nodes.
// Returns a string like "zk1:2181,zk2:2181,zk3:2181"
func BuildConnectionString(nodes []string) string {
return strings.Join(nodes, ",")
}

// ConnectionStringWithChroot builds a Zookeeper connection string for a list
// of nodes and a chroot. The chroot should start with "/".
// Returns a string like "zk1:2181,zk2:2181,zk3:2181/chroot"
func BuildConnectionStringWithChroot(nodes []string, chroot string) string {
return fmt.Sprintf("%s%s", strings.Join(nodes, ","), chroot)
}

// Kazoo interacts with the Kafka metadata in Zookeeper
type Kazoo struct {
conn *zk.Conn
Expand Down Expand Up @@ -146,6 +159,7 @@ func (kz *Kazoo) Controller() (int32, error) {
return controllerNode.BrokerID, nil
}

// Close closes the connection with the Zookeeper cluster
func (kz *Kazoo) Close() error {
kz.conn.Close()
return nil
Expand Down
12 changes: 12 additions & 0 deletions kazoo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@ import (
"testing"
)

func TestBuildConnectionString(t *testing.T) {
nodes := []string{"zk1:2181", "zk2:2181", "zk3:2181"}

if str := BuildConnectionString(nodes); str != "zk1:2181,zk2:2181,zk3:2181" {
t.Errorf("The connection string was not built correctly: %s", str)
}

if str := BuildConnectionStringWithChroot(nodes, "/chroot"); str != "zk1:2181,zk2:2181,zk3:2181/chroot" {
t.Errorf("The connection string was not built correctly: %s", str)
}
}

func TestParseConnectionString(t *testing.T) {
var (
nodes []string
Expand Down
71 changes: 68 additions & 3 deletions topic_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/json"
"fmt"
"strconv"

"github.com/samuel/go-zookeeper/zk"
)

// Topic interacts with Kafka's topic metadata in Zookeeper.
Expand All @@ -12,6 +14,7 @@ type Topic struct {
kz *Kazoo
}

// TopicList is a type that implements the sortable interface for a list of Topic instances.
type TopicList []*Topic

// Partition interacts with Kafka's partition metadata in Zookeeper.
Expand All @@ -21,9 +24,10 @@ type Partition struct {
Replicas []int32
}

// PartitionList is a type that implements the sortable interface for a list of Partition instances
type PartitionList []*Partition

// Topics returns a map of all registered Kafka topics.
// Topics returns a list of all registered Kafka topics.
func (kz *Kazoo) Topics() (TopicList, error) {
root := fmt.Sprintf("%s/brokers/topics", kz.conf.Chroot)
children, _, err := kz.conn.Children(root)
Expand All @@ -38,19 +42,58 @@ func (kz *Kazoo) Topics() (TopicList, error) {
return result, nil
}

// WatchTopics returns a list of all registered Kafka topics, and
// watches that list for changes.
func (kz *Kazoo) WatchTopics() (TopicList, <-chan zk.Event, error) {
root := fmt.Sprintf("%s/brokers/topics", kz.conf.Chroot)
children, _, c, err := kz.conn.ChildrenW(root)
if err != nil {
return nil, nil, err
}

result := make(TopicList, 0, len(children))
for _, name := range children {
result = append(result, kz.Topic(name))
}
return result, c, nil
}

// Topic returns a Topic instance for a given topic name
func (kz *Kazoo) Topic(topic string) *Topic {
return &Topic{Name: topic, kz: kz}
}

// Partitions returns a map of all partitions for the topic.
// Exists returns true if the topic exists on the Kafka cluster.
func (t *Topic) Exists() (bool, error) {
return t.kz.exists(fmt.Sprintf("%s/brokers/topics/%s", t.kz.conf.Chroot, t.Name))
}

// Partitions returns a list of all partitions for the topic.
func (t *Topic) Partitions() (PartitionList, error) {
node := fmt.Sprintf("%s/brokers/topics/%s", t.kz.conf.Chroot, t.Name)
value, _, err := t.kz.conn.Get(node)
if err != nil {
return nil, err
}

return t.parsePartitions(value)
}

// WatchPartitions returns a list of all partitions for the topic, and watches the topic for changes.
func (t *Topic) WatchPartitions() (PartitionList, <-chan zk.Event, error) {
node := fmt.Sprintf("%s/brokers/topics/%s", t.kz.conf.Chroot, t.Name)
value, _, c, err := t.kz.conn.GetW(node)
if err != nil {
return nil, nil, err
}

list, err := t.parsePartitions(value)
return list, c, err
}

// parsePartitions pases the JSON representation of the partitions
// that is stored as data on the topic node in Zookeeper.
func (t *Topic) parsePartitions(value []byte) (PartitionList, error) {
type topicMetadata struct {
Partitions map[string][]int32 `json:"partitions"`
}
Expand Down Expand Up @@ -100,6 +143,25 @@ func (t *Topic) Config() (map[string]string, error) {
return topicConfig.ConfigMap, nil
}

// Topic returns the Topic of this partition.
func (p *Partition) Topic() *Topic {
return p.topic
}

// Key returns a unique identifier for the partition, using the form "topic/partition".
func (p *Partition) Key() string {
return fmt.Sprintf("%s/%d", p.topic.Name, p.ID)
}

// PreferredReplica returns the preferred replica for this partition.
func (p *Partition) PreferredReplica() int32 {
if len(p.Replicas) > 0 {
return p.Replicas[0]
} else {
return -1
}
}

// Leader returns the broker ID of the broker that is currently the leader for the partition.
func (p *Partition) Leader() (int32, error) {
if state, err := p.state(); err != nil {
Expand Down Expand Up @@ -134,11 +196,14 @@ func (p *Partition) UsesPreferredReplica() (bool, error) {
}
}

// partitionState represents the partition state as it is stored as JSON
// in Zookeeper on the partition's state node.
type partitionState struct {
Leader int32 `json:"leader"`
ISR []int32 `json:"isr"`
}

// state retrieves and parses the partition State
func (p *Partition) state() (partitionState, error) {
var state partitionState
node := fmt.Sprintf("%s/brokers/topics/%s/partitions/%d/state", p.topic.kz.conf.Chroot, p.topic.Name, p.ID)
Expand Down Expand Up @@ -182,7 +247,7 @@ func (pl PartitionList) Len() int {
}

func (pl PartitionList) Less(i, j int) bool {
return pl[i].ID < pl[j].ID
return pl[i].topic.Name < pl[j].topic.Name || (pl[i].topic.Name == pl[j].topic.Name && pl[i].ID < pl[j].ID)
}

func (pl PartitionList) Swap(i, j int) {
Expand Down
Loading