Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't resend tx hashes that were sent to us #5449

Merged
merged 7 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public void Bundle(Block head)
SenderAddress = tx.SenderAddress,
Signature = tx.Signature,
Hash = tx.Hash,
DeliveredBy = tx.DeliveredBy,
Timestamp = tx.Timestamp,
AccessList = tx.AccessList,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,14 +398,11 @@ private void FireProcessingQueueEmpty()

public bool IsProcessingBlocks(ulong? maxProcessingInterval)
{
if (_processorTask is null || _recoveryTask is null || _processorTask.IsCompleted ||
_recoveryTask.IsCompleted)
if (_processorTask is null || _recoveryTask is null || _processorTask.IsCompleted || _recoveryTask.IsCompleted)
return false;

if (maxProcessingInterval is not null)
return _lastProcessedBlock.AddSeconds(maxProcessingInterval.Value) > DateTime.UtcNow;
else // user does not setup interval and we cannot set interval time based on chainspec
return true;
// user does not setup interval and we cannot set interval time based on chainspec
return maxProcessingInterval is null || _lastProcessedBlock.AddSeconds(maxProcessingInterval.Value) > DateTime.UtcNow;
}

private void TraceFailingBranch(ProcessingBranch processingBranch, ProcessingOptions options, IBlockTracer blockTracer, DumpOptions dumpType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,6 @@ public TransactionBuilder<T> SignedAndResolved(PrivateKey? privateKey = null)
return this;
}

public TransactionBuilder<T> DeliveredBy(PublicKey publicKey)
{
TestObjectInternal.DeliveredBy = publicKey;
return this;
}

protected override void BeforeReturn()
{
base.BeforeReturn();
Expand Down
1 change: 0 additions & 1 deletion src/Nethermind/Nethermind.Core/Transaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ public void ClearPreHash()
_preHash = default;
}

public PublicKey? DeliveredBy { get; set; } // tks: this is added so we do not send the pending tx back to original sources, not used yet
public UInt256 Timestamp { get; set; }

public int DataLength => Data?.Length ?? 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
using Nethermind.Core.Crypto;
using Nethermind.Core.Test.Builders;
using Nethermind.Core.Timers;
using Nethermind.Crypto;
using Nethermind.Int256;
using Nethermind.Logging;
using Nethermind.Network.P2P;
using Nethermind.Network.P2P.EventArg;
Expand Down Expand Up @@ -459,7 +461,7 @@ public void should_send_txs_with_size_up_to_MaxPacketSize_in_one_TransactionsMes

for (int i = 0; i < txCount; i++)
{
txs[i] = Build.A.Transaction.SignedAndResolved().TestObject;
txs[i] = Build.A.Transaction.SignedAndResolved(Build.A.PrivateKey.TestObject).TestObject;
}

_handler.SendNewTransactions(txs);
Expand Down Expand Up @@ -491,7 +493,7 @@ public void should_send_txs_with_size_exceeding_MaxPacketSize_in_more_than_one_T

for (int i = 0; i < txCount; i++)
{
txs[i] = Build.A.Transaction.SignedAndResolved().TestObject;
txs[i] = Build.A.Transaction.SignedAndResolved(Build.A.PrivateKey.TestObject).TestObject;
}

