Skip to content

Commit

Permalink
Reduce guard checks and interface calls when using ArrayPoolList (#7960)
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams authored Dec 23, 2024
1 parent 4bb25cd commit 7e15f44
Show file tree
Hide file tree
Showing 22 changed files with 111 additions and 158 deletions.
5 changes: 0 additions & 5 deletions src/Nethermind/Nethermind.Crypto/KeccakRlpStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ public override void Write(ReadOnlySpan<byte> bytesToWrite)
_keccakHash.Update(bytesToWrite);
}

public override void Write(IReadOnlyList<byte> bytesToWrite)
{
_keccakHash.Update(bytesToWrite.ToArray());
}

public override void WriteByte(byte byteToWrite)
{
_keccakHash.Update(MemoryMarshal.CreateSpan(ref byteToWrite, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,11 @@ static IEnumerable<long> CalculateGasUsed(TxReceipt[] txReceipts)
: txs.Select(static tx => tx.GasLimit));

List<RewardInfo> rewardInfos = new(txs.Length);
Span<long> gasUsedSpan = gasUsed.AsSpan();
for (int i = 0; i < txs.Length; i++)
{
txs[i].TryCalculatePremiumPerGas(block.BaseFeePerGas, out UInt256 premiumPerGas);
rewardInfos.Add(new RewardInfo(gasUsed[i], premiumPerGas));
rewardInfos.Add(new RewardInfo(gasUsedSpan[i], premiumPerGas));
}

rewardInfos.Sort(static (i1, i2) => i1.PremiumPerGas.CompareTo(i2.PremiumPerGas));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public static void TestZero<T>(IZeroMessageSerializer<T> serializer, T message,
{
buffer.Release();
buffer2.Release();
message.TryDispose();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using FluentAssertions;
using Nethermind.Consensus;
using Nethermind.Core;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
using Nethermind.Core.Specs;
Expand Down Expand Up @@ -196,7 +197,7 @@ public void should_handle_NewPooledTransactionHashesMessage([Values(true, false)
HandleIncomingStatusMessage();
HandleZeroMessage(msg, Eth65MessageCode.NewPooledTransactionHashes);

_pooledTxsRequestor.Received(canGossipTransactions ? 1 : 0).RequestTransactions(Arg.Any<Action<GetPooledTransactionsMessage>>(), Arg.Any<IReadOnlyList<Hash256>>());
_pooledTxsRequestor.Received(canGossipTransactions ? 1 : 0).RequestTransactions(Arg.Any<Action<GetPooledTransactionsMessage>>(), Arg.Any<IOwnedReadOnlyList<Hash256>>());
}

private void HandleZeroMessage<T>(T msg, int messageCode) where T : MessageBase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
using Nethermind.TxPool;
using NSubstitute;
using NUnit.Framework;
using System.Runtime.InteropServices;
using Nethermind.Core.Collections;
using Nethermind.Core.Extensions;

namespace Nethermind.Network.Test.P2P.Subprotocols.Eth.V65
{
Expand All @@ -20,74 +23,79 @@ public class PooledTxsRequestorTests
private readonly ITxPool _txPool = Substitute.For<ITxPool>();
private readonly Action<GetPooledTransactionsMessage> _doNothing = static msg => msg.Dispose();
private IPooledTxsRequestor _requestor;
private IReadOnlyList<Hash256> _request;
private IList<Hash256> _expected;
private IReadOnlyList<Hash256> _response;
private ArrayPoolList<Hash256> _response;

[TearDown]
public void TearDown()
{
_response?.Dispose();
}

[Test]
public void filter_properly_newPooledTxHashes()
{
_response = new List<Hash256>();
_requestor = new PooledTxsRequestor(_txPool, new TxPoolConfig());
_requestor.RequestTransactions(_doNothing, new List<Hash256> { TestItem.KeccakA, TestItem.KeccakD });
using var skipped = new ArrayPoolList<Hash256>(2) { TestItem.KeccakA, TestItem.KeccakD };
_requestor.RequestTransactions(_doNothing, skipped);

_request = new List<Hash256> { TestItem.KeccakA, TestItem.KeccakB, TestItem.KeccakC };
_expected = new List<Hash256> { TestItem.KeccakB, TestItem.KeccakC };
_requestor.RequestTransactions(Send, _request);
_response.Should().BeEquivalentTo(_expected);
using var request = new ArrayPoolList<Hash256>(3) { TestItem.KeccakA, TestItem.KeccakB, TestItem.KeccakC };
using var expected = new ArrayPoolList<Hash256>(3) { TestItem.KeccakB, TestItem.KeccakC };
_requestor.RequestTransactions(Send, request);
_response.Should().BeEquivalentTo(expected);
}

[Test]
public void filter_properly_already_pending_hashes()
{
_response = new List<Hash256>();
_requestor = new PooledTxsRequestor(_txPool, new TxPoolConfig());
_requestor.RequestTransactions(_doNothing, new List<Hash256> { TestItem.KeccakA, TestItem.KeccakB, TestItem.KeccakC });
using var skipped = new ArrayPoolList<Hash256>(3) { TestItem.KeccakA, TestItem.KeccakB, TestItem.KeccakC };
_requestor.RequestTransactions(_doNothing, skipped);

_request = new List<Hash256> { TestItem.KeccakA, TestItem.KeccakB, TestItem.KeccakC };
_requestor.RequestTransactions(Send, _request);
using var request = new ArrayPoolList<Hash256>(3) { TestItem.KeccakA, TestItem.KeccakB, TestItem.KeccakC };
_requestor.RequestTransactions(Send, request);
_response.Should().BeEmpty();
}

[Test]
public void filter_properly_discovered_hashes()
{
_response = new List<Hash256>();
_requestor = new PooledTxsRequestor(_txPool, new TxPoolConfig());

_request = new List<Hash256> { TestItem.KeccakA, TestItem.KeccakB, TestItem.KeccakC };
_expected = new List<Hash256> { TestItem.KeccakA, TestItem.KeccakB, TestItem.KeccakC };
_requestor.RequestTransactions(Send, _request);
_response.Should().BeEquivalentTo(_expected);
using var request = new ArrayPoolList<Hash256>(3) { TestItem.KeccakA, TestItem.KeccakB, TestItem.KeccakC };
using var expected = new ArrayPoolList<Hash256>(3) { TestItem.KeccakA, TestItem.KeccakB, TestItem.KeccakC };
_requestor.RequestTransactions(Send, request);
_response.Should().BeEquivalentTo(expected);
}

[Test]
public void can_handle_empty_argument()
{
_response = new List<Hash256>();
_requestor = new PooledTxsRequestor(_txPool, new TxPoolConfig());
_requestor.RequestTransactions(Send, new List<Hash256>());
using var skipped = new ArrayPoolList<Hash256>(0);
_requestor.RequestTransactions(Send, skipped);
_response.Should().BeEmpty();
}

[Test]
public void filter_properly_hashes_present_in_hashCache()
{
_response = new List<Hash256>();
ITxPool txPool = Substitute.For<ITxPool>();
txPool.IsKnown(Arg.Any<Hash256>()).Returns(true);
_requestor = new PooledTxsRequestor(txPool, new TxPoolConfig());

_request = new List<Hash256> { TestItem.KeccakA, TestItem.KeccakB };
_expected = new List<Hash256> { };
_requestor.RequestTransactions(Send, _request);
_response.Should().BeEquivalentTo(_expected);
using var request = new ArrayPoolList<Hash256>(2) { TestItem.KeccakA, TestItem.KeccakB };
using var expected = new ArrayPoolList<Hash256>(0) { };
_requestor.RequestTransactions(Send, request);
_response.Should().BeEquivalentTo(expected);
}

private void Send(GetPooledTransactionsMessage msg)
{
using (msg) _response = msg.Hashes.ToList();
_response?.Dispose();
using (msg)
{
_response = msg.Hashes.ToPooledList();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void Can_handle_NewPooledTransactions_message([Values(0, 1, 2, 100)] int
HandleZeroMessage(msg, Eth68MessageCode.NewPooledTransactionHashes);

_pooledTxsRequestor.Received(canGossipTransactions ? 1 : 0).RequestTransactionsEth68(Arg.Any<Action<GetPooledTransactionsMessage>>(),
Arg.Any<IReadOnlyList<Hash256>>(), Arg.Any<IReadOnlyList<int>>(), Arg.Any<IReadOnlyList<byte>>());
Arg.Any<IOwnedReadOnlyList<Hash256>>(), Arg.Any<IOwnedReadOnlyList<int>>(), Arg.Any<IOwnedReadOnlyList<byte>>());
}

[TestCase(true)]
Expand Down Expand Up @@ -163,7 +163,7 @@ public void Should_process_huge_transaction()
HandleZeroMessage(msg, Eth68MessageCode.NewPooledTransactionHashes);

_pooledTxsRequestor.Received(1).RequestTransactionsEth68(Arg.Any<Action<GetPooledTransactionsMessage>>(),
Arg.Any<IReadOnlyList<Hash256>>(), Arg.Any<IReadOnlyList<int>>(), Arg.Any<IReadOnlyList<byte>>());
Arg.Any<IOwnedReadOnlyList<Hash256>>(), Arg.Any<IOwnedReadOnlyList<int>>(), Arg.Any<IOwnedReadOnlyList<byte>>());
}

[TestCase(1)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class GetNodeDataMessageTests
public void Sets_values_from_constructor_argument()
{
Hash256[] keys = { TestItem.KeccakA, TestItem.KeccakB };
GetNodeDataMessage message = new(keys.ToPooledList());
using GetNodeDataMessage message = new(keys.ToPooledList());
keys.Should().BeEquivalentTo(message.Hashes);
}

Expand All @@ -34,15 +34,15 @@ public void Throws_on_null_argument()
[Test]
public void To_string()
{
GetNodeDataMessage message = new(ArrayPoolList<Hash256>.Empty());
using GetNodeDataMessage message = new(ArrayPoolList<Hash256>.Empty());
_ = message.ToString();
}

[Test]
public void Packet_type_and_protocol_are_correct()
{
Hash256[] keys = { TestItem.KeccakA, TestItem.KeccakB };
GetNodeDataMessage message = new(keys.ToPooledList());
using GetNodeDataMessage message = new(keys.ToPooledList());

message.PacketType.Should().Be(NodeDataMessageCode.GetNodeData);
message.Protocol.Should().Be(Protocol.NodeData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void Roundtrip_Many()
GetStorageRangeMessage msg = new()
{
RequestId = MessageConstants.Random.NextLong(),
StoragetRange = new()
StorageRange = new()
{
RootHash = TestItem.KeccakA,
Accounts = TestItem.Keccaks.Select(static k => new PathWithAccount(k, null)).ToPooledList(TestItem.Keccaks.Length),
Expand All @@ -43,7 +43,7 @@ public void Roundtrip_Empty()
GetStorageRangeMessage msg = new()
{
RequestId = MessageConstants.Random.NextLong(),
StoragetRange = new()
StorageRange = new()
{
RootHash = Keccak.OfAnEmptyString,
Accounts = ArrayPoolList<PathWithAccount>.Empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void NullPathGroup()

GetTrieNodesMessageSerializer serializer = new();

GetTrieNodesMessage? msg = serializer.Deserialize(data);
using GetTrieNodesMessage? msg = serializer.Deserialize(data);
byte[] recode = serializer.Serialize(msg);

recode.Should().BeEquivalentTo(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class TrieNodesMessageSerializerTests
[Test]
public void Roundtrip()
{
ArrayPoolList<byte[]> data = new(2) { new byte[] { 0xde, 0xad, 0xc0, 0xde }, new byte[] { 0xfe, 0xed } };
using ArrayPoolList<byte[]> data = new(2) { new byte[] { 0xde, 0xad, 0xc0, 0xde }, new byte[] { 0xfe, 0xed } };

TrieNodesMessage message = new(data);

Expand All @@ -27,7 +27,7 @@ public void Roundtrip()
[Test]
public void RoundtripWithCorrectLength()
{
ArrayPoolList<byte[]> data = new(2) { new byte[] { 0xde, 0xad, 0xc0, 0xde }, new byte[] { 0xfe, 0xed } };
using ArrayPoolList<byte[]> data = new(2) { new byte[] { 0xde, 0xad, 0xc0, 0xde }, new byte[] { 0xfe, 0xed } };

TrieNodesMessage message = new(data);
message.RequestId = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ public void Serialize(IByteBuffer byteBuffer, T message)
RlpStream rlpStream = new NettyRlpStream(byteBuffer);

rlpStream.StartSequence(contentLength);
for (int i = 0; i < message.Hashes.Count; i++)
foreach (Hash256 hash in message.Hashes.AsSpan())
{
rlpStream.Encode(message.Hashes[i]);
rlpStream.Encode(hash);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Generic;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Network.P2P.Subprotocols.Eth.V65.Messages;

namespace Nethermind.Network.P2P.Subprotocols.Eth
{
public interface IPooledTxsRequestor
{
void RequestTransactions(Action<GetPooledTransactionsMessage> send, IReadOnlyList<Hash256> hashes);
void RequestTransactionsEth66(Action<V66.Messages.GetPooledTransactionsMessage> send, IReadOnlyList<Hash256> hashes);
void RequestTransactionsEth68(Action<V66.Messages.GetPooledTransactionsMessage> send, IReadOnlyList<Hash256> hashes, IReadOnlyList<int> sizes, IReadOnlyList<byte> types);
void RequestTransactions(Action<GetPooledTransactionsMessage> send, IOwnedReadOnlyList<Hash256> hashes);
void RequestTransactionsEth66(Action<V66.Messages.GetPooledTransactionsMessage> send, IOwnedReadOnlyList<Hash256> hashes);
void RequestTransactionsEth68(Action<V66.Messages.GetPooledTransactionsMessage> send, IOwnedReadOnlyList<Hash256> hashes, IOwnedReadOnlyList<int> sizes, IOwnedReadOnlyList<byte> types);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ public PooledTxsRequestor(ITxPool txPool, ITxPoolConfig txPoolConfig)
_blobSupportEnabled = txPoolConfig.BlobsSupport.IsEnabled();
}

public void RequestTransactions(Action<GetPooledTransactionsMessage> send, IReadOnlyList<Hash256> hashes)
public void RequestTransactions(Action<GetPooledTransactionsMessage> send, IOwnedReadOnlyList<Hash256> hashes)
{
ArrayPoolList<Hash256> discoveredTxHashes = AddMarkUnknownHashes(hashes);
ArrayPoolList<Hash256> discoveredTxHashes = AddMarkUnknownHashes(hashes.AsSpan());
RequestPooledTransactions(send, discoveredTxHashes);
}

public void RequestTransactionsEth66(Action<V66.Messages.GetPooledTransactionsMessage> send, IReadOnlyList<Hash256> hashes)
public void RequestTransactionsEth66(Action<V66.Messages.GetPooledTransactionsMessage> send, IOwnedReadOnlyList<Hash256> hashes)
{
ArrayPoolList<Hash256> discoveredTxHashes = AddMarkUnknownHashes(hashes);
ArrayPoolList<Hash256> discoveredTxHashes = AddMarkUnknownHashes(hashes.AsSpan());

if (discoveredTxHashes.Count <= MaxNumberOfTxsInOneMsg)
{
Expand All @@ -56,41 +56,44 @@ public void RequestTransactionsEth66(Action<V66.Messages.GetPooledTransactionsMe
}
}

public void RequestTransactionsEth68(Action<V66.Messages.GetPooledTransactionsMessage> send, IReadOnlyList<Hash256> hashes, IReadOnlyList<int> sizes, IReadOnlyList<byte> types)
public void RequestTransactionsEth68(Action<V66.Messages.GetPooledTransactionsMessage> send, IOwnedReadOnlyList<Hash256> hashes, IOwnedReadOnlyList<int> sizes, IOwnedReadOnlyList<byte> types)
{
using ArrayPoolList<(Hash256 Hash, byte Type, int Size)> discoveredTxHashesAndSizes = AddMarkUnknownHashesEth68(hashes, sizes, types);
using ArrayPoolList<(Hash256 Hash, byte Type, int Size)> discoveredTxHashesAndSizes = AddMarkUnknownHashesEth68(hashes.AsSpan(), sizes.AsSpan(), types.AsSpan());
if (discoveredTxHashesAndSizes.Count == 0) return;

int packetSizeLeft = TransactionsMessage.MaxPacketSize;
ArrayPoolList<Hash256> hashesToRequest = new(discoveredTxHashesAndSizes.Count);

for (int i = 0; i < discoveredTxHashesAndSizes.Count; i++)
var discoveredCount = discoveredTxHashesAndSizes.Count;
var toRequestCount = 0;
foreach ((Hash256 hash, byte type, int size) in discoveredTxHashesAndSizes.AsSpan())
{
int txSize = discoveredTxHashesAndSizes[i].Size;
TxType txType = (TxType)discoveredTxHashesAndSizes[i].Type;
int txSize = size;
TxType txType = (TxType)type;

if (txSize > packetSizeLeft && hashesToRequest.Count > 0)
if (txSize > packetSizeLeft && toRequestCount > 0)
{
RequestPooledTransactionsEth66(send, hashesToRequest);
hashesToRequest = new ArrayPoolList<Hash256>(discoveredTxHashesAndSizes.Count);
hashesToRequest = new ArrayPoolList<Hash256>(discoveredCount);
packetSizeLeft = TransactionsMessage.MaxPacketSize;
toRequestCount = 0;
}

if (_blobSupportEnabled || txType != TxType.Blob)
{
hashesToRequest.Add(discoveredTxHashesAndSizes[i].Hash);
hashesToRequest.Add(hash);
packetSizeLeft -= txSize;
toRequestCount++;
}
}

RequestPooledTransactionsEth66(send, hashesToRequest);
}

private ArrayPoolList<Hash256> AddMarkUnknownHashes(IReadOnlyList<Hash256> hashes)
private ArrayPoolList<Hash256> AddMarkUnknownHashes(ReadOnlySpan<Hash256> hashes)
{
int count = hashes.Count;
ArrayPoolList<Hash256> discoveredTxHashes = new ArrayPoolList<Hash256>(count);
for (int i = 0; i < count; i++)
ArrayPoolList<Hash256> discoveredTxHashes = new ArrayPoolList<Hash256>(hashes.Length);
for (int i = 0; i < hashes.Length; i++)
{
Hash256 hash = hashes[i];
if (!_txPool.IsKnown(hash) && _pendingHashes.Set(hash))
Expand All @@ -102,11 +105,10 @@ private ArrayPoolList<Hash256> AddMarkUnknownHashes(IReadOnlyList<Hash256> hashe
return discoveredTxHashes;
}

private ArrayPoolList<(Hash256, byte, int)> AddMarkUnknownHashesEth68(IReadOnlyList<Hash256> hashes, IReadOnlyList<int> sizes, IReadOnlyList<byte> types)
private ArrayPoolList<(Hash256, byte, int)> AddMarkUnknownHashesEth68(ReadOnlySpan<Hash256> hashes, ReadOnlySpan<int> sizes, ReadOnlySpan<byte> types)
{
int count = hashes.Count;
ArrayPoolList<(Hash256, byte, int)> discoveredTxHashesAndSizes = new(count);
for (int i = 0; i < count; i++)
ArrayPoolList<(Hash256, byte, int)> discoveredTxHashesAndSizes = new(hashes.Length);
for (int i = 0; i < hashes.Length; i++)
{
Hash256 hash = hashes[i];
if (!_txPool.IsKnown(hash) && !_txPool.ContainsTx(hash, (TxType)types[i]) && _pendingHashes.Set(hash))
Expand All @@ -120,14 +122,7 @@ private ArrayPoolList<Hash256> AddMarkUnknownHashes(IReadOnlyList<Hash256> hashe

private static void RequestPooledTransactions(Action<GetPooledTransactionsMessage> send, IOwnedReadOnlyList<Hash256> hashesToRequest)
{
if (hashesToRequest.Count > 0)
{
send(new(hashesToRequest));
}
else
{
hashesToRequest.Dispose();
}
send(new(hashesToRequest));
}

private static void RequestPooledTransactionsEth66(Action<V66.Messages.GetPooledTransactionsMessage> send, IOwnedReadOnlyList<Hash256> hashesToRequest)
Expand Down
Loading

0 comments on commit 7e15f44

Please sign in to comment.