From 2744a9f60b559021334d20c617872d7399d56032 Mon Sep 17 00:00:00 2001 From: Amirul Ashraf Date: Tue, 6 Sep 2022 11:02:45 +0800 Subject: [PATCH 1/3] Auto adjust snap sync response bytes --- .../SnapProtocolHandlerTests.cs | 171 ++++++++++++++++++ .../Subprotocols/Snap/SnapProtocolHandler.cs | 76 ++++++-- 2 files changed, 234 insertions(+), 13 deletions(-) create mode 100644 src/Nethermind/Nethermind.Network.Test/SnapProtocolHandlerTests.cs diff --git a/src/Nethermind/Nethermind.Network.Test/SnapProtocolHandlerTests.cs b/src/Nethermind/Nethermind.Network.Test/SnapProtocolHandlerTests.cs new file mode 100644 index 00000000000..d3561071d34 --- /dev/null +++ b/src/Nethermind/Nethermind.Network.Test/SnapProtocolHandlerTests.cs @@ -0,0 +1,171 @@ +// Copyright (c) 2021 Demerzel Solutions Limited +// This file is part of the Nethermind library. +// +// The Nethermind library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Nethermind library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Nethermind. If not, see . +// + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using DotNetty.Buffers; +using FluentAssertions; +using Nethermind.Blockchain.Synchronization; +using Nethermind.Core; +using Nethermind.Core.Crypto; +using Nethermind.Logging; +using Nethermind.Network.P2P; +using Nethermind.Network.P2P.Messages; +using Nethermind.Network.P2P.Subprotocols.Snap; +using Nethermind.Network.P2P.Subprotocols.Snap.Messages; +using Nethermind.Network.Rlpx; +using Nethermind.State.Snap; +using Nethermind.Stats; +using NSubstitute; +using NUnit.Framework; + +namespace Nethermind.Network.Test; + +public class SnapProtocolHandlerTests +{ + private class Context + { + public ISession Session { get; set; } = Substitute.For(); + + private IMessageSerializationService _messageSerializationService; + public IMessageSerializationService MessageSerializationService + { + get + { + if (_messageSerializationService == null) + { + _messageSerializationService = new MessageSerializationService(); + _messageSerializationService.Register(new AccountRangeMessageSerializer()); + } + + return _messageSerializationService; + } + set => _messageSerializationService = value; + } + + public INodeStatsManager NodeStatsManager { get; set; } = Substitute.For(); + + + private SnapProtocolHandler _snapProtocolHandler; + public SnapProtocolHandler SnapProtocolHandler { + get => _snapProtocolHandler ??= new SnapProtocolHandler( + Session, + NodeStatsManager, + MessageSerializationService, + LimboLogs.Instance + ); + set + { + _snapProtocolHandler = value; + } + } + + public TimeSpan SimulatedLatency { get; set; } = TimeSpan.Zero; + + private List _recordedResponseBytesLength = new(); + public Context WithResponseBytesRecorder { + get { + Session + .When((ses) => ses.DeliverMessage(Arg.Any())) + .Do((callInfo) => + { + GetAccountRangeMessage accountRangeMessage = (GetAccountRangeMessage)callInfo[0]; + _recordedResponseBytesLength.Add(accountRangeMessage.ResponseBytes); + + if (SimulatedLatency > TimeSpan.Zero) + { + Task.Delay(SimulatedLatency).Wait(); + } + + IByteBuffer buffer = MessageSerializationService.ZeroSerialize(new AccountRangeMessage() + { + PathsWithAccounts = new []{ new PathWithAccount(Keccak.Zero, Account.TotallyEmpty)} + }); + buffer.ReadByte(); // Need to skip adaptive type + + ZeroPacket packet = new(buffer); + + packet.PacketType = SnapMessageCode.AccountRange; + SnapProtocolHandler.HandleMessage(packet); + }); + return this; + } + } + + public void RecordedMessageSizesShouldIncrease() + { + _recordedResponseBytesLength[^1].Should().BeGreaterThan(_recordedResponseBytesLength[^2]); + } + + public void RecordedMessageSizesShouldDecrease() + { + _recordedResponseBytesLength[^1].Should().BeLessThan(_recordedResponseBytesLength[^2]); + } + + public void RecordedMessageSizesShouldNotChange() + { + _recordedResponseBytesLength[^1].Should().Be(_recordedResponseBytesLength[^2]); + } + } + + [Test] + public async Task Test_response_bytes_adjust_with_latency() + { + Context ctx = new Context() + .WithResponseBytesRecorder; + + SnapProtocolHandler protocolHandler = ctx.SnapProtocolHandler; + + ctx.SimulatedLatency = TimeSpan.Zero; + await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None); + await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None); + ctx.RecordedMessageSizesShouldIncrease(); + + ctx.SimulatedLatency = SnapProtocolHandler.LowerLatencyThreshold + TimeSpan.FromMilliseconds(1); + await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None); + await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None); + ctx.RecordedMessageSizesShouldNotChange(); + + ctx.SimulatedLatency = SnapProtocolHandler.UpperLatencyThreshold + TimeSpan.FromMilliseconds(1); + await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None); + await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None); + ctx.RecordedMessageSizesShouldDecrease(); + } + + [Test] + [Explicit] + public async Task Test_response_bytes_reset_on_error() + { + Context ctx = new Context() + .WithResponseBytesRecorder; + + SnapProtocolHandler protocolHandler = ctx.SnapProtocolHandler; + + // Just setting baseline + await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None); + await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None); + ctx.RecordedMessageSizesShouldIncrease(); + + ctx.SimulatedLatency = Timeouts.Eth + TimeSpan.FromSeconds(1); + await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None); + ctx.SimulatedLatency = TimeSpan.Zero; // The read value is the request down, but it is adjusted on above request + await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None); + ctx.RecordedMessageSizesShouldDecrease(); + } +} diff --git a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/SnapProtocolHandler.cs b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/SnapProtocolHandler.cs index 319c9f878ee..61ae9f30014 100644 --- a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/SnapProtocolHandler.cs +++ b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/SnapProtocolHandler.cs @@ -1,21 +1,22 @@ // Copyright (c) 2021 Demerzel Solutions Limited // This file is part of the Nethermind library. -// +// // The Nethermind library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. -// +// // The Nethermind library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. -// +// // You should have received a copy of the GNU Lesser General Public License // along with the Nethermind. If not, see . -// +// using System; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using DotNetty.Buffers; @@ -35,7 +36,11 @@ namespace Nethermind.Network.P2P.Subprotocols.Snap { public class SnapProtocolHandler : ZeroProtocolHandlerBase, ISnapSyncPeer { - private const int BYTES_LIMIT = 2_000_000; + private const int MaxBytesLimit = 2_000_000; + private const int MinBytesLimit = 200_000; + public static readonly TimeSpan UpperLatencyThreshold = TimeSpan.FromMilliseconds(2000); + public static readonly TimeSpan LowerLatencyThreshold = TimeSpan.FromMilliseconds(1000); + private const double BytesLimitAdjustmentFactor = 1.5; public override string Name => "snap1"; protected override TimeSpan InitTimeout => Timeouts.Eth; @@ -50,6 +55,8 @@ public class SnapProtocolHandler : ZeroProtocolHandlerBase, ISnapSyncPeer private readonly MessageQueue _getTrieNodesRequests; private static readonly byte[] _emptyBytes = { 0 }; + private int _currentBytesLimit = MinBytesLimit; + public SnapProtocolHandler(ISession session, INodeStatsManager nodeStats, IMessageSerializationService serializer, @@ -185,10 +192,11 @@ public async Task GetAccountRange(AccountRange range, Cancell var request = new GetAccountRangeMessage() { AccountRange = range, - ResponseBytes = BYTES_LIMIT + ResponseBytes = _currentBytesLimit }; - AccountRangeMessage response = await SendRequest(request, _getAccountRangeRequests, token); + AccountRangeMessage response = await AdjustBytesLimit(() => + SendRequest(request, _getAccountRangeRequests, token)); Metrics.SnapGetAccountRangeSent++; @@ -200,10 +208,11 @@ public async Task GetStorageRange(StorageRange range, Cancellati var request = new GetStorageRangeMessage() { StoragetRange = range, - ResponseBytes = BYTES_LIMIT + ResponseBytes = _currentBytesLimit }; - StorageRangeMessage response = await SendRequest(request, _getStorageRangeRequests, token); + StorageRangeMessage response = await AdjustBytesLimit(() => + SendRequest(request, _getStorageRangeRequests, token)); Metrics.SnapGetStorageRangesSent++; @@ -215,10 +224,11 @@ public async Task GetByteCodes(Keccak[] codeHashes, CancellationToken var request = new GetByteCodesMessage() { Hashes = codeHashes, - Bytes = BYTES_LIMIT + Bytes = _currentBytesLimit }; - ByteCodesMessage response = await SendRequest(request, _getByteCodesRequests, token); + ByteCodesMessage response = await AdjustBytesLimit(() => + SendRequest(request, _getByteCodesRequests, token)); Metrics.SnapGetByteCodesSent++; @@ -233,10 +243,11 @@ public async Task GetTrieNodes(AccountsToRefreshRequest request, Cance { RootHash = request.RootHash, Paths = groups, - Bytes = BYTES_LIMIT + Bytes = _currentBytesLimit }; - TrieNodesMessage response = await SendRequest(reqMsg, _getTrieNodesRequests, token); + TrieNodesMessage response = await AdjustBytesLimit(() => + SendRequest(reqMsg, _getTrieNodesRequests, token)); Metrics.SnapGetTrieNodesSent++; @@ -290,5 +301,44 @@ private async Task SendRequest(TIn msg, MessageQueue StatsManager.ReportTransferSpeedEvent(Session.Node, TransferSpeedType.SnapRanges, 0L); throw new TimeoutException($"{Session} Request timeout in {nameof(TIn)}"); } + + /// + /// Adjust the _currentBytesLimit depending on the latency of the request and if the request failed. + /// + /// + /// + /// + private async Task AdjustBytesLimit(Func> func) + { + int startingBytesLimit = _currentBytesLimit; + bool failed = false; + Stopwatch sw = Stopwatch.StartNew(); + try + { + return await func(); + } + catch (Exception) + { + failed = true; + throw; + } + finally + { + sw.Stop(); + if (failed) + { + _currentBytesLimit = MinBytesLimit; + } + else if (sw.Elapsed < LowerLatencyThreshold) + { + _currentBytesLimit = Math.Min((int)(startingBytesLimit * BytesLimitAdjustmentFactor), MaxBytesLimit); + } + else if (sw.Elapsed > UpperLatencyThreshold && startingBytesLimit > MinBytesLimit) + { + _currentBytesLimit = (int)(startingBytesLimit / BytesLimitAdjustmentFactor); + } + } + } + } } From 7a09cd8d81b56116ecc91e00396d2a84d30e7468 Mon Sep 17 00:00:00 2001 From: Amirul Ashraf Date: Tue, 6 Sep 2022 11:29:50 +0800 Subject: [PATCH 2/3] Added test --- .../P2P/Subprotocols/Snap/SnapProtocolHandler.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/SnapProtocolHandler.cs b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/SnapProtocolHandler.cs index 61ae9f30014..9e463a17adf 100644 --- a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/SnapProtocolHandler.cs +++ b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/SnapProtocolHandler.cs @@ -310,6 +310,8 @@ private async Task SendRequest(TIn msg, MessageQueue /// private async Task AdjustBytesLimit(Func> func) { + // Record bytes limit so that in case multiple concurrent request happens, we do not multiply the + // limit on top of other adjustment, so only the last adjustment will stick, which is fine. int startingBytesLimit = _currentBytesLimit; bool failed = false; Stopwatch sw = Stopwatch.StartNew(); From 0cd5e4829e96aa2015f43cb92eafb10829e43e26 Mon Sep 17 00:00:00 2001 From: Amirul Ashraf Date: Tue, 6 Sep 2022 19:11:11 +0800 Subject: [PATCH 3/3] Reduces minimum byte limit --- .../P2P/Subprotocols/Snap/SnapProtocolHandler.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/SnapProtocolHandler.cs b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/SnapProtocolHandler.cs index 9e463a17adf..8ad0e78bf6e 100644 --- a/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/SnapProtocolHandler.cs +++ b/src/Nethermind/Nethermind.Network/P2P/Subprotocols/Snap/SnapProtocolHandler.cs @@ -37,10 +37,10 @@ namespace Nethermind.Network.P2P.Subprotocols.Snap public class SnapProtocolHandler : ZeroProtocolHandlerBase, ISnapSyncPeer { private const int MaxBytesLimit = 2_000_000; - private const int MinBytesLimit = 200_000; + private const int MinBytesLimit = 20_000; public static readonly TimeSpan UpperLatencyThreshold = TimeSpan.FromMilliseconds(2000); public static readonly TimeSpan LowerLatencyThreshold = TimeSpan.FromMilliseconds(1000); - private const double BytesLimitAdjustmentFactor = 1.5; + private const double BytesLimitAdjustmentFactor = 2; public override string Name => "snap1"; protected override TimeSpan InitTimeout => Timeouts.Eth;