Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix disconnecting peers during snap sync #4528

Merged
merged 3 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions src/Nethermind/Nethermind.Network.Test/SnapProtocolHandlerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright (c) 2021 Demerzel Solutions Limited
// This file is part of the Nethermind library.
//
// The Nethermind library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Nethermind library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Nethermind. If not, see <http://www.gnu.org/licenses/>.
//

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Buffers;
using FluentAssertions;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Logging;
using Nethermind.Network.P2P;
using Nethermind.Network.P2P.Messages;
using Nethermind.Network.P2P.Subprotocols.Snap;
using Nethermind.Network.P2P.Subprotocols.Snap.Messages;
using Nethermind.Network.Rlpx;
using Nethermind.State.Snap;
using Nethermind.Stats;
using NSubstitute;
using NUnit.Framework;

namespace Nethermind.Network.Test;

public class SnapProtocolHandlerTests
{
private class Context
{
public ISession Session { get; set; } = Substitute.For<ISession>();

private IMessageSerializationService _messageSerializationService;
public IMessageSerializationService MessageSerializationService
{
get
{
if (_messageSerializationService == null)
{
_messageSerializationService = new MessageSerializationService();
_messageSerializationService.Register(new AccountRangeMessageSerializer());
}

return _messageSerializationService;
}
set => _messageSerializationService = value;
}

public INodeStatsManager NodeStatsManager { get; set; } = Substitute.For<INodeStatsManager>();


private SnapProtocolHandler _snapProtocolHandler;
public SnapProtocolHandler SnapProtocolHandler {
get => _snapProtocolHandler ??= new SnapProtocolHandler(
Session,
NodeStatsManager,
MessageSerializationService,
LimboLogs.Instance
);
set
{
_snapProtocolHandler = value;
}
}

public TimeSpan SimulatedLatency { get; set; } = TimeSpan.Zero;

private List<long> _recordedResponseBytesLength = new();
public Context WithResponseBytesRecorder {
get {
Session
.When((ses) => ses.DeliverMessage(Arg.Any<P2PMessage>()))
.Do((callInfo) =>
{
GetAccountRangeMessage accountRangeMessage = (GetAccountRangeMessage)callInfo[0];
_recordedResponseBytesLength.Add(accountRangeMessage.ResponseBytes);

if (SimulatedLatency > TimeSpan.Zero)
{
Task.Delay(SimulatedLatency).Wait();
}

IByteBuffer buffer = MessageSerializationService.ZeroSerialize(new AccountRangeMessage()
{
PathsWithAccounts = new []{ new PathWithAccount(Keccak.Zero, Account.TotallyEmpty)}
});
buffer.ReadByte(); // Need to skip adaptive type

ZeroPacket packet = new(buffer);

packet.PacketType = SnapMessageCode.AccountRange;
SnapProtocolHandler.HandleMessage(packet);
});
return this;
}
}

public void RecordedMessageSizesShouldIncrease()
{
_recordedResponseBytesLength[^1].Should().BeGreaterThan(_recordedResponseBytesLength[^2]);
}

public void RecordedMessageSizesShouldDecrease()
{
_recordedResponseBytesLength[^1].Should().BeLessThan(_recordedResponseBytesLength[^2]);
}

public void RecordedMessageSizesShouldNotChange()
{
_recordedResponseBytesLength[^1].Should().Be(_recordedResponseBytesLength[^2]);
}
}

[Test]
public async Task Test_response_bytes_adjust_with_latency()
{
Context ctx = new Context()
.WithResponseBytesRecorder;

SnapProtocolHandler protocolHandler = ctx.SnapProtocolHandler;

ctx.SimulatedLatency = TimeSpan.Zero;
await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None);
await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None);
ctx.RecordedMessageSizesShouldIncrease();

ctx.SimulatedLatency = SnapProtocolHandler.LowerLatencyThreshold + TimeSpan.FromMilliseconds(1);
await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None);
await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None);
ctx.RecordedMessageSizesShouldNotChange();

ctx.SimulatedLatency = SnapProtocolHandler.UpperLatencyThreshold + TimeSpan.FromMilliseconds(1);
await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None);
await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None);
ctx.RecordedMessageSizesShouldDecrease();
}

[Test]
[Explicit]
public async Task Test_response_bytes_reset_on_error()
{
Context ctx = new Context()
.WithResponseBytesRecorder;

SnapProtocolHandler protocolHandler = ctx.SnapProtocolHandler;

// Just setting baseline
await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None);
await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None);
ctx.RecordedMessageSizesShouldIncrease();

