Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not cache local txs in PeerInfo #3813

Merged
merged 7 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DotNetty.Common.Utilities;
Expand Down Expand Up @@ -223,6 +224,9 @@ public void SendNewTransaction(Transaction tx)
SendMessage(new[]{tx});
}

public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) =>
SendNewTransactions(txs.Select(t => t.Tx));

public virtual void SendNewTransactions(IEnumerable<Transaction> txs)
{
const int maxCapacity = 256;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority)

public PublicKey Id => Node.Id;

public void SendNewTransactions(IEnumerable<Transaction> txs)
public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs)
{
throw new NotImplementedException();
}
Expand Down Expand Up @@ -1001,7 +1001,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority)

public PublicKey Id => Node.Id;

public void SendNewTransactions(IEnumerable<Transaction> txs)
public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs)
{
throw new NotImplementedException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority)

public PublicKey Id => Node.Id;

public void SendNewTransactions(IEnumerable<Transaction> txs)
public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs)
{
throw new NotImplementedException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority)

public PublicKey Id => Node.Id;

public void SendNewTransactions(IEnumerable<Transaction> txs)
public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs)
{
throw new NotImplementedException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void HintNewBlock(Keccak blockHash, long number)

public PublicKey Id => Node.Id;

public void SendNewTransactions(IEnumerable<Transaction> txs) { }
public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) { }

public Task<TxReceipt[][]> GetReceipts(IReadOnlyList<Keccak> blockHash, CancellationToken token)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority)

public PublicKey Id => Node.Id;

public void SendNewTransactions(IEnumerable<Transaction> txs) { }
public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) { }

public Task<TxReceipt[][]> GetReceipts(IReadOnlyList<Keccak> blockHash, CancellationToken token)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority)

public PublicKey Id => Node.Id;

public void SendNewTransactions(IEnumerable<Transaction> txs) { }
public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs) { }

public Task<TxReceipt[][]> GetReceipts(IReadOnlyList<Keccak> blockHash, CancellationToken token)
{
Expand Down
12 changes: 6 additions & 6 deletions src/Nethermind/Nethermind.TxPool.Test/TxBroadcasterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ public void should_pick_best_persistent_txs_to_broadcast(int threshold)
.SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeys[i])
.TestObject;

_broadcaster.StartBroadcast(transactions[i]);
_broadcaster.Broadcast(transactions[i], true);
}

_broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount);

ITxPoolPeer txPoolPeer = Substitute.For<ITxPoolPeer>();
List<Transaction> pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, ArraySegment<Transaction>.Empty).ToList();
List<Transaction> pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, Array.Empty<Transaction>()).Select(t => t.Tx).ToList();

int expectedCount = threshold <= 0 ? 0 : Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount);
pickedTxs.Count.Should().Be(expectedCount);
Expand Down Expand Up @@ -137,13 +137,13 @@ public void should_not_pick_txs_with_GasPrice_lower_than_CurrentBaseFee(int thre
.SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeys[i])
.TestObject;

_broadcaster.StartBroadcast(transactions[i]);
_broadcaster.Broadcast(transactions[i], true);
}

_broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount);

ITxPoolPeer txPoolPeer = Substitute.For<ITxPoolPeer>();
List<Transaction> pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, ArraySegment<Transaction>.Empty).ToList();
List<Transaction> pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, Array.Empty<Transaction>()).Select(t => t.Tx).ToList();

int expectedCount = threshold <= 0 ? 0 : Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount - currentBaseFeeInGwei);
pickedTxs.Count.Should().Be(expectedCount);
Expand Down Expand Up @@ -190,13 +190,13 @@ public void should_not_pick_1559_txs_with_MaxFeePerGas_lower_than_CurrentBaseFee
.SignedAndResolved(_ethereumEcdsa, TestItem.PrivateKeys[i])
.TestObject;

_broadcaster.StartBroadcast(transactions[i]);
_broadcaster.Broadcast(transactions[i], true);
}

_broadcaster.GetSnapshot().Length.Should().Be(addedTxsCount);

ITxPoolPeer txPoolPeer = Substitute.For<ITxPoolPeer>();
List<Transaction> pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, ArraySegment<Transaction>.Empty).ToList();
List<Transaction> pickedTxs = _broadcaster.GetTxsToSend(txPoolPeer, Array.Empty<Transaction>()).Select(t => t.Tx).ToList();

int expectedCount = threshold <= 0 ? 0 : Math.Min(addedTxsCount * threshold / 100 + 1, addedTxsCount - currentBaseFeeInGwei);
pickedTxs.Count.Should().Be(expectedCount);
Expand Down
4 changes: 3 additions & 1 deletion src/Nethermind/Nethermind.TxPool/ITxPoolPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
//

using System.Collections.Generic;
using System.Linq;
using Nethermind.Core;
using Nethermind.Core.Crypto;

