From 8d56e0ab5b2106b94536ed8b3b340a9aec5be891 Mon Sep 17 00:00:00 2001 From: Marcin Sobczak Date: Wed, 2 Feb 2022 14:57:32 +0100 Subject: [PATCH 1/6] changes --- .../TxBroadcasterTests.cs | 213 ++++++++++++++++++ .../Nethermind.TxPool.Test/TxPoolTests.cs | 77 +++++++ .../Collections/SortedPool.cs | 45 ++++ .../Nethermind.TxPool/ITxPoolPeer.cs | 2 + src/Nethermind/Nethermind.TxPool/PeerInfo.cs | 18 +- .../Nethermind.TxPool/TxBroadcaster.cs | 77 +++++-- src/Nethermind/Nethermind.TxPool/TxPool.cs | 20 +- 7 files changed, 420 insertions(+), 32 deletions(-) create mode 100644 src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs diff --git a/src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs b/src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs new file mode 100644 index 00000000000..a914d6a2848 --- /dev/null +++ b/src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs @@ -0,0 +1,213 @@ +// Copyright (c) 2022 Demerzel Solutions Limited +// This file is part of the Nethermind library. +// +// The Nethermind library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Nethermind library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Nethermind. If not, see . +// + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using FluentAssertions; +using Nethermind.Blockchain; +using Nethermind.Blockchain.Comparers; +using Nethermind.Core; +using Nethermind.Core.Extensions; +using Nethermind.Core.Specs; +using Nethermind.Core.Test.Builders; +using Nethermind.Core.Timers; +using Nethermind.Crypto; +using Nethermind.Logging; +using Nethermind.Specs; +using NSubstitute; +using NUnit.Framework; + +[assembly: InternalsVisibleTo("Nethermind.Blockchain.Test")] + +namespace Nethermind.TxPool.Test; + +[TestFixture] +public class TxBroadcasterTests +{ + private ILogManager _logManager; + private ISpecProvider _specProvider; + private IBlockTree _blockTree; + private IComparer _comparer; + private TxBroadcaster _broadcaster; + private EthereumEcdsa _ethereumEcdsa; + private TxPoolConfig _txPoolConfig; + private IChainHeadInfoProvider _headInfo; + + [SetUp] + public void Setup() + { + _logManager = LimboLogs.Instance; + _specProvider = RopstenSpecProvider.Instance; + _ethereumEcdsa = new EthereumEcdsa(_specProvider.ChainId, _logManager); + _blockTree = Substitute.For(); + _comparer = new TransactionComparerProvider(_specProvider, _blockTree).GetDefaultComparer(); + _txPoolConfig = new TxPoolConfig(); + _headInfo = Substitute.For(); + } + + [TestCase(0)] + [TestCase(1)] + [TestCase(2)] + [TestCase(99)] + [TestCase(100)] + [TestCase(101)] + [TestCase(1000)] + [TestCase(-10)] + public void should_pick_best_persistent_txs_to_broadcast(int threshold) + { + _txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = threshold }; + _broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager); + _headInfo.CurrentBaseFee.Returns(0.GWei()); + + int addedTxsCount = TestItem.PrivateKeys.Length; + Transaction[] transactions = new Transaction[addedTxsCount]; + + for (int i = 0; i < addedTxsCount; i++) + { + transactions[i] = Build.A.Transaction + .WithGasPrice(i.GWei()) + .SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeys[i]) + .TestObject; + + _broadcaster.Broadcast(transactions[i], true); + } + + _broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount); + + ITxPoolPeer txPoolPeer = Substitute.For(); + List pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, ArraySegment.Empty).Select(t => t.Tx).ToList(); + + int expectedCount = threshold <= 0 ? 0 : Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount); + pickedTxs.Count.Should().Be(expectedCount); + + List expectedTxs = new(); + + for (int i = 1; i <= expectedCount; i++) + { + expectedTxs.Add(transactions[addedTxsCount - i]); + } + + expectedTxs.Should().BeEquivalentTo(pickedTxs); + } + + [TestCase(0)] + [TestCase(1)] + [TestCase(2)] + [TestCase(99)] + [TestCase(100)] + [TestCase(101)] + [TestCase(1000)] + [TestCase(-10)] + public void should_not_pick_txs_with_GasPrice_lower_than_CurrentBaseFee(int threshold) + { + _txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = threshold }; + _broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager); + + const int currentBaseFeeInGwei = 250; + _headInfo.CurrentBaseFee.Returns(currentBaseFeeInGwei.GWei()); + Block headBlock = Build.A.Block + .WithNumber(RopstenSpecProvider.LondonBlockNumber) + .WithBaseFeePerGas(currentBaseFeeInGwei.GWei()) + .TestObject; + _blockTree.Head.Returns(headBlock); + + int addedTxsCount = TestItem.PrivateKeys.Length; + Transaction[] transactions = new Transaction[addedTxsCount]; + + for (int i = 0; i < addedTxsCount; i++) + { + transactions[i] = Build.A.Transaction + .WithGasPrice(i.GWei()) + .SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeys[i]) + .TestObject; + + _broadcaster.Broadcast(transactions[i], true); + } + + _broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount); + + ITxPoolPeer txPoolPeer = Substitute.For(); + List pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, ArraySegment.Empty).Select(t => t.Tx).ToList(); + + int expectedCount = threshold <= 0 ? 0 : Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount - currentBaseFeeInGwei); + pickedTxs.Count.Should().Be(expectedCount); + + List expectedTxs = new(); + + for (int i = 1; i <= expectedCount; i++) + { + expectedTxs.Add(transactions[addedTxsCount - i]); + } + + expectedTxs.Should().BeEquivalentTo(pickedTxs); + } + + [TestCase(0)] + [TestCase(1)] + [TestCase(2)] + [TestCase(99)] + [TestCase(100)] + [TestCase(101)] + [TestCase(1000)] + [TestCase(-10)] + public void should_not_pick_1559_txs_with_MaxFeePerGas_lower_than_CurrentBaseFee(int threshold) + { + _txPoolConfig = new TxPoolConfig() { PeerNotificationThreshold = threshold }; + _broadcaster = new TxBroadcaster(_comparer, TimerFactory.Default, _txPoolConfig, _headInfo, _logManager); + + const int currentBaseFeeInGwei = 250; + _headInfo.CurrentBaseFee.Returns(currentBaseFeeInGwei.GWei()); + Block headBlock = Build.A.Block + .WithNumber(RopstenSpecProvider.LondonBlockNumber) + .WithBaseFeePerGas(currentBaseFeeInGwei.GWei()) + .TestObject; + _blockTree.Head.Returns(headBlock); + + int addedTxsCount = TestItem.PrivateKeys.Length; + Transaction[] transactions = new Transaction[addedTxsCount]; + + for (int i = 0; i < addedTxsCount; i++) + { + transactions[i] = Build.A.Transaction + .WithType(TxType.EIP1559) + .WithMaxFeePerGas(i.GWei()) + .SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeys[i]) + .TestObject; + + _broadcaster.Broadcast(transactions[i], true); + } + + _broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount); + + ITxPoolPeer txPoolPeer = Substitute.For(); + List pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, ArraySegment.Empty).Select(t => t.Tx).ToList(); + + int expectedCount = threshold <= 0 ? 0 : Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount - currentBaseFeeInGwei); + pickedTxs.Count.Should().Be(expectedCount); + + List expectedTxs = new(); + + for (int i = 1; i <= expectedCount; i++) + { + expectedTxs.Add(transactions[addedTxsCount - i]); + } + + expectedTxs.Should().BeEquivalentTo(pickedTxs, o => o.Excluding(transaction => transaction.MaxFeePerGas)); + } +} diff --git a/src/Nethermind/Nethermind.TxPool.Test/TxPoolTests.cs b/src/Nethermind/Nethermind.TxPool.Test/TxPoolTests.cs index f514713766d..e000f50e625 100644 --- a/src/Nethermind/Nethermind.TxPool.Test/TxPoolTests.cs +++ b/src/Nethermind/Nethermind.TxPool.Test/TxPoolTests.cs @@ -701,6 +701,83 @@ public void should_not_broadcast_own_transactions_that_faded_out_and_came_back() Assert.AreEqual(0, _txPool.GetOwnPendingTransactions().Length); } + [TestCase(1, 0)] + [TestCase(2, 0)] + [TestCase(2, 1)] + [TestCase(10, 0)] + [TestCase(10, 1)] + [TestCase(10, 5)] + [TestCase(10, 8)] + [TestCase(10, 9)] + public void should_remove_stale_txs_from_persistent_transactions(int numberOfTxs, int nonceIncludedInBlock) + { + _txPool = CreatePool(); + + Transaction[] transactions = new Transaction[numberOfTxs]; + EnsureSenderBalance(TestItem.AddressA, UInt256.MaxValue); + + for (int i = 0; i < numberOfTxs; i++) + { + transactions[i] = Build.A.Transaction + .WithNonce((UInt256)i) + .WithGasLimit(GasCostOf.Transaction) + .WithGasPrice(10.GWei()) + .SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeyA) + .TestObject; + _txPool.SubmitTx(transactions[i], TxHandlingOptions.PersistentBroadcast); + } + _txPool.GetOwnPendingTransactions().Length.Should().Be(numberOfTxs); + + Block block = Build.A.Block.WithTransactions(transactions[nonceIncludedInBlock]).TestObject; + BlockReplacementEventArgs blockReplacementEventArgs = new(block, null); + + ManualResetEvent manualResetEvent = new(false); + _txPool.RemoveTransaction(Arg.Do(t => manualResetEvent.Set())); + _blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockReplacementEventArgs); + manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(200)); + + // transactions[nonceIncludedInBlock] was included in the block and should be removed, as well as all lower nonces. + _txPool.GetOwnPendingTransactions().Length.Should().Be(numberOfTxs - nonceIncludedInBlock - 1); + } + + [Test] + public void broadcaster_should_work_well_when_there_are_no_txs_in_persistent_txs_from_sender_of_tx_included_in_block() + { + _txPool = CreatePool(); + + Transaction transactionA = Build.A.Transaction + .WithNonce(0) + .WithGasLimit(GasCostOf.Transaction) + .WithGasPrice(10.GWei()) + .SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeyA) + .TestObject; + EnsureSenderBalance(transactionA); + _txPool.SubmitTx(transactionA, TxHandlingOptions.None); + + Transaction transactionB = Build.A.Transaction + .WithNonce(0) + .WithGasLimit(GasCostOf.Transaction) + .WithGasPrice(10.GWei()) + .SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeyB) + .TestObject; + EnsureSenderBalance(transactionB); + _txPool.SubmitTx(transactionB, TxHandlingOptions.PersistentBroadcast); + + _txPool.GetPendingTransactions().Length.Should().Be(2); + _txPool.GetOwnPendingTransactions().Length.Should().Be(1); + + Block block = Build.A.Block.WithTransactions(transactionA).TestObject; + BlockReplacementEventArgs blockReplacementEventArgs = new(block, null); + + ManualResetEvent manualResetEvent = new(false); + _txPool.RemoveTransaction(Arg.Do(t => manualResetEvent.Set())); + _blockTree.BlockAddedToMain += Raise.EventWith(new object(), blockReplacementEventArgs); + manualResetEvent.WaitOne(TimeSpan.FromMilliseconds(200)); + + _txPool.GetPendingTransactions().Length.Should().Be(1); + _txPool.GetOwnPendingTransactions().Length.Should().Be(1); + } + [Test] public async Task should_remove_transactions_concurrently() { diff --git a/src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs b/src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs index 7360392d24f..21b245cf210 100644 --- a/src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs +++ b/src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs @@ -145,6 +145,21 @@ public bool TryTakeFirst(out TValue first) return TryRemove(GetKey(sortedValues.Min), out first); } + /// + /// Returns best element of each bucket in supplied comparer order. + /// + [MethodImpl(MethodImplOptions.Synchronized)] + public IEnumerable GetFirsts() + { + SortedSet sortedValues = new(_sortedComparer); + foreach (KeyValuePair> bucket in _buckets) + { + sortedValues.Add(bucket.Value.Max!); + } + + return sortedValues; + } + /// /// Gets last element in supplied comparer order. /// @@ -210,6 +225,36 @@ private bool TryRemove(TKey key, bool evicted, out TValue value, out ICollection [MethodImpl(MethodImplOptions.Synchronized)] public bool TryRemove(TKey key) => TryRemove(key, out _, out _); + /// + /// Tries to get elements matching predicated criteria, iterating through SortedSet with break on first mismatch. + /// + /// Given GroupKey, which elements are checked. + /// Predicated criteria. + /// Elements matching predicated criteria. + [MethodImpl(MethodImplOptions.Synchronized)] + public IEnumerable TakeWhile(TGroupKey groupKey, Predicate where) + { + if (_buckets.TryGetValue(groupKey, out SortedSet? bucket)) + { + using SortedSet.Enumerator enumerator = bucket!.GetEnumerator(); + List? list = null; + + while (enumerator.MoveNext()) + { + if (!where(enumerator.Current)) + { + break; + } + + list ??= new List(); + list.Add(enumerator.Current); + } + + return list ?? Enumerable.Empty(); + } + return Enumerable.Empty(); + } + /// /// Tries to get element. /// diff --git a/src/Nethermind/Nethermind.TxPool/ITxPoolPeer.cs b/src/Nethermind/Nethermind.TxPool/ITxPoolPeer.cs index 4dab3391ee0..9befd00f342 100644 --- a/src/Nethermind/Nethermind.TxPool/ITxPoolPeer.cs +++ b/src/Nethermind/Nethermind.TxPool/ITxPoolPeer.cs @@ -16,6 +16,7 @@ // using System.Collections.Generic; +using System.Linq; using Nethermind.Core; using Nethermind.Core.Crypto; @@ -26,6 +27,7 @@ public interface ITxPoolPeer public PublicKey Id { get; } public string Enode => string.Empty; void SendNewTransaction(Transaction tx) => SendNewTransactions(new[]{tx}); + void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) => SendNewTransactions(txs.Select(t => t.Tx)); void SendNewTransactions(IEnumerable txs); } } diff --git a/src/Nethermind/Nethermind.TxPool/PeerInfo.cs b/src/Nethermind/Nethermind.TxPool/PeerInfo.cs index 177be50d654..1e4d8b83c9f 100644 --- a/src/Nethermind/Nethermind.TxPool/PeerInfo.cs +++ b/src/Nethermind/Nethermind.TxPool/PeerInfo.cs @@ -16,6 +16,7 @@ // using System.Collections.Generic; +using System.Linq; using Nethermind.Core; using Nethermind.Core.Caching; using Nethermind.Core.Crypto; @@ -37,21 +38,24 @@ public PeerInfo(ITxPoolPeer peer) public void SendNewTransaction(Transaction tx) { - if (NotifiedTransactions.Set(tx.Hash)) - { - Peer.SendNewTransaction(tx); - } + Peer.SendNewTransaction(tx); } - public void SendNewTransactions(IEnumerable txs) + public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) { Peer.SendNewTransactions(GetTxsToSendAndMarkAsNotified(txs)); } - private IEnumerable GetTxsToSendAndMarkAsNotified(IEnumerable txs) + public void SendNewTransactions(IEnumerable txs) => SendNewTransactions(txs.Select(t => (t, false))); + + private IEnumerable GetTxsToSendAndMarkAsNotified(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) { - foreach (Transaction tx in txs) + foreach ((Transaction tx, bool isPersistent) in txs) { + if (isPersistent) + { + yield return tx; + } if (NotifiedTransactions.Set(tx.Hash)) { yield return tx; diff --git a/src/Nethermind/Nethermind.TxPool/TxBroadcaster.cs b/src/Nethermind/Nethermind.TxPool/TxBroadcaster.cs index f35c8199892..59f0ea91e7f 100644 --- a/src/Nethermind/Nethermind.TxPool/TxBroadcaster.cs +++ b/src/Nethermind/Nethermind.TxPool/TxBroadcaster.cs @@ -18,10 +18,12 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Linq; using System.Threading; using Nethermind.Core; using Nethermind.Core.Crypto; using Nethermind.Core.Timers; +using Nethermind.Int256; using Nethermind.Logging; using Nethermind.TxPool.Collections; @@ -33,6 +35,7 @@ namespace Nethermind.TxPool internal class TxBroadcaster : IDisposable { private readonly ITxPoolConfig _txPoolConfig; + private readonly IChainHeadInfoProvider _headInfo; /// /// Notification threshold randomizer seed @@ -75,9 +78,11 @@ internal class TxBroadcaster : IDisposable public TxBroadcaster(IComparer comparer, ITimerFactory timerFactory, ITxPoolConfig txPoolConfig, + IChainHeadInfoProvider chainHeadInfoProvider, ILogManager? logManager) { _txPoolConfig = txPoolConfig; + _headInfo = chainHeadInfoProvider; _logger = logManager?.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager)); _persistentTxs = new TxDistinctSortedPool(MemoryAllowance.MemPoolSize, comparer, logManager); _accumulatedTemporaryTxs = new ConcurrentBag(); @@ -91,13 +96,25 @@ public TxBroadcaster(IComparer comparer, internal Transaction[] GetSnapshot() => _persistentTxs.GetSnapshot(); - public void StartBroadcast(Transaction tx) + public void Broadcast(Transaction tx, bool isPersistent) + { + if (isPersistent) + { + StartBroadcast(tx); + } + else + { + BroadcastOnce(tx); + } + } + + private void StartBroadcast(Transaction tx) { NotifyPeersAboutLocalTx(tx); _persistentTxs.TryInsert(tx.Hash, tx); } - - public void BroadcastOnce(Transaction tx) + + private void BroadcastOnce(Transaction tx) { _accumulatedTemporaryTxs.Add(tx); } @@ -111,11 +128,22 @@ public void StopBroadcast(Keccak txHash) { if (_persistentTxs.Count != 0) { - bool wasIncluded = _persistentTxs.TryRemove(txHash, out Transaction _); - if (wasIncluded) + bool hasBeenRemoved = _persistentTxs.TryRemove(txHash, out Transaction _); + if (hasBeenRemoved) { if (_logger.IsTrace) _logger.Trace( - $"Transaction {txHash} removed from broadcaster after block inclusion"); + $"Transaction {txHash} removed from broadcaster"); + } + } + } + + public void EnsureStopBroadcastUpToNonce(Address address, UInt256 nonce) + { + if (_persistentTxs.Count != 0) + { + foreach (Transaction tx in _persistentTxs.TakeWhile(address, t => t.Nonce <= nonce)) + { + StopBroadcast(tx.Hash!); } } } @@ -124,15 +152,13 @@ private void TimerOnElapsed(object sender, EventArgs args) { void NotifyPeers() { - Transaction[] persistentTxs = _persistentTxs.GetSnapshot(); - _txsToSend = Interlocked.Exchange(ref _accumulatedTemporaryTxs, _txsToSend); if (_logger.IsDebug) _logger.Debug($"Broadcasting transactions to all peers"); foreach ((_, ITxPoolPeer peer) in _peers) { - Notify(peer, GetTxsToSend(peer, persistentTxs, _txsToSend)); + Notify(peer, GetTxsToSend(peer, _txsToSend)); } _txsToSend.Clear(); @@ -142,13 +168,31 @@ void NotifyPeers() _timer.Enabled = true; } - private IEnumerable GetTxsToSend(ITxPoolPeer peer, IReadOnlyList persistentTxs, IEnumerable txsToSend) + internal IEnumerable<(Transaction Tx, bool IsPersistent)> GetTxsToSend(ITxPoolPeer peer, IEnumerable txsToSend) { - for (int i = 0; i < persistentTxs.Count; i++) + if (_txPoolConfig.PeerNotificationThreshold > 0) { - if (_txPoolConfig.PeerNotificationThreshold >= Random.Value.Next(1, 100)) + // PeerNotificationThreshold is a declared in config percent of transactions in persistent broadcast, + // which will be sent when timer elapse. numberOfPersistentTxsToBroadcast is equal to + // PeerNotificationThreshold multiplication by number of transactions in persistent broadcast, rounded up. + int numberOfPersistentTxsToBroadcast = + Math.Min(_txPoolConfig.PeerNotificationThreshold * _persistentTxs.Count / 100 + 1, + _persistentTxs.Count); + + foreach (Transaction tx in _persistentTxs.GetFirsts()) { - yield return persistentTxs[i]; + if (numberOfPersistentTxsToBroadcast > 0) + { + if (tx.MaxFeePerGas >= _headInfo.CurrentBaseFee) + { + numberOfPersistentTxsToBroadcast--; + yield return (tx, true); + } + } + else + { + break; + } } } @@ -156,12 +200,15 @@ private IEnumerable GetTxsToSend(ITxPoolPeer peer, IReadOnlyList txs) + private void Notify(ITxPoolPeer peer, IEnumerable txs) => + Notify(peer, txs.Select(t => (t, false))); + + private void Notify(ITxPoolPeer peer, IEnumerable<(Transaction Tx, bool IsPersistent)> txs) { try { diff --git a/src/Nethermind/Nethermind.TxPool/TxPool.cs b/src/Nethermind/Nethermind.TxPool/TxPool.cs index 392064a3fac..24fad307cba 100644 --- a/src/Nethermind/Nethermind.TxPool/TxPool.cs +++ b/src/Nethermind/Nethermind.TxPool/TxPool.cs @@ -104,7 +104,7 @@ public TxPool( AddNodeInfoEntryForTxPool(); _transactions = new TxDistinctSortedPool(MemoryAllowance.MemPoolSize, comparer, logManager); - _broadcaster = new TxBroadcaster(comparer, TimerFactory.Default, txPoolConfig, logManager); + _broadcaster = new TxBroadcaster(comparer, TimerFactory.Default, txPoolConfig, chainHeadInfoProvider, logManager); _headInfo.HeadChanged += OnHeadChange; @@ -212,7 +212,7 @@ private void RemoveProcessedTransactions(IReadOnlyList blockTransac discoveredForHashCache++; } - if (!RemoveTransaction(txHash)) + if (!RemoveIncludedTransaction(blockTransactions[i])) { discoveredForPendingTxs++; } @@ -231,6 +231,13 @@ private void RemoveProcessedTransactions(IReadOnlyList blockTransac } } + private bool RemoveIncludedTransaction(Transaction tx) + { + bool removed = RemoveTransaction(tx.Hash); + _broadcaster.EnsureStopBroadcastUpToNonce(tx.SenderAddress!, tx.Nonce); + return removed; + } + public void AddPeer(ITxPoolPeer peer) { PeerInfo peerInfo = new(peer); @@ -313,14 +320,7 @@ private AcceptTxResult AddCore(Transaction tx, bool isPersistentBroadcast) } } - if (isPersistentBroadcast) - { - _broadcaster.StartBroadcast(tx); - } - else - { - _broadcaster.BroadcastOnce(tx); - } + _broadcaster.Broadcast(tx, isPersistentBroadcast); _hashCache.SetLongTerm(tx.Hash!); NewPending?.Invoke(this, new TxEventArgs(tx)); From 90c1e54e5743f45b2c947fe2ee3e8709e7442b7d Mon Sep 17 00:00:00 2001 From: Marcin Sobczak Date: Wed, 2 Feb 2022 15:14:48 +0100 Subject: [PATCH 2/6] simplification in PeerInfo --- src/Nethermind/Nethermind.TxPool/PeerInfo.cs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/Nethermind/Nethermind.TxPool/PeerInfo.cs b/src/Nethermind/Nethermind.TxPool/PeerInfo.cs index 1e4d8b83c9f..c06003d84ca 100644 --- a/src/Nethermind/Nethermind.TxPool/PeerInfo.cs +++ b/src/Nethermind/Nethermind.TxPool/PeerInfo.cs @@ -52,11 +52,7 @@ private IEnumerable GetTxsToSendAndMarkAsNotified(IEnumerable<(Tran { foreach ((Transaction tx, bool isPersistent) in txs) { - if (isPersistent) - { - yield return tx; - } - if (NotifiedTransactions.Set(tx.Hash)) + if (isPersistent || NotifiedTransactions.Set(tx.Hash)) { yield return tx; } From 71b31b13cdbc843b21d847829208134483134549 Mon Sep 17 00:00:00 2001 From: Marcin Sobczak Date: Wed, 2 Feb 2022 15:30:40 +0100 Subject: [PATCH 3/6] reverse dependency in ITxPoolPeer --- .../P2P/ProtocolHandlers/SyncPeerProtocolHandlerBase.cs | 4 ++++ .../Nethermind.Synchronization.Test/BlockDownloaderTests.cs | 4 ++-- .../FastSync/StateSyncFeedTests.cs | 2 +- .../Nethermind.Synchronization.Test/LatencySyncPeerMock.cs | 2 +- .../Nethermind.Synchronization.Test/SyncPeerMock.cs | 2 +- .../Nethermind.Synchronization.Test/SyncPeerPoolTests.cs | 2 +- .../Nethermind.Synchronization.Test/SynchronizerTests.cs | 2 +- src/Nethermind/Nethermind.TxPool/ITxPoolPeer.cs | 4 ++-- src/Nethermind/Nethermind.TxPool/PeerInfo.cs | 4 +--- 9 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/Nethermind/Nethermind.Network/P2P/ProtocolHandlers/SyncPeerProtocolHandlerBase.cs b/src/Nethermind/Nethermind.Network/P2P/ProtocolHandlers/SyncPeerProtocolHandlerBase.cs index c45c0708080..5860585f5ab 100644 --- a/src/Nethermind/Nethermind.Network/P2P/ProtocolHandlers/SyncPeerProtocolHandlerBase.cs +++ b/src/Nethermind/Nethermind.Network/P2P/ProtocolHandlers/SyncPeerProtocolHandlerBase.cs @@ -17,6 +17,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using System.Threading; using System.Threading.Tasks; using DotNetty.Common.Utilities; @@ -224,6 +225,9 @@ public void SendNewTransaction(Transaction tx) SendMessage(new[]{tx}); } + public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) => + SendNewTransactions(txs.Select(t => t.Tx)); + public virtual void SendNewTransactions(IEnumerable txs) { const int maxCapacity = 256; diff --git a/src/Nethermind/Nethermind.Synchronization.Test/BlockDownloaderTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/BlockDownloaderTests.cs index f283d285ff9..1feae95c3f0 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/BlockDownloaderTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/BlockDownloaderTests.cs @@ -567,7 +567,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority) public PublicKey Id => Node.Id; - public void SendNewTransactions(IEnumerable txs) + public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) { throw new NotImplementedException(); } @@ -1001,7 +1001,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority) public PublicKey Id => Node.Id; - public void SendNewTransactions(IEnumerable txs) + public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) { throw new NotImplementedException(); } diff --git a/src/Nethermind/Nethermind.Synchronization.Test/FastSync/StateSyncFeedTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/FastSync/StateSyncFeedTests.cs index 4eac2ed4d65..f41aa95bffa 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/FastSync/StateSyncFeedTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/FastSync/StateSyncFeedTests.cs @@ -240,7 +240,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority) public PublicKey Id => Node.Id; - public void SendNewTransactions(IEnumerable txs) + public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) { throw new NotImplementedException(); } diff --git a/src/Nethermind/Nethermind.Synchronization.Test/LatencySyncPeerMock.cs b/src/Nethermind/Nethermind.Synchronization.Test/LatencySyncPeerMock.cs index 5896b7f93ef..bb8322ceb8d 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/LatencySyncPeerMock.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/LatencySyncPeerMock.cs @@ -99,7 +99,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority) public PublicKey Id => Node.Id; - public void SendNewTransactions(IEnumerable txs) + public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) { throw new NotImplementedException(); } diff --git a/src/Nethermind/Nethermind.Synchronization.Test/SyncPeerMock.cs b/src/Nethermind/Nethermind.Synchronization.Test/SyncPeerMock.cs index 02d69d42ff5..7485cd6aa28 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/SyncPeerMock.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/SyncPeerMock.cs @@ -166,7 +166,7 @@ public void HintNewBlock(Keccak blockHash, long number) public PublicKey Id => Node.Id; - public void SendNewTransactions(IEnumerable txs) { } + public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) { } public Task GetReceipts(IReadOnlyList blockHash, CancellationToken token) { diff --git a/src/Nethermind/Nethermind.Synchronization.Test/SyncPeerPoolTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/SyncPeerPoolTests.cs index f0ebbf875e7..43e688271b3 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/SyncPeerPoolTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/SyncPeerPoolTests.cs @@ -117,7 +117,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority) public PublicKey Id => Node.Id; - public void SendNewTransactions(IEnumerable txs) { } + public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) { } public Task GetReceipts(IReadOnlyList blockHash, CancellationToken token) { diff --git a/src/Nethermind/Nethermind.Synchronization.Test/SynchronizerTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/SynchronizerTests.cs index 6c029390686..311e55f74b2 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/SynchronizerTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/SynchronizerTests.cs @@ -193,7 +193,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority) public PublicKey Id => Node.Id; - public void SendNewTransactions(IEnumerable txs) { } + public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) { } public Task GetReceipts(IReadOnlyList blockHash, CancellationToken token) { diff --git a/src/Nethermind/Nethermind.TxPool/ITxPoolPeer.cs b/src/Nethermind/Nethermind.TxPool/ITxPoolPeer.cs index 9befd00f342..828e4ce756e 100644 --- a/src/Nethermind/Nethermind.TxPool/ITxPoolPeer.cs +++ b/src/Nethermind/Nethermind.TxPool/ITxPoolPeer.cs @@ -27,7 +27,7 @@ public interface ITxPoolPeer public PublicKey Id { get; } public string Enode => string.Empty; void SendNewTransaction(Transaction tx) => SendNewTransactions(new[]{tx}); - void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) => SendNewTransactions(txs.Select(t => t.Tx)); - void SendNewTransactions(IEnumerable txs); + void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs); + void SendNewTransactions(IEnumerable txs) => SendNewTransactions(txs.Select(t => (t, false))); } } diff --git a/src/Nethermind/Nethermind.TxPool/PeerInfo.cs b/src/Nethermind/Nethermind.TxPool/PeerInfo.cs index c06003d84ca..a08dc13112b 100644 --- a/src/Nethermind/Nethermind.TxPool/PeerInfo.cs +++ b/src/Nethermind/Nethermind.TxPool/PeerInfo.cs @@ -45,9 +45,7 @@ public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> { Peer.SendNewTransactions(GetTxsToSendAndMarkAsNotified(txs)); } - - public void SendNewTransactions(IEnumerable txs) => SendNewTransactions(txs.Select(t => (t, false))); - + private IEnumerable GetTxsToSendAndMarkAsNotified(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) { foreach ((Transaction tx, bool isPersistent) in txs) From b26e2ee50212bd43037089f0cc5fc65d0387ac00 Mon Sep 17 00:00:00 2001 From: Marcin Sobczak Date: Wed, 2 Feb 2022 16:08:15 +0100 Subject: [PATCH 4/6] cosmetic --- src/Nethermind/Nethermind.TxPool/PeerInfo.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Nethermind/Nethermind.TxPool/PeerInfo.cs b/src/Nethermind/Nethermind.TxPool/PeerInfo.cs index a08dc13112b..3a7d39cc098 100644 --- a/src/Nethermind/Nethermind.TxPool/PeerInfo.cs +++ b/src/Nethermind/Nethermind.TxPool/PeerInfo.cs @@ -16,7 +16,6 @@ // using System.Collections.Generic; -using System.Linq; using Nethermind.Core; using Nethermind.Core.Caching; using Nethermind.Core.Crypto; From 55d000a0da5de116e6998b48e142917575ab0c82 Mon Sep 17 00:00:00 2001 From: Marcin Sobczak Date: Tue, 15 Feb 2022 13:05:33 +0100 Subject: [PATCH 5/6] ArraySegment.Empty -> Array.Empty --- src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs b/src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs index a914d6a2848..6a40fa2d7d4 100644 --- a/src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs +++ b/src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs @@ -91,7 +91,7 @@ public void should_pick_best_persistent_txs_to_broadcast(int threshold) _broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount); ITxPoolPeer txPoolPeer = Substitute.For(); - List pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, ArraySegment.Empty).Select(t => t.Tx).ToList(); + List pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, Array.Empty()).Select(t => t.Tx).ToList(); int expectedCount = threshold <= 0 ? 0 : Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount); pickedTxs.Count.Should().Be(expectedCount); @@ -143,7 +143,7 @@ public void should_not_pick_txs_with_GasPrice_lower_than_CurrentBaseFee(int thre _broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount); ITxPoolPeer txPoolPeer = Substitute.For(); - List pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, ArraySegment.Empty).Select(t => t.Tx).ToList(); + List pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, Array.Empty()).Select(t => t.Tx).ToList(); int expectedCount = threshold <= 0 ? 0 : Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount - currentBaseFeeInGwei); pickedTxs.Count.Should().Be(expectedCount); @@ -196,7 +196,7 @@ public void should_not_pick_1559_txs_with_MaxFeePerGas_lower_than_CurrentBaseFee _broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount); ITxPoolPeer txPoolPeer = Substitute.For(); - List pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, ArraySegment.Empty).Select(t => t.Tx).ToList(); + List pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, Array.Empty()).Select(t => t.Tx).ToList(); int expectedCount = threshold <= 0 ? 0 : Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount - currentBaseFeeInGwei); pickedTxs.Count.Should().Be(expectedCount); From 8fe076ec286fa813b797469a486a5171594bd363 Mon Sep 17 00:00:00 2001 From: Marcin Sobczak Date: Tue, 15 Feb 2022 13:19:18 +0100 Subject: [PATCH 6/6] decrease notified transactions cache from 2x to 1x MemPoolSize --- src/Nethermind/Nethermind.TxPool/PeerInfo.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Nethermind/Nethermind.TxPool/PeerInfo.cs b/src/Nethermind/Nethermind.TxPool/PeerInfo.cs index 3a7d39cc098..bfbc2a02cfd 100644 --- a/src/Nethermind/Nethermind.TxPool/PeerInfo.cs +++ b/src/Nethermind/Nethermind.TxPool/PeerInfo.cs @@ -26,7 +26,7 @@ internal class PeerInfo : ITxPoolPeer { private ITxPoolPeer Peer { get; } - private LruKeyCache NotifiedTransactions { get; } = new(2 * MemoryAllowance.MemPoolSize, "notifiedTransactions"); + private LruKeyCache NotifiedTransactions { get; } = new(MemoryAllowance.MemPoolSize, "notifiedTransactions"); public PeerInfo(ITxPoolPeer peer) {