From 0315f4a4d3f52d77040da7dcddf5da85d0cbbc7d Mon Sep 17 00:00:00 2001 From: Lukasz Rozmej Date: Wed, 17 Jul 2024 12:45:05 +0200 Subject: [PATCH] Fix batch collection massages through sockets (#7265) --- .../JsonRpcSocketsClientTests.cs | 32 ++++++++++++++++--- .../MemoryMessageStream.cs | 30 +++++++++++++++++ .../WebSockets/JsonRpcSocketsClient.cs | 2 +- 3 files changed, 58 insertions(+), 6 deletions(-) create mode 100644 src/Nethermind/Nethermind.JsonRpc.Test/MemoryMessageStream.cs diff --git a/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcSocketsClientTests.cs b/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcSocketsClientTests.cs index c3c6052db6c..373557181ae 100644 --- a/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcSocketsClientTests.cs +++ b/src/Nethermind/Nethermind.JsonRpc.Test/JsonRpcSocketsClientTests.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Net; using System.Net.Sockets; @@ -128,7 +129,7 @@ static async Task CountNumberOfMessages(Socket socket, CancellationToken to } disposeCount.Should().Be(messageCount); - cts.Cancel(); + await cts.CancelAsync(); return messageCount; }); @@ -223,7 +224,7 @@ async Task ReadMessages(Socket socket, IList receivedMessages, Canc } } stream.Close(); - cts.Cancel(); + await cts.CancelAsync(); return messageCount; }); @@ -298,7 +299,7 @@ public async Task Can_send_multiple_messages(int messageCount) await client.SendJsonRpcResult(result); await Task.Delay(100); } - cts.Cancel(); + await cts.CancelAsync(); return messageCount; }); @@ -340,7 +341,7 @@ public async Task Can_send_collections(int elements) await client.SendJsonRpcResult(result); await Task.Delay(100); - cts.Cancel(); + await cts.CancelAsync(); }); await Task.WhenAll(sendCollection, server); @@ -380,7 +381,7 @@ public async Task Stops_on_limited_body_size(int maxByteCount) int sent = await client.SendJsonRpcResult(result); await Task.Delay(100); - cts.Cancel(); + await cts.CancelAsync(); return sent; }); @@ -391,6 +392,27 @@ public async Task Stops_on_limited_body_size(int maxByteCount) Assert.That(received, Is.LessThanOrEqualTo(Math.Min(sent, maxByteCount))); } + [Test] + public async Task Can_serialize_collection() + { + await using MemoryMessageStream stream = new(); + EthereumJsonSerializer ethereumJsonSerializer = new(); + using JsonRpcSocketsClient client = new( + clientName: "TestClient", + stream: stream, + endpointType: RpcEndpoint.Ws, + jsonRpcProcessor: null!, + jsonRpcLocalStats: new NullJsonRpcLocalStats(), + jsonSerializer: ethereumJsonSerializer, + maxBatchResponseBodySize: 10_000 + ); + using JsonRpcResult result = JsonRpcResult.Collection(RandomBatchResult(10, 100)); + await client.SendJsonRpcResult(result); + stream.Seek(0, SeekOrigin.Begin); + JsonRpcSuccessResponse[]? response = ethereumJsonSerializer.Deserialize(stream); + response.Should().NotContainNulls(); + } + private static async Task OneShotServer(string uri, Func> func) { using HttpListener httpListener = new(); diff --git a/src/Nethermind/Nethermind.JsonRpc.Test/MemoryMessageStream.cs b/src/Nethermind/Nethermind.JsonRpc.Test/MemoryMessageStream.cs new file mode 100644 index 00000000000..f1bc3b2a87b --- /dev/null +++ b/src/Nethermind/Nethermind.JsonRpc.Test/MemoryMessageStream.cs @@ -0,0 +1,30 @@ +// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System; +using System.IO; +using System.Threading.Tasks; +using Nethermind.Sockets; + +namespace Nethermind.JsonRpc.Test; + +public class MemoryMessageStream : MemoryStream, IMessageBorderPreservingStream +{ + private static readonly byte Delimiter = Convert.ToByte('\n'); + + public Task ReceiveAsync(ArraySegment buffer) + { + int read = Read(buffer.AsSpan()); + return Task.FromResult(new ReceiveResult + { + Read = read, + EndOfMessage = read > 0 && buffer[read - 1] == Delimiter + }); + } + + public Task WriteEndOfMessageAsync() + { + WriteByte(Delimiter); + return Task.FromResult(1); + } +} diff --git a/src/Nethermind/Nethermind.JsonRpc/WebSockets/JsonRpcSocketsClient.cs b/src/Nethermind/Nethermind.JsonRpc/WebSockets/JsonRpcSocketsClient.cs index 1176e351741..3cf54185a5e 100644 --- a/src/Nethermind/Nethermind.JsonRpc/WebSockets/JsonRpcSocketsClient.cs +++ b/src/Nethermind/Nethermind.JsonRpc/WebSockets/JsonRpcSocketsClient.cs @@ -138,7 +138,7 @@ public virtual async Task SendJsonRpcResult(JsonRpcResult result) responseSize += 1; } isFirst = false; - responseSize += (int)await _jsonSerializer.SerializeAsync(_stream, result.Response, indented: false); + responseSize += (int)await _jsonSerializer.SerializeAsync(_stream, entry.Response, indented: false); _ = _jsonRpcLocalStats.ReportCall(entry.Report); // We reached the limit and don't want to responded to more request in the batch