Skip to content

Commit

Permalink
Perf/smoother peer discovery (#5846)
Browse files Browse the repository at this point in the history
* Continuously connect

* Added another delay due to disconnect

* Increase useless peer timeout

* Simple rate limiter

* Integrate with peer manager

* Adjust some stats

* Check all peer, and uses pending variable

* Make logic clearer

* Minor cleanup

* Missed cancellation token

* Cancel setup outgoing peer connection if throttled

* Lockless ratelimiter

* Addressing comment

* Minor adjustments

* Whitespace

* Having trouble getting candidate at higher speed

* Make test more consistent

* Even more lenient
  • Loading branch information
asdacap authored Jun 23, 2023
1 parent 7804661 commit 6ab0a3a
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 87 deletions.
66 changes: 66 additions & 0 deletions src/Nethermind/Nethermind.Core.Test/RateLimiterTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using NUnit.Framework;

namespace Nethermind.Core.Test;

[Parallelizable(ParallelScope.Self)]
public class RateLimiterTests
{
[TestCase(100, 1, 1000)]
[TestCase(100, 1, 100)]
[TestCase(1000, 1, 100)]
[TestCase(100, 4, 1000)]
[TestCase(100, 4, 100)]
[TestCase(1000, 4, 100)]
public async Task RateLimiter_should_delay_wait_to_rate_limit(int eventPerSec, int concurrency, int durationMs)
{
RateLimiter rateLimiter = new(eventPerSec);

TimeSpan duration = TimeSpan.FromMilliseconds(durationMs);
DateTimeOffset startTime = DateTimeOffset.Now;
DateTimeOffset deadline = startTime + duration;
long counter = 0;

Task[] tasks = Enumerable.Range(0, concurrency).Select(async (_) =>
{
while (DateTimeOffset.Now < deadline)
{
Interlocked.Increment(ref counter);
await rateLimiter.WaitAsync(CancellationToken.None);
}
}).ToArray();

Task.WaitAll(tasks);

int effectivePerSec = (int)(counter / (DateTimeOffset.Now - startTime).TotalSeconds);
effectivePerSec.Should().BeInRange((int)(eventPerSec * 0.5), (int)(eventPerSec * 1.1));
}

[Test]
public async Task RateLimiter_should_throw_when_cancelled()
{
RateLimiter rateLimiter = new(1);
await rateLimiter.WaitAsync(CancellationToken.None);
CancellationTokenSource cts = new();
ValueTask waitTask = rateLimiter.WaitAsync(cts.Token);
cts.Cancel();

Func<Task> act = async () => await waitTask;
await act.Should().ThrowAsync<OperationCanceledException>();
}

[Test]
public async Task RateLimiter_should_return_true_on_is_throttled_if_throttled()
{
RateLimiter rateLimiter = new(1);
await rateLimiter.WaitAsync(CancellationToken.None);
rateLimiter.IsThrottled().Should().BeTrue();
}
}
73 changes: 73 additions & 0 deletions src/Nethermind/Nethermind.Core/RateLimiter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Core.Extensions;

namespace Nethermind.Core;

/// <summary>
/// Simple rate limiter that limits rate of event, by delaying the caller so that a minimum amount of time elapsed
/// between event.
/// </summary>
public class RateLimiter
{
private readonly long _delay;
private long _nextSlot;

public RateLimiter(int eventPerSec) : this(1.0 / eventPerSec)
{
}

private RateLimiter(double intervalSec)
{
_delay = (long)(Stopwatch.Frequency * intervalSec);

_nextSlot = GetCurrentTick();
}

public static long GetCurrentTick()
{
return Stopwatch.GetTimestamp();
}

private static double TickToMs(long tick)
{
return tick * 1000.0 / Stopwatch.Frequency;
}

/// <summary>
/// Return true if its definitely will be throttled when calling WaitAsync. May still get throttled even if this
/// return false.
/// </summary>
/// <returns></returns>
public bool IsThrottled()
{
return GetCurrentTick() < _nextSlot;
}

public async ValueTask WaitAsync(CancellationToken ctx)
{
while (true)
{
long originalNextSlot = _nextSlot;

// Technically its possible that two `GetCurrentTick()` call at the same time can return same value,
// but its very unlikely.
long now = GetCurrentTick();
if (now >= originalNextSlot
&& Interlocked.CompareExchange(ref _nextSlot, now + _delay, originalNextSlot) == originalNextSlot)
{
return;
}

long toWait = originalNextSlot - now;
if (toWait < 0) continue;

await Task.Delay(TimeSpan.FromMilliseconds(TickToMs(toWait)), ctx);
}
}
}
12 changes: 6 additions & 6 deletions src/Nethermind/Nethermind.Network.Stats/StatsParameters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,19 @@ private StatsParameters()

public int[] FailedConnectionDelays { get; }

public int[] DisconnectDelays { get; }
public int[] DisconnectDelays { get; set; }

public Dictionary<DisconnectReason, TimeSpan> DelayDueToLocalDisconnect { get; } = new()
{
{ DisconnectReason.UselessPeer, TimeSpan.FromMinutes(5) }
{ DisconnectReason.UselessPeer, TimeSpan.FromMinutes(15) },

// Its actually protocol init timeout, when status message is not received in time.
{ DisconnectReason.ReceiveMessageTimeout, TimeSpan.FromMinutes(5) },
};

public Dictionary<DisconnectReason, TimeSpan> DelayDueToRemoteDisconnect { get; } = new()
{
// Actual explicit ClientQuitting is very rare, but internally we also use this status for connection
// closed, which can happen if remote client close connection without giving any reason.
// It is unclear why we have such large number of these, but it seems that it is usually transient.
{ DisconnectReason.ClientQuitting, TimeSpan.FromMinutes(1) }
{ DisconnectReason.ClientQuitting, TimeSpan.FromMinutes(1) },
};

public Dictionary<NodeStatsEventType, TimeSpan> DelayDueToEvent { get; } = new()
Expand Down
36 changes: 22 additions & 14 deletions src/Nethermind/Nethermind.Network.Test/PeerManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@
using Nethermind.Crypto;
using Nethermind.Logging;
using Nethermind.Network.Config;
using Nethermind.Network.Discovery;
using Nethermind.Network.P2P;
using Nethermind.Network.P2P.Analyzers;
using Nethermind.Network.P2P.EventArg;
using Nethermind.Network.Rlpx;
using Nethermind.Network.StaticNodes;
using Nethermind.Stats;
using Nethermind.Stats.Model;
using NSubstitute;
Expand Down Expand Up @@ -85,12 +83,18 @@ public async Task Will_connect_to_a_candidate_node()
[Test]
public async Task Will_only_connect_up_to_max_peers()
{
await using Context ctx = new();
await using Context ctx = new(1);
ctx.SetupPersistedPeers(50);
ctx.PeerPool.Start();
ctx.PeerManager.Start();
await Task.Delay(_travisDelayLong * 10);
Assert.That(ctx.RlpxPeer.ConnectAsyncCallsCount, Is.EqualTo(25));
await Task.Delay(_travisDelayLong);

int expectedConnectCount = 25;
Assert.That(
() => ctx.RlpxPeer.ConnectAsyncCallsCount,
Is
.InRange(expectedConnectCount, expectedConnectCount + 1)
.After(_travisDelay * 10, 10));
}

[Test]
Expand Down Expand Up @@ -261,11 +265,11 @@ public async Task Will_fill_up_on_disconnects()
ctx.PeerPool.Start();
ctx.PeerManager.Start();
await Task.Delay(_travisDelayLong);
Assert.That(ctx.RlpxPeer.ConnectAsyncCallsCount, Is.EqualTo(25));
Assert.That(ctx.RlpxPeer.ConnectAsyncCallsCount, Is.AtLeast(25));
ctx.DisconnectAllSessions();

await Task.Delay(_travisDelayLong);
Assert.That(ctx.RlpxPeer.ConnectAsyncCallsCount, Is.EqualTo(50));
Assert.That(ctx.RlpxPeer.ConnectAsyncCallsCount, Is.AtLeast(50));
}

