Skip to content

Commit

Permalink
[KIP-460] Elect leader api implemented (#2320)
Browse files Browse the repository at this point in the history
  • Loading branch information
PratRanj07 authored Oct 10, 2024
1 parent dc5aff2 commit 8212364
Show file tree
Hide file tree
Showing 15 changed files with 680 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* References librdkafka.redist 2.6.0. Refer to the [librdkafka v2.6.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.6.0) for more information.
* [KIP-848 EA](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): Admin API for listing consumer groups now has an optional filter to return only groups of given types (#2323).
* [KIP-460](https://cwiki.apache.org/confluence/display/KAFKA/KIP-460%3A+Admin+Leader+Election+RPC) Admin Leader Election RPC (#2320)


# 2.5.3
Expand Down
79 changes: 78 additions & 1 deletion examples/AdminClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,31 @@ static bool ParseListConsumerGroupsArgs(string[] commandArgs,
}
}

static Tuple<ElectionType, List<TopicPartition>> ParseElectLeadersArgs(string[] args)
{
if ((args.Length -1 ) % 2 != 0)
{
Console.WriteLine("usage: .. <bootstrapServers> elect-leaders <electionType> <topic1> <partition1> ..");
Environment.ExitCode = 1;
return null;
}

var electionType = Enum.Parse<ElectionType>(args[0]);
var partitions = new List<TopicPartition>();
if(args.Length == 1)
{
partitions = null;
return Tuple.Create(electionType, partitions);
}
for (int i = 1; i < args.Length; i += 2)
{
var topic = args[i];
var partition = Int32.Parse(args[i + 1]);
partitions.Add(new TopicPartition(topic, partition));
}
return Tuple.Create(electionType, partitions);
}

static void PrintListOffsetsResultInfos(List<ListOffsetsResultInfo> ListOffsetsResultInfos)
{
foreach(var listOffsetsResultInfo in ListOffsetsResultInfos)
Expand All @@ -403,6 +428,20 @@ static void PrintListOffsetsResultInfos(List<ListOffsetsResultInfo> ListOffsetsR
}
}

static void PrintElectLeaderResults(List<TopicPartitionError> topicPartitions)
{
Console.WriteLine($"ElectLeaders response has {topicPartitions.Count} partition(s):");
foreach (var partitionResult in topicPartitions)
{
if (!partitionResult.Error.IsError)
Console.WriteLine($"Election successful in {partitionResult.Topic} {partitionResult.Partition}");
else
Console.WriteLine($"Election failed in {partitionResult.Topic} {partitionResult.Partition}: " +
$"Code: {partitionResult.Error.Code}" +
$", Reason: {partitionResult.Error.Reason}");
}
}

static async Task CreateAclsAsync(string bootstrapServers, string[] commandArgs)
{
List<AclBinding> aclBindings;
Expand Down Expand Up @@ -978,6 +1017,41 @@ static async Task ListOffsetsAsync(string bootstrapServers, string[] commandArgs
}
}
}

static async Task ElectLeadersAsync(string bootstrapServers, string[] commandArgs)
{
if (commandArgs.Length < 3 && (commandArgs.Length - 1) % 2 != 0)
{
Console.WriteLine("usage: .. <bootstrapServers> elect-leaders <electionType> <topic1> <partition1> ..");
Environment.ExitCode = 1;
return;
}

var req = ParseElectLeadersArgs(commandArgs);
var electionType = req.Item1;
var partitions = req.Item2;
var timeout = TimeSpan.FromSeconds(30);
ElectLeadersOptions options = new ElectLeadersOptions() { RequestTimeout = timeout };
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
{
try
{
var result = await adminClient.ElectLeadersAsync(electionType, partitions, options);
PrintElectLeaderResults(result.TopicPartitions);

}
catch (ElectLeadersException e)
{
Console.WriteLine("One or more elect leaders operations failed.");
PrintElectLeaderResults(e.Results.TopicPartitions);
}
catch (KafkaException e)
{
Console.WriteLine($"An error occurred electing leaders: {e}");
Environment.ExitCode = 1;
}
}
}
static void PrintTopicDescriptions(List<TopicDescription> topicDescriptions, bool includeAuthorizedOperations)
{
foreach (var topic in topicDescriptions)
Expand Down Expand Up @@ -1146,7 +1220,7 @@ public static async Task Main(string[] args)
"list-consumer-group-offsets", "alter-consumer-group-offsets",
"incremental-alter-configs", "describe-user-scram-credentials",
"alter-user-scram-credentials", "describe-topics",
"describe-cluster", "list-offsets"
"describe-cluster", "list-offsets", "elect-leaders"
}) +
" ..");
Environment.ExitCode = 1;
Expand Down Expand Up @@ -1210,6 +1284,9 @@ public static async Task Main(string[] args)
case "list-offsets":
await ListOffsetsAsync(bootstrapServers, commandArgs);
break;
case "elect-leaders":
await ElectLeadersAsync(bootstrapServers, commandArgs);
break;
default:
Console.WriteLine($"unknown command: {command}");
break;
Expand Down
47 changes: 47 additions & 0 deletions src/Confluent.Kafka/Admin/ElectLeadersException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2024 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System.Linq;


