Skip to content

Commit

Permalink
Fix unbounded memory usage with era imported node during old heders (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
asdacap authored Dec 20, 2024
1 parent 695420e commit d7d62a7
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,7 @@ public interface ISyncConfig : IConfig

[ConfigItem(Description = "_Technical._ Max distance between best suggested header and available state to assume state is synced.", DefaultValue = "0", HiddenFromDocs = true)]
int HeaderStateDistance { get; set; }

[ConfigItem(Description = "_Technical._ Memory budget for in memory dependencies of fast headers.", DefaultValue = "0", HiddenFromDocs = true)]
ulong FastHeadersMemoryBudget { get; set; }
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only
using Nethermind.Config;
using Nethermind.Core.Extensions;
using Nethermind.Db;

namespace Nethermind.Blockchain.Synchronization
Expand Down Expand Up @@ -80,6 +81,8 @@ public string? PivotHash
/// </summary>
public int HeaderStateDistance { get; set; } = 0;

public ulong FastHeadersMemoryBudget { get; set; } = (ulong)128.MB();

public override string ToString()
{
return
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Init/MemoryHintMan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private void AssignFastBlocksMemory(ISyncConfig syncConfig)
FastBlocksMemory = Math.Min(1.GB(), (long)(0.1 * _remainingMemory));
}

Synchronization.MemoryAllowance.FastBlocksMemory = (ulong)FastBlocksMemory;
syncConfig.FastHeadersMemoryBudget = (ulong)FastBlocksMemory;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,39 @@ void FillBatch(HeadersSyncBatch batch)
localBlockTree.LowestInsertedHeader?.Number.Should().Be(0);
}

[Test]
public async Task Limits_persisted_headers_dependency()
{
var peerChain = CachedBlockTreeBuilder.OfLength(1000);
var pivotHeader = peerChain.FindHeader(700)!;
var syncConfig = new TestSyncConfig
{
FastSync = true,
PivotNumber = pivotHeader.Number.ToString(),
PivotHash = pivotHeader.Hash!.ToString(),
PivotTotalDifficulty = pivotHeader.TotalDifficulty.ToString()!,
FastHeadersMemoryBudget = (ulong)100.KB(),
};

IBlockTree localBlockTree = Build.A.BlockTree(peerChain.FindBlock(0, BlockTreeLookupOptions.None)!, null).WithSyncConfig(syncConfig).TestObject;

// Insert some chain
for (int i = 300; i < 600; i++)
{
localBlockTree.Insert(peerChain.FindHeader(i)!).Should().Be(AddBlockResult.Added);
}

ISyncPeerPool syncPeerPool = Substitute.For<ISyncPeerPool>();
ISyncReport report = Substitute.For<ISyncReport>();
report.HeadersInQueue.Returns(new MeasuredProgress());
report.FastBlocksHeaders.Returns(new MeasuredProgress());
using HeadersSyncFeed feed = new(localBlockTree, syncPeerPool, syncConfig, report, new TestLogManager(LogLevel.Trace));
feed.InitializeFeed();

(await feed.PrepareRequest()).Should().NotBe(null);
(await feed.PrepareRequest()).Should().Be(null);
}

[Test]
public async Task Will_never_lose_batch_on_invalid_batch()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class HeadersSyncFeed : ActivatedSyncFeed<HeadersSyncBatch?>
private readonly Lock _handlerLock = new();

private readonly int _headersRequestSize = GethSyncLimits.MaxHeaderFetch;
private readonly ulong _fastHeadersMemoryBudget;
protected long _lowestRequestedHeaderNumber;

protected Hash256 _nextHeaderHash;
Expand Down Expand Up @@ -67,7 +68,6 @@ public class HeadersSyncFeed : ActivatedSyncFeed<HeadersSyncBatch?>
/// </summary>
private readonly ReaderWriterLockSlim _resetLock = new();

private IEnumerator<KeyValuePair<long, HeadersSyncBatch>>? _enumerator;
private ulong _memoryEstimate;
private long _headersEstimate;

