diff --git a/src/Nethermind/Nethermind.Core.Test/RateLimiterTests.cs b/src/Nethermind/Nethermind.Core.Test/RateLimiterTests.cs new file mode 100644 index 00000000000..d0a3fb7e8f2 --- /dev/null +++ b/src/Nethermind/Nethermind.Core.Test/RateLimiterTests.cs @@ -0,0 +1,66 @@ +// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using NUnit.Framework; + +namespace Nethermind.Core.Test; + +[Parallelizable(ParallelScope.Self)] +public class RateLimiterTests +{ + [TestCase(100, 1, 1000)] + [TestCase(100, 1, 100)] + [TestCase(1000, 1, 100)] + [TestCase(100, 4, 1000)] + [TestCase(100, 4, 100)] + [TestCase(1000, 4, 100)] + public async Task RateLimiter_should_delay_wait_to_rate_limit(int eventPerSec, int concurrency, int durationMs) + { + RateLimiter rateLimiter = new(eventPerSec); + + TimeSpan duration = TimeSpan.FromMilliseconds(durationMs); + DateTimeOffset startTime = DateTimeOffset.Now; + DateTimeOffset deadline = startTime + duration; + long counter = 0; + + Task[] tasks = Enumerable.Range(0, concurrency).Select(async (_) => + { + while (DateTimeOffset.Now < deadline) + { + Interlocked.Increment(ref counter); + await rateLimiter.WaitAsync(CancellationToken.None); + } + }).ToArray(); + + Task.WaitAll(tasks); + + int effectivePerSec = (int)(counter / (DateTimeOffset.Now - startTime).TotalSeconds); + effectivePerSec.Should().BeInRange((int)(eventPerSec * 0.5), (int)(eventPerSec * 1.1)); + } + + [Test] + public async Task RateLimiter_should_throw_when_cancelled() + { + RateLimiter rateLimiter = new(1); + await rateLimiter.WaitAsync(CancellationToken.None); + CancellationTokenSource cts = new(); + ValueTask waitTask = rateLimiter.WaitAsync(cts.Token); + cts.Cancel(); + + Func act = async () => await waitTask; + await act.Should().ThrowAsync(); + } + + [Test] + public async Task RateLimiter_should_return_true_on_is_throttled_if_throttled() + { + RateLimiter rateLimiter = new(1); + await rateLimiter.WaitAsync(CancellationToken.None); + rateLimiter.IsThrottled().Should().BeTrue(); + } +} diff --git a/src/Nethermind/Nethermind.Core/RateLimiter.cs b/src/Nethermind/Nethermind.Core/RateLimiter.cs new file mode 100644 index 00000000000..201854d19fe --- /dev/null +++ b/src/Nethermind/Nethermind.Core/RateLimiter.cs @@ -0,0 +1,73 @@ +// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; +using Nethermind.Core.Extensions; + +namespace Nethermind.Core; + +/// +/// Simple rate limiter that limits rate of event, by delaying the caller so that a minimum amount of time elapsed +/// between event. +/// +public class RateLimiter +{ + private readonly long _delay; + private long _nextSlot; + + public RateLimiter(int eventPerSec) : this(1.0 / eventPerSec) + { + } + + private RateLimiter(double intervalSec) + { + _delay = (long)(Stopwatch.Frequency * intervalSec); + + _nextSlot = GetCurrentTick(); + } + + public static long GetCurrentTick() + { + return Stopwatch.GetTimestamp(); + } + + private static double TickToMs(long tick) + { + return tick * 1000.0 / Stopwatch.Frequency; + } + + /// + /// Return true if its definitely will be throttled when calling WaitAsync. May still get throttled even if this + /// return false. + /// + /// + public bool IsThrottled() + { + return GetCurrentTick() < _nextSlot; + } + + public async ValueTask WaitAsync(CancellationToken ctx) + { + while (true) + { + long originalNextSlot = _nextSlot; + + // Technically its possible that two `GetCurrentTick()` call at the same time can return same value, + // but its very unlikely. + long now = GetCurrentTick(); + if (now >= originalNextSlot + && Interlocked.CompareExchange(ref _nextSlot, now + _delay, originalNextSlot) == originalNextSlot) + { + return; + } + + long toWait = originalNextSlot - now; + if (toWait < 0) continue; + + await Task.Delay(TimeSpan.FromMilliseconds(TickToMs(toWait)), ctx); + } + } +} diff --git a/src/Nethermind/Nethermind.Network.Stats/StatsParameters.cs b/src/Nethermind/Nethermind.Network.Stats/StatsParameters.cs index 4eba1b3bdbd..2e8a530adce 100644 --- a/src/Nethermind/Nethermind.Network.Stats/StatsParameters.cs +++ b/src/Nethermind/Nethermind.Network.Stats/StatsParameters.cs @@ -41,19 +41,19 @@ private StatsParameters() public int[] FailedConnectionDelays { get; } - public int[] DisconnectDelays { get; } + public int[] DisconnectDelays { get; set; } public Dictionary DelayDueToLocalDisconnect { get; } = new() { - { DisconnectReason.UselessPeer, TimeSpan.FromMinutes(5) } + { DisconnectReason.UselessPeer, TimeSpan.FromMinutes(15) }, + + // Its actually protocol init timeout, when status message is not received in time. + { DisconnectReason.ReceiveMessageTimeout, TimeSpan.FromMinutes(5) }, }; public Dictionary DelayDueToRemoteDisconnect { get; } = new() { - // Actual explicit ClientQuitting is very rare, but internally we also use this status for connection - // closed, which can happen if remote client close connection without giving any reason. - // It is unclear why we have such large number of these, but it seems that it is usually transient. - { DisconnectReason.ClientQuitting, TimeSpan.FromMinutes(1) } + { DisconnectReason.ClientQuitting, TimeSpan.FromMinutes(1) }, }; public Dictionary DelayDueToEvent { get; } = new() diff --git a/src/Nethermind/Nethermind.Network.Test/PeerManagerTests.cs b/src/Nethermind/Nethermind.Network.Test/PeerManagerTests.cs index aaf662ad94b..643ac9d6e9d 100644 --- a/src/Nethermind/Nethermind.Network.Test/PeerManagerTests.cs +++ b/src/Nethermind/Nethermind.Network.Test/PeerManagerTests.cs @@ -16,12 +16,10 @@ using Nethermind.Crypto; using Nethermind.Logging; using Nethermind.Network.Config; -using Nethermind.Network.Discovery; using Nethermind.Network.P2P; using Nethermind.Network.P2P.Analyzers; using Nethermind.Network.P2P.EventArg; using Nethermind.Network.Rlpx; -using Nethermind.Network.StaticNodes; using Nethermind.Stats; using Nethermind.Stats.Model; using NSubstitute; @@ -85,12 +83,18 @@ public async Task Will_connect_to_a_candidate_node() [Test] public async Task Will_only_connect_up_to_max_peers() { - await using Context ctx = new(); + await using Context ctx = new(1); ctx.SetupPersistedPeers(50); ctx.PeerPool.Start(); ctx.PeerManager.Start(); - await Task.Delay(_travisDelayLong * 10); - Assert.That(ctx.RlpxPeer.ConnectAsyncCallsCount, Is.EqualTo(25)); + await Task.Delay(_travisDelayLong); + + int expectedConnectCount = 25; + Assert.That( + () => ctx.RlpxPeer.ConnectAsyncCallsCount, + Is + .InRange(expectedConnectCount, expectedConnectCount + 1) + .After(_travisDelay * 10, 10)); } [Test] @@ -261,11 +265,11 @@ public async Task Will_fill_up_on_disconnects() ctx.PeerPool.Start(); ctx.PeerManager.Start(); await Task.Delay(_travisDelayLong); - Assert.That(ctx.RlpxPeer.ConnectAsyncCallsCount, Is.EqualTo(25)); + Assert.That(ctx.RlpxPeer.ConnectAsyncCallsCount, Is.AtLeast(25)); ctx.DisconnectAllSessions(); await Task.Delay(_travisDelayLong); - Assert.That(ctx.RlpxPeer.ConnectAsyncCallsCount, Is.EqualTo(50)); + Assert.That(ctx.RlpxPeer.ConnectAsyncCallsCount, Is.AtLeast(50)); } [Test, Retry(5)] @@ -292,21 +296,23 @@ public async Task Will_fill_up_over_and_over_again_on_disconnects() TimeSpan prevConnectingDelay = StatsParameters.Instance.DelayDueToEvent[NodeStatsEventType.Connecting]; StatsParameters.Instance.DelayDueToEvent[NodeStatsEventType.Connecting] = TimeSpan.Zero; + int[] prevDisconnectDelays = StatsParameters.Instance.DisconnectDelays; + StatsParameters.Instance.DisconnectDelays = new[] { 0 }; try { - int currentCount = 0; for (int i = 0; i < 10; i++) { - currentCount += 25; - await Task.Delay(_travisDelayLonger); - Assert.That(ctx.RlpxPeer.ConnectAsyncCallsCount, Is.EqualTo(currentCount)); + Assert.That( + () => ctx.PeerPool.ActivePeers.Count(), + Is.AtLeast(25).After(_travisDelayLonger * 2, 10)); ctx.DisconnectAllSessions(); } } finally { StatsParameters.Instance.DelayDueToEvent[NodeStatsEventType.Connecting] = prevConnectingDelay; + StatsParameters.Instance.DisconnectDelays = prevDisconnectDelays; } } @@ -379,7 +385,7 @@ public async Task Will_fill_up_over_and_over_again_on_disconnects_and_when_ids_k { currentCount += 25; maxCount += 50; - await Task.Delay(_travisDelay); + Assert.That(() => ctx.RlpxPeer.ConnectAsyncCallsCount, Is.InRange(currentCount, maxCount).After(_travisDelayLonger * 2, 10)); ctx.RlpxPeer.ConnectAsyncCallsCount.Should().BeInRange(currentCount, maxCount); ctx.HandshakeAllSessions(); await Task.Delay(_travisDelay); @@ -561,7 +567,7 @@ private class Context : IAsyncDisposable public IStaticNodesManager StaticNodesManager { get; } public List Sessions { get; } = new(); - public Context() + public Context(int parallelism = 0) { RlpxPeer = new RlpxMock(Sessions); DiscoveryApp = Substitute.For(); @@ -573,6 +579,8 @@ public Context() NetworkConfig = new NetworkConfig(); NetworkConfig.MaxActivePeers = 25; NetworkConfig.PeersPersistenceInterval = 50; + NetworkConfig.NumConcurrentOutgoingConnects = parallelism; + NetworkConfig.MaxOutgoingConnectPerSec = 1000000; // no limit in unit test StaticNodesManager = Substitute.For(); StaticNodesManager.LoadInitialList().Returns(new List()); CompositeNodeSource nodeSources = new(NodesLoader, DiscoveryApp, StaticNodesManager); @@ -654,7 +662,7 @@ public void DisconnectAllSessions() foreach (Session session in clone) { - session.MarkDisconnected(DisconnectReason.TooManyPeers, DisconnectType.Remote, "test"); + session.MarkDisconnected(DisconnectReason.DisconnectRequested, DisconnectType.Remote, "test"); } } diff --git a/src/Nethermind/Nethermind.Network/Config/INetworkConfig.cs b/src/Nethermind/Nethermind.Network/Config/INetworkConfig.cs index 48a31f89410..e01030cd12f 100644 --- a/src/Nethermind/Nethermind.Network/Config/INetworkConfig.cs +++ b/src/Nethermind/Nethermind.Network/Config/INetworkConfig.cs @@ -86,6 +86,9 @@ public interface INetworkConfig : IConfig [ConfigItem(DefaultValue = "0", HiddenFromDocs = true, Description = "[TECHNICAL] Number of concurrent outgoing connections. Reduce this if your ISP throttles from having open too many connections. Default is 0 which means same as processor count.")] int NumConcurrentOutgoingConnects { get; set; } + [ConfigItem(DefaultValue = "20", Description = "[TECHNICAL] Max number of new outgoing connections per second. Default is 20.")] + int MaxOutgoingConnectPerSec { get; set; } + [ConfigItem(DefaultValue = "2000", HiddenFromDocs = true, Description = "[TECHNICAL] Outgoing connection timeout in ms. Default is 2 seconds.")] int ConnectTimeoutMs { get; set; } diff --git a/src/Nethermind/Nethermind.Network/Config/NetworkConfig.cs b/src/Nethermind/Nethermind.Network/Config/NetworkConfig.cs index 1f57cad0641..2a988dc4479 100644 --- a/src/Nethermind/Nethermind.Network/Config/NetworkConfig.cs +++ b/src/Nethermind/Nethermind.Network/Config/NetworkConfig.cs @@ -35,6 +35,7 @@ public class NetworkConfig : INetworkConfig public int P2PPort { get; set; } = 30303; public long SimulateSendLatencyMs { get; set; } = 0; public int NumConcurrentOutgoingConnects { get; set; } = 0; + public int MaxOutgoingConnectPerSec { get; set; } = 20; public int ConnectTimeoutMs { get; set; } = 2000; public int ProcessingThreadCount { get; set; } = 1; } diff --git a/src/Nethermind/Nethermind.Network/PeerManager.cs b/src/Nethermind/Nethermind.Network/PeerManager.cs index 3b7d57ecf51..354ace71a0c 100644 --- a/src/Nethermind/Nethermind.Network/PeerManager.cs +++ b/src/Nethermind/Nethermind.Network/PeerManager.cs @@ -2,15 +2,15 @@ // SPDX-License-Identifier: LGPL-3.0-only using System; -using System.Buffers; using System.Collections.Concurrent; using System.Collections.Generic; using System.ComponentModel; using System.Linq; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; using FastEnumUtility; +using Nethermind.Core; using Nethermind.Core.Attributes; using Nethermind.Core.Crypto; using Nethermind.Logging; @@ -20,7 +20,6 @@ using Nethermind.Network.Rlpx; using Nethermind.Stats; using Nethermind.Stats.Model; -using Nethermind.Synchronization.Peers; using Timer = System.Timers.Timer; namespace Nethermind.Network @@ -37,6 +36,7 @@ public class PeerManager : IPeerManager private readonly PeerComparer _peerComparer = new(); private readonly IPeerPool _peerPool; private readonly List _candidates; + private readonly RateLimiter _outgoingConnectionRateLimiter; private int _pending; private int _tryCount; @@ -51,10 +51,9 @@ public class PeerManager : IPeerManager private bool _isStarted; private int _logCounter = 1; - private Task _peerUpdateLoopTask; private readonly CancellationTokenSource _cancellationTokenSource = new(); - private readonly int _parallelism; + private readonly int _outgoingConnectParallelism; public PeerManager( IRlpxHost rlpxHost, @@ -67,11 +66,12 @@ public PeerManager( _rlpxHost = rlpxHost ?? throw new ArgumentNullException(nameof(rlpxHost)); _stats = stats ?? throw new ArgumentNullException(nameof(stats)); _networkConfig = networkConfig ?? throw new ArgumentNullException(nameof(networkConfig)); - _parallelism = networkConfig.NumConcurrentOutgoingConnects; - if (_parallelism == 0) + _outgoingConnectParallelism = networkConfig.NumConcurrentOutgoingConnects; + if (_outgoingConnectParallelism == 0) { - _parallelism = Environment.ProcessorCount; + _outgoingConnectParallelism = Environment.ProcessorCount; } + _outgoingConnectionRateLimiter = new RateLimiter(networkConfig.MaxOutgoingConnectPerSec); _peerPool = peerPool; _candidates = new List(networkConfig.MaxActivePeers * 2); @@ -122,7 +122,7 @@ private void PeerPoolOnPeerAdded(object sender, PeerEventArgs nodeEventArgs) // fire and forget - all the surrounding logic will be executed // exceptions can be lost here without issues // this for rapid connections to newly discovered peers without having to go through the UpdatePeerLoop - SetupOutgoingPeerConnection(peer); + SetupOutgoingPeerConnection(peer, cancelIfThrottled: true); } #pragma warning restore 4014 } @@ -158,7 +158,7 @@ public void Start() StartPeerUpdateLoop(); - _peerUpdateLoopTask = Task.Factory.StartNew( + Task.Factory.StartNew( RunPeerUpdateLoop, _cancellationTokenSource.Token, TaskCreationOptions.LongRunning, @@ -205,7 +205,17 @@ private class CandidateSelection private async Task RunPeerUpdateLoop() { - const int TIME_WAIT = 60_000; + Channel taskChannel = Channel.CreateBounded(1); + List? tasks = Enumerable.Range(0, _outgoingConnectParallelism).Select((idx) => + { + return Task.Run(async () => + { + await foreach (Peer peer in taskChannel.Reader.ReadAllAsync(_cancellationTokenSource.Token)) + { + await SetupOutgoingPeerConnection(peer); + } + }); + }).ToList(); int loopCount = 0; long previousActivePeersCount = 0; @@ -236,7 +246,7 @@ private async Task RunPeerUpdateLoop() continue; } - if (AvailableActivePeersCount == 0) + if (!EnsureAvailableActivePeerSlot()) { continue; } @@ -250,6 +260,9 @@ private async Task RunPeerUpdateLoop() List remainingCandidates = _currentSelection.Candidates; if (remainingCandidates.Count == 0) { + // Delay to prevent high CPU use. There is a shortcut path for newly discovered peer, so having + // a lower delay probably wont do much. + await Task.Delay(TimeSpan.FromSeconds(1)); continue; } @@ -258,61 +271,16 @@ private async Task RunPeerUpdateLoop() break; } - int currentPosition = 0; - long lastMs = Environment.TickCount64; - int peersTried = 0; - while (true) + foreach (Peer peer in remainingCandidates) { - if (_cancellationTokenSource.IsCancellationRequested) - { - break; - } - - int nodesToTry = Math.Min(remainingCandidates.Count - currentPosition, AvailableActivePeersCount); - if (nodesToTry <= 0) + if (!EnsureAvailableActivePeerSlot()) { + // Some new connection are in flight at this point, but statistically speaking, they + // are going to fail, so its fine. break; } - peersTried += nodesToTry; - ActionBlock workerBlock = new( - SetupOutgoingPeerConnection, - new ExecutionDataflowBlockOptions - { - MaxDegreeOfParallelism = _parallelism, - CancellationToken = _cancellationTokenSource.Token - }); - - for (int i = 0; i < nodesToTry; i++) - { - await workerBlock.SendAsync(remainingCandidates[currentPosition + i]); - } - - currentPosition += nodesToTry; - - workerBlock.Complete(); - - // Wait for all messages to propagate through the network. - await workerBlock.Completion; - - Interlocked.Increment(ref _connectionRounds); - - long nowMs = Environment.TickCount64; - if (peersTried > 1_000) - { - peersTried = 0; - // Wait for sockets to clear - await Task.Delay(TIME_WAIT); - } - else - { - long diffMs = nowMs - lastMs; - if (diffMs < 50) - { - await Task.Delay(50 - (int)diffMs); - } - } - lastMs = nowMs; + await taskChannel.Writer.WriteAsync(peer, _cancellationTokenSource.Token); } if (_logger.IsTrace) @@ -344,11 +312,8 @@ private async Task RunPeerUpdateLoop() _logCounter++; } - if (_peerPool.ActivePeerCount < MaxActivePeers) + if (EnsureAvailableActivePeerSlot()) { - // We been though all the peers once, so wait TIME-WAIT additional delay before - // trying them again to avoid busy loop or exhausting sockets. - await Task.Delay(TIME_WAIT); _peerUpdateRequested.Set(); } @@ -380,8 +345,34 @@ private async Task RunPeerUpdateLoop() _peerUpdateTimer.Start(); } + + taskChannel.Writer.Complete(); + await Task.WhenAll(tasks); } + private bool EnsureAvailableActivePeerSlot() + { + if (AvailableActivePeersCount - _pending > 0) + { + return true; + } + + // Once the connection was established, the active peer count will increase, but it might + // not pass the handshake and the status check. So we wait for a bit to see if we can get + // the active peer count to go down within this time window. + DateTimeOffset deadline = DateTimeOffset.Now + Timeouts.Handshake + + TimeSpan.FromMilliseconds(_networkConfig.ConnectTimeoutMs); + while (DateTimeOffset.Now < deadline && (AvailableActivePeersCount - _pending) <= 0) + { + // The signal is not very reliable. So we just do like a simple pool. + _peerUpdateRequested.Reset(); + _peerUpdateRequested.Wait(TimeSpan.FromMilliseconds(100)); + } + + return AvailableActivePeersCount - _pending > 0; + } + + private static readonly IReadOnlyList _enumValues = FastEnum.GetValues(); private void SelectAndRankCandidates() @@ -617,8 +608,12 @@ public int Compare(PeerStats x, PeerStats y) #region Outgoing connection handling [Todo(Improve.MissingFunctionality, "Add cancellation support for the peer connection (so it does not wait for the 10sec timeout")] - private async Task SetupOutgoingPeerConnection(Peer peer) + private async Task SetupOutgoingPeerConnection(Peer peer, bool cancelIfThrottled = false) { + if (cancelIfThrottled && _outgoingConnectionRateLimiter.IsThrottled()) return; + + await _outgoingConnectionRateLimiter.WaitAsync(_cancellationTokenSource.Token); + // Can happen when In connection is received from the same peer and is initialized before we get here // In this case we do not initialize OUT connection if (!AddActivePeer(peer.Node.Id, peer, "upgrading candidate")) @@ -633,6 +628,7 @@ private async Task SetupOutgoingPeerConnection(Peer peer) // for some time we will have a peer in active that has no session assigned - analyze this? Interlocked.Decrement(ref _pending); + _peerUpdateRequested.Set(); if (_logger.IsTrace) _logger.Trace($"Connecting to {_stats.GetCurrentReputation(peer.Node)} rep node - {result}, ACTIVE: {_peerPool.ActivePeerCount}, CAND: {_peerPool.PeerCount}"); if (!result)