Skip to content

Commit

Permalink
Move prewarming as early as possible (#8001)
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams authored Jan 6, 2025
1 parent ddc7a95 commit b9ed309
Showing 1 changed file with 48 additions and 22 deletions.
70 changes: 48 additions & 22 deletions src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using Nethermind.Consensus.Withdrawals;
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
using Nethermind.Core.Specs;
using Nethermind.Core.Threading;
using Nethermind.Crypto;
Expand Down Expand Up @@ -85,28 +86,38 @@ public Block[] Process(Hash256 newBranchStateRoot, List<Block> suggestedBlocks,
{
if (suggestedBlocks.Count == 0) return [];

TxHashCalculator.CalculateInBackground(suggestedBlocks);
BlocksProcessing?.Invoke(this, new BlocksProcessingEventArgs(suggestedBlocks));

/* We need to save the snapshot state root before reorganization in case the new branch has invalid blocks.
In case of invalid blocks on the new branch we will discard the entire branch and come back to
the previous head state.*/
Hash256 previousBranchStateRoot = CreateCheckpoint();
InitBranch(newBranchStateRoot);

Block suggestedBlock = suggestedBlocks[0];
// Start prewarming as early as possible
WaitForCacheClear();
(CancellationTokenSource? prewarmCancellation, Task? preWarmTask)
= PreWarmTransactions(suggestedBlock, newBranchStateRoot);

BlocksProcessing?.Invoke(this, new BlocksProcessingEventArgs(suggestedBlocks));

Hash256 preBlockStateRoot = newBranchStateRoot;

bool notReadOnly = !options.ContainsFlag(ProcessingOptions.ReadOnlyChain);
int blocksCount = suggestedBlocks.Count;
Block[] processedBlocks = new Block[blocksCount];

Task? preWarmTask = null;
try
{
for (int i = 0; i < blocksCount; i++)
{
preWarmTask = null;
WaitForCacheClear();
Block suggestedBlock = suggestedBlocks[i];
suggestedBlock = suggestedBlocks[i];
// If prewarmCancellation is not null it means we are in first iteration of loop
// and started prewarming at method entry, so don't start it again
if (prewarmCancellation is null)
{
(prewarmCancellation, preWarmTask) = PreWarmTransactions(suggestedBlock, preBlockStateRoot);
}

if (blocksCount > 64 && i % 8 == 0)
{
if (_logger.IsInfo) _logger.Info($"Processing part of a long blocks branch {i}/{blocksCount}. Block: {suggestedBlock}");
Expand All @@ -120,23 +131,20 @@ the previous head state.*/
Block processedBlock;
TxReceipt[] receipts;

bool skipPrewarming = preWarmer is null || suggestedBlock.Transactions.Length < 3;
if (!skipPrewarming)
if (prewarmCancellation is not null)
{
using CancellationTokenSource cancellationTokenSource = new();
preWarmTask = preWarmer.PreWarmCaches(suggestedBlock, preBlockStateRoot, _specProvider.GetSpec(suggestedBlock.Header), cancellationTokenSource.Token, _beaconBlockRootHandler);
(processedBlock, receipts) = ProcessOne(suggestedBlock, options, blockTracer);
// Block is processed, we can cancel the prewarm task
cancellationTokenSource.Cancel();
CancellationTokenExtensions.CancelDisposeAndClear(ref prewarmCancellation);
}
else
{
// Even though we skip prewarming we still need to ensure the caches are cleared
CacheType result = preWarmer?.ClearCaches() ?? default;
if (result != default)
{
if (_logger.IsWarn) _logger.Warn($"Low txs, caches {result} are not empty. Clearing them.");
}
// Even though we skip prewarming we still need to ensure the caches are cleared
(processedBlock, receipts) = ProcessOne(suggestedBlock, options, blockTracer);
}

Expand Down Expand Up @@ -168,7 +176,13 @@ the previous head state.*/
preBlockStateRoot = processedBlock.StateRoot;
// Make sure the prewarm task is finished before we reset the state
preWarmTask?.GetAwaiter().GetResult();
preWarmTask = null;
_stateProvider.Reset(resizeCollections: true);

// Calculate the transaction hashes in the background and release tx sequence memory
// Hashes will be required for PersistentReceiptStorage in ForkchoiceUpdatedHandler
// Though we still want to release the memory even if syncing rather than processing live
TxHashCalculator.CalculateInBackground(suggestedBlock);
}

if (options.ContainsFlag(ProcessingOptions.DoNotUpdateHead))
Expand All @@ -181,13 +195,28 @@ the previous head state.*/
catch (Exception ex) // try to restore at all cost
{
if (_logger.IsWarn) _logger.Warn($"Encountered exception {ex} while processing blocks.");
CancellationTokenExtensions.CancelDisposeAndClear(ref prewarmCancellation);
QueueClearCaches(preWarmTask);
preWarmTask?.GetAwaiter().GetResult();
RestoreBranch(previousBranchStateRoot);
throw;
}
}

private (CancellationTokenSource prewarmCancellation, Task preWarmTask) PreWarmTransactions(Block suggestedBlock, Hash256 preBlockStateRoot)
{
if (preWarmer is null || suggestedBlock.Transactions.Length < 3) return (null, null);

CancellationTokenSource prewarmCancellation = new();
Task preWarmTask = preWarmer.PreWarmCaches(suggestedBlock,
preBlockStateRoot,
_specProvider.GetSpec(suggestedBlock.Header),
prewarmCancellation.Token,
_beaconBlockRootHandler);

return (prewarmCancellation, preWarmTask);
}

private void WaitForCacheClear() => _clearTask.GetAwaiter().GetResult();

private void QueueClearCaches(Task? preWarmTask)
Expand Down Expand Up @@ -472,27 +501,24 @@ void ApplyTransition()
}
}

private class TxHashCalculator(List<Block> suggestedBlocks) : IThreadPoolWorkItem
private class TxHashCalculator(Block suggestedBlock) : IThreadPoolWorkItem
{
public static void CalculateInBackground(List<Block> suggestedBlocks)
public static void CalculateInBackground(Block suggestedBlock)
{
// Memory has been reserved on the transactions to delay calculate the hashes
// We calculate the hashes in the background to release that memory
ThreadPool.UnsafeQueueUserWorkItem(new TxHashCalculator(suggestedBlocks), preferLocal: false);
ThreadPool.UnsafeQueueUserWorkItem(new TxHashCalculator(suggestedBlock), preferLocal: false);
}

void IThreadPoolWorkItem.Execute()
{
// Hashes will be required for PersistentReceiptStorage in UpdateMainChain ForkchoiceUpdatedHandler
// Which occurs after the block has been processed; however the block is stored in cache and picked up
// from there so we can calculate the hashes now for that later use.
foreach (Block block in CollectionsMarshal.AsSpan(suggestedBlocks))
foreach (Transaction tx in suggestedBlock.Transactions)
{
foreach (Transaction tx in block.Transactions)
{
// Calculate the hashes to release the memory from the transactionSequence
tx.CalculateHashInternal();
}
// Calculate the hashes to release the memory from the transactionSequence
tx.CalculateHashInternal();
}
}
}
Expand Down

0 comments on commit b9ed309

Please sign in to comment.