Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Less allocations for node processing #6967

Merged
merged 8 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,16 @@ public void ProcessFindNodeMsg(FindNodeMsg msg)
NodeStats.AddNodeStatsEvent(NodeStatsEventType.DiscoveryFindNodeIn);
RefreshNodeContactTime();

Node[] nodes = _nodeTable
.GetClosestNodes(msg.SearchedNodeId)
.Take(12) // Otherwise the payload may become too big, which is out of spec.
.ToArray();
// 12 otherwise the payload may become too big, which is out of spec.
var closestNodes = _nodeTable.GetClosestNodes(msg.SearchedNodeId, bucketSize: 12);
Node[] nodes = new Node[closestNodes.Count];
int count = 0;
foreach (Node node in closestNodes)
{
nodes[count] = node;
count++;
}

SendNeighbors(nodes);
}

Expand Down
50 changes: 39 additions & 11 deletions src/Nethermind/Nethermind.Network.Discovery/NodesLocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public void Initialize(Node masterNode)
_masterNode = masterNode;
}

public async Task LocateNodesAsync(CancellationToken cancellationToken)
public Task LocateNodesAsync(CancellationToken cancellationToken)
{
await LocateNodesAsync(null, cancellationToken);
return LocateNodesAsync(null, cancellationToken);
}

public async Task LocateNodesAsync(byte[]? searchedNodeId, CancellationToken cancellationToken)
Expand All @@ -59,16 +59,37 @@ public async Task LocateNodesAsync(byte[]? searchedNodeId, CancellationToken can
int attemptsCount = 0;
while (true)
{
//if searched node is not specified master node is used
IEnumerable<Node> closestNodes = searchedNodeId is not null ? _nodeTable.GetClosestNodes(searchedNodeId) : _nodeTable.GetClosestNodes();

candidatesCount = 0;
foreach (Node closestNode in closestNodes.Where(node => !alreadyTriedNodes.Contains(node.IdHash)))
if (searchedNodeId is not null)
{
foreach (Node closestNode in _nodeTable.GetClosestNodes(searchedNodeId))
{
if (alreadyTriedNodes.Contains(closestNode.IdHash))
{
continue;
}

tryCandidates[candidatesCount++] = closestNode;
if (candidatesCount > tryCandidates.Length - 1)
{
break;
}
}
}
else
{
tryCandidates[candidatesCount++] = closestNode;
if (candidatesCount > tryCandidates.Length - 1)
foreach (Node closestNode in _nodeTable.GetClosestNodes())
{
break;
if (alreadyTriedNodes.Contains(closestNode.IdHash))
{
continue;
}

tryCandidates[candidatesCount++] = closestNode;
if (candidatesCount > tryCandidates.Length - 1)
{
break;
}
}
}

Expand Down Expand Up @@ -134,7 +155,14 @@ public async Task LocateNodesAsync(byte[]? searchedNodeId, CancellationToken can
}
}
}
int nodesCountAfterDiscovery = _nodeTable.Buckets.Sum(x => x.BondedItemsCount);

int nodesCountAfterDiscovery = 0;
var buckets = _nodeTable.Buckets;
for (int i = 0; i < buckets.Length; i++)
{
nodesCountAfterDiscovery += buckets[i].BondedItemsCount;
}

if (_logger.IsDebug) _logger.Debug($"Finished discovery cycle, tried contacting {alreadyTriedNodes.Count} nodes. All nodes count before the process: {nodesCountBeforeDiscovery}, after the process: {nodesCountAfterDiscovery}");

if (_logger.IsTrace)
Expand Down Expand Up @@ -172,7 +200,7 @@ private int NodesCountBeforeDiscovery

