Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add CreatePartitionsRequest/Response #985

Merged
merged 2 commits into from
Dec 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions create_partitions_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package sarama

import "time"

type CreatePartitionsRequest struct {
TopicPartitions map[string]*TopicPartition
Timeout time.Duration
ValidateOnly bool
}

func (c *CreatePartitionsRequest) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(c.TopicPartitions)); err != nil {
return err
}

for topic, partition := range c.TopicPartitions {
if err := pe.putString(topic); err != nil {
return err
}
if err := partition.encode(pe); err != nil {
return err
}
}

pe.putInt32(int32(c.Timeout / time.Millisecond))

pe.putBool(c.ValidateOnly)

return nil
}

func (c *CreatePartitionsRequest) decode(pd packetDecoder, version int16) (err error) {
n, err := pd.getArrayLength()
if err != nil {
return err
}
c.TopicPartitions = make(map[string]*TopicPartition, n)
for i := 0; i < n; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
c.TopicPartitions[topic] = new(TopicPartition)
if err := c.TopicPartitions[topic].decode(pd, version); err != nil {
return err
}
}

timeout, err := pd.getInt32()
if err != nil {
return err
}
c.Timeout = time.Duration(timeout) * time.Millisecond

if c.ValidateOnly, err = pd.getBool(); err != nil {
return err
}

return nil
}

func (r *CreatePartitionsRequest) key() int16 {
return 37
}

func (r *CreatePartitionsRequest) version() int16 {
return 0
}

func (r *CreatePartitionsRequest) requiredVersion() KafkaVersion {
return V1_0_0_0
}

type TopicPartition struct {
Count int32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm looking at https://kafka.apache.org/protocol.html#The_Messages_CreateTopics and this doesn't seem quite right. Aren't you missing the number of replication factor?

Copy link
Author

@buyology buyology Dec 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. This is actually https://kafka.apache.org/protocol#The_Messages_CreatePartitions, so it's only about adding about new partitions to an already existing topic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦‍♂️ alright then

Assignment [][]int32
}

func (t *TopicPartition) encode(pe packetEncoder) error {
pe.putInt32(t.Count)

if len(t.Assignment) == 0 {
pe.putInt32(-1)
return nil
}

if err := pe.putArrayLength(len(t.Assignment)); err != nil {
return err
}

for _, assign := range t.Assignment {
if err := pe.putInt32Array(assign); err != nil {
return err
}
}

return nil
}

func (t *TopicPartition) decode(pd packetDecoder, version int16) (err error) {
if t.Count, err = pd.getInt32(); err != nil {
return err
}

n, err := pd.getInt32()
if err != nil {
return err
}
if n <= 0 {
return nil
}
t.Assignment = make([][]int32, n)

for i := 0; i < int(n); i++ {
if t.Assignment[i], err = pd.getInt32Array(); err != nil {
return err
}
}

return nil
}
50 changes: 50 additions & 0 deletions create_partitions_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package sarama

import (
"testing"
"time"
)

var (
createPartitionRequestNoAssignment = []byte{
0, 0, 0, 1, // one topic
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0, 0, 3, // 3 partitions
255, 255, 255, 255, // no assignments
0, 0, 0, 100, // timeout
0, // validate only = false
}

createPartitionRequestAssignment = []byte{
0, 0, 0, 1,
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0, 0, 3, // 3 partitions
0, 0, 0, 2,
0, 0, 0, 2,
0, 0, 0, 2, 0, 0, 0, 3,
0, 0, 0, 2,
0, 0, 0, 3, 0, 0, 0, 1,
0, 0, 0, 100,
1, // validate only = true
}
)

func TestCreatePartitionsRequest(t *testing.T) {
req := &CreatePartitionsRequest{
TopicPartitions: map[string]*TopicPartition{
"topic": &TopicPartition{
Count: 3,
},
},
Timeout: 100 * time.Millisecond,
}

buf := testRequestEncode(t, "no assignment", req, createPartitionRequestNoAssignment)
testRequestDecode(t, "no assignment", req, buf)

req.ValidateOnly = true
req.TopicPartitions["topic"].Assignment = [][]int32{{2, 3}, {3, 1}}

buf = testRequestEncode(t, "assignment", req, createPartitionRequestAssignment)
testRequestDecode(t, "assignment", req, buf)
}
94 changes: 94 additions & 0 deletions create_partitions_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package sarama

import "time"

type CreatePartitionsResponse struct {
ThrottleTime time.Duration
TopicPartitionErrors map[string]*TopicPartitionError
}

