Skip to content

Commit

Permalink
[KIP-430] DescribeConsumerGroups, DescribeTopics, DescribeCluster wit…
Browse files Browse the repository at this point in the history
…h authorized AclOperations (#2021)

---------

Co-authored-by: Emanuele Sabellico <esabellico@confluent.io>
  • Loading branch information
jainruchir and emasab authored Oct 18, 2023
1 parent bd5a9bc commit 58b5291
Show file tree
Hide file tree
Showing 35 changed files with 2,420 additions and 55 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
# vNext
# 2.3.0

## Enhancements

- References librdkafka.redist 2.3.0. Refer to the [librdkafka v2.3.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.3.0) for more information.
- Added support for external JSON schemas in `JsonSerializer` and `JsonDeserializer` (#2042).
- Added compatibility methods to CachedSchemaRegistryClient ([ISBronny](https://github.com/ISBronny), #2097).
- Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()` (#2021, @jainruchir).
- [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
Return authorized operations in describe responses (#2021, @jainruchir).


# 2.2.0
Expand Down
218 changes: 208 additions & 10 deletions examples/AdminClient/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright 2016-2017 Confluent Inc., 2015-2016 Andreas Heider
// Copyright 2015-2016 Andreas Heider,
// 2016-2023 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -525,7 +526,8 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm
{
try
{
var result = await adminClient.ListConsumerGroupsAsync(new ListConsumerGroupsOptions() {
var result = await adminClient.ListConsumerGroupsAsync(new ListConsumerGroupsOptions()
{
RequestTimeout = timeout,
MatchStates = statesList,
});
Expand All @@ -546,23 +548,52 @@ static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] comm

static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[] commandArgs)
{
if (commandArgs.Length < 1)
if (commandArgs.Length < 3)
{
Console.WriteLine("usage: .. <bootstrapServers> describe-consumer-groups <group1> [<group2 ... <groupN>]");
Console.WriteLine("usage: .. <bootstrapServers> describe-consumer-groups <username> <password> <include_authorized_operations> <group1> [<group2 ... <groupN>]");
Environment.ExitCode = 1;
return;
}

var groupNames = commandArgs.ToList();
var username = commandArgs[0];
var password = commandArgs[1];
var includeAuthorizedOperations = (commandArgs[2] == "1");
var groupNames = commandArgs.Skip(3).ToList();

if (string.IsNullOrWhiteSpace(username))
{
username = null;
}
if (string.IsNullOrWhiteSpace(password))
{
password = null;
}

var timeout = TimeSpan.FromSeconds(30);
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
var config = new AdminClientConfig
{
BootstrapServers = bootstrapServers,
};
if (username != null && password != null)
{
config = new AdminClientConfig
{
BootstrapServers = bootstrapServers,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = username,
SaslPassword = password,
};
}

using (var adminClient = new AdminClientBuilder(config).Build())
{
try
{
var descResult = await adminClient.DescribeConsumerGroupsAsync(groupNames, new DescribeConsumerGroupsOptions() { RequestTimeout = timeout });
var descResult = await adminClient.DescribeConsumerGroupsAsync(groupNames, new DescribeConsumerGroupsOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations});
foreach (var group in descResult.ConsumerGroupDescriptions)
{
Console.WriteLine($" Group: {group.GroupId} {group.Error}");
Console.WriteLine($"\n Group: {group.GroupId} {group.Error}");
Console.WriteLine($" Broker: {group.Coordinator}");
Console.WriteLine($" IsSimpleConsumerGroup: {group.IsSimpleConsumerGroup}");
Console.WriteLine($" PartitionAssignor: {group.PartitionAssignor}");
Expand All @@ -579,6 +610,11 @@ static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[]
}
Console.WriteLine($" TopicPartitions: [{topicPartitions}]");
}
if (includeAuthorizedOperations)
{
string operations = string.Join(" ", group.AuthorizedOperations);
Console.WriteLine($" Authorized operations: {operations}");
}
}
}
catch (KafkaException e)
Expand Down Expand Up @@ -757,6 +793,162 @@ await adminClient.AlterUserScramCredentialsAsync(alterations,
}
}

static void PrintTopicDescriptions(List<TopicDescription> topicDescriptions, bool includeAuthorizedOperations)
{
foreach (var topic in topicDescriptions)
{
Console.WriteLine($"\n Topic: {topic.Name} {topic.Error}");
Console.WriteLine($" Partitions:");
foreach (var partition in topic.Partitions)
{
Console.WriteLine($" Partition ID: {partition.Partition} with leader: {partition.Leader}");
if(!partition.ISR.Any())
{
Console.WriteLine(" There is no In-Sync-Replica broker for the partition");
}
else
{
string isrs = string.Join("; ", partition.ISR);
Console.WriteLine($" The In-Sync-Replica brokers are: {isrs}");
}

if(!partition.Replicas.Any())
{
Console.WriteLine(" There is no Replica broker for the partition");
}
else
{
string replicas = string.Join("; ", partition.Replicas);
Console.WriteLine($" The Replica brokers are: {replicas}");
}

}
Console.WriteLine($" Is internal: {topic.IsInternal}");
if (includeAuthorizedOperations)
{
string operations = string.Join(" ", topic.AuthorizedOperations);
Console.WriteLine($" Authorized operations: {operations}");
}
}
}

static async Task DescribeTopicsAsync(string bootstrapServers, string[] commandArgs)
{
if (commandArgs.Length < 3)
{
Console.WriteLine("usage: .. <bootstrapServers> describe-topics <username> <password> <include_authorized_operations> <topic1> [<topic2 ... <topicN>]");
Environment.ExitCode = 1;
return;
}

var username = commandArgs[0];
var password = commandArgs[1];
var includeAuthorizedOperations = (commandArgs[2] == "1");
if (string.IsNullOrWhiteSpace(username))
{
username = null;
}
if (string.IsNullOrWhiteSpace(password))
{
password = null;
}
var topicNames = commandArgs.Skip(3).ToList();

var timeout = TimeSpan.FromSeconds(30);
var config = new AdminClientConfig
{
BootstrapServers = bootstrapServers,
};
if (username != null && password != null)
{
config = new AdminClientConfig
{
BootstrapServers = bootstrapServers,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = username,
SaslPassword = password,
};
}

using (var adminClient = new AdminClientBuilder(config).Build())
{
try
{
var descResult = await adminClient.DescribeTopicsAsync(
TopicCollection.OfTopicNames(topicNames),
new DescribeTopicsOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations});
PrintTopicDescriptions(descResult.TopicDescriptions, includeAuthorizedOperations);
}
catch (DescribeTopicsException e)
{
// At least one TopicDescription will have an error.
PrintTopicDescriptions(e.Results.TopicDescriptions, includeAuthorizedOperations);
}
catch (KafkaException e)
{
Console.WriteLine($"An error occurred describing topics: {e}");
Environment.ExitCode = 1;
}
}
}

static async Task DescribeClusterAsync(string bootstrapServers, string[] commandArgs)
{
if (commandArgs.Length < 3)
{
Console.WriteLine("usage: .. <bootstrapServers> describe-cluster <username> <password> <include_authorized_operations>");
Environment.ExitCode = 1;
return;
}

var username = commandArgs[0];
var password = commandArgs[1];
var includeAuthorizedOperations = (commandArgs[2] == "1");

var timeout = TimeSpan.FromSeconds(30);
var config = new AdminClientConfig
{
BootstrapServers = bootstrapServers,
};
if (username != null && password != null)
{
config = new AdminClientConfig
{
BootstrapServers = bootstrapServers,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = username,
SaslPassword = password,
};
}

using (var adminClient = new AdminClientBuilder(config).Build())
{
try
{
var descResult = await adminClient.DescribeClusterAsync(new DescribeClusterOptions() { RequestTimeout = timeout , IncludeAuthorizedOperations = includeAuthorizedOperations});

Console.WriteLine($" Cluster Id: {descResult.ClusterId}\n Controller: {descResult.Controller}");
Console.WriteLine(" Nodes:");
foreach(var node in descResult.Nodes)
{
Console.WriteLine($" {node}");
}
if (includeAuthorizedOperations)
{
string operations = string.Join(" ", descResult.AuthorizedOperations);
Console.WriteLine($" Authorized operations: {operations}");
}
}
catch (KafkaException e)
{
Console.WriteLine($"An error occurred describing cluster: {e}");
Environment.ExitCode = 1;
}
}
}

public static async Task Main(string[] args)
{
if (args.Length < 2)
Expand All @@ -768,8 +960,8 @@ public static async Task Main(string[] args)
"list-consumer-groups", "describe-consumer-groups",
"list-consumer-group-offsets", "alter-consumer-group-offsets",
"incremental-alter-configs", "describe-user-scram-credentials",
"alter-user-scram-credentials"

"alter-user-scram-credentials", "describe-topics",
"describe-cluster"
}) +
" ..");
Environment.ExitCode = 1;
Expand Down Expand Up @@ -824,6 +1016,12 @@ public static async Task Main(string[] args)
case "alter-user-scram-credentials":
await AlterUserScramCredentialsAsync(bootstrapServers, commandArgs);
break;
case "describe-topics":
await DescribeTopicsAsync(bootstrapServers, commandArgs);
break;
case "describe-cluster":
await DescribeClusterAsync(bootstrapServers, commandArgs);
break;
default:
Console.WriteLine($"unknown command: {command}");
break;
Expand Down
44 changes: 42 additions & 2 deletions src/Confluent.Kafka/Admin/ConsumerGroupDescription.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Confluent Inc.
// Copyright 2022-2023 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,8 @@
// Refer to LICENSE for more information.

using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Confluent.Kafka.Admin
{
Expand Down Expand Up @@ -50,13 +52,51 @@ public class ConsumerGroupDescription
public ConsumerGroupState State { get; set; }

/// <summary>
/// Consumer group coordinator (broker).
/// Broker that acts as consumer group coordinator (null if not known).
/// </summary>
public Node Coordinator { get; set; }

/// <summary>
/// Members list.
/// </summary>
public List<MemberDescription> Members { get; set; }

/// <summary>
/// AclOperation list (null if not requested or not supported).
/// </summary>
public List<AclOperation> AuthorizedOperations { get; set; }

/// <summary>
/// Returns a JSON representation of this object.
/// </summary>
/// <returns>
/// A JSON representation of this object.
/// </returns>
public override string ToString()
{
var result = new StringBuilder();
var members = string.Join(",",
Members.Select(member =>
member.ToString()
).ToList());
var authorizedOperations = "null";
if (AuthorizedOperations != null)
{
authorizedOperations = string.Join(",",
AuthorizedOperations.Select(authorizedOperation =>
authorizedOperation.ToString().Quote()
).ToList());
authorizedOperations = $"[{authorizedOperations}]";
}

result.Append($"{{\"GroupId\": {GroupId.Quote()}");
result.Append($", \"Error\": \"{Error.Code}\", \"IsSimpleConsumerGroup\": {IsSimpleConsumerGroup.Quote()}");
result.Append($", \"PartitionAssignor\": {PartitionAssignor.Quote()}, \"State\": {State.ToString().Quote()}");
result.Append($", \"Coordinator\": {Coordinator?.ToString() ?? "null"}, \"Members\": [{members}]");
result.Append($", \"AuthorizedOperations\": {authorizedOperations}}}");

return result.ToString();
}

}
}
Loading

0 comments on commit 58b5291

Please sign in to comment.