Expand Down Expand Up @@ -104,19 +104,14 @@ private long HeadersInQueue
private long CalculateHeadersInQueue()
{
// Reuse the enumerator
var enumerator = Interlocked.Exchange(ref _enumerator, null) ?? _dependencies.GetEnumerator();
using var enumerator = _dependencies.GetEnumerator();

long count = 0;
while (enumerator.MoveNext())
{
count += enumerator.Current.Value.Response?.Count ?? 0;
}

// Stop gap method to reduce allocations from non-struct enumerator
// https://github.com/dotnet/runtime/pull/38296
enumerator.Reset();
_enumerator = enumerator;

return count;
}

Expand All @@ -138,19 +133,14 @@ private ulong MemoryInQueue
private ulong CalculateMemoryInQueue()
{
// Reuse the enumerator
var enumerator = Interlocked.Exchange(ref _enumerator, null) ?? _dependencies.GetEnumerator();
using var enumerator = _dependencies.GetEnumerator();

ulong amount = 0;
while (enumerator.MoveNext())
{
amount += (ulong)enumerator.Current.Value?.ResponseSizeEstimate;
}

// Stop gap method to reduce allocations from non-struct enumerator
// https://github.com/dotnet/runtime/pull/38296
enumerator.Reset();
_enumerator = enumerator;

return amount;
}

Expand All @@ -169,6 +159,7 @@ public HeadersSyncFeed(
_syncConfig = syncConfig ?? throw new ArgumentNullException(nameof(syncConfig));
_logger = logManager?.GetClassLogger<HeadersSyncFeed>() ?? throw new ArgumentNullException(nameof(HeadersSyncFeed));
_totalDifficultyStrategy = totalDifficultyStrategy ?? new CumulativeTotalDifficultyStrategy();
_fastHeadersMemoryBudget = syncConfig.FastHeadersMemoryBudget;

if (!_syncConfig.UseGethLimitsInFastBlocks)
{
Expand Down Expand Up @@ -229,7 +220,7 @@ private bool ShouldBuildANewBatch()

bool noBatchesLeft = AllHeadersDownloaded
|| destinationHeaderRequested
|| MemoryInQueue >= MemoryAllowance.FastBlocksMemory
|| MemoryInQueue >= _fastHeadersMemoryBudget
|| isImmediateSync && AnyHeaderDownloaded;

if (noBatchesLeft)
Expand Down Expand Up @@ -512,15 +503,18 @@ private void EnqueueBatch(HeadersSyncBatch batch, bool skipPersisted = false)
}

headers.AsSpan().Reverse();

using HeadersSyncBatch newBatchToProcess = new HeadersSyncBatch();
newBatchToProcess.StartNumber = lastHeader.Number;
newBatchToProcess.RequestSize = headers.Count;
newBatchToProcess.Response = headers;
if (_logger.IsDebug) _logger.Debug($"Handling header portion {newBatchToProcess.StartNumber} to {newBatchToProcess.EndNumber} with persisted headers.");
InsertHeaders(newBatchToProcess);

int newRequestSize = batch.RequestSize - headers.Count;
if (headers.Count > 0)
{
using HeadersSyncBatch newBatchToProcess = new HeadersSyncBatch();
newBatchToProcess.StartNumber = lastHeader.Number;
newBatchToProcess.RequestSize = headers.Count;
newBatchToProcess.Response = headers;
if (_logger.IsDebug) _logger.Debug($"Handling header portion {newBatchToProcess.StartNumber} to {newBatchToProcess.EndNumber} with persisted headers.");
InsertHeaders(newBatchToProcess);
MarkDirty();
HeadersSyncQueueReport.Update(HeadersInQueue);
}

if (newRequestSize == 0) return null;

Expand Down
12 changes: 0 additions & 12 deletions src/Nethermind/Nethermind.Synchronization/MemoryAllowance.cs

This file was deleted.

0 comments on commit d7d62a7

Please sign in to comment.