func (c *CreatePartitionsResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(c.ThrottleTime / time.Millisecond))
if err := pe.putArrayLength(len(c.TopicPartitionErrors)); err != nil {
return err
}

for topic, partitionError := range c.TopicPartitionErrors {
if err := pe.putString(topic); err != nil {
return err
}
if err := partitionError.encode(pe); err != nil {
return err
}
}

return nil
}

func (c *CreatePartitionsResponse) decode(pd packetDecoder, version int16) (err error) {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond

n, err := pd.getArrayLength()
if err != nil {
return err
}

c.TopicPartitionErrors = make(map[string]*TopicPartitionError, n)
for i := 0; i < n; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
c.TopicPartitionErrors[topic] = new(TopicPartitionError)
if err := c.TopicPartitionErrors[topic].decode(pd, version); err != nil {
return err
}
}

return nil
}

func (r *CreatePartitionsResponse) key() int16 {
return 37
}

func (r *CreatePartitionsResponse) version() int16 {
return 0
}

func (r *CreatePartitionsResponse) requiredVersion() KafkaVersion {
return V1_0_0_0
}

type TopicPartitionError struct {
Err KError
ErrMsg *string
}

func (t *TopicPartitionError) encode(pe packetEncoder) error {
pe.putInt16(int16(t.Err))

if err := pe.putNullableString(t.ErrMsg); err != nil {
return err
}

return nil
}

func (t *TopicPartitionError) decode(pd packetDecoder, version int16) (err error) {
kerr, err := pd.getInt16()
if err != nil {
return err
}
t.Err = KError(kerr)

if t.ErrMsg, err = pd.getNullableString(); err != nil {
return err
}

return nil
}
52 changes: 52 additions & 0 deletions create_partitions_response_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package sarama

import (
"reflect"
"testing"
"time"
)

var (
createPartitionResponseSuccess = []byte{
0, 0, 0, 100, // throttleTimeMs
0, 0, 0, 1,
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0, // no error
255, 255, // no error message
}

createPartitionResponseFail = []byte{
0, 0, 0, 100, // throttleTimeMs
0, 0, 0, 1,
0, 5, 't', 'o', 'p', 'i', 'c',
0, 37, // partition error
0, 5, 'e', 'r', 'r', 'o', 'r',
}
)

func TestCreatePartitionsResponse(t *testing.T) {
resp := &CreatePartitionsResponse{
ThrottleTime: 100 * time.Millisecond,
TopicPartitionErrors: map[string]*TopicPartitionError{
"topic": &TopicPartitionError{},
},
}

testResponse(t, "success", resp, createPartitionResponseSuccess)
decodedresp := new(CreatePartitionsResponse)
testVersionDecodable(t, "success", decodedresp, createPartitionResponseSuccess, 0)
if !reflect.DeepEqual(decodedresp, resp) {
t.Errorf("Decoding error: expected %v but got %v", decodedresp, resp)
}

errMsg := "error"
resp.TopicPartitionErrors["topic"].Err = ErrInvalidPartitions
resp.TopicPartitionErrors["topic"].ErrMsg = &errMsg

testResponse(t, "with errors", resp, createPartitionResponseFail)
decodedresp = new(CreatePartitionsResponse)
testVersionDecodable(t, "with errors", decodedresp, createPartitionResponseFail, 0)
if !reflect.DeepEqual(decodedresp, resp) {
t.Errorf("Decoding error: expected %v but got %v", decodedresp, resp)
}
}
1 change: 1 addition & 0 deletions packet_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type packetDecoder interface {
getInt64() (int64, error)
getVarint() (int64, error)
getArrayLength() (int, error)
getBool() (bool, error)

// Collections
getBytes() ([]byte, error)
Expand Down
1 change: 1 addition & 0 deletions packet_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type packetEncoder interface {
putInt64(in int64)
putVarint(in int64)
putArrayLength(in int) error
putBool(in bool)

// Collections
putBytes(in []byte) error
Expand Down
4 changes: 4 additions & 0 deletions prep_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (pe *prepEncoder) putArrayLength(in int) error {
return nil
}

func (pe *prepEncoder) putBool(in bool) {
pe.length++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be tempted to just delegate this to putInt8 since that's the effective implementation elsewhere

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the above is mostly for symmetry :) bool is quite common in the new request/responses + just thinking it's nice to have getBool() sanity check for valid values. what do you think. is it ok to leave in?

}

// arrays

func (pe *prepEncoder) putBytes(in []byte) error {
Expand Down
Loading