Skip to content

Commit

Permalink
[KIP-396] ListOffsets (#2086)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Emanuele Sabellico <esabellico@confluent.io>
Co-authored-by: Anchit Jain <anjain@confluent.io>
  • Loading branch information
3 people authored Oct 19, 2023
1 parent 58b5291 commit 07de95e
Show file tree
Hide file tree
Showing 18 changed files with 861 additions and 4 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
## Enhancements

- References librdkafka.redist 2.3.0. Refer to the [librdkafka v2.3.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.3.0) for more information.
- [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
Return authorized operations in describe responses (#2021, @jainruchir).
- [KIP-396](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484): Added support for ListOffsets Admin API (#2086).
- Added support for external JSON schemas in `JsonSerializer` and `JsonDeserializer` (#2042).
- Added compatibility methods to CachedSchemaRegistryClient ([ISBronny](https://github.com/ISBronny), #2097).
- Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()` (#2021, @jainruchir).
- [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
Return authorized operations in describe responses (#2021, @jainruchir).


# 2.2.0
Expand Down
119 changes: 117 additions & 2 deletions examples/AdminClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,87 @@ static List<UserScramCredentialAlteration> ParseUserScramCredentialAlterations(
return alterations;
}

static Tuple<IsolationLevel, List<TopicPartitionOffsetSpec>> ParseListOffsetsArgs(string[] args)
{
if (args.Length == 0)
{
Console.WriteLine("usage: .. <bootstrapServers> list-offsets <isolation_level> " +
"<topic1> <partition1> <EARLIEST/LATEST/MAXTIMESTAMP/TIMESTAMP t1> ..");
Environment.ExitCode = 1;
return null;
}

var isolationLevel = Enum.Parse<IsolationLevel>(args[0]);
var topicPartitionOffsetSpecs = new List<TopicPartitionOffsetSpec>();
for (int i = 1; i < args.Length;)
{
if (args.Length < i+3)
{
throw new ArgumentException($"Invalid number of arguments for topicPartitionOffsetSpec[{topicPartitionOffsetSpecs.Count}]: {args.Length - i}");
}

string topic = args[i];
var partition = Int32.Parse(args[i + 1]);
var offsetSpec = args[i + 2];
if (offsetSpec == "TIMESTAMP")
{
if (args.Length < i+4)
{
throw new ArgumentException($"Invalid number of arguments for topicPartitionOffsetSpec[{topicPartitionOffsetSpecs.Count}]: {args.Length - i}");
}

var timestamp = Int64.Parse(args[i + 3]);
i = i + 1;
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec
{
TopicPartition = new TopicPartition(topic, new Partition(partition)),
OffsetSpec = OffsetSpec.ForTimestamp(timestamp)
});
}
else if (offsetSpec == "MAX_TIMESTAMP")
{
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec
{
TopicPartition = new TopicPartition(topic, new Partition(partition)),
OffsetSpec = OffsetSpec.MaxTimestamp()
});
}
else if (offsetSpec == "EARLIEST")
{
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec
{
TopicPartition = new TopicPartition(topic, new Partition(partition)),
OffsetSpec = OffsetSpec.Earliest()
});
}
else if (offsetSpec == "LATEST")
{
topicPartitionOffsetSpecs.Add( new TopicPartitionOffsetSpec
{
TopicPartition = new TopicPartition(topic, new Partition(partition)),
OffsetSpec = OffsetSpec.Latest()
});
}
else
{
throw new ArgumentException(
"offsetSpec can be EARLIEST, LATEST, MAX_TIMESTAMP or TIMESTAMP T1.");
}
i = i + 3;
}
return Tuple.Create(isolationLevel, topicPartitionOffsetSpecs);
}

static void PrintListOffsetsResultInfos(List<ListOffsetsResultInfo> ListOffsetsResultInfos)
{
foreach(var listOffsetsResultInfo in ListOffsetsResultInfos)
{
Console.WriteLine(" ListOffsetsResultInfo:");
Console.WriteLine($" TopicPartitionOffsetError: {listOffsetsResultInfo.TopicPartitionOffsetError}");
Console.WriteLine($" Timestamp: {listOffsetsResultInfo.Timestamp}");
}
}

static async Task CreateAclsAsync(string bootstrapServers, string[] commandArgs)
{
List<AclBinding> aclBindings;
Expand Down Expand Up @@ -793,6 +874,38 @@ await adminClient.AlterUserScramCredentialsAsync(alterations,
}
}

static async Task ListOffsetsAsync(string bootstrapServers, string[] commandArgs) {

var listOffsetsArgs = ParseListOffsetsArgs(commandArgs);
if (listOffsetsArgs == null) { return; }

var isolationLevel = listOffsetsArgs.Item1;
var topicPartitionOffsets = listOffsetsArgs.Item2;

var timeout = TimeSpan.FromSeconds(30);
ListOffsetsOptions options = new ListOffsetsOptions(){ RequestTimeout = timeout, IsolationLevel = isolationLevel };

using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
{
try
{
var listOffsetsResult = await adminClient.ListOffsetsAsync(topicPartitionOffsets, options);
Console.WriteLine("ListOffsetsResult:");
PrintListOffsetsResultInfos(listOffsetsResult.ListOffsetsResultInfos);
}
catch (ListOffsetsException e)
{
Console.WriteLine("ListOffsetsReport:");
Console.WriteLine($" Error: {e.Error}");
PrintListOffsetsResultInfos(e.Result.ListOffsetsResultInfos);
}
catch (KafkaException e)
{
Console.WriteLine($"An error occurred listing offsets: {e}");
Environment.ExitCode = 1;
}
}
}
static void PrintTopicDescriptions(List<TopicDescription> topicDescriptions, bool includeAuthorizedOperations)
{
foreach (var topic in topicDescriptions)
Expand Down Expand Up @@ -956,12 +1069,11 @@ public static async Task Main(string[] args)
Console.WriteLine(
"usage: .. <bootstrapServers> " + String.Join("|", new string[] {
"list-groups", "metadata", "library-version", "create-topic", "create-acls",
"describe-acls", "delete-acls",
"list-consumer-groups", "describe-consumer-groups",
"list-consumer-group-offsets", "alter-consumer-group-offsets",
"incremental-alter-configs", "describe-user-scram-credentials",
"alter-user-scram-credentials", "describe-topics",
"describe-cluster"
"describe-cluster", "list-offsets"
}) +
" ..");
Environment.ExitCode = 1;
Expand Down Expand Up @@ -1022,6 +1134,9 @@ public static async Task Main(string[] args)
case "describe-cluster":
await DescribeClusterAsync(bootstrapServers, commandArgs);
break;
case "list-offsets":
await ListOffsetsAsync(bootstrapServers, commandArgs);
break;
default:
Console.WriteLine($"unknown command: {command}");
break;
Expand Down
47 changes: 47 additions & 0 deletions src/Confluent.Kafka/Admin/ListOffsetsException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.


namespace Confluent.Kafka.Admin
{
/// <summary>
/// Represents an error occurred while listing offsets.
/// </summary>
public class ListOffsetsException : KafkaException
{
/// <summary>
/// Initializes a new instance of ListOffsetsException.
/// </summary>
/// <param name="result">
/// The result corresponding to all partitions in the request
/// (whether or not they were in error). At least one of these
/// topic partiton in result will be in error.
/// </param>
public ListOffsetsException(ListOffsetsReport result)
: base(new Error(ErrorCode.Local_Partial,
"An error occurred in list offsets, check individual topic partiton in result."))
{
Result = result;
}

/// <summary>
/// The result corresponding to all partitions in the request
/// (whether or not they were in error). At least one of these
/// results will be in error.
/// </summary>
public ListOffsetsReport Result { get; }
}
}
45 changes: 45 additions & 0 deletions src/Confluent.Kafka/Admin/ListOffsetsOption.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2023 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System;


namespace Confluent.Kafka.Admin
{
/// <summary>
/// Options for the "AdminClient.ListOffsetsAsync" method.
/// </summary>
public class ListOffsetsOptions
{
/// <summary>
/// The overall request timeout, including broker lookup, request
/// transmission, operation time on broker, and response. If set
/// to null, the default request timeout for the AdminClient will
/// be used.
///
/// Default: null
/// </summary>
public TimeSpan? RequestTimeout { get; set; }

/// <summary>
/// Isolation level to fetch the offset with.
/// Applies to the whole request.
///
/// Default: ReadUncommitted
/// </summary>
public IsolationLevel IsolationLevel { get; set; } = IsolationLevel.ReadUncommitted;
}
}
56 changes: 56 additions & 0 deletions src/Confluent.Kafka/Admin/ListOffsetsReport.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2023 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System.Collections.Generic;
using System.Text;
using System.Linq;


namespace Confluent.Kafka.Admin
{
/// <summary>
/// Represents the result of a ListOffsets request (including error status).
/// </summary>
public class ListOffsetsReport
{
/// <summary>
/// Result information for all the partitions queried
/// with ListOffsets. At least one of these
/// results will be in error.
/// </summary>
public List<ListOffsetsResultInfo> ListOffsetsResultInfos { get; set; }

/// <summary>
/// Operation error status.
/// </summary>
public Error Error { get; set; }

/// <summary>
/// Returns a JSON representation of the object.
/// </summary>
/// <returns>
/// A JSON representation of the object.
/// </returns>
public override string ToString()
{
var result = new StringBuilder();
result.Append($"{{\"ListOffsetsResultInfos\": [");
result.Append(string.Join(",", ListOffsetsResultInfos.Select(b => $" {b.ToString()}")));
result.Append($"], \"Error\": \"{Error.Code}\"}}");
return result.ToString();
}
}
}
51 changes: 51 additions & 0 deletions src/Confluent.Kafka/Admin/ListOffsetsResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2023 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System.Collections.Generic;
using System.Text;
using System.Linq;


namespace Confluent.Kafka.Admin
{
/// <summary>
/// Represents the result of a ListOffsets request.
/// </summary>
public class ListOffsetsResult
{
/// <summary>
/// Result information for all the partitions queried
/// with ListOffsets.
/// </summary>
public List<ListOffsetsResultInfo> ListOffsetsResultInfos { get; set; }


/// <summary>
/// Returns a JSON representation of the object.
/// </summary>
/// <returns>
/// A JSON representation of the object.
/// </returns>
public override string ToString()
{
var result = new StringBuilder();
result.Append($"{{\"ListOffsetsResultInfos\": [");
result.Append(string.Join(",", ListOffsetsResultInfos.Select(b => $" {b.ToString()}")));
result.Append("]}");
return result.ToString();
}
}
}
Loading

0 comments on commit 07de95e

Please sign in to comment.