diff --git a/CHANGELOG.md b/CHANGELOG.md index 590fcd866..b4451f9fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * References librdkafka.redist 2.6.0. Refer to the [librdkafka v2.6.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.6.0) for more information. * [KIP-848 EA](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): Admin API for listing consumer groups now has an optional filter to return only groups of given types (#2323). +* [KIP-460](https://cwiki.apache.org/confluence/display/KAFKA/KIP-460%3A+Admin+Leader+Election+RPC) Admin Leader Election RPC (#2320) # 2.5.3 diff --git a/examples/AdminClient/Program.cs b/examples/AdminClient/Program.cs index fee39c3d9..df71cf77e 100644 --- a/examples/AdminClient/Program.cs +++ b/examples/AdminClient/Program.cs @@ -393,6 +393,31 @@ static bool ParseListConsumerGroupsArgs(string[] commandArgs, } } + static Tuple> ParseElectLeadersArgs(string[] args) + { + if ((args.Length -1 ) % 2 != 0) + { + Console.WriteLine("usage: .. elect-leaders .."); + Environment.ExitCode = 1; + return null; + } + + var electionType = Enum.Parse(args[0]); + var partitions = new List(); + if(args.Length == 1) + { + partitions = null; + return Tuple.Create(electionType, partitions); + } + for (int i = 1; i < args.Length; i += 2) + { + var topic = args[i]; + var partition = Int32.Parse(args[i + 1]); + partitions.Add(new TopicPartition(topic, partition)); + } + return Tuple.Create(electionType, partitions); + } + static void PrintListOffsetsResultInfos(List ListOffsetsResultInfos) { foreach(var listOffsetsResultInfo in ListOffsetsResultInfos) @@ -403,6 +428,20 @@ static void PrintListOffsetsResultInfos(List ListOffsetsR } } + static void PrintElectLeaderResults(List topicPartitions) + { + Console.WriteLine($"ElectLeaders response has {topicPartitions.Count} partition(s):"); + foreach (var partitionResult in topicPartitions) + { + if (!partitionResult.Error.IsError) + Console.WriteLine($"Election successful in {partitionResult.Topic} {partitionResult.Partition}"); + else + Console.WriteLine($"Election failed in {partitionResult.Topic} {partitionResult.Partition}: " + + $"Code: {partitionResult.Error.Code}" + + $", Reason: {partitionResult.Error.Reason}"); + } + } + static async Task CreateAclsAsync(string bootstrapServers, string[] commandArgs) { List aclBindings; @@ -978,6 +1017,41 @@ static async Task ListOffsetsAsync(string bootstrapServers, string[] commandArgs } } } + + static async Task ElectLeadersAsync(string bootstrapServers, string[] commandArgs) + { + if (commandArgs.Length < 3 && (commandArgs.Length - 1) % 2 != 0) + { + Console.WriteLine("usage: .. elect-leaders .."); + Environment.ExitCode = 1; + return; + } + + var req = ParseElectLeadersArgs(commandArgs); + var electionType = req.Item1; + var partitions = req.Item2; + var timeout = TimeSpan.FromSeconds(30); + ElectLeadersOptions options = new ElectLeadersOptions() { RequestTimeout = timeout }; + using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) + { + try + { + var result = await adminClient.ElectLeadersAsync(electionType, partitions, options); + PrintElectLeaderResults(result.TopicPartitions); + + } + catch (ElectLeadersException e) + { + Console.WriteLine("One or more elect leaders operations failed."); + PrintElectLeaderResults(e.Results.TopicPartitions); + } + catch (KafkaException e) + { + Console.WriteLine($"An error occurred electing leaders: {e}"); + Environment.ExitCode = 1; + } + } + } static void PrintTopicDescriptions(List topicDescriptions, bool includeAuthorizedOperations) { foreach (var topic in topicDescriptions) @@ -1146,7 +1220,7 @@ public static async Task Main(string[] args) "list-consumer-group-offsets", "alter-consumer-group-offsets", "incremental-alter-configs", "describe-user-scram-credentials", "alter-user-scram-credentials", "describe-topics", - "describe-cluster", "list-offsets" + "describe-cluster", "list-offsets", "elect-leaders" }) + " .."); Environment.ExitCode = 1; @@ -1210,6 +1284,9 @@ public static async Task Main(string[] args) case "list-offsets": await ListOffsetsAsync(bootstrapServers, commandArgs); break; + case "elect-leaders": + await ElectLeadersAsync(bootstrapServers, commandArgs); + break; default: Console.WriteLine($"unknown command: {command}"); break; diff --git a/src/Confluent.Kafka/Admin/ElectLeadersException.cs b/src/Confluent.Kafka/Admin/ElectLeadersException.cs new file mode 100644 index 000000000..7775999cd --- /dev/null +++ b/src/Confluent.Kafka/Admin/ElectLeadersException.cs @@ -0,0 +1,47 @@ +// Copyright 2024 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System.Linq; + + +namespace Confluent.Kafka.Admin +{ + /// + /// Represents an error that occured during the ElectLeaders operation. + /// + public class ElectLeadersException : KafkaException + { + /// + /// Initializes a new instance of ElectLeadersException. + /// + /// + /// The result of the ElectLeaders operation. + /// + public ElectLeadersException(ElectLeadersReport report) + : base(new Error(ErrorCode.Local_Partial, + "Some errors occurred electing leaders: [" + + string.Join(", ", report.TopicPartitions.Where(tp => tp.Error.IsError)) + + "]")) + { + this.Results = report; + } + + /// + /// Gets the results of the ElectLeaders operation. + /// + public ElectLeadersReport Results { get; } + } +} diff --git a/src/Confluent.Kafka/Admin/ElectLeadersOptions.cs b/src/Confluent.Kafka/Admin/ElectLeadersOptions.cs new file mode 100644 index 000000000..3ae775d1f --- /dev/null +++ b/src/Confluent.Kafka/Admin/ElectLeadersOptions.cs @@ -0,0 +1,46 @@ +// Copyright 2024 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; + + +namespace Confluent.Kafka.Admin +{ + /// + /// Options for the "AdminClient.ElectLeaders" method. + /// + public class ElectLeadersOptions + { + /// + /// The overall request timeout, including broker lookup, request + /// transmission, operation time on broker, and response. If set + /// to null, the default request timeout for the AdminClient will + /// be used. + /// + /// Default: null + /// + public TimeSpan? RequestTimeout { get; set; } + + /// + /// The broker's operation timeout - the maximum time to wait for + /// ElectLeaders before returning a result to the application. + /// If set to null, will return immediately upon triggering election. + /// + /// Default: null + /// + public TimeSpan? OperationTimeout { get; set; } + } +} diff --git a/src/Confluent.Kafka/Admin/ElectLeadersReport.cs b/src/Confluent.Kafka/Admin/ElectLeadersReport.cs new file mode 100644 index 000000000..1d9eb7a2a --- /dev/null +++ b/src/Confluent.Kafka/Admin/ElectLeadersReport.cs @@ -0,0 +1,55 @@ +// Copyright 2024 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System.Collections.Generic; +using System.Text; +using System.Linq; + + +namespace Confluent.Kafka.Admin +{ + /// + /// Represents the result of an elect leaders request (including error status). + /// + public class ElectLeadersReport + { + /// + /// First error encountered in TopicPartitions. + /// + internal Error Error { get; set; } + + /// + /// Individual partition results. + /// At least one of these will be in error. + /// + public List TopicPartitions { get; set; } + + /// + /// A Json representation of the object. + /// + /// + /// A Json representation of the object. + /// + public override string ToString() + { + var result = new StringBuilder(); + result.Append($"{{\"TopicPartitions\": ["); + result.Append(string.Join(",", TopicPartitions.Select(b => $" {b.ToString()}"))); + result.Append($"]}}"); + return result.ToString(); + } + } +} \ No newline at end of file diff --git a/src/Confluent.Kafka/Admin/ElectLeadersResult.cs b/src/Confluent.Kafka/Admin/ElectLeadersResult.cs new file mode 100644 index 000000000..86c34bdf1 --- /dev/null +++ b/src/Confluent.Kafka/Admin/ElectLeadersResult.cs @@ -0,0 +1,50 @@ +// Copyright 2024 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System.Collections.Generic; +using System.Text; +using System.Linq; + + +namespace Confluent.Kafka.Admin +{ + /// + /// Result information for all Partitions queried + /// in an ElectLeaderRequest. + /// + public class ElectLeadersResult + { + /// + /// Individual partition results. + /// + public List TopicPartitions { get; set; } + + /// + /// A Json representation of the object. + /// + /// + /// A Json representation of the object. + /// + public override string ToString() + { + var result = new StringBuilder(); + result.Append($"{{ \"TopicPartitions\": ["); + result.Append(string.Join(",", TopicPartitions.Select(b => $" {b.ToString()}"))); + result.Append($"]}}"); + return result.ToString(); + } + } +} diff --git a/src/Confluent.Kafka/AdminClient.cs b/src/Confluent.Kafka/AdminClient.cs index 204f44a10..082222233 100644 --- a/src/Confluent.Kafka/AdminClient.cs +++ b/src/Confluent.Kafka/AdminClient.cs @@ -635,6 +635,52 @@ private ListOffsetsReport extractListOffsetsReport(IntPtr resultPtr) }; } + private ElectLeadersReport extractElectLeadersResults(IntPtr resultPtr) + { + IntPtr topicPartitionsPtr = Librdkafka.ElectLeaders_result_partitions(resultPtr, out UIntPtr topicPartitionsCountPtr); + + if ((int)topicPartitionsCountPtr == 0) + { + return new ElectLeadersReport + { + Error = new Error(ErrorCode.NoError), + TopicPartitions = new List() + }; + } + + IntPtr[] topicPartitionsPtrArr = new IntPtr[(int)topicPartitionsCountPtr]; + Marshal.Copy(topicPartitionsPtr, topicPartitionsPtrArr, 0, (int)topicPartitionsCountPtr); + + ErrorCode reportErrorCode = ErrorCode.NoError; + var topicPartitions = topicPartitionsPtrArr.Select(ptr => + { + IntPtr topic_partition = Librdkafka.TopicPartitionResult_partition(ptr); + IntPtr error = Librdkafka.TopicPartitionResult_error(ptr); + + ErrorCode errCode = Librdkafka.error_code(error); + var errStr = Librdkafka.error_string(error); + + var tpe = Marshal.PtrToStructure(topic_partition); + + if(tpe.err != ErrorCode.NoError && reportErrorCode == ErrorCode.NoError) + { + reportErrorCode = tpe.err; + } + + return new TopicPartitionError( + tpe.topic, + new Partition(tpe.partition), + new Error(errCode, errStr) + ); + }).ToList(); + + return new ElectLeadersReport + { + Error = new Error(reportErrorCode), + TopicPartitions = topicPartitions + }; + } + private Task StartPollTask(CancellationToken ct) => Task.Factory.StartNew(() => { @@ -1253,6 +1299,33 @@ private Task StartPollTask(CancellationToken ct) } break; } + case Librdkafka.EventType.ElectLeaders_Result: + { + if(errorCode != ErrorCode.NoError) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); + break; + } + ElectLeadersReport report = extractElectLeadersResults(eventPtr); + if(report.Error.IsError) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new ElectLeadersException(report))); + } + else + { + var result = new ElectLeadersResult() + { + TopicPartitions = report.TopicPartitions + }; + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetResult(result)); + } + break; + } default: // Should never happen. throw new InvalidOperationException($"Unknown result type: {type}"); @@ -1304,6 +1377,7 @@ private Task StartPollTask(CancellationToken ct) { Librdkafka.EventType.DescribeTopics_Result, typeof(TaskCompletionSource) }, { Librdkafka.EventType.DescribeCluster_Result, typeof(TaskCompletionSource) }, { Librdkafka.EventType.ListOffsets_Result, typeof(TaskCompletionSource) }, + { Librdkafka.EventType.ElectLeaders_Result, typeof(TaskCompletionSource) }, }; @@ -1777,5 +1851,19 @@ public Task ListOffsetsAsync(IEnumerable + /// Refer to + /// + public Task ElectLeadersAsync(ElectionType electionType, IEnumerable partitions, ElectLeadersOptions options = null) + { + var completionSource = new TaskCompletionSource(); + var gch = GCHandle.Alloc(completionSource); + Handle.LibrdkafkaHandle.ElectLeaders( + electionType, partitions, options, resultQueue, + GCHandle.ToIntPtr(gch)); + return completionSource.Task; + } + } } diff --git a/src/Confluent.Kafka/ElectionType.cs b/src/Confluent.Kafka/ElectionType.cs new file mode 100644 index 000000000..8b0a9c0b7 --- /dev/null +++ b/src/Confluent.Kafka/ElectionType.cs @@ -0,0 +1,35 @@ +// Copyright 2024 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + + +namespace Confluent.Kafka +{ + /// + /// Enumerates the different types of Election types. + /// + public enum ElectionType : int + { + /// + /// Preferred Elections + /// + Preferred = 0, + + /// + /// Unclean Elections + /// + Unclean = 1, + } +} diff --git a/src/Confluent.Kafka/IAdminClient.cs b/src/Confluent.Kafka/IAdminClient.cs index f351fe813..12ef6c8d2 100644 --- a/src/Confluent.Kafka/IAdminClient.cs +++ b/src/Confluent.Kafka/IAdminClient.cs @@ -629,6 +629,40 @@ public static Task ListOffsetsAsync( } throw new NotImplementedException(); } + + /// + /// Perform Preferred or Unclean leader election for partitions. + /// + /// + /// AdminClient interface. + /// + /// + /// The type of election to trigger(Preferred or Unclean). + /// + /// + /// The partitions for which election has to be performed. + /// For NULL partitions, election will be performed for all partitions. + /// But results will be shown only where the election is successful or + /// an error other than ELECTION_NOT_NEEDED is encountered. + /// + /// + /// The options to use for this call. + /// + public static Task ElectLeadersAsync( + this IAdminClient adminClient, + ElectionType electionType, + IEnumerable partitions = null, + ElectLeadersOptions options = null) + { + if (adminClient is AdminClient) + { + return ((AdminClient) adminClient).ElectLeadersAsync( + electionType, + partitions, + options); + } + throw new NotImplementedException(); + } } diff --git a/src/Confluent.Kafka/Impl/LibRdKafka.cs b/src/Confluent.Kafka/Impl/LibRdKafka.cs index 5e6bbc066..9f2fc4970 100644 --- a/src/Confluent.Kafka/Impl/LibRdKafka.cs +++ b/src/Confluent.Kafka/Impl/LibRdKafka.cs @@ -76,6 +76,7 @@ internal enum AdminOp DescribeTopics = 19, DescribeCluster = 20, ListOffsets = 21, + ElectLeaders = 22, } public enum EventType : int @@ -109,6 +110,7 @@ public enum EventType : int DescribeTopics_Result = 0x100000, DescribeCluster_Result = 0x200000, ListOffsets_Result = 0x400000, + ElectLeaders_Result = 0x800000, } // Minimum librdkafka version. @@ -474,6 +476,13 @@ static bool SetDelegates(Type nativeMethodsClass) _TopicPartitionInfo_partition = (_TopicPartitionInfo_partition_delegate)methods.Single(m => m.Name == "rd_kafka_TopicPartitionInfo_partition").CreateDelegate(typeof (_TopicPartitionInfo_partition_delegate)); _TopicPartitionInfo_replicas = (_TopicPartitionInfo_replicas_delegate)methods.Single(m => m.Name == "rd_kafka_TopicPartitionInfo_replicas").CreateDelegate(typeof (_TopicPartitionInfo_replicas_delegate)); + _ElectLeadersRequest_new = (_ElectLeadersRequest_new_delegate)methods.Single(m => m.Name == "rd_kafka_ElectLeaders_new").CreateDelegate(typeof(_ElectLeadersRequest_new_delegate)); + _ElectLeadersRequest_destroy = (_ElectLeadersRequest_destroy_delegate)methods.Single(m => m.Name == "rd_kafka_ElectLeaders_destroy").CreateDelegate(typeof(_ElectLeadersRequest_destroy_delegate)); + _ElectLeaders = (_ElectLeaders_delegate)methods.Single(m => m.Name == "rd_kafka_ElectLeaders").CreateDelegate(typeof(_ElectLeaders_delegate)); + _ElectLeaders_result_partitions = (_ElectLeaders_result_partitions_delegate)methods.Single(m => m.Name == "rd_kafka_ElectLeaders_result_partitions").CreateDelegate(typeof(_ElectLeaders_result_partitions_delegate)); + _TopicPartitionResult_partition = (_TopicPartitionResult_partition_delegate)methods.Single(m => m.Name == "rd_kafka_topic_partition_result_partition").CreateDelegate(typeof(_TopicPartitionResult_partition_delegate)); + _TopicPartitionResult_error = (_TopicPartitionResult_error_delegate)methods.Single(m => m.Name == "rd_kafka_topic_partition_result_error").CreateDelegate(typeof(_TopicPartitionResult_error_delegate)); + _DescribeCluster = (_DescribeCluster_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeCluster").CreateDelegate(typeof (_DescribeCluster_delegate)); _DescribeCluster_result_nodes = (_DescribeCluster_result_nodes_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeCluster_result_nodes").CreateDelegate(typeof (_DescribeCluster_result_nodes_delegate)); _DescribeCluster_result_authorized_operations = (_DescribeCluster_result_authorized_operations_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeCluster_result_authorized_operations").CreateDelegate(typeof (_DescribeCluster_result_authorized_operations_delegate)); @@ -2265,6 +2274,41 @@ internal static IntPtr DescribeCluster_result_controller(IntPtr result) private static _DescribeCluster_result_cluster_id_delegate _DescribeCluster_result_cluster_id; internal static IntPtr DescribeCluster_result_cluster_id(IntPtr result) => _DescribeCluster_result_cluster_id(result); + + + // + // ElectLeaders + // + + private delegate IntPtr _ElectLeadersRequest_new_delegate(ElectionType electionType, IntPtr partitions); + private static _ElectLeadersRequest_new_delegate _ElectLeadersRequest_new; + internal static IntPtr ElectLeadersRequest_New(ElectionType electionType, IntPtr partitions) + => _ElectLeadersRequest_new(electionType, partitions); + + private delegate void _ElectLeadersRequest_destroy_delegate(IntPtr electLeaderRequest); + private static _ElectLeadersRequest_destroy_delegate _ElectLeadersRequest_destroy; + internal static void ElectLeadersRequest_destroy(IntPtr electLeaderRequest) + => _ElectLeadersRequest_destroy(electLeaderRequest); + + private delegate void _ElectLeaders_delegate(IntPtr handle, IntPtr electLeaderRequest, IntPtr options, IntPtr resultQueuePtr); + private static _ElectLeaders_delegate _ElectLeaders; + internal static void ElectLeaders(IntPtr handle, IntPtr electLeaderRequest, IntPtr options, IntPtr resultQueuePtr) + => _ElectLeaders(handle, electLeaderRequest, options, resultQueuePtr); + + private delegate IntPtr _ElectLeaders_result_partitions_delegate(IntPtr result, out UIntPtr cntp); + private static _ElectLeaders_result_partitions_delegate _ElectLeaders_result_partitions; + internal static IntPtr ElectLeaders_result_partitions(IntPtr result, out UIntPtr cntp) + => _ElectLeaders_result_partitions(result, out cntp); + + private delegate IntPtr _TopicPartitionResult_partition_delegate(IntPtr result); + private static _TopicPartitionResult_partition_delegate _TopicPartitionResult_partition; + internal static IntPtr TopicPartitionResult_partition(IntPtr result) + => _TopicPartitionResult_partition(result); + + private delegate IntPtr _TopicPartitionResult_error_delegate(IntPtr result); + private static _TopicPartitionResult_error_delegate _TopicPartitionResult_error; + internal static IntPtr TopicPartitionResult_error(IntPtr result) + => _TopicPartitionResult_error(result); // // Queues diff --git a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs index bceb4aaf8..5b08d48cb 100644 --- a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs +++ b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs @@ -1281,6 +1281,24 @@ internal static extern void rd_kafka_DescribeCluster( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_DescribeCluster_result_cluster_id(IntPtr result); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ElectLeaders_new(ElectionType electionType, IntPtr partitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_ElectLeaders_destroy(IntPtr electLeader); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_ElectLeaders(IntPtr handle, IntPtr electLeaderRequest, IntPtr options, IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ElectLeaders_result_partitions(IntPtr result_event, out UIntPtr cntp); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_topic_partition_result_partition(IntPtr result); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_topic_partition_result_error(IntPtr result); + // // Queues // diff --git a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs index c8ffd8abd..e269144bf 100644 --- a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs +++ b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs @@ -1285,6 +1285,24 @@ internal static extern void rd_kafka_DescribeCluster( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_DescribeCluster_result_cluster_id(IntPtr result); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ElectLeaders_new(ElectionType electionType, IntPtr partitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_ElectLeaders_destroy(IntPtr electLeader); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_ElectLeaders(IntPtr handle, IntPtr electLeaderRequest, IntPtr options, IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ElectLeaders_result_partitions(IntPtr result_event, out UIntPtr cntp); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_topic_partition_result_partition(IntPtr result); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_topic_partition_result_error(IntPtr result); + // // Queues // diff --git a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos8.cs b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos8.cs index d42f5974e..91f536b19 100644 --- a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos8.cs +++ b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos8.cs @@ -1285,6 +1285,24 @@ internal static extern void rd_kafka_DescribeCluster( [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_DescribeCluster_result_cluster_id(IntPtr result); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ElectLeaders_new(ElectionType electionType, IntPtr partitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_ElectLeaders_destroy(IntPtr electLeader); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_ElectLeaders(IntPtr handle, IntPtr electLeaderRequest, IntPtr options, IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ElectLeaders_result_partitions(IntPtr result_event, out UIntPtr cntp); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_topic_partition_result_partition(IntPtr result); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_topic_partition_result_error(IntPtr result); + // // Queues // diff --git a/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs b/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs index 11a14dfdd..375b7f3dc 100644 --- a/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs +++ b/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs @@ -2541,6 +2541,50 @@ internal void ListOffsets(IEnumerable topicPartitionOf } } + internal void ElectLeaders(ElectionType electionType, IEnumerable partitions, ElectLeadersOptions options, IntPtr resultQueuePtr, IntPtr completionSourcePtr) + { + ThrowIfHandleClosed(); + var optionsPtr = IntPtr.Zero; + IntPtr topic_partition_list = IntPtr.Zero; + IntPtr request = IntPtr.Zero; + try + { + // Set Admin Options if any. + options = new ElectLeadersOptions(); + optionsPtr = Librdkafka.AdminOptions_new(handle, Librdkafka.AdminOp.ElectLeaders); + setOption_RequestTimeout(optionsPtr, options.RequestTimeout); + setOption_OperationTimeout(optionsPtr, options.OperationTimeout); + setOption_completionSource(optionsPtr, completionSourcePtr); + + if(partitions != null) + { + topic_partition_list = Librdkafka.topic_partition_list_new((IntPtr)partitions.Count()); + foreach (var topicPartitions in partitions) + { + IntPtr topic_partition = Librdkafka.topic_partition_list_add(topic_partition_list, topicPartitions.Topic, topicPartitions.Partition); + } + } + request = Librdkafka.ElectLeadersRequest_New(electionType, topic_partition_list); + + Librdkafka.ElectLeaders(handle, request, optionsPtr, resultQueuePtr); + } + finally + { + if (optionsPtr != IntPtr.Zero) + { + Librdkafka.AdminOptions_destroy(optionsPtr); + } + if (topic_partition_list != IntPtr.Zero) + { + Librdkafka.topic_partition_list_destroy(topic_partition_list); + } + if(request != IntPtr.Zero) + { + Librdkafka.ElectLeadersRequest_destroy(request); + } + } + } + internal void DescribeTopics(TopicCollection topicCollection, DescribeTopicsOptions options, IntPtr resultQueuePtr, IntPtr completionSourcePtr) { ThrowIfHandleClosed(); diff --git a/test/Confluent.Kafka.UnitTests/Admin/ElectLeadersError.cs b/test/Confluent.Kafka.UnitTests/Admin/ElectLeadersError.cs new file mode 100644 index 000000000..029a004d1 --- /dev/null +++ b/test/Confluent.Kafka.UnitTests/Admin/ElectLeadersError.cs @@ -0,0 +1,104 @@ +// Copyright 2024 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using Xunit; +using System; +using Confluent.Kafka.Admin; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Confluent.Kafka.UnitTests +{ + public class ElectLeadersErrorTests + { + private AdminClientConfig GetTestConfig() + { + return new AdminClientConfig + { + BootstrapServers = "localhost:90922", + SocketTimeoutMs = 30 + }; + } + + [Theory] + [InlineData(ElectionType.Preferred)] + [InlineData(ElectionType.Unclean)] + public async Task ElectLeadersAsync_EmptyPartitions_ThrowsKafkaException(ElectionType electionType) + { + using var adminClient = new AdminClientBuilder(GetTestConfig()).Build(); + + var exception = await Assert.ThrowsAsync(() => + adminClient.ElectLeadersAsync( + electionType, + new List(), + new ElectLeadersOptions()) + ); + + Assert.Contains("Local: Timed out", exception.Message); + } + + [Theory] + [InlineData(ElectionType.Preferred)] + [InlineData(ElectionType.Unclean)] + public async Task ElectLeadersAsync_NullPartitions_ThrowsKafkaException(ElectionType electionType) + { + using var adminClient = new AdminClientBuilder(GetTestConfig()).Build(); + + var exception = await Assert.ThrowsAsync(() => + adminClient.ElectLeadersAsync( + electionType, + null, + new ElectLeadersOptions()) + ); + + Assert.Contains("Local: Timed out", exception.Message); + } + + [Theory] + [InlineData(ElectionType.Preferred)] + [InlineData(ElectionType.Unclean)] + public async Task ElectLeadersAsync_DuplicatePartitions_ThrowsKafkaException(ElectionType electionType) + { + using var adminClient = new AdminClientBuilder(GetTestConfig()).Build(); + + var exception = await Assert.ThrowsAsync(() => + adminClient.ElectLeadersAsync( + electionType, + new List { new TopicPartition("topic", 0), new TopicPartition("topic", 0)}, + new ElectLeadersOptions()) + ); + + Assert.Contains("Duplicate partitions specified", exception.Message); + } + + [Theory] + [InlineData(ElectionType.Preferred)] + [InlineData(ElectionType.Unclean)] + public async Task ElectLeadersAsync_ValidRequest_TimesOut(ElectionType electionType) + { + using var adminClient = new AdminClientBuilder(GetTestConfig()).Build(); + + var exception = await Assert.ThrowsAsync(() => + adminClient.ElectLeadersAsync( + electionType, + new List { new TopicPartition("topic", 0) }, + new ElectLeadersOptions { RequestTimeout = TimeSpan.FromMilliseconds(1000) }) + ); + + Assert.Contains("Local: Timed out", exception.Message); + } + } +} \ No newline at end of file