-
Notifications
You must be signed in to change notification settings - Fork 861
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KIP-430] DescribeConsumerGroups, DescribeTopics, DescribeCluster with authorized AclOperations #2021
Merged
Merged
[KIP-430] DescribeConsumerGroups, DescribeTopics, DescribeCluster with authorized AclOperations #2021
Changes from 15 commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
f9b72d2
Initial commit for kip430
jainruchir 6ff4931
Describe Topics/Cluster without integration tests
jainruchir c8c44f8
integration tests for describe topic and cluster(for review as failing)
jainruchir 2b7e3ed
Merge branch 'master' into dev_kip430
jainruchir 7874688
Merge branch 'master' into dev_kip430
emasab 9545b10
Reflect librdkafka changes
emasab 82ace2a
Integration tests and examples
emasab ea33890
Copyright and spaces before namespace
emasab 4dfcd5e
Move TopicPartitionInfo to common
emasab 06b23bf
Avoid breaking binary compatibility
emasab 2ff8a56
Documentation and ToString methods
emasab 43f20a0
ToString unit tests
emasab 3c33848
Update changelog
emasab 3f233b9
Address comments
emasab b4495d2
Better documentation for optional response fields
emasab b08b0f7
Address comments
emasab 9beae2d
Merge branch 'master' into dev_kip430
emasab c0a7b18
Add librdkafka.redist reference in changelog
emasab File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -1,4 +1,5 @@ | ||||||||
// Copyright 2016-2017 Confluent Inc., 2015-2016 Andreas Heider | ||||||||
// Copyright 2015-2016 Andreas Heider, | ||||||||
// 2016-2023 Confluent Inc. | ||||||||
// | ||||||||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||
// you may not use this file except in compliance with the License. | ||||||||
|
@@ -525,7 +526,8 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm | |||||||
{ | ||||||||
try | ||||||||
{ | ||||||||
var result = await adminClient.ListConsumerGroupsAsync(new ListConsumerGroupsOptions() { | ||||||||
var result = await adminClient.ListConsumerGroupsAsync(new ListConsumerGroupsOptions() | ||||||||
{ | ||||||||
RequestTimeout = timeout, | ||||||||
MatchStates = statesList, | ||||||||
}); | ||||||||
|
@@ -546,23 +548,52 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm | |||||||
|
||||||||
static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[] commandArgs) | ||||||||
{ | ||||||||
if (commandArgs.Length < 1) | ||||||||
if (commandArgs.Length < 3) | ||||||||
{ | ||||||||
Console.WriteLine("usage: .. <bootstrapServers> describe-consumer-groups <group1> [<group2 ... <groupN>]"); | ||||||||
Console.WriteLine("usage: .. <bootstrapServers> describe-consumer-groups <username> <password> <include_authorized_operations> <group1> [<group2 ... <groupN>]"); | ||||||||
Environment.ExitCode = 1; | ||||||||
return; | ||||||||
} | ||||||||
|
||||||||
var groupNames = commandArgs.ToList(); | ||||||||
var username = commandArgs[0]; | ||||||||
var password = commandArgs[1]; | ||||||||
var includeAuthorizedOperations = (commandArgs[2] == "1"); | ||||||||
var groupNames = commandArgs.Skip(3).ToList(); | ||||||||
|
||||||||
if (string.IsNullOrWhiteSpace(username)) | ||||||||
{ | ||||||||
username = null; | ||||||||
} | ||||||||
if (string.IsNullOrWhiteSpace(password)) | ||||||||
{ | ||||||||
password = null; | ||||||||
} | ||||||||
|
||||||||
var timeout = TimeSpan.FromSeconds(30); | ||||||||
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) | ||||||||
var config = new AdminClientConfig | ||||||||
{ | ||||||||
BootstrapServers = bootstrapServers, | ||||||||
}; | ||||||||
if (username != null && password != null) | ||||||||
{ | ||||||||
config = new AdminClientConfig | ||||||||
{ | ||||||||
BootstrapServers = bootstrapServers, | ||||||||
SecurityProtocol = SecurityProtocol.SaslPlaintext, | ||||||||
SaslMechanism = SaslMechanism.Plain, | ||||||||
SaslUsername = username, | ||||||||
SaslPassword = password, | ||||||||
}; | ||||||||
} | ||||||||
|
||||||||
using (var adminClient = new AdminClientBuilder(config).Build()) | ||||||||
{ | ||||||||
try | ||||||||
{ | ||||||||
var descResult = await adminClient.DescribeConsumerGroupsAsync(groupNames, new DescribeConsumerGroupsOptions() { RequestTimeout = timeout }); | ||||||||
var descResult = await adminClient.DescribeConsumerGroupsAsync(groupNames, new DescribeConsumerGroupsOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations}); | ||||||||
foreach (var group in descResult.ConsumerGroupDescriptions) | ||||||||
{ | ||||||||
Console.WriteLine($" Group: {group.GroupId} {group.Error}"); | ||||||||
Console.WriteLine($"\n Group: {group.GroupId} {group.Error}"); | ||||||||
Console.WriteLine($" Broker: {group.Coordinator}"); | ||||||||
Console.WriteLine($" IsSimpleConsumerGroup: {group.IsSimpleConsumerGroup}"); | ||||||||
Console.WriteLine($" PartitionAssignor: {group.PartitionAssignor}"); | ||||||||
|
@@ -579,6 +610,11 @@ static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[] | |||||||
} | ||||||||
Console.WriteLine($" TopicPartitions: [{topicPartitions}]"); | ||||||||
} | ||||||||
if(includeAuthorizedOperations) | ||||||||
{ | ||||||||
string operations = string.Join(" ", group.AuthorizedOperations); | ||||||||
Console.WriteLine($" Authorized operations: {operations}"); | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
catch (KafkaException e) | ||||||||
|
@@ -757,6 +793,160 @@ await adminClient.AlterUserScramCredentialsAsync(alterations, | |||||||
} | ||||||||
} | ||||||||
|
||||||||
static void PrintTopicDescriptions(List<TopicDescription> topicDescriptions, bool includeAuthorizedOperations) | ||||||||
{ | ||||||||
foreach (var topic in topicDescriptions) | ||||||||
{ | ||||||||
Console.WriteLine($"\n Topic: {topic.Name} {topic.Error}"); | ||||||||
Console.WriteLine($" Partitions:"); | ||||||||
foreach (var partition in topic.Partitions) | ||||||||
{ | ||||||||
Console.WriteLine($" Partition ID: {partition.Partition} with leader: {partition.Leader}"); | ||||||||
if(!partition.ISR.Any()) | ||||||||
{ | ||||||||
Console.WriteLine(" There is no In-Sync-Replica broker for the partition"); | ||||||||
} | ||||||||
else{ | ||||||||
string isrs = string.Join("; ", partition.ISR); | ||||||||
Console.WriteLine($" The In-Sync-Replica brokers are: {isrs}"); | ||||||||
} | ||||||||
|
||||||||
if(!partition.Replicas.Any()) | ||||||||
{ | ||||||||
Console.WriteLine(" There is no Replica broker for the partition"); | ||||||||
} | ||||||||
else{ | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these are nits but
Suggested change
|
||||||||
string replicas = string.Join("; ", partition.Replicas); | ||||||||
Console.WriteLine($" The Replica brokers are: {replicas}"); | ||||||||
} | ||||||||
|
||||||||
} | ||||||||
Console.WriteLine($" Is internal: {topic.IsInternal}"); | ||||||||
if(includeAuthorizedOperations) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
{ | ||||||||
string operations = string.Join(" ", topic.AuthorizedOperations); | ||||||||
Console.WriteLine($" Authorized operations: {operations}"); | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
static async Task DescribeTopicsAsync(string bootstrapServers, string[] commandArgs) | ||||||||
{ | ||||||||
if (commandArgs.Length < 3) | ||||||||
{ | ||||||||
Console.WriteLine("usage: .. <bootstrapServers> describe-topics <username> <password> <include_authorized_operations> <topic1> [<topic2 ... <topicN>]"); | ||||||||
Environment.ExitCode = 1; | ||||||||
return; | ||||||||
} | ||||||||
|
||||||||
var username = commandArgs[0]; | ||||||||
var password = commandArgs[1]; | ||||||||
var includeAuthorizedOperations = (commandArgs[2] == "1"); | ||||||||
if (string.IsNullOrWhiteSpace(username)) | ||||||||
{ | ||||||||
username = null; | ||||||||
} | ||||||||
if (string.IsNullOrWhiteSpace(password)) | ||||||||
{ | ||||||||
password = null; | ||||||||
} | ||||||||
var topicNames = commandArgs.Skip(3).ToList(); | ||||||||
|
||||||||
var timeout = TimeSpan.FromSeconds(30); | ||||||||
var config = new AdminClientConfig | ||||||||
{ | ||||||||
BootstrapServers = bootstrapServers, | ||||||||
}; | ||||||||
if (username != null && password != null) | ||||||||
{ | ||||||||
config = new AdminClientConfig | ||||||||
{ | ||||||||
BootstrapServers = bootstrapServers, | ||||||||
SecurityProtocol = SecurityProtocol.SaslPlaintext, | ||||||||
SaslMechanism = SaslMechanism.Plain, | ||||||||
SaslUsername = username, | ||||||||
SaslPassword = password, | ||||||||
}; | ||||||||
} | ||||||||
|
||||||||
using (var adminClient = new AdminClientBuilder(config).Build()) | ||||||||
{ | ||||||||
try | ||||||||
{ | ||||||||
var descResult = await adminClient.DescribeTopicsAsync( | ||||||||
TopicCollection.OfTopicNames(topicNames), | ||||||||
new DescribeTopicsOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations}); | ||||||||
PrintTopicDescriptions(descResult.TopicDescriptions, includeAuthorizedOperations); | ||||||||
} | ||||||||
catch (DescribeTopicsException e) | ||||||||
{ | ||||||||
// At least one TopicDescription will have an error. | ||||||||
PrintTopicDescriptions(e.Results.TopicDescriptions, includeAuthorizedOperations); | ||||||||
} | ||||||||
catch (KafkaException e) | ||||||||
{ | ||||||||
Console.WriteLine($"An error occurred describing topics: {e}"); | ||||||||
Environment.ExitCode = 1; | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
static async Task DescribeClusterAsync(string bootstrapServers, string[] commandArgs) | ||||||||
{ | ||||||||
if (commandArgs.Length < 3) | ||||||||
{ | ||||||||
Console.WriteLine("usage: .. <bootstrapServers> describe-cluster <username> <password> <include_authorized_operations>"); | ||||||||
Environment.ExitCode = 1; | ||||||||
return; | ||||||||
} | ||||||||
|
||||||||
var username = commandArgs[0]; | ||||||||
var password = commandArgs[1]; | ||||||||
var includeAuthorizedOperations = (commandArgs[2] == "1"); | ||||||||
|
||||||||
var timeout = TimeSpan.FromSeconds(30); | ||||||||
var config = new AdminClientConfig | ||||||||
{ | ||||||||
BootstrapServers = bootstrapServers, | ||||||||
}; | ||||||||
if (username != null && password != null) | ||||||||
{ | ||||||||
config = new AdminClientConfig | ||||||||
{ | ||||||||
BootstrapServers = bootstrapServers, | ||||||||
SecurityProtocol = SecurityProtocol.SaslPlaintext, | ||||||||
SaslMechanism = SaslMechanism.Plain, | ||||||||
SaslUsername = username, | ||||||||
SaslPassword = password, | ||||||||
}; | ||||||||
} | ||||||||
|
||||||||
using (var adminClient = new AdminClientBuilder(config).Build()) | ||||||||
{ | ||||||||
try | ||||||||
{ | ||||||||
var descResult = await adminClient.DescribeClusterAsync(new DescribeClusterOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations}); | ||||||||
|
||||||||
Console.WriteLine($" Cluster Id: {descResult.ClusterId}\n Controller: {descResult.Controller}"); | ||||||||
Console.WriteLine(" Nodes:"); | ||||||||
foreach(var node in descResult.Nodes) | ||||||||
{ | ||||||||
Console.WriteLine($" {node}"); | ||||||||
} | ||||||||
if(includeAuthorizedOperations) | ||||||||
{ | ||||||||
string operations = string.Join(" ", descResult.AuthorizedOperations); | ||||||||
Console.WriteLine($" Authorized operations: {operations}"); | ||||||||
} | ||||||||
} | ||||||||
catch (KafkaException e) | ||||||||
{ | ||||||||
Console.WriteLine($"An error occurred describing cluster: {e}"); | ||||||||
Environment.ExitCode = 1; | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
public static async Task Main(string[] args) | ||||||||
{ | ||||||||
if (args.Length < 2) | ||||||||
|
@@ -768,8 +958,8 @@ public static async Task Main(string[] args) | |||||||
"list-consumer-groups", "describe-consumer-groups", | ||||||||
"list-consumer-group-offsets", "alter-consumer-group-offsets", | ||||||||
"incremental-alter-configs", "describe-user-scram-credentials", | ||||||||
"alter-user-scram-credentials" | ||||||||
|
||||||||
"alter-user-scram-credentials", "describe-topics", | ||||||||
"describe-cluster" | ||||||||
}) + | ||||||||
" .."); | ||||||||
Environment.ExitCode = 1; | ||||||||
|
@@ -824,6 +1014,12 @@ public static async Task Main(string[] args) | |||||||
case "alter-user-scram-credentials": | ||||||||
await AlterUserScramCredentialsAsync(bootstrapServers, commandArgs); | ||||||||
break; | ||||||||
case "describe-topics": | ||||||||
await DescribeTopicsAsync(bootstrapServers, commandArgs); | ||||||||
break; | ||||||||
case "describe-cluster": | ||||||||
await DescribeClusterAsync(bootstrapServers, commandArgs); | ||||||||
break; | ||||||||
default: | ||||||||
Console.WriteLine($"unknown command: {command}"); | ||||||||
break; | ||||||||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
// Copyright 2023 Confluent Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
// | ||
// Refer to LICENSE for more information. | ||
|
||
using System; | ||
|
||
|
||
namespace Confluent.Kafka.Admin | ||
{ | ||
/// <summary> | ||
/// Options for the "IAdminClient.DescribeCluster" method. | ||
/// </summary> | ||
public class DescribeClusterOptions | ||
{ | ||
/// <summary> | ||
/// The overall request timeout, including broker lookup, request | ||
/// transmission, operation time on broker, and response. If set | ||
/// to null, the default request timeout for the AdminClient will | ||
/// be used. | ||
/// | ||
/// Default: null | ||
/// </summary> | ||
public TimeSpan? RequestTimeout { get; set; } | ||
|
||
/// <summary> | ||
/// Decides if the broker should return cluster authorized operations. | ||
/// | ||
/// Default: false | ||
/// </summary> | ||
public bool IncludeAuthorizedOperations { get; set; } = false; | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.