Skip to content

Commit

Permalink
Add support for DeleteGroups
Browse files Browse the repository at this point in the history
Fixed #1095.
  • Loading branch information
RussellLuo committed May 2, 2018
1 parent e8552c0 commit 2ca6a9e
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 0 deletions.
11 changes: 11 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,17 @@ func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsRespon

return response, nil
}

func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
response := new(DeleteGroupsResponse)

if err := b.sendAndReceive(request, response); err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
b.lock.Lock()
defer b.lock.Unlock()
Expand Down
13 changes: 13 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,19 @@ var brokerTestTable = []struct {
t.Error("ApiVersions request got no response!")
}
}},

{"DeleteGroupsRequest",
[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
func(t *testing.T, broker *Broker) {
request := DeleteGroupsRequest{}
response, err := broker.DeleteGroups(&request)
if err != nil {
t.Error(err)
}
if response == nil {
t.Error("DeleteGroups request got no response!")
}
}},
}

func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {
Expand Down
30 changes: 30 additions & 0 deletions delete_groups_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package sarama

type DeleteGroupsRequest struct {
Groups []string
}

func (r *DeleteGroupsRequest) encode(pe packetEncoder) error {
return pe.putStringArray(r.Groups)
}

func (r *DeleteGroupsRequest) decode(pd packetDecoder, version int16) (err error) {
r.Groups, err = pd.getStringArray()
return
}

func (r *DeleteGroupsRequest) key() int16 {
return 42
}

func (r *DeleteGroupsRequest) version() int16 {
return 0
}

func (r *DeleteGroupsRequest) requiredVersion() KafkaVersion {
return V0_9_0_0
}

func (r *DeleteGroupsRequest) AddGroup(group string) {
r.Groups = append(r.Groups, group)
}
34 changes: 34 additions & 0 deletions delete_groups_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package sarama

import "testing"

var (
emptyDeleteGroupsRequest = []byte{0, 0, 0, 0}

singleDeleteGroupsRequest = []byte{
0, 0, 0, 1, // 1 group
0, 3, 'f', 'o', 'o', // group name: foo
}

doubleDeleteGroupsRequest = []byte{
0, 0, 0, 2, // 2 groups
0, 3, 'f', 'o', 'o', // group name: foo
0, 3, 'b', 'a', 'r', // group name: foo
}
)

func TestDeleteGroupsRequest(t *testing.T) {
var request *DeleteGroupsRequest

request = new(DeleteGroupsRequest)
testRequest(t, "no groups", request, emptyDeleteGroupsRequest)

request = new(DeleteGroupsRequest)
request.AddGroup("foo")
testRequest(t, "one group", request, singleDeleteGroupsRequest)

request = new(DeleteGroupsRequest)
request.AddGroup("foo")
request.AddGroup("bar")
testRequest(t, "two groups", request, doubleDeleteGroupsRequest)
}
67 changes: 67 additions & 0 deletions delete_groups_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package sarama

type DeleteGroupsResponse struct {
ThrottleTimeMs int32
GroupErrorCodes map[string]KError
}

func (r *DeleteGroupsResponse) encode(pe packetEncoder) error {
pe.putInt32(r.ThrottleTimeMs)

if err := pe.putArrayLength(len(r.GroupErrorCodes)); err != nil {
return err
}
for groupID, errorCode := range r.GroupErrorCodes {
if err := pe.putString(groupID); err != nil {
return err
}
pe.putInt16(int16(errorCode))
}

return nil
}

func (r *DeleteGroupsResponse) decode(pd packetDecoder, version int16) error {
throttleTimeMs, err := pd.getInt32()
if err != nil {
return err
}

r.ThrottleTimeMs = throttleTimeMs

n, err := pd.getArrayLength()
if err != nil {
return err
}
if n == 0 {
return nil
}

r.GroupErrorCodes = make(map[string]KError, n)
for i := 0; i < n; i++ {
groupID, err := pd.getString()
if err != nil {
return err
}
errorCode, err := pd.getInt16()
if err != nil {
return err
}

r.GroupErrorCodes[groupID] = KError(errorCode)
}

return nil
}

func (r *DeleteGroupsResponse) key() int16 {
return 42
}

func (r *DeleteGroupsResponse) version() int16 {
return 0
}

func (r *DeleteGroupsResponse) requiredVersion() KafkaVersion {
return V0_9_0_0
}
57 changes: 57 additions & 0 deletions delete_groups_response_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package sarama

import (
"testing"
)

var (
emptyDeleteGroupsResponse = []byte{
0, 0, 0, 0, // does not violate any quota
0, 0, 0, 0, // no groups
}

errorDeleteGroupsResponse = []byte{
0, 0, 0, 0, // does not violate any quota
0, 0, 0, 1, // 1 group
0, 3, 'f', 'o', 'o', // group name
0, 31, // error ErrClusterAuthorizationFailed
}

noErrorDeleteGroupsResponse = []byte{
0, 0, 0, 0, // does not violate any quota
0, 0, 0, 1, // 1 group
0, 3, 'f', 'o', 'o', // group name
0, 0, // no error
}
)

func TestDeleteGroupsResponse(t *testing.T) {
var response *DeleteGroupsResponse

response = new(DeleteGroupsResponse)
testVersionDecodable(t, "empty", response, emptyDeleteGroupsResponse, 0)
if response.ThrottleTimeMs != 0 {
t.Error("Expected no violation")
}
if len(response.GroupErrorCodes) != 0 {
t.Error("Expected no groups")
}

response = new(DeleteGroupsResponse)
testVersionDecodable(t, "error", response, errorDeleteGroupsResponse, 0)
if response.ThrottleTimeMs != 0 {
t.Error("Expected no violation")
}
if response.GroupErrorCodes["foo"] != ErrClusterAuthorizationFailed {
t.Error("Expected error ErrClusterAuthorizationFailed, found:", response.GroupErrorCodes["foo"])
}

response = new(DeleteGroupsResponse)
testVersionDecodable(t, "no error", response, noErrorDeleteGroupsResponse, 0)
if response.ThrottleTimeMs != 0 {
t.Error("Expected no violation")
}
if response.GroupErrorCodes["foo"] != ErrNoError {
t.Error("Expected error ErrClusterAuthorizationFailed, found:", response.GroupErrorCodes["foo"])
}
}
2 changes: 2 additions & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ func allocateBody(key, version int16) protocolBody {
return &AlterConfigsRequest{}
case 37:
return &CreatePartitionsRequest{}
case 42:
return &DeleteGroupsRequest{}
}
return nil
}

0 comments on commit 2ca6a9e

Please sign in to comment.