Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor persistent broadcast #3901

Merged
merged 11 commits into from
Mar 28, 2022
69 changes: 57 additions & 12 deletions src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
using Nethermind.Core.Test.Builders;
using Nethermind.Core.Timers;
using Nethermind.Crypto;
using Nethermind.Int256;
using Nethermind.Logging;
using Nethermind.Specs;
using NSubstitute;
Expand Down Expand Up @@ -61,10 +62,12 @@ public void Setup()
_headInfo = Substitute.For<IChainHeadInfoProvider>();
}

[TestCase(0)]
[TestCase(1)]
[TestCase(2)]
[TestCase(99)]
[TestCase(100)]
[TestCase(101)]
[TestCase(1000)]
[TestCase(-10)]
public void should_pick_best_persistent_txs_to_broadcast(int threshold)
{
_txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = threshold };
Expand All @@ -86,16 +89,27 @@ public void should_pick_best_persistent_txs_to_broadcast(int threshold)

_broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount);

List<Transaction> pickedTxs = _broadcaster.GetPersistentTxsToSend().ToList();
IList<Transaction> pickedTxs = _broadcaster.GetPersistentTxsToSend();

int expectedCount = threshold <= 0 ? 0 : addedTxsCount;
int expectedCount = Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount);
pickedTxs.Count.Should().Be(expectedCount);

List<Transaction> expectedTxs = new();

for (int i = 1; i <= expectedCount; i++)
{
expectedTxs.Add(transactions[addedTxsCount - i]);
}

expectedTxs.Should().BeEquivalentTo(pickedTxs);
}

[TestCase(0)]
[TestCase(1)]
[TestCase(2)]
[TestCase(99)]
[TestCase(100)]
[TestCase(101)]
[TestCase(1000)]
[TestCase(-10)]
public void should_not_pick_txs_with_GasPrice_lower_than_CurrentBaseFee(int threshold)
{
_txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = threshold };
Expand Down Expand Up @@ -124,9 +138,9 @@ public void should_not_pick_txs_with_GasPrice_lower_than_CurrentBaseFee(int thre

_broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount);

List<Transaction> pickedTxs = _broadcaster.GetPersistentTxsToSend().ToList();
IList<Transaction> pickedTxs = _broadcaster.GetPersistentTxsToSend();

int expectedCount = threshold <= 0 ? 0 : addedTxsCount - currentBaseFeeInGwei;
int expectedCount = Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount - currentBaseFeeInGwei);
pickedTxs.Count.Should().Be(expectedCount);

List<Transaction> expectedTxs = new();
Expand All @@ -139,10 +153,12 @@ public void should_not_pick_txs_with_GasPrice_lower_than_CurrentBaseFee(int thre
expectedTxs.Should().BeEquivalentTo(pickedTxs);
}

[TestCase(0)]
[TestCase(1)]
[TestCase(2)]
[TestCase(99)]
[TestCase(100)]
[TestCase(101)]
[TestCase(1000)]
[TestCase(-10)]
public void should_not_pick_1559_txs_with_MaxFeePerGas_lower_than_CurrentBaseFee(int threshold)
{
_txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = threshold };
Expand Down Expand Up @@ -172,9 +188,9 @@ public void should_not_pick_1559_txs_with_MaxFeePerGas_lower_than_CurrentBaseFee

_broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount);

List<Transaction> pickedTxs = _broadcaster.GetPersistentTxsToSend().ToList();
IList<Transaction> pickedTxs = _broadcaster.GetPersistentTxsToSend();

int expectedCount = threshold <= 0 ? 0 : addedTxsCount - currentBaseFeeInGwei;
int expectedCount = Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount - currentBaseFeeInGwei);
pickedTxs.Count.Should().Be(expectedCount);

List<Transaction> expectedTxs = new();
Expand All @@ -186,4 +202,33 @@ public void should_not_pick_1559_txs_with_MaxFeePerGas_lower_than_CurrentBaseFee

expectedTxs.Should().BeEquivalentTo(pickedTxs, o => o.Excluding(transaction => transaction.MaxFeePerGas));
}

[Test]
public void should_pick_tx_with_lowest_nonce_from_bucket()
{
_txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = 5 };
_broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager);
_headInfo.CurrentBaseFee.Returns(0.GWei());

const int addedTxsCount = 5;
Transaction[] transactions = new Transaction[addedTxsCount];

for (int i = 0; i < addedTxsCount; i++)
{
transactions[i] = Build.A.Transaction
.WithNonce((UInt256)i)
.WithGasPrice(i.GWei())
.SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeyA)
.TestObject;

_broadcaster.Broadcast(transactions[i], true);
}
_broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount);

IList<Transaction> pickedTxs = _broadcaster.GetPersistentTxsToSend();
pickedTxs.Count.Should().Be(1);