Expand All @@ -26,6 +27,7 @@ public interface ITxPoolPeer
public PublicKey Id { get; }
public string Enode => string.Empty;
void SendNewTransaction(Transaction tx) => SendNewTransactions(new[]{tx});
void SendNewTransactions(IEnumerable<Transaction> txs);
void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs);
void SendNewTransactions(IEnumerable<Transaction> txs) => SendNewTransactions(txs.Select(t => (t, false)));
}
}
17 changes: 7 additions & 10 deletions src/Nethermind/Nethermind.TxPool/PeerInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal class PeerInfo : ITxPoolPeer
{
private ITxPoolPeer Peer { get; }

private LruKeyCache<Keccak> NotifiedTransactions { get; } = new(2 * MemoryAllowance.MemPoolSize, "notifiedTransactions");
private LruKeyCache<Keccak> NotifiedTransactions { get; } = new(MemoryAllowance.MemPoolSize, "notifiedTransactions");

public PeerInfo(ITxPoolPeer peer)
{
Expand All @@ -37,22 +37,19 @@ public PeerInfo(ITxPoolPeer peer)

public void SendNewTransaction(Transaction tx)
{
if (NotifiedTransactions.Set(tx.Hash))
{
Peer.SendNewTransaction(tx);
}
Peer.SendNewTransaction(tx);
}

public void SendNewTransactions(IEnumerable<Transaction> txs)
public void SendNewTransactions(IEnumerable<(Transaction Tx, bool IsPersistent)> txs)
{
Peer.SendNewTransactions(GetTxsToSendAndMarkAsNotified(txs));
}

private IEnumerable<Transaction> GetTxsToSendAndMarkAsNotified(IEnumerable<Transaction> txs)
private IEnumerable<Transaction> GetTxsToSendAndMarkAsNotified(IEnumerable<(Transaction Tx, bool IsPersistent)> txs)
{
foreach (Transaction tx in txs)
foreach ((Transaction tx, bool isPersistent) in txs)
{
if (NotifiedTransactions.Set(tx.Hash))
if (isPersistent || NotifiedTransactions.Set(tx.Hash))
marcindsobczak marked this conversation as resolved.
Show resolved Hide resolved
{
yield return tx;
}
Expand Down
30 changes: 23 additions & 7 deletions src/Nethermind/Nethermind.TxPool/TxBroadcaster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Nethermind.Core;
using Nethermind.Core.Crypto;
Expand Down Expand Up @@ -95,13 +96,25 @@ public TxBroadcaster(IComparer<Transaction> comparer,

internal Transaction[] GetSnapshot() => _persistentTxs.GetSnapshot();

public void StartBroadcast(Transaction tx)
public void Broadcast(Transaction tx, bool isPersistent)
{
if (isPersistent)
{
StartBroadcast(tx);
}
else
{
BroadcastOnce(tx);
}
}

private void StartBroadcast(Transaction tx)
{
NotifyPeersAboutLocalTx(tx);
_persistentTxs.TryInsert(tx.Hash, tx);
}
public void BroadcastOnce(Transaction tx)

private void BroadcastOnce(Transaction tx)
{
_accumulatedTemporaryTxs.Add(tx);
}
Expand Down Expand Up @@ -155,7 +168,7 @@ void NotifyPeers()
_timer.Enabled = true;
}

internal IEnumerable<Transaction> GetTxsToSend(ITxPoolPeer peer, IEnumerable<Transaction> txsToSend)
internal IEnumerable<(Transaction Tx, bool IsPersistent)> GetTxsToSend(ITxPoolPeer peer, IEnumerable<Transaction> txsToSend)
{
if (_txPoolConfig.PeerNotificationThreshold > 0)
{
Expand All @@ -173,7 +186,7 @@ internal IEnumerable<Transaction> GetTxsToSend(ITxPoolPeer peer, IEnumerable<Tra
if (tx.MaxFeePerGas >= _headInfo.CurrentBaseFee)
{
numberOfPersistentTxsToBroadcast--;
yield return tx;
yield return (tx, true);
}
}
else
Expand All @@ -187,12 +200,15 @@ internal IEnumerable<Transaction> GetTxsToSend(ITxPoolPeer peer, IEnumerable<Tra
{
if (tx.DeliveredBy is null || !tx.DeliveredBy.Equals(peer.Id))
{
yield return tx;
yield return (tx, false);
}
}
}

private void Notify(ITxPoolPeer peer, IEnumerable<Transaction> txs)
private void Notify(ITxPoolPeer peer, IEnumerable<Transaction> txs) =>
Notify(peer, txs.Select(t => (t, false)));

private void Notify(ITxPoolPeer peer, IEnumerable<(Transaction Tx, bool IsPersistent)> txs)
{
try
{
Expand Down
9 changes: 1 addition & 8 deletions src/Nethermind/Nethermind.TxPool/TxPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,7 @@ private AcceptTxResult AddCore(Transaction tx, bool isPersistentBroadcast)
}
}

if (isPersistentBroadcast)
{
_broadcaster.StartBroadcast(tx);
}
else
{
_broadcaster.BroadcastOnce(tx);
}
_broadcaster.Broadcast(tx, isPersistentBroadcast);

_hashCache.SetLongTerm(tx.Hash!);
NewPending?.Invoke(this, new TxEventArgs(tx));
Expand Down