_handler.SendNewTransactions(txs);
Expand All @@ -508,19 +510,20 @@ public void should_send_txs_with_size_exceeding_MaxPacketSize_in_more_than_one_T
public void should_send_single_transaction_even_if_exceed_MaxPacketSize(int dataSize)
{
int txCount = 512; //we will try to send 512 txs
Transaction tx = Build.A.Transaction.WithData(new byte[dataSize]).SignedAndResolved().TestObject;
int sizeOfOneTx = tx.GetLength(new TxDecoder());
int numberOfTxsInOneMsg = Math.Max(TransactionsMessage.MaxPacketSize / sizeOfOneTx, 1);
int nonFullMsgTxsCount = txCount % numberOfTxsInOneMsg;
int messagesCount = txCount / numberOfTxsInOneMsg + (nonFullMsgTxsCount > 0 ? 1 : 0);

Transaction[] txs = new Transaction[txCount];

for (int i = 0; i < txCount; i++)
{
txs[i] = tx;
txs[i] = Build.A.Transaction.WithData(new byte[dataSize]).SignedAndResolved(Build.A.PrivateKey.TestObject).TestObject;
}

Transaction tx = txs[0];
int sizeOfOneTx = tx.GetLength(new TxDecoder());
int numberOfTxsInOneMsg = Math.Max(TransactionsMessage.MaxPacketSize / sizeOfOneTx, 1);
int nonFullMsgTxsCount = txCount % numberOfTxsInOneMsg;
int messagesCount = txCount / numberOfTxsInOneMsg + (nonFullMsgTxsCount > 0 ? 1 : 0);

_handler.SendNewTransactions(txs);

_session.Received(messagesCount).DeliverMessage(Arg.Is<TransactionsMessage>(m => m.Transactions.Count == numberOfTxsInOneMsg || m.Transactions.Count == nonFullMsgTxsCount));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Nethermind.Core.Specs;
using Nethermind.Core.Test.Builders;
using Nethermind.Core.Timers;
using Nethermind.Int256;
using Nethermind.Logging;
using Nethermind.Network.P2P;
using Nethermind.Network.P2P.Subprotocols.Eth.V62.Messages;
Expand Down Expand Up @@ -99,7 +100,7 @@ public void should_send_up_to_MaxCount_hashes_in_one_NewPooledTransactionHashesM

for (int i = 0; i < txCount; i++)
{
txs[i] = Build.A.Transaction.SignedAndResolved().TestObject;
txs[i] = Build.A.Transaction.WithNonce((UInt256)i).SignedAndResolved().TestObject;
}

_handler.SendNewTransactions(txs, false);
Expand All @@ -119,7 +120,7 @@ public void should_send_more_than_MaxCount_hashes_in_more_than_one_NewPooledTran

for (int i = 0; i < txCount; i++)
{
txs[i] = Build.A.Transaction.SignedAndResolved().TestObject;
txs[i] = Build.A.Transaction.WithNonce((UInt256)i).SignedAndResolved().TestObject;
}

_handler.SendNewTransactions(txs, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Nethermind.Core.Specs;
using Nethermind.Core.Test.Builders;
using Nethermind.Core.Timers;
using Nethermind.Int256;
using Nethermind.Logging;
using Nethermind.Network.P2P;
using Nethermind.Network.P2P.Subprotocols;
Expand Down Expand Up @@ -161,7 +162,7 @@ public void should_send_up_to_MaxCount_hashes_in_one_NewPooledTransactionHashesM

for (int i = 0; i < txCount; i++)
{
txs[i] = Build.A.Transaction.SignedAndResolved().TestObject;
txs[i] = Build.A.Transaction.WithNonce((UInt256)i).SignedAndResolved().TestObject;
}

_handler.SendNewTransactions(txs, false);
Expand All @@ -181,7 +182,7 @@ public void should_send_more_than_MaxCount_hashes_in_more_than_one_NewPooledTran

for (int i = 0; i < txCount; i++)
{
txs[i] = Build.A.Transaction.SignedAndResolved().TestObject;
txs[i] = Build.A.Transaction.WithNonce((UInt256)i).SignedAndResolved().TestObject;
}

_handler.SendNewTransactions(txs, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Nethermind.Blockchain;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Core;
using Nethermind.Core.Caching;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
Expand All @@ -23,6 +24,7 @@
using Nethermind.Stats.Model;
using Nethermind.Synchronization;
using Nethermind.TxPool;
using MemoryAllowance = Nethermind.TxPool.MemoryAllowance;

namespace Nethermind.Network.P2P.ProtocolHandlers
{
Expand Down Expand Up @@ -51,6 +53,7 @@ public abstract class SyncPeerProtocolHandlerBase : ZeroProtocolHandlerBase, ISy

protected readonly MessageQueue<GetBlockHeadersMessage, BlockHeader[]> _headersRequests;
protected readonly MessageQueue<GetBlockBodiesMessage, BlockBody[]> _bodiesRequests;
protected LruKeyCache<Keccak> NotifiedTransactions { get; } = new(2 * MemoryAllowance.MemPoolSize, "notifiedTransactions");

protected SyncPeerProtocolHandlerBase(ISession session,
IMessageSerializationService serializer,
Expand Down Expand Up @@ -183,7 +186,23 @@ public void SendNewTransaction(Transaction tx)
SendMessage(new[] { tx });
}

public virtual void SendNewTransactions(IEnumerable<Transaction> txs, bool sendFullTx = false)
public void SendNewTransactions(IEnumerable<Transaction> txs, bool sendFullTx = false)
{
SendNewTransactionsCore(TxsToSendAndMarkAsNotified(txs, sendFullTx), sendFullTx);
}

private IEnumerable<Transaction> TxsToSendAndMarkAsNotified(IEnumerable<Transaction> txs, bool sendFullTx)
{
foreach (Transaction tx in txs)
{
if (sendFullTx || (tx.Hash != null && NotifiedTransactions.Set(tx.Hash)))
{
yield return tx;
}
}
}

protected virtual void SendNewTransactionsCore(IEnumerable<Transaction> txs, bool sendFullTx)
{
int packetSizeLeft = TransactionsMessage.MaxPacketSize;
using ArrayPoolList<Transaction> txsToSend = new(1024);
Expand Down Expand Up @@ -312,13 +331,13 @@ protected BlockBodiesMessage FulfillBlockBodiesRequest(GetBlockBodiesMessage get
return new BlockBodiesMessage(blocks);
}

protected virtual void Handle(BlockHeadersMessage message, long size)
protected void Handle(BlockHeadersMessage message, long size)
{
Metrics.Eth62BlockHeadersReceived++;
_headersRequests.Handle(message.BlockHeaders, size);
}

protected virtual void HandleBodies(BlockBodiesMessage blockBodiesMessage, long size)
protected void HandleBodies(BlockBodiesMessage blockBodiesMessage, long size)
{
Metrics.Eth62BlockBodiesReceived++;
_bodiesRequests.Handle(blockBodiesMessage.Bodies, size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,15 @@ public override void Init()
}

BlockHeader head = SyncServer.Head;
StatusMessage statusMessage = new();
statusMessage.NetworkId = SyncServer.NetworkId;
statusMessage.ProtocolVersion = ProtocolVersion;
statusMessage.TotalDifficulty = head.TotalDifficulty ?? head.Difficulty;
statusMessage.BestHash = head.Hash!;
statusMessage.GenesisHash = SyncServer.Genesis.Hash!;
StatusMessage statusMessage = new()
{
NetworkId = SyncServer.NetworkId,
ProtocolVersion = ProtocolVersion,
TotalDifficulty = head.TotalDifficulty ?? head.Difficulty,
BestHash = head.Hash!,
GenesisHash = SyncServer.Genesis.Hash!
};

EnrichStatusMessage(statusMessage);

Metrics.StatusesSent++;
Expand Down Expand Up @@ -280,8 +283,12 @@ private void HandleSlow(IList<Transaction> transactions)

private void PrepareAndSubmitTransaction(Transaction tx, bool isTrace)
{
tx.DeliveredBy = Node.Id;
tx.Timestamp = _timestamper.UnixTime.Seconds;
if (tx.Hash is not null)
{
NotifiedTransactions.Set(tx.Hash);
}

AcceptTxResult accepted = _txPool.SubmitTx(tx, TxHandlingOptions.None);
_floodController.Report(accepted);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ GetPooledTransactionsMessage getPooledTxMsg
protected virtual void Handle(NewPooledTransactionHashesMessage msg)
{
Metrics.Eth65NewPooledTransactionHashesReceived++;

AddNotifiedTransactions(msg.Hashes);

Stopwatch stopwatch = Stopwatch.StartNew();

_pooledTxsRequestor.RequestTransactions(Send, msg.Hashes);
Expand All @@ -85,6 +88,14 @@ protected virtual void Handle(NewPooledTransactionHashesMessage msg)
$"in {stopwatch.Elapsed.TotalMilliseconds}ms");
}

protected void AddNotifiedTransactions(IReadOnlyList<Keccak> hashes)
{
foreach (Keccak hash in hashes)
{
NotifiedTransactions.Set(hash);
}
}

private void Handle(GetPooledTransactionsMessage msg)
{
Metrics.Eth65GetPooledTransactionsReceived++;
Expand Down Expand Up @@ -121,11 +132,11 @@ internal PooledTransactionsMessage FulfillPooledTransactionsRequest(
return new PooledTransactionsMessage(txsToSend);
}

public override void SendNewTransactions(IEnumerable<Transaction> txs, bool sendFullTx)
protected override void SendNewTransactionsCore(IEnumerable<Transaction> txs, bool sendFullTx)
{
if (sendFullTx)
{
base.SendNewTransactions(txs, true);
base.SendNewTransactionsCore(txs, true);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,30 +73,29 @@ private void Handle(NewPooledTransactionHashesMessage68 message)
$"Hashes count: {message.Hashes.Count} " +
$"Types count: {message.Types.Count} " +
$"Sizes count: {message.Sizes.Count}";
if (isTrace)
Logger.Trace(errorMessage);
if (isTrace) Logger.Trace(errorMessage);

throw new SubprotocolException(errorMessage);
}

Metrics.Eth68NewPooledTransactionHashesReceived++;

AddNotifiedTransactions(message.Hashes);

Stopwatch? stopwatch = isTrace ? Stopwatch.StartNew() : null;

_pooledTxsRequestor.RequestTransactionsEth66(_sendAction, message.Hashes);

stopwatch?.Stop();

if (isTrace)
Logger.Trace($"OUT {Counter:D5} {nameof(NewPooledTransactionHashesMessage68)} to {Node:c} " +
$"in {stopwatch.Elapsed.TotalMilliseconds}ms");
if (isTrace) Logger.Trace($"OUT {Counter:D5} {nameof(NewPooledTransactionHashesMessage68)} to {Node:c} in {stopwatch.Elapsed.TotalMilliseconds}ms");
}

public override void SendNewTransactions(IEnumerable<Transaction> txs, bool sendFullTx)
protected override void SendNewTransactionsCore(IEnumerable<Transaction> txs, bool sendFullTx)
{
if (sendFullTx)
{
base.SendNewTransactions(txs, sendFullTx);
base.SendNewTransactionsCore(txs, sendFullTx);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Core;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Logging;
using Nethermind.Synchronization.ParallelSync;
Expand All @@ -31,8 +32,9 @@ protected override async Task Dispatch(PeerInfo peerInfo, BodiesSyncBatch batch,
batch.ResponseSourcePeer = peerInfo;
batch.MarkSent();

Keccak[]? hashes = batch.Infos.Where(i => i is not null).Select(i => i!.BlockHash).ToArray();
if (hashes.Length == 0)
using ArrayPoolList<Keccak> hashes = new(batch.Infos.Length);
hashes.AddRange(batch.Infos.Where(i => i is not null).Select(i => i!.BlockHash));
if (hashes.Count == 0)
{
if (Logger.IsDebug) Logger.Debug($"{batch} - attempted send a request with no hash.");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Core;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Logging;
using Nethermind.Synchronization.ParallelSync;
Expand All @@ -31,8 +32,9 @@ protected override async Task Dispatch(PeerInfo peerInfo, ReceiptsSyncBatch batc
batch.ResponseSourcePeer = peerInfo;
batch.MarkSent();

Keccak[]? hashes = batch.Infos.Where(i => i is not null).Select(i => i!.BlockHash).ToArray();
if (hashes.Length == 0)
using ArrayPoolList<Keccak> hashes = new(batch.Infos.Length);
hashes.AddRange(batch.Infos.Where(i => i is not null).Select(i => i!.BlockHash));
if (hashes.Count == 0)
{
if (Logger.IsDebug) Logger.Debug($"{batch} - attempted send a request with no hash.");
return;
Expand Down
1 change: 0 additions & 1 deletion src/Nethermind/Nethermind.TxPool.Test/TxPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1624,7 +1624,6 @@ private Transaction GetTransaction(UInt256 nonce, long gasLimit, UInt256 gasPric
.WithGasPrice(gasPrice)
.WithData(data)
.To(to)
.DeliveredBy(privateKey.PublicKey)
.SignedAndResolved(_ethereumEcdsa, privateKey)
.TestObject;

Expand Down
Loading