[Test, Retry(5)]
Expand All @@ -292,21 +296,23 @@ public async Task Will_fill_up_over_and_over_again_on_disconnects()

TimeSpan prevConnectingDelay = StatsParameters.Instance.DelayDueToEvent[NodeStatsEventType.Connecting];
StatsParameters.Instance.DelayDueToEvent[NodeStatsEventType.Connecting] = TimeSpan.Zero;
int[] prevDisconnectDelays = StatsParameters.Instance.DisconnectDelays;
StatsParameters.Instance.DisconnectDelays = new[] { 0 };

try
{
int currentCount = 0;
for (int i = 0; i < 10; i++)
{
currentCount += 25;
await Task.Delay(_travisDelayLonger);
Assert.That(ctx.RlpxPeer.ConnectAsyncCallsCount, Is.EqualTo(currentCount));
Assert.That(
() => ctx.PeerPool.ActivePeers.Count(),
Is.AtLeast(25).After(_travisDelayLonger * 2, 10));
ctx.DisconnectAllSessions();
}
}
finally
{
StatsParameters.Instance.DelayDueToEvent[NodeStatsEventType.Connecting] = prevConnectingDelay;
StatsParameters.Instance.DisconnectDelays = prevDisconnectDelays;
}
}

Expand Down Expand Up @@ -379,7 +385,7 @@ public async Task Will_fill_up_over_and_over_again_on_disconnects_and_when_ids_k
{
currentCount += 25;
maxCount += 50;
await Task.Delay(_travisDelay);
Assert.That(() => ctx.RlpxPeer.ConnectAsyncCallsCount, Is.InRange(currentCount, maxCount).After(_travisDelayLonger * 2, 10));
ctx.RlpxPeer.ConnectAsyncCallsCount.Should().BeInRange(currentCount, maxCount);
ctx.HandshakeAllSessions();
await Task.Delay(_travisDelay);
Expand Down Expand Up @@ -561,7 +567,7 @@ private class Context : IAsyncDisposable
public IStaticNodesManager StaticNodesManager { get; }
public List<Session> Sessions { get; } = new();

public Context()
public Context(int parallelism = 0)
{
RlpxPeer = new RlpxMock(Sessions);
DiscoveryApp = Substitute.For<IDiscoveryApp>();
Expand All @@ -573,6 +579,8 @@ public Context()
NetworkConfig = new NetworkConfig();
NetworkConfig.MaxActivePeers = 25;
NetworkConfig.PeersPersistenceInterval = 50;
NetworkConfig.NumConcurrentOutgoingConnects = parallelism;
NetworkConfig.MaxOutgoingConnectPerSec = 1000000; // no limit in unit test
StaticNodesManager = Substitute.For<IStaticNodesManager>();
StaticNodesManager.LoadInitialList().Returns(new List<Node>());
CompositeNodeSource nodeSources = new(NodesLoader, DiscoveryApp, StaticNodesManager);
Expand Down Expand Up @@ -654,7 +662,7 @@ public void DisconnectAllSessions()

foreach (Session session in clone)
{
session.MarkDisconnected(DisconnectReason.TooManyPeers, DisconnectType.Remote, "test");
session.MarkDisconnected(DisconnectReason.DisconnectRequested, DisconnectType.Remote, "test");
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/Nethermind/Nethermind.Network/Config/INetworkConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public interface INetworkConfig : IConfig
[ConfigItem(DefaultValue = "0", HiddenFromDocs = true, Description = "[TECHNICAL] Number of concurrent outgoing connections. Reduce this if your ISP throttles from having open too many connections. Default is 0 which means same as processor count.")]
int NumConcurrentOutgoingConnects { get; set; }

[ConfigItem(DefaultValue = "20", Description = "[TECHNICAL] Max number of new outgoing connections per second. Default is 20.")]
int MaxOutgoingConnectPerSec { get; set; }

[ConfigItem(DefaultValue = "2000", HiddenFromDocs = true, Description = "[TECHNICAL] Outgoing connection timeout in ms. Default is 2 seconds.")]
int ConnectTimeoutMs { get; set; }

Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Network/Config/NetworkConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class NetworkConfig : INetworkConfig
public int P2PPort { get; set; } = 30303;
public long SimulateSendLatencyMs { get; set; } = 0;
public int NumConcurrentOutgoingConnects { get; set; } = 0;
public int MaxOutgoingConnectPerSec { get; set; } = 20;
public int ConnectTimeoutMs { get; set; } = 2000;
public int ProcessingThreadCount { get; set; } = 1;
}
Expand Down
Loading

0 comments on commit 6ab0a3a

Please sign in to comment.