From ce981d15443fe1348e3b5cc5e12da71c45e1a16c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Br=C3=BCderl?= Date: Tue, 2 Jul 2019 15:45:00 +0200 Subject: [PATCH] admin: add DeleteConsumerGroup --- admin.go | 32 +++++++++++++++++++++++++++++++- admin_test.go | 30 ++++++++++++++++++++++++++++++ mockresponses.go | 23 +++++++++++++++++++++++ 3 files changed, 84 insertions(+), 1 deletion(-) diff --git a/admin.go b/admin.go index d88a0a493..18ca4918d 100644 --- a/admin.go +++ b/admin.go @@ -84,7 +84,10 @@ type ClusterAdmin interface { // List the consumer group offsets available in the cluster. ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) - // Get information about the nodes in the cluster. + // Delete a consumer group. + DeleteConsumerGroup(group string) error + + // Get information about the nodes in the cluster DescribeCluster() (brokers []*Broker, controllerID int32, err error) // Close shuts down the admin and closes underlying client. @@ -614,3 +617,30 @@ func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions m return coordinator.FetchOffset(request) } + +func (ca *clusterAdmin) DeleteConsumerGroup(group string) error { + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } + + request := &DeleteGroupsRequest{ + Groups: []string{group}, + } + + resp, err := coordinator.DeleteGroups(request) + if err != nil { + return err + } + + groupErr, ok := resp.GroupErrorCodes[group] + if !ok { + return ErrIncompleteResponse + } + + if groupErr != ErrNoError { + return groupErr + } + + return nil +} diff --git a/admin_test.go b/admin_test.go index aafb1481a..40061113b 100644 --- a/admin_test.go +++ b/admin_test.go @@ -859,3 +859,33 @@ func TestListConsumerGroupOffsets(t *testing.T) { } } + +func TestDeleteConsumerGroup(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + group := "my-group" + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + // "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError), + "DeleteGroupsRequest": NewMockDeleteGroupsRequest(t).SetDeletedGroups([]string{group}), + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker), + }) + + config := NewConfig() + config.Version = V1_1_0_0 + + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + err = admin.DeleteConsumerGroup(group) + if err != nil { + t.Fatalf("DeleteConsumerGroup failed with error %v", err) + } + +} diff --git a/mockresponses.go b/mockresponses.go index 919d8bb07..3b0747c1f 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -885,3 +885,26 @@ func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder { } return res } + +type MockDeleteGroupsResponse struct { + deletedGroups []string +} + +func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse { + return &MockDeleteGroupsResponse{} +} + +func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse { + m.deletedGroups = groups + return m +} + +func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoder { + resp := &DeleteGroupsResponse{ + GroupErrorCodes: map[string]KError{}, + } + for _, group := range m.deletedGroups { + resp.GroupErrorCodes[group] = ErrNoError + } + return resp +}