namespace Confluent.Kafka.Admin
{
/// <summary>
/// Represents an error that occured during the ElectLeaders operation.
/// </summary>
public class ElectLeadersException : KafkaException
{
/// <summary>
/// Initializes a new instance of ElectLeadersException.
/// </summary>
/// <param name="report">
/// The result of the ElectLeaders operation.
/// </param>
public ElectLeadersException(ElectLeadersReport report)
: base(new Error(ErrorCode.Local_Partial,
"Some errors occurred electing leaders: [" +
string.Join(", ", report.TopicPartitions.Where(tp => tp.Error.IsError)) +
"]"))
{
this.Results = report;
}

/// <summary>
/// Gets the results of the ElectLeaders operation.
/// </summary>
public ElectLeadersReport Results { get; }
}
}
46 changes: 46 additions & 0 deletions src/Confluent.Kafka/Admin/ElectLeadersOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2024 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System;


namespace Confluent.Kafka.Admin
{
/// <summary>
/// Options for the "AdminClient.ElectLeaders" method.
/// </summary>
public class ElectLeadersOptions
{
/// <summary>
/// The overall request timeout, including broker lookup, request
/// transmission, operation time on broker, and response. If set
/// to null, the default request timeout for the AdminClient will
/// be used.
///
/// Default: null
/// </summary>
public TimeSpan? RequestTimeout { get; set; }

/// <summary>
/// The broker's operation timeout - the maximum time to wait for
/// ElectLeaders before returning a result to the application.
/// If set to null, will return immediately upon triggering election.
///
/// Default: null
/// </summary>
public TimeSpan? OperationTimeout { get; set; }
}
}
55 changes: 55 additions & 0 deletions src/Confluent.Kafka/Admin/ElectLeadersReport.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2024 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System.Collections.Generic;
using System.Text;
using System.Linq;


namespace Confluent.Kafka.Admin
{
/// <summary>
/// Represents the result of an elect leaders request (including error status).
/// </summary>
public class ElectLeadersReport
{
/// <summary>
/// First error encountered in TopicPartitions.
/// </summary>
internal Error Error { get; set; }

/// <summary>
/// Individual partition results.
/// At least one of these will be in error.
/// </summary>
public List<TopicPartitionError> TopicPartitions { get; set; }

/// <summary>
/// A Json representation of the object.
/// </summary>
/// <returns>
/// A Json representation of the object.
/// </returns>
public override string ToString()
{
var result = new StringBuilder();
result.Append($"{{\"TopicPartitions\": [");
result.Append(string.Join(",", TopicPartitions.Select(b => $" {b.ToString()}")));
result.Append($"]}}");
return result.ToString();
}
}
}
50 changes: 50 additions & 0 deletions src/Confluent.Kafka/Admin/ElectLeadersResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2024 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System.Collections.Generic;
using System.Text;
using System.Linq;


namespace Confluent.Kafka.Admin
{
/// <summary>
/// Result information for all Partitions queried
/// in an ElectLeaderRequest.
/// </summary>
public class ElectLeadersResult
{
/// <summary>
/// Individual partition results.
/// </summary>
public List<TopicPartitionError> TopicPartitions { get; set; }

/// <summary>
/// A Json representation of the object.
/// </summary>
/// <returns>
/// A Json representation of the object.
/// </returns>
public override string ToString()
{
var result = new StringBuilder();
result.Append($"{{ \"TopicPartitions\": [");
result.Append(string.Join(",", TopicPartitions.Select(b => $" {b.ToString()}")));
result.Append($"]}}");
return result.ToString();
}
}
}
Loading

0 comments on commit 8212364

Please sign in to comment.