List<Transaction> expectedTxs = new() { transactions[0] };
expectedTxs.Should().BeEquivalentTo(pickedTxs);
}
}
12 changes: 11 additions & 1 deletion src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,23 @@ public int GetBucketCount(TGroupKey group)
/// </summary>
[MethodImpl(MethodImplOptions.Synchronized)]
public bool TryTakeFirst(out TValue first)
{
return TryRemove(GetKey(GetFirsts().Min), out first);
}

/// <summary>
/// Returns best element of each bucket in supplied comparer order.
/// </summary>
[MethodImpl(MethodImplOptions.Synchronized)]
public SortedSet<TValue> GetFirsts()
{
SortedSet<TValue> sortedValues = new(_sortedComparer);
foreach (KeyValuePair<TGroupKey, SortedSet<TValue>> bucket in _buckets)
{
sortedValues.Add(bucket.Value.Min!);
}
return TryRemove(GetKey(sortedValues.Min), out first);

return sortedValues;
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.TxPool/ITxPoolConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace Nethermind.TxPool
{
public interface ITxPoolConfig : IConfig
{
[ConfigItem(DefaultValue = "1", Description = "Defines average percent of tx hashes from persistent broadcast send to peer together with hashes of last added txs.")]
[ConfigItem(DefaultValue = "5", Description = "Defines average percent of tx hashes from persistent broadcast send to peer together with hashes of last added txs.")]
int PeerNotificationThreshold { get; set; }

[ConfigItem(DefaultValue = "2048", Description = "Max number of transactions held in mempool (more transactions in mempool mean more memory used")]
Expand Down
54 changes: 42 additions & 12 deletions src/Nethermind/Nethermind.TxPool/TxBroadcaster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,28 +130,58 @@ public void BroadcastOnce(ITxPoolPeer peer, Transaction[] txs)

public void BroadcastPersistentTxs()
{
if (_logger.IsDebug) _logger.Debug($"Broadcasting persistent transactions to all peers");
if (_txPoolConfig.PeerNotificationThreshold > 0)
{
IList<Transaction> persistentTxsToSend = GetPersistentTxsToSend();

foreach ((_, ITxPoolPeer peer) in _peers)
if (persistentTxsToSend.Count > 0)
{
if (_logger.IsDebug) _logger.Debug($"Broadcasting {persistentTxsToSend.Count} persistent transactions to all peers.");

foreach ((_, ITxPoolPeer peer) in _peers)
{
Notify(peer, persistentTxsToSend, true);
}
}
else
{
if (_logger.IsDebug) _logger.Debug($"There are currently no transactions able to broadcast.");
}
}
else
{
Notify(peer, GetPersistentTxsToSend(), true);
if (_logger.IsDebug) _logger.Debug($"PeerNotificationThreshold is not a positive value: {_txPoolConfig.PeerNotificationThreshold}. Skipping broadcasting persistent transactions.");
}
}

internal IEnumerable<Transaction> GetPersistentTxsToSend()
internal IList<Transaction> GetPersistentTxsToSend()
{
if (_txPoolConfig.PeerNotificationThreshold <= 0)
{
yield break;
}

foreach (Transaction tx in _persistentTxs.GetSnapshot())
// PeerNotificationThreshold is a declared in config max percent of transactions in persistent broadcast,
// which will be sent after processing of every block. numberOfPersistentTxsToBroadcast is equal to
// PeerNotificationThreshold multiplication by number of transactions in persistent broadcast, rounded up.
int numberOfPersistentTxsToBroadcast =
Math.Min(_txPoolConfig.PeerNotificationThreshold * _persistentTxs.Count / 100 + 1,
_persistentTxs.Count);

List<Transaction> persistentTxsToSend = new(numberOfPersistentTxsToBroadcast);

foreach (Transaction tx in _persistentTxs.GetFirsts())
{
if (tx.MaxFeePerGas >= _headInfo.CurrentBaseFee)
if (numberOfPersistentTxsToBroadcast > 0)
{
yield return tx;
if (tx.MaxFeePerGas >= _headInfo.CurrentBaseFee)
{
numberOfPersistentTxsToBroadcast--;
persistentTxsToSend.Add(tx);
}
}
else
{
break;
}
}

return persistentTxsToSend;
}

public void StopBroadcast(Keccak txHash)
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.TxPool/TxPoolConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace Nethermind.TxPool
{
public class TxPoolConfig : ITxPoolConfig
{
public int PeerNotificationThreshold { get; set; } = 1;
public int PeerNotificationThreshold { get; set; } = 5;
public int Size { get; set; } = 2048;
public int HashCacheSize { get; set; } = 512 * 1024;
public long? GasLimit { get; set; } = null;
Expand Down