Skip to content

Commit

Permalink
Merge pull request #1417 from birdayz/admin_delete_consumer_group
Browse files Browse the repository at this point in the history
admin: add DeleteConsumerGroup
  • Loading branch information
bai authored Jul 2, 2019
2 parents c50148e + ce981d1 commit 09dc31d
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 1 deletion.
32 changes: 31 additions & 1 deletion admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ type ClusterAdmin interface {
// List the consumer group offsets available in the cluster.
ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)

// Get information about the nodes in the cluster.
// Delete a consumer group.
DeleteConsumerGroup(group string) error

// Get information about the nodes in the cluster
DescribeCluster() (brokers []*Broker, controllerID int32, err error)

// Close shuts down the admin and closes underlying client.
Expand Down Expand Up @@ -614,3 +617,30 @@ func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions m

return coordinator.FetchOffset(request)
}

func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
coordinator, err := ca.client.Coordinator(group)
if err != nil {
return err
}

request := &DeleteGroupsRequest{
Groups: []string{group},
}

resp, err := coordinator.DeleteGroups(request)
if err != nil {
return err
}

groupErr, ok := resp.GroupErrorCodes[group]
if !ok {
return ErrIncompleteResponse
}

if groupErr != ErrNoError {
return groupErr
}

return nil
}
30 changes: 30 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,3 +859,33 @@ func TestListConsumerGroupOffsets(t *testing.T) {
}

}

func TestDeleteConsumerGroup(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

group := "my-group"

seedBroker.SetHandlerByMap(map[string]MockResponse{
// "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError),
"DeleteGroupsRequest": NewMockDeleteGroupsRequest(t).SetDeletedGroups([]string{group}),
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
})

config := NewConfig()
config.Version = V1_1_0_0

admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

err = admin.DeleteConsumerGroup(group)
if err != nil {
t.Fatalf("DeleteConsumerGroup failed with error %v", err)
}

}
23 changes: 23 additions & 0 deletions mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,3 +885,26 @@ func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder {
}
return res
}

type MockDeleteGroupsResponse struct {
deletedGroups []string
}

func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {
return &MockDeleteGroupsResponse{}
}

func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {
m.deletedGroups = groups
return m
}

func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoder {
resp := &DeleteGroupsResponse{
GroupErrorCodes: map[string]KError{},
}
for _, group := range m.deletedGroups {
resp.GroupErrorCodes[group] = ErrNoError
}
return resp
}

0 comments on commit 09dc31d

Please sign in to comment.