private void LogNodeTable()
{
IEnumerable<NodeBucket> nonEmptyBuckets = _nodeTable.Buckets.Where(x => x.BondedItems.Any());
IEnumerable<NodeBucket> nonEmptyBuckets = _nodeTable.Buckets.Where(x => x.AnyBondedItems());
StringBuilder sb = new();

int length = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using Nethermind.Core.Crypto;
using Nethermind.Stats.Model;
using static Nethermind.Network.Discovery.RoutingTable.NodeTable;

namespace Nethermind.Network.Discovery.RoutingTable;

Expand All @@ -18,10 +19,11 @@ public interface INodeTable
/// <summary>
/// GetClosestNodes to MasterNode
/// </summary>
IEnumerable<Node> GetClosestNodes();
ClosestNodesEnumerator GetClosestNodes();

/// <summary>
/// GetClosestNodes to provided Node
/// </summary>
IEnumerable<Node> GetClosestNodes(byte[] nodeId);
ClosestNodesFromNodeEnumerator GetClosestNodes(byte[] nodeId);
ClosestNodesFromNodeEnumerator GetClosestNodes(byte[] nodeId, int bucketSize);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System.Collections;
using System.Diagnostics;
using Nethermind.Stats.Model;

Expand Down Expand Up @@ -28,26 +29,71 @@ public NodeBucket(int distance, int bucketSize, float dropFullBucketProbability

public int BucketSize { get; }

public IEnumerable<NodeBucketItem> BondedItems
public bool AnyBondedItems()
{
get
foreach (NodeBucketItem _ in BondedItems)
{
lock (_nodeBucketLock)
return true;
}

return false;
}

public BondedItemsEnumerator BondedItems
=> new(this);

public struct BondedItemsEnumerator : IEnumerator<NodeBucketItem>, IEnumerable<NodeBucketItem>
{
private NodeBucket _nodeBucket;
private LinkedListNode<NodeBucketItem>? _currentNode;
private DateTime _referenceTime;

public BondedItemsEnumerator(NodeBucket nodeBucket)
{
_nodeBucket = nodeBucket;
_referenceTime = DateTime.UtcNow;
Monitor.Enter(_nodeBucket._nodeBucketLock);
_currentNode = nodeBucket._items.Last;
Current = null!;
}

public NodeBucketItem Current { get; private set; }

object IEnumerator.Current => Current;

public bool MoveNext()
{
while (_currentNode is not null)
{
LinkedListNode<NodeBucketItem>? node = _items.Last;
DateTime utcNow = DateTime.UtcNow;
while (node is not null)
Current = _currentNode.Value;
_currentNode = _currentNode.Previous;
if (Current.IsBonded(_referenceTime))
{
if (!node.Value.IsBonded(utcNow))
{
break;
}

yield return node.Value;
node = node.Previous;
return true;
}
}

Current = null!;
return false;
}

public void Reset() => throw new NotSupportedException();
benaadams marked this conversation as resolved.
Show resolved Hide resolved

public void Dispose()
{
if (_nodeBucket is not null)
{
Monitor.Exit(_nodeBucket._nodeBucketLock);
}
_nodeBucket = null!;
}
public BondedItemsEnumerator GetEnumerator() => this;

IEnumerator<NodeBucketItem> IEnumerable<NodeBucketItem>.GetEnumerator()
=> GetEnumerator();

IEnumerator IEnumerable.GetEnumerator()
=> GetEnumerator();
}

public int BondedItemsCount
Expand Down
143 changes: 122 additions & 21 deletions src/Nethermind/Nethermind.Network.Discovery/RoutingTable/NodeTable.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System.Collections;
using System.Runtime.InteropServices;
using Nethermind.Core.Crypto;
using Nethermind.Logging;
using Nethermind.Network.Config;
using Nethermind.Stats.Model;
using static Nethermind.Network.Discovery.RoutingTable.NodeBucket;

namespace Nethermind.Network.Discovery.RoutingTable;

Expand Down Expand Up @@ -73,42 +76,140 @@ public void RefreshNode(Node node)
bucket.RefreshNode(node);
}

public IEnumerable<Node> GetClosestNodes()
public ClosestNodesEnumerator GetClosestNodes()
{
int count = 0;
int bucketSize = _discoveryConfig.BucketSize;
return new ClosestNodesEnumerator(Buckets, _discoveryConfig.BucketSize);
}

public struct ClosestNodesEnumerator : IEnumerator<Node>, IEnumerable<Node>
{
private readonly NodeBucket[] _buckets;
private readonly int _bucketSize;
private BondedItemsEnumerator _itemEnumerator;
private bool _enumeratorSet;
private int _bucketIndex;
private int _count;

public ClosestNodesEnumerator(NodeBucket[] buckets, int bucketSize)
{
_buckets = buckets;
_bucketSize = bucketSize;
Current = null!;
_bucketIndex = -1;
_count = 0;
}

public Node Current { get; private set; }

foreach (NodeBucket nodeBucket in Buckets)
object IEnumerator.Current => Current;

public bool MoveNext()
{
foreach (NodeBucketItem nodeBucketItem in nodeBucket.BondedItems)
try
{
if (count < bucketSize)
while (_count < _bucketSize)
{
count++;
if (nodeBucketItem.Node is not null)
if (!_enumeratorSet || !_itemEnumerator.MoveNext())
{
yield return nodeBucketItem.Node;
_itemEnumerator.Dispose();
_bucketIndex++;
if (_bucketIndex >= _buckets.Length)
{
return false;
}

_itemEnumerator = _buckets[_bucketIndex].BondedItems.GetEnumerator();
_enumeratorSet = true;
continue;
}

Current = _itemEnumerator.Current.Node!;
_count++;
return true;
}
else
{
yield break;
}
}
finally
{
_itemEnumerator.Dispose();
}

return false;
}

public void Reset() => throw new NotSupportedException();

public void Dispose() { }

public ClosestNodesEnumerator GetEnumerator() => this;

IEnumerator<Node> IEnumerable<Node>.GetEnumerator() => this;

IEnumerator IEnumerable.GetEnumerator() => this;
}

public IEnumerable<Node> GetClosestNodes(byte[] nodeId)
public ClosestNodesFromNodeEnumerator GetClosestNodes(byte[] nodeId)
{
return GetClosestNodes(nodeId, _discoveryConfig.BucketSize);
}

public ClosestNodesFromNodeEnumerator GetClosestNodes(byte[] nodeId, int bucketSize)
{
CheckInitialization();
return new ClosestNodesFromNodeEnumerator(Buckets, nodeId, _nodeDistanceCalculator, Math.Min(bucketSize, _discoveryConfig.BucketSize));
}

public struct ClosestNodesFromNodeEnumerator : IEnumerator<Node>, IEnumerable<Node>
{
private readonly List<Node> _sortedNodes;
benaadams marked this conversation as resolved.
Show resolved Hide resolved
private int _currentIndex;

public ClosestNodesFromNodeEnumerator(NodeBucket[] buckets, byte[] targetNodeId, INodeDistanceCalculator calculator, int bucketSize)
{
_sortedNodes = new List<Node>();
Hash256 idHash = Keccak.Compute(targetNodeId);
foreach (var bucket in buckets)
{
foreach (var item in bucket.BondedItems)
{
if (item.Node != null && item.Node.IdHash != idHash)
{
_sortedNodes.Add(item.Node);
}
}
}

_sortedNodes.Sort((a, b) => calculator.CalculateDistance(a.Id.Bytes, targetNodeId).CompareTo(calculator.CalculateDistance(b.Id.Bytes, targetNodeId)));
if (_sortedNodes.Count > bucketSize)
{
CollectionsMarshal.SetCount(_sortedNodes, bucketSize);
}

_currentIndex = -1;
}

public readonly int Count => _sortedNodes.Count;

public Node Current => _sortedNodes[_currentIndex];

object IEnumerator.Current => Current;

public bool MoveNext()
{
if (_currentIndex + 1 < _sortedNodes.Count)
{
_currentIndex++;
return true;
}
return false;
}

public void Reset() => throw new NotSupportedException();
public void Dispose() { }

public ClosestNodesFromNodeEnumerator GetEnumerator() => this;
IEnumerator<Node> IEnumerable<Node>.GetEnumerator() => this;

Hash256 idHash = Keccak.Compute(nodeId);
return Buckets.SelectMany(x => x.BondedItems)
.Where(x => x.Node?.IdHash != idHash && x.Node is not null)
.Select(x => new { x.Node, Distance = _nodeDistanceCalculator.CalculateDistance(x.Node!.Id.Bytes, nodeId) })
.OrderBy(x => x.Distance)
.Take(_discoveryConfig.BucketSize)
.Select(x => x.Node!);
IEnumerator IEnumerable.GetEnumerator() => this;
}

public void Initialize(PublicKey masterNodeKey)
Expand Down