diff --git a/broker.go b/broker.go index 9755a7d7c..d836bee6d 100644 --- a/broker.go +++ b/broker.go @@ -539,6 +539,17 @@ func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsRespon return response, nil } + +func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) { + response := new(DeleteGroupsResponse) + + if err := b.sendAndReceive(request, response); err != nil { + return nil, err + } + + return response, nil +} + func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) { b.lock.Lock() defer b.lock.Unlock() diff --git a/broker_test.go b/broker_test.go index cc73b4440..9263cef8b 100644 --- a/broker_test.go +++ b/broker_test.go @@ -71,7 +71,7 @@ func TestSimpleBrokerCommunication(t *testing.T) { // Set the broker id in order to validate local broker metrics broker.id = 0 conf := NewConfig() - conf.Version = V0_10_0_0 + conf.Version = tt.version err := broker.Open(conf) if err != nil { t.Fatal(err) @@ -97,11 +97,13 @@ func TestSimpleBrokerCommunication(t *testing.T) { // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake var brokerTestTable = []struct { + version KafkaVersion name string response []byte runner func(*testing.T, *Broker) }{ - {"MetadataRequest", + {V0_10_0_0, + "MetadataRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := MetadataRequest{} @@ -114,7 +116,8 @@ var brokerTestTable = []struct { } }}, - {"ConsumerMetadataRequest", + {V0_10_0_0, + "ConsumerMetadataRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ConsumerMetadataRequest{} @@ -127,7 +130,8 @@ var brokerTestTable = []struct { } }}, - {"ProduceRequest (NoResponse)", + {V0_10_0_0, + "ProduceRequest (NoResponse)", []byte{}, func(t *testing.T, broker *Broker) { request := ProduceRequest{} @@ -141,7 +145,8 @@ var brokerTestTable = []struct { } }}, - {"ProduceRequest (WaitForLocal)", + {V0_10_0_0, + "ProduceRequest (WaitForLocal)", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ProduceRequest{} @@ -155,7 +160,8 @@ var brokerTestTable = []struct { } }}, - {"FetchRequest", + {V0_10_0_0, + "FetchRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := FetchRequest{} @@ -168,7 +174,8 @@ var brokerTestTable = []struct { } }}, - {"OffsetFetchRequest", + {V0_10_0_0, + "OffsetFetchRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := OffsetFetchRequest{} @@ -181,7 +188,8 @@ var brokerTestTable = []struct { } }}, - {"OffsetCommitRequest", + {V0_10_0_0, + "OffsetCommitRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := OffsetCommitRequest{} @@ -194,7 +202,8 @@ var brokerTestTable = []struct { } }}, - {"OffsetRequest", + {V0_10_0_0, + "OffsetRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := OffsetRequest{} @@ -207,7 +216,8 @@ var brokerTestTable = []struct { } }}, - {"JoinGroupRequest", + {V0_10_0_0, + "JoinGroupRequest", []byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := JoinGroupRequest{} @@ -220,7 +230,8 @@ var brokerTestTable = []struct { } }}, - {"SyncGroupRequest", + {V0_10_0_0, + "SyncGroupRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := SyncGroupRequest{} @@ -233,7 +244,8 @@ var brokerTestTable = []struct { } }}, - {"LeaveGroupRequest", + {V0_10_0_0, + "LeaveGroupRequest", []byte{0x00, 0x00}, func(t *testing.T, broker *Broker) { request := LeaveGroupRequest{} @@ -246,7 +258,8 @@ var brokerTestTable = []struct { } }}, - {"HeartbeatRequest", + {V0_10_0_0, + "HeartbeatRequest", []byte{0x00, 0x00}, func(t *testing.T, broker *Broker) { request := HeartbeatRequest{} @@ -259,7 +272,8 @@ var brokerTestTable = []struct { } }}, - {"ListGroupsRequest", + {V0_10_0_0, + "ListGroupsRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ListGroupsRequest{} @@ -272,7 +286,8 @@ var brokerTestTable = []struct { } }}, - {"DescribeGroupsRequest", + {V0_10_0_0, + "DescribeGroupsRequest", []byte{0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := DescribeGroupsRequest{} @@ -285,7 +300,8 @@ var brokerTestTable = []struct { } }}, - {"ApiVersionsRequest", + {V0_10_0_0, + "ApiVersionsRequest", []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, func(t *testing.T, broker *Broker) { request := ApiVersionsRequest{} @@ -297,6 +313,20 @@ var brokerTestTable = []struct { t.Error("ApiVersions request got no response!") } }}, + + {V1_1_0_0, + "DeleteGroupsRequest", + []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + func(t *testing.T, broker *Broker) { + request := DeleteGroupsRequest{} + response, err := broker.DeleteGroups(&request) + if err != nil { + t.Error(err) + } + if response == nil { + t.Error("DeleteGroups request got no response!") + } + }}, } func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) { diff --git a/delete_groups_request.go b/delete_groups_request.go new file mode 100644 index 000000000..305a324ac --- /dev/null +++ b/delete_groups_request.go @@ -0,0 +1,30 @@ +package sarama + +type DeleteGroupsRequest struct { + Groups []string +} + +func (r *DeleteGroupsRequest) encode(pe packetEncoder) error { + return pe.putStringArray(r.Groups) +} + +func (r *DeleteGroupsRequest) decode(pd packetDecoder, version int16) (err error) { + r.Groups, err = pd.getStringArray() + return +} + +func (r *DeleteGroupsRequest) key() int16 { + return 42 +} + +func (r *DeleteGroupsRequest) version() int16 { + return 0 +} + +func (r *DeleteGroupsRequest) requiredVersion() KafkaVersion { + return V1_1_0_0 +} + +func (r *DeleteGroupsRequest) AddGroup(group string) { + r.Groups = append(r.Groups, group) +} diff --git a/delete_groups_request_test.go b/delete_groups_request_test.go new file mode 100644 index 000000000..908172498 --- /dev/null +++ b/delete_groups_request_test.go @@ -0,0 +1,34 @@ +package sarama + +import "testing" + +var ( + emptyDeleteGroupsRequest = []byte{0, 0, 0, 0} + + singleDeleteGroupsRequest = []byte{ + 0, 0, 0, 1, // 1 group + 0, 3, 'f', 'o', 'o', // group name: foo + } + + doubleDeleteGroupsRequest = []byte{ + 0, 0, 0, 2, // 2 groups + 0, 3, 'f', 'o', 'o', // group name: foo + 0, 3, 'b', 'a', 'r', // group name: foo + } +) + +func TestDeleteGroupsRequest(t *testing.T) { + var request *DeleteGroupsRequest + + request = new(DeleteGroupsRequest) + testRequest(t, "no groups", request, emptyDeleteGroupsRequest) + + request = new(DeleteGroupsRequest) + request.AddGroup("foo") + testRequest(t, "one group", request, singleDeleteGroupsRequest) + + request = new(DeleteGroupsRequest) + request.AddGroup("foo") + request.AddGroup("bar") + testRequest(t, "two groups", request, doubleDeleteGroupsRequest) +} diff --git a/delete_groups_response.go b/delete_groups_response.go new file mode 100644 index 000000000..c067ebb42 --- /dev/null +++ b/delete_groups_response.go @@ -0,0 +1,70 @@ +package sarama + +import ( + "time" +) + +type DeleteGroupsResponse struct { + ThrottleTime time.Duration + GroupErrorCodes map[string]KError +} + +func (r *DeleteGroupsResponse) encode(pe packetEncoder) error { + pe.putInt32(int32(r.ThrottleTime / time.Millisecond)) + + if err := pe.putArrayLength(len(r.GroupErrorCodes)); err != nil { + return err + } + for groupID, errorCode := range r.GroupErrorCodes { + if err := pe.putString(groupID); err != nil { + return err + } + pe.putInt16(int16(errorCode)) + } + + return nil +} + +func (r *DeleteGroupsResponse) decode(pd packetDecoder, version int16) error { + throttleTime, err := pd.getInt32() + if err != nil { + return err + } + r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond + + n, err := pd.getArrayLength() + if err != nil { + return err + } + if n == 0 { + return nil + } + + r.GroupErrorCodes = make(map[string]KError, n) + for i := 0; i < n; i++ { + groupID, err := pd.getString() + if err != nil { + return err + } + errorCode, err := pd.getInt16() + if err != nil { + return err + } + + r.GroupErrorCodes[groupID] = KError(errorCode) + } + + return nil +} + +func (r *DeleteGroupsResponse) key() int16 { + return 42 +} + +func (r *DeleteGroupsResponse) version() int16 { + return 0 +} + +func (r *DeleteGroupsResponse) requiredVersion() KafkaVersion { + return V1_1_0_0 +} diff --git a/delete_groups_response_test.go b/delete_groups_response_test.go new file mode 100644 index 000000000..6f622b5f0 --- /dev/null +++ b/delete_groups_response_test.go @@ -0,0 +1,57 @@ +package sarama + +import ( + "testing" +) + +var ( + emptyDeleteGroupsResponse = []byte{ + 0, 0, 0, 0, // does not violate any quota + 0, 0, 0, 0, // no groups + } + + errorDeleteGroupsResponse = []byte{ + 0, 0, 0, 0, // does not violate any quota + 0, 0, 0, 1, // 1 group + 0, 3, 'f', 'o', 'o', // group name + 0, 31, // error ErrClusterAuthorizationFailed + } + + noErrorDeleteGroupsResponse = []byte{ + 0, 0, 0, 0, // does not violate any quota + 0, 0, 0, 1, // 1 group + 0, 3, 'f', 'o', 'o', // group name + 0, 0, // no error + } +) + +func TestDeleteGroupsResponse(t *testing.T) { + var response *DeleteGroupsResponse + + response = new(DeleteGroupsResponse) + testVersionDecodable(t, "empty", response, emptyDeleteGroupsResponse, 0) + if response.ThrottleTime != 0 { + t.Error("Expected no violation") + } + if len(response.GroupErrorCodes) != 0 { + t.Error("Expected no groups") + } + + response = new(DeleteGroupsResponse) + testVersionDecodable(t, "error", response, errorDeleteGroupsResponse, 0) + if response.ThrottleTime != 0 { + t.Error("Expected no violation") + } + if response.GroupErrorCodes["foo"] != ErrClusterAuthorizationFailed { + t.Error("Expected error ErrClusterAuthorizationFailed, found:", response.GroupErrorCodes["foo"]) + } + + response = new(DeleteGroupsResponse) + testVersionDecodable(t, "no error", response, noErrorDeleteGroupsResponse, 0) + if response.ThrottleTime != 0 { + t.Error("Expected no violation") + } + if response.GroupErrorCodes["foo"] != ErrNoError { + t.Error("Expected error ErrClusterAuthorizationFailed, found:", response.GroupErrorCodes["foo"]) + } +} diff --git a/request.go b/request.go index 80333a1f2..4d211a14f 100644 --- a/request.go +++ b/request.go @@ -142,6 +142,8 @@ func allocateBody(key, version int16) protocolBody { return &AlterConfigsRequest{} case 37: return &CreatePartitionsRequest{} + case 42: + return &DeleteGroupsRequest{} } return nil }