diff --git a/src/Nethermind/Nethermind.Core/Collections/CappedReadOnlyList.cs b/src/Nethermind/Nethermind.Core/Collections/CappedReadOnlyList.cs new file mode 100644 index 00000000000..54c6d1f4100 --- /dev/null +++ b/src/Nethermind/Nethermind.Core/Collections/CappedReadOnlyList.cs @@ -0,0 +1,43 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; + +namespace Nethermind.Core.Collections; + +public readonly struct CappedReadOnlyList: IReadOnlyList +{ + private readonly IReadOnlyList _baseReadonlyList; + private readonly int _cappedLength; + + public CappedReadOnlyList(IReadOnlyList baseReadonly, int cappedLength) + { + _baseReadonlyList = baseReadonly; + _cappedLength = cappedLength; + } + + public IEnumerator GetEnumerator() + { + return _baseReadonlyList.Take(_cappedLength).GetEnumerator(); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + + public int Count => Math.Min(_baseReadonlyList.Count, _cappedLength); + + public T this[int index] + { + get + { + if (index >= _cappedLength) + { + throw new IndexOutOfRangeException($"Index is {index} while count is {_cappedLength}"); + } + + return _baseReadonlyList[index]; + } + } +} diff --git a/src/Nethermind/Nethermind.Core/Collections/IReadOnlyListExtensions.cs b/src/Nethermind/Nethermind.Core/Collections/IReadOnlyListExtensions.cs new file mode 100644 index 00000000000..dbc0d45442e --- /dev/null +++ b/src/Nethermind/Nethermind.Core/Collections/IReadOnlyListExtensions.cs @@ -0,0 +1,16 @@ +using System.Collections.Generic; + +namespace Nethermind.Core.Collections; + +public static class IReadOnlyListExtensions +{ + public static IReadOnlyList CappedTo(this IReadOnlyList readOnlyList, int length) + { + if (length > readOnlyList.Count) + { + return readOnlyList; + } + + return new CappedReadOnlyList(readOnlyList, length); + } +} diff --git a/src/Nethermind/Nethermind.Network/P2P/ProtocolHandlers/SyncPeerProtocolHandlerBase.cs b/src/Nethermind/Nethermind.Network/P2P/ProtocolHandlers/SyncPeerProtocolHandlerBase.cs index 76d54d023fe..430e4c9b649 100644 --- a/src/Nethermind/Nethermind.Network/P2P/ProtocolHandlers/SyncPeerProtocolHandlerBase.cs +++ b/src/Nethermind/Nethermind.Network/P2P/ProtocolHandlers/SyncPeerProtocolHandlerBase.cs @@ -22,7 +22,6 @@ using Nethermind.Blockchain; using Nethermind.Blockchain.Synchronization; using Nethermind.Core; -using Nethermind.Core.Attributes; using Nethermind.Core.Collections; using Nethermind.Core.Crypto; using Nethermind.Core.Extensions; @@ -30,9 +29,7 @@ using Nethermind.Logging; using Nethermind.Network.P2P.Subprotocols.Eth.V62; using Nethermind.Network.P2P.Subprotocols.Eth.V62.Messages; -using Nethermind.Network.P2P.Subprotocols.Eth.V63; using Nethermind.Network.P2P.Subprotocols.Eth.V63.Messages; -using Nethermind.Network.Rlpx; using Nethermind.Stats; using Nethermind.Stats.Model; using Nethermind.Synchronization; @@ -65,6 +62,14 @@ public abstract class SyncPeerProtocolHandlerBase : ZeroProtocolHandlerBase, ISy protected readonly MessageQueue _headersRequests; protected readonly MessageQueue _bodiesRequests; + private static int GetBodiesLatencyHighWatermark = 8000; + private static int GetBodiesLatencyLowWatermark = 5000; + private static double GetBodiesBatchSizeAdjustmentFactor = 1.5; + private static int GetBodiesMaxBatchSize = 256; + private static int GetBodiesMinBatchSize = 1; + + private int _getBodiesCurrentBatchSize = 4; + protected SyncPeerProtocolHandlerBase(ISession session, IMessageSerializationService serializer, INodeStatsManager statsManager, @@ -90,10 +95,39 @@ async Task ISyncPeer.GetBlockBodies(IReadOnlyList blockHash return Array.Empty(); } - GetBlockBodiesMessage bodiesMsg = new(blockHashes); + int startingBodiesCountLimit = _getBodiesCurrentBatchSize; + GetBlockBodiesMessage bodiesMsg = new(blockHashes.CappedTo(startingBodiesCountLimit)); - BlockBody[] blocks = await SendRequest(bodiesMsg, token); - return blocks; + try + { + Stopwatch sw = Stopwatch.StartNew(); + BlockBody[] blocks = await SendRequest(bodiesMsg, token); + long elapsed = sw.ElapsedMilliseconds; + if (elapsed < GetBodiesLatencyLowWatermark && blocks.Length == startingBodiesCountLimit) + { + _getBodiesCurrentBatchSize = Math.Min( + (int)Math.Ceiling(startingBodiesCountLimit * GetBodiesBatchSizeAdjustmentFactor), + GetBodiesMaxBatchSize + ); + } + else if (elapsed > GetBodiesLatencyHighWatermark) + { + _getBodiesCurrentBatchSize = Math.Max( + (int)Math.Floor(startingBodiesCountLimit / GetBodiesBatchSizeAdjustmentFactor), + GetBodiesMinBatchSize + ); + } + + return blocks; + } + catch (Exception) + { + _getBodiesCurrentBatchSize = Math.Max( + (int)Math.Floor(startingBodiesCountLimit / GetBodiesBatchSizeAdjustmentFactor), + GetBodiesMinBatchSize + ); + throw; + } } protected virtual async Task SendRequest(GetBlockBodiesMessage message, CancellationToken token) diff --git a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V63/Eth63ProtocolHandler.cs b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V63/Eth63ProtocolHandler.cs index 09a01181d64..514a378d752 100644 --- a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V63/Eth63ProtocolHandler.cs +++ b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V63/Eth63ProtocolHandler.cs @@ -21,6 +21,7 @@ using System.Threading.Tasks; using Nethermind.Consensus; using Nethermind.Core; +using Nethermind.Core.Collections; using Nethermind.Core.Crypto; using Nethermind.Logging; using Nethermind.Network.P2P.Subprotocols.Eth.V62; @@ -38,6 +39,14 @@ public class Eth63ProtocolHandler : Eth62ProtocolHandler private readonly MessageQueue _receiptsRequests; + private static int GetReceiptsLatencyHighWatermark = 8000; + private static int GetReceiptsLatencyLowWatermark = 5000; + private static int GetReceiptsMaxBatchSize = 256; + private static int GetReceiptsMinBatchSize = 1; + private static double GetReceiptsBatchSizeAdjustmentFactor = 1.5; + + private int _getReceiptsCurrentBatchSize = 4; + public Eth63ProtocolHandler(ISession session, IMessageSerializationService serializer, INodeStatsManager nodeStatsManager, @@ -142,9 +151,40 @@ public override async Task GetReceipts(IReadOnlyList bloc return Array.Empty(); } - GetReceiptsMessage msg = new(blockHashes); - TxReceipt[][] txReceipts = await SendRequest(msg, token); - return txReceipts; + int startingReceiptsCountLimit = _getReceiptsCurrentBatchSize; + GetReceiptsMessage msg = new(blockHashes.CappedTo(startingReceiptsCountLimit)); + + try + { + Stopwatch sw = Stopwatch.StartNew(); + TxReceipt[][] txReceipts = await SendRequest(msg, token); + + long elapsed = sw.ElapsedMilliseconds; + if (elapsed < GetReceiptsLatencyLowWatermark && txReceipts.Length == startingReceiptsCountLimit) + { + _getReceiptsCurrentBatchSize = Math.Min( + (int)Math.Ceiling(startingReceiptsCountLimit * GetReceiptsBatchSizeAdjustmentFactor), + GetReceiptsMaxBatchSize + ); + } + else if (elapsed > GetReceiptsLatencyHighWatermark) + { + _getReceiptsCurrentBatchSize = Math.Max( + (int)Math.Floor(startingReceiptsCountLimit / GetReceiptsBatchSizeAdjustmentFactor), + GetReceiptsMinBatchSize + ); + } + + return txReceipts; + } + catch (Exception) + { + _getReceiptsCurrentBatchSize = Math.Max( + (int)Math.Floor(startingReceiptsCountLimit / GetReceiptsBatchSizeAdjustmentFactor), + GetReceiptsMinBatchSize + ); + throw; + } } protected virtual async Task SendRequest(GetNodeDataMessage message, CancellationToken token)