Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KIP-430] DescribeConsumerGroups, DescribeTopics, DescribeCluster with authorized AclOperations #2021

Merged
merged 18 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# vNext
# 2.3.0

## Enhancements

- Added support for external JSON schemas in `JsonSerializer` and `JsonDeserializer` (#2042).
- Added compatibility methods to CachedSchemaRegistryClient ([ISBronny](https://github.com/ISBronny), #2097).
- Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()` (#2021, @jainruchir).
- [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
Return authorized operations in describe responses (#2021, @jainruchir).


# 2.2.0
Expand Down
216 changes: 206 additions & 10 deletions examples/AdminClient/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright 2016-2017 Confluent Inc., 2015-2016 Andreas Heider
// Copyright 2015-2016 Andreas Heider,
// 2016-2023 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -525,7 +526,8 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm
{
try
{
var result = await adminClient.ListConsumerGroupsAsync(new ListConsumerGroupsOptions() {
var result = await adminClient.ListConsumerGroupsAsync(new ListConsumerGroupsOptions()
{
RequestTimeout = timeout,
MatchStates = statesList,
});
Expand All @@ -546,23 +548,52 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm

static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[] commandArgs)
{
if (commandArgs.Length < 1)
if (commandArgs.Length < 3)
{
Console.WriteLine("usage: .. <bootstrapServers> describe-consumer-groups <group1> [<group2 ... <groupN>]");
Console.WriteLine("usage: .. <bootstrapServers> describe-consumer-groups <username> <password> <include_authorized_operations> <group1> [<group2 ... <groupN>]");
Environment.ExitCode = 1;
return;
}

var groupNames = commandArgs.ToList();
var username = commandArgs[0];
var password = commandArgs[1];
var includeAuthorizedOperations = (commandArgs[2] == "1");
var groupNames = commandArgs.Skip(3).ToList();

if (string.IsNullOrWhiteSpace(username))
{
username = null;
}
if (string.IsNullOrWhiteSpace(password))
{
password = null;
}

var timeout = TimeSpan.FromSeconds(30);
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
var config = new AdminClientConfig
{
BootstrapServers = bootstrapServers,
};
if (username != null && password != null)
{
config = new AdminClientConfig
{
BootstrapServers = bootstrapServers,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = username,
SaslPassword = password,
};
}

using (var adminClient = new AdminClientBuilder(config).Build())
{
try
{
var descResult = await adminClient.DescribeConsumerGroupsAsync(groupNames, new DescribeConsumerGroupsOptions() { RequestTimeout = timeout });
var descResult = await adminClient.DescribeConsumerGroupsAsync(groupNames, new DescribeConsumerGroupsOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations});
foreach (var group in descResult.ConsumerGroupDescriptions)
{
Console.WriteLine($" Group: {group.GroupId} {group.Error}");
Console.WriteLine($"\n Group: {group.GroupId} {group.Error}");
Console.WriteLine($" Broker: {group.Coordinator}");
Console.WriteLine($" IsSimpleConsumerGroup: {group.IsSimpleConsumerGroup}");
Console.WriteLine($" PartitionAssignor: {group.PartitionAssignor}");
Expand All @@ -579,6 +610,11 @@ static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[]
}
Console.WriteLine($" TopicPartitions: [{topicPartitions}]");
}
if(includeAuthorizedOperations)
{
string operations = string.Join(" ", group.AuthorizedOperations);
Console.WriteLine($" Authorized operations: {operations}");
}
}
}
catch (KafkaException e)
Expand Down Expand Up @@ -757,6 +793,160 @@ await adminClient.AlterUserScramCredentialsAsync(alterations,
}
}

static void PrintTopicDescriptions(List<TopicDescription> topicDescriptions, bool includeAuthorizedOperations)
{
foreach (var topic in topicDescriptions)
{
Console.WriteLine($"\n Topic: {topic.Name} {topic.Error}");
Console.WriteLine($" Partitions:");
foreach (var partition in topic.Partitions)
{
Console.WriteLine($" Partition ID: {partition.Partition} with leader: {partition.Leader}");
if(!partition.ISR.Any())
{
Console.WriteLine(" There is no In-Sync-Replica broker for the partition");
}
else{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
else{
else
{

string isrs = string.Join("; ", partition.ISR);
Console.WriteLine($" The In-Sync-Replica brokers are: {isrs}");
}

if(!partition.Replicas.Any())
{
Console.WriteLine(" There is no Replica broker for the partition");
}
else{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are nits but

Suggested change
else{
else
{

string replicas = string.Join("; ", partition.Replicas);
Console.WriteLine($" The Replica brokers are: {replicas}");
}

}
Console.WriteLine($" Is internal: {topic.IsInternal}");
if(includeAuthorizedOperations)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if(includeAuthorizedOperations)
if (includeAuthorizedOperations)

{
string operations = string.Join(" ", topic.AuthorizedOperations);
Console.WriteLine($" Authorized operations: {operations}");
}
}
}

static async Task DescribeTopicsAsync(string bootstrapServers, string[] commandArgs)
{
if (commandArgs.Length < 3)
{
Console.WriteLine("usage: .. <bootstrapServers> describe-topics <username> <password> <include_authorized_operations> <topic1> [<topic2 ... <topicN>]");
Environment.ExitCode = 1;
return;
}

var username = commandArgs[0];
var password = commandArgs[1];
var includeAuthorizedOperations = (commandArgs[2] == "1");
if (string.IsNullOrWhiteSpace(username))
{
username = null;
}
if (string.IsNullOrWhiteSpace(password))
{
password = null;
}
var topicNames = commandArgs.Skip(3).ToList();

var timeout = TimeSpan.FromSeconds(30);
var config = new AdminClientConfig
{
BootstrapServers = bootstrapServers,
};
if (username != null && password != null)
{
config = new AdminClientConfig
{
BootstrapServers = bootstrapServers,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = username,
SaslPassword = password,
};
}

using (var adminClient = new AdminClientBuilder(config).Build())
{
try
{
var descResult = await adminClient.DescribeTopicsAsync(
TopicCollection.OfTopicNames(topicNames),
new DescribeTopicsOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations});
PrintTopicDescriptions(descResult.TopicDescriptions, includeAuthorizedOperations);
}
catch (DescribeTopicsException e)
{
// At least one TopicDescription will have an error.
PrintTopicDescriptions(e.Results.TopicDescriptions, includeAuthorizedOperations);
}
catch (KafkaException e)
{
Console.WriteLine($"An error occurred describing topics: {e}");
Environment.ExitCode = 1;
}
}
}

static async Task DescribeClusterAsync(string bootstrapServers, string[] commandArgs)
{
if (commandArgs.Length < 3)
{
Console.WriteLine("usage: .. <bootstrapServers> describe-cluster <username> <password> <include_authorized_operations>");
Environment.ExitCode = 1;
return;
}

var username = commandArgs[0];
var password = commandArgs[1];
var includeAuthorizedOperations = (commandArgs[2] == "1");

var timeout = TimeSpan.FromSeconds(30);
var config = new AdminClientConfig
{
BootstrapServers = bootstrapServers,
};
if (username != null && password != null)
{
config = new AdminClientConfig
{
BootstrapServers = bootstrapServers,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = username,
SaslPassword = password,
};
}

using (var adminClient = new AdminClientBuilder(config).Build())
{
try
{
var descResult = await adminClient.DescribeClusterAsync(new DescribeClusterOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations});

Console.WriteLine($" Cluster Id: {descResult.ClusterId}\n Controller: {descResult.Controller}");
Console.WriteLine(" Nodes:");
foreach(var node in descResult.Nodes)
{
Console.WriteLine($" {node}");
}
if(includeAuthorizedOperations)
{
string operations = string.Join(" ", descResult.AuthorizedOperations);
Console.WriteLine($" Authorized operations: {operations}");
}
}
catch (KafkaException e)
{
Console.WriteLine($"An error occurred describing cluster: {e}");
Environment.ExitCode = 1;
}
}
}

public static async Task Main(string[] args)
{
if (args.Length < 2)
Expand All @@ -768,8 +958,8 @@ public static async Task Main(string[] args)
"list-consumer-groups", "describe-consumer-groups",
"list-consumer-group-offsets", "alter-consumer-group-offsets",
"incremental-alter-configs", "describe-user-scram-credentials",
"alter-user-scram-credentials"

"alter-user-scram-credentials", "describe-topics",
"describe-cluster"
}) +
" ..");
Environment.ExitCode = 1;
Expand Down Expand Up @@ -824,6 +1014,12 @@ public static async Task Main(string[] args)
case "alter-user-scram-credentials":
await AlterUserScramCredentialsAsync(bootstrapServers, commandArgs);
break;
case "describe-topics":
await DescribeTopicsAsync(bootstrapServers, commandArgs);
break;
case "describe-cluster":
await DescribeClusterAsync(bootstrapServers, commandArgs);
break;
default:
Console.WriteLine($"unknown command: {command}");
break;
Expand Down
44 changes: 42 additions & 2 deletions src/Confluent.Kafka/Admin/ConsumerGroupDescription.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Confluent Inc.
// Copyright 2022-2023 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,8 @@
// Refer to LICENSE for more information.

using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Confluent.Kafka.Admin
{
Expand Down Expand Up @@ -50,13 +52,51 @@ public class ConsumerGroupDescription
public ConsumerGroupState State { get; set; }

/// <summary>
/// Consumer group coordinator (broker).
/// Broker that acts as consumer group coordinator (null if not known).
/// </summary>
public Node Coordinator { get; set; }

/// <summary>
/// Members list.
/// </summary>
public List<MemberDescription> Members { get; set; }

/// <summary>
/// AclOperation list (null if not requested or not supported).
/// </summary>
public List<AclOperation> AuthorizedOperations { get; set; }

/// <summary>
/// Returns a JSON representation of this object.
/// </summary>
/// <returns>
/// A JSON representation of this object.
/// </returns>
public override string ToString()
{
var result = new StringBuilder();
var members = string.Join(",",
Members.Select(member =>
member.ToString()
).ToList());
var authorizedOperations = "null";
if (AuthorizedOperations != null)
{
authorizedOperations = string.Join(",",
AuthorizedOperations.Select(authorizedOperation =>
authorizedOperation.ToString().Quote()
).ToList());
authorizedOperations = $"[{authorizedOperations}]";
}

result.Append($"{{\"GroupId\": {GroupId.Quote()}");
result.Append($", \"Error\": \"{Error.Code}\", \"IsSimpleConsumerGroup\": {IsSimpleConsumerGroup.Quote()}");
result.Append($", \"PartitionAssignor\": {PartitionAssignor.Quote()}, \"State\": {State.ToString().Quote()}");
result.Append($", \"Coordinator\": {Coordinator?.ToString() ?? "null"}, \"Members\": [{members}]");
result.Append($", \"AuthorizedOperations\": {authorizedOperations}}}");

return result.ToString();
}

}
}
44 changes: 44 additions & 0 deletions src/Confluent.Kafka/Admin/DescribeClusterOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2023 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System;


namespace Confluent.Kafka.Admin
{
/// <summary>
/// Options for the "IAdminClient.DescribeCluster" method.
/// </summary>
public class DescribeClusterOptions
{
/// <summary>
/// The overall request timeout, including broker lookup, request
/// transmission, operation time on broker, and response. If set
/// to null, the default request timeout for the AdminClient will
/// be used.
///
/// Default: null
/// </summary>
public TimeSpan? RequestTimeout { get; set; }

/// <summary>
/// Decides if the broker should return cluster authorized operations.
///
/// Default: false
/// </summary>
public bool IncludeAuthorizedOperations { get; set; } = false;
}
}
Loading