ctx.SimulatedLatency = Timeouts.Eth + TimeSpan.FromSeconds(1);
await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None);
ctx.SimulatedLatency = TimeSpan.Zero; // The read value is the request down, but it is adjusted on above request
await protocolHandler.GetAccountRange(new AccountRange(Keccak.Zero, Keccak.Zero), CancellationToken.None);
ctx.RecordedMessageSizesShouldDecrease();
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
// Copyright (c) 2021 Demerzel Solutions Limited
// This file is part of the Nethermind library.
//
//
// The Nethermind library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
//
// The Nethermind library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Nethermind. If not, see <http://www.gnu.org/licenses/>.
//
//

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Buffers;
Expand All @@ -35,7 +36,11 @@ namespace Nethermind.Network.P2P.Subprotocols.Snap
{
public class SnapProtocolHandler : ZeroProtocolHandlerBase, ISnapSyncPeer
{
private const int BYTES_LIMIT = 2_000_000;
private const int MaxBytesLimit = 2_000_000;
private const int MinBytesLimit = 20_000;
public static readonly TimeSpan UpperLatencyThreshold = TimeSpan.FromMilliseconds(2000);
public static readonly TimeSpan LowerLatencyThreshold = TimeSpan.FromMilliseconds(1000);
private const double BytesLimitAdjustmentFactor = 2;

public override string Name => "snap1";
protected override TimeSpan InitTimeout => Timeouts.Eth;
Expand All @@ -50,6 +55,8 @@ public class SnapProtocolHandler : ZeroProtocolHandlerBase, ISnapSyncPeer
private readonly MessageQueue<GetTrieNodesMessage, TrieNodesMessage> _getTrieNodesRequests;
private static readonly byte[] _emptyBytes = { 0 };

private int _currentBytesLimit = MinBytesLimit;

public SnapProtocolHandler(ISession session,
INodeStatsManager nodeStats,
IMessageSerializationService serializer,
Expand Down Expand Up @@ -185,10 +192,11 @@ public async Task<AccountsAndProofs> GetAccountRange(AccountRange range, Cancell
var request = new GetAccountRangeMessage()
{
AccountRange = range,
ResponseBytes = BYTES_LIMIT
ResponseBytes = _currentBytesLimit
};

AccountRangeMessage response = await SendRequest(request, _getAccountRangeRequests, token);
AccountRangeMessage response = await AdjustBytesLimit(() =>
SendRequest(request, _getAccountRangeRequests, token));

Metrics.SnapGetAccountRangeSent++;

Expand All @@ -200,10 +208,11 @@ public async Task<SlotsAndProofs> GetStorageRange(StorageRange range, Cancellati
var request = new GetStorageRangeMessage()
{
StoragetRange = range,
ResponseBytes = BYTES_LIMIT
ResponseBytes = _currentBytesLimit
};

StorageRangeMessage response = await SendRequest(request, _getStorageRangeRequests, token);
StorageRangeMessage response = await AdjustBytesLimit(() =>
SendRequest(request, _getStorageRangeRequests, token));

Metrics.SnapGetStorageRangesSent++;

Expand All @@ -215,10 +224,11 @@ public async Task<byte[][]> GetByteCodes(Keccak[] codeHashes, CancellationToken
var request = new GetByteCodesMessage()
{
Hashes = codeHashes,
Bytes = BYTES_LIMIT
Bytes = _currentBytesLimit
};

ByteCodesMessage response = await SendRequest(request, _getByteCodesRequests, token);
ByteCodesMessage response = await AdjustBytesLimit(() =>
SendRequest(request, _getByteCodesRequests, token));

Metrics.SnapGetByteCodesSent++;

Expand All @@ -233,10 +243,11 @@ public async Task<byte[][]> GetTrieNodes(AccountsToRefreshRequest request, Cance
{
RootHash = request.RootHash,
Paths = groups,
Bytes = BYTES_LIMIT
Bytes = _currentBytesLimit
};

TrieNodesMessage response = await SendRequest(reqMsg, _getTrieNodesRequests, token);
TrieNodesMessage response = await AdjustBytesLimit(() =>
SendRequest(reqMsg, _getTrieNodesRequests, token));

Metrics.SnapGetTrieNodesSent++;

Expand Down Expand Up @@ -290,5 +301,46 @@ private async Task<TOut> SendRequest<TIn, TOut>(TIn msg, MessageQueue<TIn, TOut>
StatsManager.ReportTransferSpeedEvent(Session.Node, TransferSpeedType.SnapRanges, 0L);
throw new TimeoutException($"{Session} Request timeout in {nameof(TIn)}");
}

/// <summary>
/// Adjust the _currentBytesLimit depending on the latency of the request and if the request failed.
/// </summary>
/// <param name="func"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
private async Task<T> AdjustBytesLimit<T>(Func<Task<T>> func)
{
// Record bytes limit so that in case multiple concurrent request happens, we do not multiply the
// limit on top of other adjustment, so only the last adjustment will stick, which is fine.
int startingBytesLimit = _currentBytesLimit;
bool failed = false;
Stopwatch sw = Stopwatch.StartNew();
try
{
return await func();
}
catch (Exception)
{
failed = true;
throw;
}
finally
{
sw.Stop();
if (failed)
{
_currentBytesLimit = MinBytesLimit;
}
else if (sw.Elapsed < LowerLatencyThreshold)
{
_currentBytesLimit = Math.Min((int)(startingBytesLimit * BytesLimitAdjustmentFactor), MaxBytesLimit);
}
else if (sw.Elapsed > UpperLatencyThreshold && startingBytesLimit > MinBytesLimit)
{
_currentBytesLimit = (int)(startingBytesLimit / BytesLimitAdjustmentFactor);
}
}
}

}
}