From 3b36366dc95af241dedf72d89c199f464a6a5054 Mon Sep 17 00:00:00 2001 From: Lautaro Emanuel <31224949+emlautarom1@users.noreply.github.com> Date: Fri, 2 Aug 2024 14:39:00 -0300 Subject: [PATCH] Incorrect batch unwrapping in JSON-RPC replay tool (#7279) --- tools/Nethermind.Tools.Kute/Application.cs | 18 ++++++---- tools/Nethermind.Tools.Kute/JsonRpc.cs | 19 +++++++--- .../ComposedJsonRpcMethodFilter.cs | 13 ++++--- .../JsonRpcSubmitter/HttpJsonRpcSubmitter.cs | 2 +- .../JsonRpcSubmitter/IJsonRpcSubmitter.cs | 2 +- .../JsonRpcSubmitter/NullJsonRpcSubmitter.cs | 4 +-- .../MessageProvider/JsonRpcMessageProvider.cs | 30 ++++------------ .../UnwrapBatchJsonRpcMessageProvider.cs | 35 +++++++++++++++++++ tools/Nethermind.Tools.Kute/Program.cs | 23 ++++++------ 9 files changed, 91 insertions(+), 55 deletions(-) create mode 100644 tools/Nethermind.Tools.Kute/MessageProvider/UnwrapBatchJsonRpcMessageProvider.cs diff --git a/tools/Nethermind.Tools.Kute/Application.cs b/tools/Nethermind.Tools.Kute/Application.cs index d65e7632722..e2943214307 100644 --- a/tools/Nethermind.Tools.Kute/Application.cs +++ b/tools/Nethermind.Tools.Kute/Application.cs @@ -12,7 +12,6 @@ using Nethermind.Tools.Kute.ResponseTracer; using System.Collections.Concurrent; using Nethermind.Tools.Kute.FlowManager; -using System.Threading.Tasks; namespace Nethermind.Tools.Kute; @@ -54,7 +53,7 @@ public async Task Run() { _progressReporter.ReportStart(); - BlockingCollection<(Task, JsonRpc)> responseTasks = new BlockingCollection<(Task, JsonRpc)>(); + BlockingCollection<(Task, JsonRpc)> responseTasks = new BlockingCollection<(Task, JsonRpc)>(); var responseHandlingTask = Task.Run(async () => { await Parallel.ForEachAsync(responseTasks.GetConsumingEnumerable(), async (task, ct) => @@ -121,7 +120,7 @@ await Parallel.ForEachAsync(responseTasks.GetConsumingEnumerable(), async (task, await _metricsConsumer.ConsumeMetrics(_metrics); } - public async Task AnalyzeRequest((Task, JsonRpc) task) + public async Task AnalyzeRequest((Task, JsonRpc) task) { switch (task.Item2) { @@ -133,12 +132,14 @@ public async Task AnalyzeRequest((Task, JsonRpc) task) } case JsonRpc.BatchJsonRpc batch: { - HttpResponseMessage content; + HttpResponseMessage? content; using (_metrics.TimeBatch()) { content = await task.Item1; } - var deserialized = JsonSerializer.Deserialize(content.Content.ReadAsStream()); + var deserialized = content is not null + ? JsonSerializer.Deserialize(await content.Content.ReadAsStreamAsync()) + : null; if (_validator.IsInvalid(batch, deserialized)) { @@ -155,12 +156,15 @@ public async Task AnalyzeRequest((Task, JsonRpc) task) } case JsonRpc.SingleJsonRpc single: { - HttpResponseMessage content; + HttpResponseMessage? content; using (_metrics.TimeMethod(single.MethodName)) { content = await task.Item1; } - var deserialized = JsonSerializer.Deserialize(content.Content.ReadAsStream()); + + var deserialized = content is not null + ? JsonSerializer.Deserialize(await content.Content.ReadAsStreamAsync()) + : null; if (single.MethodName is null) { diff --git a/tools/Nethermind.Tools.Kute/JsonRpc.cs b/tools/Nethermind.Tools.Kute/JsonRpc.cs index afc120196e2..b6e05157a0b 100644 --- a/tools/Nethermind.Tools.Kute/JsonRpc.cs +++ b/tools/Nethermind.Tools.Kute/JsonRpc.cs @@ -7,20 +7,29 @@ namespace Nethermind.Tools.Kute; public abstract record JsonRpc { - public JsonDocument Document; + private readonly JsonDocument _document; private JsonRpc(JsonDocument document) { - Document = document; + _document = document; } - public string ToJsonString() => Document.RootElement.ToString(); + public string ToJsonString() => _document.RootElement.ToString(); public record BatchJsonRpc : JsonRpc { public BatchJsonRpc(JsonDocument document) : base(document) { } public override string ToString() => $"{nameof(BatchJsonRpc)} {ToJsonString()}"; + + public IEnumerable Items() + { + foreach (var element in _document.RootElement.EnumerateArray()) + { + var document = JsonSerializer.Deserialize(element); + yield return document is null ? null : new SingleJsonRpc(document); + } + } } public record SingleJsonRpc : JsonRpc @@ -31,11 +40,11 @@ public record SingleJsonRpc : JsonRpc public SingleJsonRpc(JsonDocument document) : base(document) { _isResponse = new(() => - Document.RootElement.TryGetProperty("response", out _) + _document.RootElement.TryGetProperty("response", out _) ); _methodName = new(() => { - if (Document.RootElement.TryGetProperty("method", out var jsonMethodField)) + if (_document.RootElement.TryGetProperty("method", out var jsonMethodField)) { return jsonMethodField.GetString(); } diff --git a/tools/Nethermind.Tools.Kute/JsonRpcMethodFilter/ComposedJsonRpcMethodFilter.cs b/tools/Nethermind.Tools.Kute/JsonRpcMethodFilter/ComposedJsonRpcMethodFilter.cs index 89319ccdbe9..91d02bfe140 100644 --- a/tools/Nethermind.Tools.Kute/JsonRpcMethodFilter/ComposedJsonRpcMethodFilter.cs +++ b/tools/Nethermind.Tools.Kute/JsonRpcMethodFilter/ComposedJsonRpcMethodFilter.cs @@ -3,11 +3,16 @@ namespace Nethermind.Tools.Kute.JsonRpcMethodFilter; -class ComposedJsonRpcMethodFilter(IEnumerable filters) : IJsonRpcMethodFilter +class ComposedJsonRpcMethodFilter : IJsonRpcMethodFilter { - private readonly IEnumerable _filters = filters; + private readonly List _filters; + private readonly bool _hasNoFilters; - private bool HasFilters => _filters?.Any() ?? false; + public ComposedJsonRpcMethodFilter(List filters) + { + _filters = filters; + _hasNoFilters = filters.Count == 0; + } - public bool ShouldSubmit(string methodName) => !HasFilters || _filters.Any(f => f.ShouldSubmit(methodName)); + public bool ShouldSubmit(string methodName) => _hasNoFilters || _filters.Any(f => f.ShouldSubmit(methodName)); } diff --git a/tools/Nethermind.Tools.Kute/JsonRpcSubmitter/HttpJsonRpcSubmitter.cs b/tools/Nethermind.Tools.Kute/JsonRpcSubmitter/HttpJsonRpcSubmitter.cs index c82239f3163..35a2612f8bd 100644 --- a/tools/Nethermind.Tools.Kute/JsonRpcSubmitter/HttpJsonRpcSubmitter.cs +++ b/tools/Nethermind.Tools.Kute/JsonRpcSubmitter/HttpJsonRpcSubmitter.cs @@ -23,7 +23,7 @@ public HttpJsonRpcSubmitter(HttpClient httpClient, IAuth auth, string hostAddres _uri = new Uri(hostAddress); } - public async Task Submit(JsonRpc rpc) + public async Task Submit(JsonRpc rpc) { var request = new HttpRequestMessage(HttpMethod.Post, _uri) { diff --git a/tools/Nethermind.Tools.Kute/JsonRpcSubmitter/IJsonRpcSubmitter.cs b/tools/Nethermind.Tools.Kute/JsonRpcSubmitter/IJsonRpcSubmitter.cs index bd8c383246c..2918dd6ed86 100644 --- a/tools/Nethermind.Tools.Kute/JsonRpcSubmitter/IJsonRpcSubmitter.cs +++ b/tools/Nethermind.Tools.Kute/JsonRpcSubmitter/IJsonRpcSubmitter.cs @@ -7,5 +7,5 @@ namespace Nethermind.Tools.Kute.JsonRpcSubmitter; interface IJsonRpcSubmitter { - Task Submit(JsonRpc rpc); + Task Submit(JsonRpc rpc); } diff --git a/tools/Nethermind.Tools.Kute/JsonRpcSubmitter/NullJsonRpcSubmitter.cs b/tools/Nethermind.Tools.Kute/JsonRpcSubmitter/NullJsonRpcSubmitter.cs index ca4da79e311..9aeeea5220f 100644 --- a/tools/Nethermind.Tools.Kute/JsonRpcSubmitter/NullJsonRpcSubmitter.cs +++ b/tools/Nethermind.Tools.Kute/JsonRpcSubmitter/NullJsonRpcSubmitter.cs @@ -7,7 +7,5 @@ namespace Nethermind.Tools.Kute.JsonRpcSubmitter; class NullJsonRpcSubmitter : IJsonRpcSubmitter { - - public Task Submit(JsonRpc rpc) => Task.FromResult(null); - + public Task Submit(JsonRpc rpc) => Task.FromResult(null); } diff --git a/tools/Nethermind.Tools.Kute/MessageProvider/JsonRpcMessageProvider.cs b/tools/Nethermind.Tools.Kute/MessageProvider/JsonRpcMessageProvider.cs index bf8cd31a2eb..687de50cbeb 100644 --- a/tools/Nethermind.Tools.Kute/MessageProvider/JsonRpcMessageProvider.cs +++ b/tools/Nethermind.Tools.Kute/MessageProvider/JsonRpcMessageProvider.cs @@ -8,16 +8,12 @@ namespace Nethermind.Tools.Kute.MessageProvider; public class JsonRpcMessageProvider : IMessageProvider { private readonly IMessageProvider _provider; - private readonly bool _unwrapBatches; - - public JsonRpcMessageProvider(IMessageProvider provider, bool unwrapBatches) + public JsonRpcMessageProvider(IMessageProvider provider) { _provider = provider; - _unwrapBatches = unwrapBatches; } - public IAsyncEnumerable Messages { get => MessagesImpl(); } private async IAsyncEnumerable MessagesImpl() @@ -26,26 +22,12 @@ public JsonRpcMessageProvider(IMessageProvider provider, bool unwrapBatc { var jsonDoc = JsonSerializer.Deserialize(msg); - switch (jsonDoc?.RootElement.ValueKind) + yield return jsonDoc?.RootElement.ValueKind switch { - case JsonValueKind.Object: - yield return new JsonRpc.SingleJsonRpc(jsonDoc); - break; - case JsonValueKind.Array: - if (_unwrapBatches) - { - foreach (JsonElement single in jsonDoc.RootElement.EnumerateArray()) - { - yield return new JsonRpc.SingleJsonRpc(JsonDocument.Parse(single.ToString())); - } - } - yield return new JsonRpc.BatchJsonRpc(jsonDoc); - break; - default: - yield return null; - break; - - } + JsonValueKind.Object => new JsonRpc.SingleJsonRpc(jsonDoc), + JsonValueKind.Array => new JsonRpc.BatchJsonRpc(jsonDoc), + _ => null + }; } } } diff --git a/tools/Nethermind.Tools.Kute/MessageProvider/UnwrapBatchJsonRpcMessageProvider.cs b/tools/Nethermind.Tools.Kute/MessageProvider/UnwrapBatchJsonRpcMessageProvider.cs new file mode 100644 index 00000000000..563ee222ce9 --- /dev/null +++ b/tools/Nethermind.Tools.Kute/MessageProvider/UnwrapBatchJsonRpcMessageProvider.cs @@ -0,0 +1,35 @@ +// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +namespace Nethermind.Tools.Kute.MessageProvider; + +public class UnwrapBatchJsonRpcMessageProvider : IMessageProvider +{ + private readonly IMessageProvider _provider; + + public UnwrapBatchJsonRpcMessageProvider(IMessageProvider provider) + { + _provider = provider; + } + + public IAsyncEnumerable Messages { get => MessagesImpl(); } + + private async IAsyncEnumerable MessagesImpl() + { + await foreach (var jsonRpc in _provider.Messages) + { + switch (jsonRpc) + { + case JsonRpc.SingleJsonRpc: + yield return jsonRpc; + break; + case JsonRpc.BatchJsonRpc batch: + foreach (JsonRpc.SingleJsonRpc? single in batch.Items()) + { + yield return single; + } + break; + } + } + } +} diff --git a/tools/Nethermind.Tools.Kute/Program.cs b/tools/Nethermind.Tools.Kute/Program.cs index 40c2118bd51..3e5157cfcc9 100644 --- a/tools/Nethermind.Tools.Kute/Program.cs +++ b/tools/Nethermind.Tools.Kute/Program.cs @@ -53,11 +53,12 @@ static IServiceProvider BuildServiceProvider(Config config) collection.AddSingleton>(serviceProvider => { var messageProvider = serviceProvider.GetRequiredService>(); - bool unwrapBatches = config.UnwrapBatch; - return new JsonRpcMessageProvider(messageProvider, unwrapBatches); - }); - + var jsonMessageProvider = new JsonRpcMessageProvider(messageProvider); + return config.UnwrapBatch + ? new UnwrapBatchJsonRpcMessageProvider(jsonMessageProvider) + : jsonMessageProvider; + }); collection.AddSingleton( config.DryRun ? new NullJsonRpcValidator() @@ -69,7 +70,7 @@ static IServiceProvider BuildServiceProvider(Config config) collection.AddSingleton( new ComposedJsonRpcMethodFilter( config.MethodFilters - .Select(pattern => new PatternJsonRpcMethodFilter(pattern)) + .Select(pattern => new PatternJsonRpcMethodFilter(pattern) as IJsonRpcMethodFilter) .ToList() ) ); @@ -87,7 +88,7 @@ static IServiceProvider BuildServiceProvider(Config config) // For dry runs we still want to trigger the generation of an AuthToken // This is to ensure that all parameters required for the generation are correct, // and not require a real run to verify that this is the case. - string _ = provider.GetRequiredService().AuthToken; + string _ = provider.GetRequiredService().AuthToken; return new NullJsonRpcSubmitter(); }); collection.AddSingleton( @@ -99,13 +100,16 @@ static IServiceProvider BuildServiceProvider(Config config) { if (config.ShowProgress) { - // TODO: + // NOTE: // Terrible, terrible hack since it forces a double enumeration: // - A first one to count the number of messages. // - A second one to actually process each message. // We can reduce the cost by not parsing each message on the first enumeration - // At the same time, this optimization relies on implementation details. - var messagesProvider = provider.GetRequiredService>(); + // only when we're not unwrapping batches. If we are, we need to parse. + // This optimization relies on implementation details. + IMessageProvider messagesProvider = config.UnwrapBatch + ? provider.GetRequiredService>() + : provider.GetRequiredService>(); var totalMessages = messagesProvider.Messages.ToEnumerable().Count(); return new ConsoleProgressReporter(totalMessages); } @@ -121,7 +125,6 @@ static IServiceProvider BuildServiceProvider(Config config) _ => throw new ArgumentOutOfRangeException(), } ); - collection.AddSingleton(new JsonRpcFlowManager(config.RequestsPerSecond, config.UnwrapBatch)); return collection.BuildServiceProvider();