-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add CreatePartitionsRequest/Response #985
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
package sarama | ||
|
||
import "time" | ||
|
||
type CreatePartitionsRequest struct { | ||
TopicPartitions map[string]*TopicPartition | ||
Timeout time.Duration | ||
ValidateOnly bool | ||
} | ||
|
||
func (c *CreatePartitionsRequest) encode(pe packetEncoder) error { | ||
if err := pe.putArrayLength(len(c.TopicPartitions)); err != nil { | ||
return err | ||
} | ||
|
||
for topic, partition := range c.TopicPartitions { | ||
if err := pe.putString(topic); err != nil { | ||
return err | ||
} | ||
if err := partition.encode(pe); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
pe.putInt32(int32(c.Timeout / time.Millisecond)) | ||
|
||
pe.putBool(c.ValidateOnly) | ||
|
||
return nil | ||
} | ||
|
||
func (c *CreatePartitionsRequest) decode(pd packetDecoder, version int16) (err error) { | ||
n, err := pd.getArrayLength() | ||
if err != nil { | ||
return err | ||
} | ||
c.TopicPartitions = make(map[string]*TopicPartition, n) | ||
for i := 0; i < n; i++ { | ||
topic, err := pd.getString() | ||
if err != nil { | ||
return err | ||
} | ||
c.TopicPartitions[topic] = new(TopicPartition) | ||
if err := c.TopicPartitions[topic].decode(pd, version); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
timeout, err := pd.getInt32() | ||
if err != nil { | ||
return err | ||
} | ||
c.Timeout = time.Duration(timeout) * time.Millisecond | ||
|
||
if c.ValidateOnly, err = pd.getBool(); err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (r *CreatePartitionsRequest) key() int16 { | ||
return 37 | ||
} | ||
|
||
func (r *CreatePartitionsRequest) version() int16 { | ||
return 0 | ||
} | ||
|
||
func (r *CreatePartitionsRequest) requiredVersion() KafkaVersion { | ||
return V1_0_0_0 | ||
} | ||
|
||
type TopicPartition struct { | ||
Count int32 | ||
Assignment [][]int32 | ||
} | ||
|
||
func (t *TopicPartition) encode(pe packetEncoder) error { | ||
pe.putInt32(t.Count) | ||
|
||
if len(t.Assignment) == 0 { | ||
pe.putInt32(-1) | ||
return nil | ||
} | ||
|
||
if err := pe.putArrayLength(len(t.Assignment)); err != nil { | ||
return err | ||
} | ||
|
||
for _, assign := range t.Assignment { | ||
if err := pe.putInt32Array(assign); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (t *TopicPartition) decode(pd packetDecoder, version int16) (err error) { | ||
if t.Count, err = pd.getInt32(); err != nil { | ||
return err | ||
} | ||
|
||
n, err := pd.getInt32() | ||
if err != nil { | ||
return err | ||
} | ||
if n <= 0 { | ||
return nil | ||
} | ||
t.Assignment = make([][]int32, n) | ||
|
||
for i := 0; i < int(n); i++ { | ||
if t.Assignment[i], err = pd.getInt32Array(); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package sarama | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
) | ||
|
||
var ( | ||
createPartitionRequestNoAssignment = []byte{ | ||
0, 0, 0, 1, // one topic | ||
0, 5, 't', 'o', 'p', 'i', 'c', | ||
0, 0, 0, 3, // 3 partitions | ||
255, 255, 255, 255, // no assignments | ||
0, 0, 0, 100, // timeout | ||
0, // validate only = false | ||
} | ||
|
||
createPartitionRequestAssignment = []byte{ | ||
0, 0, 0, 1, | ||
0, 5, 't', 'o', 'p', 'i', 'c', | ||
0, 0, 0, 3, // 3 partitions | ||
0, 0, 0, 2, | ||
0, 0, 0, 2, | ||
0, 0, 0, 2, 0, 0, 0, 3, | ||
0, 0, 0, 2, | ||
0, 0, 0, 3, 0, 0, 0, 1, | ||
0, 0, 0, 100, | ||
1, // validate only = true | ||
} | ||
) | ||
|
||
func TestCreatePartitionsRequest(t *testing.T) { | ||
req := &CreatePartitionsRequest{ | ||
TopicPartitions: map[string]*TopicPartition{ | ||
"topic": &TopicPartition{ | ||
Count: 3, | ||
}, | ||
}, | ||
Timeout: 100 * time.Millisecond, | ||
} | ||
|
||
buf := testRequestEncode(t, "no assignment", req, createPartitionRequestNoAssignment) | ||
testRequestDecode(t, "no assignment", req, buf) | ||
|
||
req.ValidateOnly = true | ||
req.TopicPartitions["topic"].Assignment = [][]int32{{2, 3}, {3, 1}} | ||
|
||
buf = testRequestEncode(t, "assignment", req, createPartitionRequestAssignment) | ||
testRequestDecode(t, "assignment", req, buf) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
package sarama | ||
|
||
import "time" | ||
|
||
type CreatePartitionsResponse struct { | ||
ThrottleTime time.Duration | ||
TopicPartitionErrors map[string]*TopicPartitionError | ||
} | ||
|
||
func (c *CreatePartitionsResponse) encode(pe packetEncoder) error { | ||
pe.putInt32(int32(c.ThrottleTime / time.Millisecond)) | ||
if err := pe.putArrayLength(len(c.TopicPartitionErrors)); err != nil { | ||
return err | ||
} | ||
|
||
for topic, partitionError := range c.TopicPartitionErrors { | ||
if err := pe.putString(topic); err != nil { | ||
return err | ||
} | ||
if err := partitionError.encode(pe); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (c *CreatePartitionsResponse) decode(pd packetDecoder, version int16) (err error) { | ||
throttleTime, err := pd.getInt32() | ||
if err != nil { | ||
return err | ||
} | ||
c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond | ||
|
||
n, err := pd.getArrayLength() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
c.TopicPartitionErrors = make(map[string]*TopicPartitionError, n) | ||
for i := 0; i < n; i++ { | ||
topic, err := pd.getString() | ||
if err != nil { | ||
return err | ||
} | ||
c.TopicPartitionErrors[topic] = new(TopicPartitionError) | ||
if err := c.TopicPartitionErrors[topic].decode(pd, version); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (r *CreatePartitionsResponse) key() int16 { | ||
return 37 | ||
} | ||
|
||
func (r *CreatePartitionsResponse) version() int16 { | ||
return 0 | ||
} | ||
|
||
func (r *CreatePartitionsResponse) requiredVersion() KafkaVersion { | ||
return V1_0_0_0 | ||
} | ||
|
||
type TopicPartitionError struct { | ||
Err KError | ||
ErrMsg *string | ||
} | ||
|
||
func (t *TopicPartitionError) encode(pe packetEncoder) error { | ||
pe.putInt16(int16(t.Err)) | ||
|
||
if err := pe.putNullableString(t.ErrMsg); err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (t *TopicPartitionError) decode(pd packetDecoder, version int16) (err error) { | ||
kerr, err := pd.getInt16() | ||
if err != nil { | ||
return err | ||
} | ||
t.Err = KError(kerr) | ||
|
||
if t.ErrMsg, err = pd.getNullableString(); err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
package sarama | ||
|
||
import ( | ||
"reflect" | ||
"testing" | ||
"time" | ||
) | ||
|
||
var ( | ||
createPartitionResponseSuccess = []byte{ | ||
0, 0, 0, 100, // throttleTimeMs | ||
0, 0, 0, 1, | ||
0, 5, 't', 'o', 'p', 'i', 'c', | ||
0, 0, // no error | ||
255, 255, // no error message | ||
} | ||
|
||
createPartitionResponseFail = []byte{ | ||
0, 0, 0, 100, // throttleTimeMs | ||
0, 0, 0, 1, | ||
0, 5, 't', 'o', 'p', 'i', 'c', | ||
0, 37, // partition error | ||
0, 5, 'e', 'r', 'r', 'o', 'r', | ||
} | ||
) | ||
|
||
func TestCreatePartitionsResponse(t *testing.T) { | ||
resp := &CreatePartitionsResponse{ | ||
ThrottleTime: 100 * time.Millisecond, | ||
TopicPartitionErrors: map[string]*TopicPartitionError{ | ||
"topic": &TopicPartitionError{}, | ||
}, | ||
} | ||
|
||
testResponse(t, "success", resp, createPartitionResponseSuccess) | ||
decodedresp := new(CreatePartitionsResponse) | ||
testVersionDecodable(t, "success", decodedresp, createPartitionResponseSuccess, 0) | ||
if !reflect.DeepEqual(decodedresp, resp) { | ||
t.Errorf("Decoding error: expected %v but got %v", decodedresp, resp) | ||
} | ||
|
||
errMsg := "error" | ||
resp.TopicPartitionErrors["topic"].Err = ErrInvalidPartitions | ||
resp.TopicPartitionErrors["topic"].ErrMsg = &errMsg | ||
|
||
testResponse(t, "with errors", resp, createPartitionResponseFail) | ||
decodedresp = new(CreatePartitionsResponse) | ||
testVersionDecodable(t, "with errors", decodedresp, createPartitionResponseFail, 0) | ||
if !reflect.DeepEqual(decodedresp, resp) { | ||
t.Errorf("Decoding error: expected %v but got %v", decodedresp, resp) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,10 @@ func (pe *prepEncoder) putArrayLength(in int) error { | |
return nil | ||
} | ||
|
||
func (pe *prepEncoder) putBool(in bool) { | ||
pe.length++ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. might be tempted to just delegate this to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the above is mostly for symmetry :) bool is quite common in the new request/responses + just thinking it's nice to have |
||
} | ||
|
||
// arrays | ||
|
||
func (pe *prepEncoder) putBytes(in []byte) error { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm looking at https://kafka.apache.org/protocol.html#The_Messages_CreateTopics and this doesn't seem quite right. Aren't you missing the number of replication factor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. This is actually https://kafka.apache.org/protocol#The_Messages_CreatePartitions, so it's only about adding about new partitions to an already existing topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦♂️ alright then