Skip to content

Commit

Permalink
Fix/header sync still throw errors (#5119)
Browse files Browse the repository at this point in the history
* Fix heaader sync still throw error

* Optimize header sync feed

* Revert "Optimize header sync feed"

This reverts commit dd2cdea.

* Add a unit test

* Remove synchronized
  • Loading branch information
asdacap authored Jan 10, 2023
1 parent a6c442a commit 9ca61f9
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Blockchain;
Expand Down Expand Up @@ -75,7 +76,7 @@ public BeaconHeadersSyncFeed(

private Keccak ExpectedPivotHash => _pivot.PivotParentHash ?? _pivot.PivotHash ?? Keccak.Zero;

public override void InitializeFeed()
protected override void ResetPivot()
{
_chainMerged = false;

Expand Down Expand Up @@ -125,7 +126,6 @@ protected override void PostFinishCleanUp()
if (_pivotNumber != ExpectedPivotNumber)
{
// Pivot changed during the sync. Need to reset the states
PostFinishCleanUp();
InitializeFeed();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Nethermind.Blockchain;
Expand Down Expand Up @@ -85,6 +86,66 @@ void FulfillBatch(HeadersSyncBatch batch)
feed.HandleResponse(batch1);
}

[Test]
public async Task Can_reset_and_not_hang_when_a_batch_is_processing()
{
IDbProvider memDbProvider = await TestMemDbProvider.InitAsync();
BlockTree remoteBlockTree = Build.A.BlockTree().OfHeadersOnly.OfChainLength(501).TestObject;

BlockTree blockTree = new(memDbProvider.BlocksDb, memDbProvider.HeadersDb, memDbProvider.BlockInfosDb, new ChainLevelInfoRepository(memDbProvider.BlockInfosDb), MainnetSpecProvider.Instance, NullBloomStorage.Instance, LimboLogs.Instance);

ISyncReport syncReport = Substitute.For<ISyncReport>();
syncReport.FastBlocksHeaders.Returns(new MeasuredProgress());
syncReport.HeadersInQueue.Returns(new MeasuredProgress());

ManualResetEventSlim hangLatch = new(false);

BlockHeader pivot = remoteBlockTree.FindHeader(500, BlockTreeLookupOptions.None)!;
ResettableHeaderSyncFeed feed = new(
Substitute.For<ISyncModeSelector>(),
blockTree,
Substitute.For<ISyncPeerPool>(),
new SyncConfig { FastSync = true, FastBlocks = true, PivotNumber = "500", PivotHash = pivot.Hash.Bytes.ToHexString(), PivotTotalDifficulty = pivot.TotalDifficulty!.ToString() },
syncReport,
LimboLogs.Instance,
hangOnBlockNumber: 400,
hangLatch: hangLatch
);

feed.InitializeFeed();

void FulfillBatch(HeadersSyncBatch batch)
{
batch.Response = remoteBlockTree.FindHeaders(
remoteBlockTree.FindHeader(batch.StartNumber, BlockTreeLookupOptions.None)!.Hash, batch.RequestSize, 0,
false);
}

HeadersSyncBatch? batch1 = await feed.PrepareRequest();
FulfillBatch(batch1);

// Initiate a process batch which should hang in the middle
Task responseTask = Task.Factory.StartNew(() => feed.HandleResponse(batch1), TaskCreationOptions.RunContinuationsAsynchronously);
await Task.Delay(TimeSpan.FromMilliseconds(50));

// Initiate a reset at the same time. Without protection, the _nextHeaderHash would be updated here, but so do at `InsertHeader` via `HandleResponse`.
Task resetTask = Task.Factory.StartNew(() => feed.Reset(), TaskCreationOptions.RunContinuationsAsynchronously);
await Task.Delay(TimeSpan.FromMilliseconds(50));

hangLatch.Set();
await responseTask;
await resetTask;

// A new batch is creating, starting at hang block
HeadersSyncBatch? batch2 = await feed.PrepareRequest();

FulfillBatch(batch2);
feed.HandleResponse(batch2);

// The whole new batch should get processed instead of skipping due to concurrently modified _nextHeaderHash.
blockTree.LowestInsertedHeader.Number.Should().Be(batch2.StartNumber);
}

[Test]
public async Task Can_keep_returning_nulls_after_all_batches_were_prepared()
{
Expand Down Expand Up @@ -139,15 +200,39 @@ public async Task Can_resume_downloading_from_parent_of_lowest_inserted_header()

private class ResettableHeaderSyncFeed : HeadersSyncFeed
{
public ResettableHeaderSyncFeed(ISyncModeSelector syncModeSelector, IBlockTree? blockTree, ISyncPeerPool? syncPeerPool, ISyncConfig? syncConfig, ISyncReport? syncReport, ILogManager? logManager, bool alwaysStartHeaderSync = false) : base(syncModeSelector, blockTree, syncPeerPool, syncConfig, syncReport, logManager, alwaysStartHeaderSync)
private ManualResetEventSlim? _hangLatch;
private long? _hangOnBlockNumber;

public ResettableHeaderSyncFeed(
ISyncModeSelector syncModeSelector,
IBlockTree? blockTree,
ISyncPeerPool? syncPeerPool,
ISyncConfig? syncConfig,
ISyncReport? syncReport,
ILogManager? logManager,
long? hangOnBlockNumber = null,
ManualResetEventSlim? hangLatch = null,
bool alwaysStartHeaderSync = false
) : base(syncModeSelector, blockTree, syncPeerPool, syncConfig, syncReport, logManager, alwaysStartHeaderSync)
{
_hangOnBlockNumber = hangOnBlockNumber;
_hangLatch = hangLatch;
}

public void Reset()
{
base.PostFinishCleanUp();
InitializeFeed();
}

protected override AddBlockResult InsertToBlockTree(BlockHeader header)
{
if (header.Number == _hangOnBlockNumber)
{
_hangLatch.Wait();
}
return base.InsertToBlockTree(header);
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -64,6 +65,12 @@ public class HeadersSyncFeed : ActivatedSyncFeed<HeadersSyncBatch?>
private readonly ConcurrentDictionary<long, HeadersSyncBatch> _dependencies = new();
// Stop gap method to reduce allocations from non-struct enumerator
// https://github.com/dotnet/runtime/pull/38296

/// <summary>
/// Its a lock to block every processing if needed in order to reset the whole state.
/// </summary>
private readonly ReaderWriterLockSlim _resetLock = new();

private IEnumerator<KeyValuePair<long, HeadersSyncBatch>>? _enumerator;
private ulong _memoryEstimate;
private long _headersEstimate;
Expand Down Expand Up @@ -183,9 +190,23 @@ public HeadersSyncFeed(

public override void InitializeFeed()
{
_logger.Info("Feed reinitialized");
_pivotNumber = _syncConfig.PivotNumberParsed;
_resetLock.EnterWriteLock();
try
{
PostFinishCleanUp();
ResetPivot();
}
finally
{
_resetLock.ExitWriteLock();
}

base.InitializeFeed();
}

protected virtual void ResetPivot()
{
_pivotNumber = _syncConfig.PivotNumberParsed;
_lowestRequestedHeaderNumber = _pivotNumber + 1; // Because we want the pivot to be requested
_nextHeaderHash = _syncConfig.PivotHashParsed;
_nextHeaderDiff = _syncConfig.PivotTotalDifficultyParsed;
Expand All @@ -197,12 +218,8 @@ public override void InitializeFeed()
SetExpectedNextHeaderToParent(lowestInserted);
_lowestRequestedHeaderNumber = lowestInserted.Number;
}

base.InitializeFeed();
}

protected virtual bool StartingFeedCondition() => _syncConfig.FastBlocks;

protected override SyncMode ActivationSyncModes { get; }
= SyncMode.FastHeaders & ~SyncMode.FastBlocks;

Expand Down Expand Up @@ -270,34 +287,42 @@ private void HandleDependentBatches(CancellationToken cancellationToken)

public override Task<HeadersSyncBatch?> PrepareRequest(CancellationToken cancellationToken = default)
{
HandleDependentBatches(cancellationToken);

if (_pending.TryDequeue(out HeadersSyncBatch? batch))
{
if (_logger.IsTrace) _logger.Trace($"Dequeue batch {batch}");
batch!.MarkRetry();
}
else if (ShouldBuildANewBatch())
_resetLock.EnterReadLock();
try
{
batch = BuildNewBatch();
if (_logger.IsTrace) _logger.Trace($"New batch {batch}");
}
HandleDependentBatches(cancellationToken);

if (batch is not null)
{
_sent.TryAdd(batch, _dummyObject);
if (batch.StartNumber >= (LowestInsertedBlockHeader?.Number ?? 0) - FastBlocksPriorities.ForHeaders)
if (_pending.TryDequeue(out HeadersSyncBatch? batch))
{
batch.Prioritized = true;
if (_logger.IsTrace) _logger.Trace($"Dequeue batch {batch}");
batch!.MarkRetry();
}
else if (ShouldBuildANewBatch())
{
batch = BuildNewBatch();
if (_logger.IsTrace) _logger.Trace($"New batch {batch}");
}

LogStateOnPrepare();
}
if (batch is not null)
{
_sent.TryAdd(batch, _dummyObject);
if (batch.StartNumber >= (LowestInsertedBlockHeader?.Number ?? 0) - FastBlocksPriorities.ForHeaders)
{
batch.Prioritized = true;
}

return Task.FromResult(batch);
LogStateOnPrepare();
}

return Task.FromResult(batch);
}
finally
{
_resetLock.ExitReadLock();
}
}

protected virtual HeadersSyncBatch BuildNewBatch()
private HeadersSyncBatch BuildNewBatch()
{
HeadersSyncBatch batch = new();
batch.MinNumber = _lowestRequestedHeaderNumber - 1;
Expand Down Expand Up @@ -351,39 +376,47 @@ public override SyncResponseHandlingResult HandleResponse(HeadersSyncBatch? batc
return SyncResponseHandlingResult.InternalError;
}

if (!_sent.ContainsKey(batch))
{
if (_logger.IsDebug) _logger.Debug("Ignoring batch not in sent record");
return SyncResponseHandlingResult.Ignored;
}

if ((batch.Response?.Length ?? 0) == 0)
{
batch.MarkHandlingStart();
if (_logger.IsTrace) _logger.Trace($"{batch} - came back EMPTY");
_pending.Enqueue(batch);
batch.MarkHandlingEnd();
return batch.ResponseSourcePeer is null ? SyncResponseHandlingResult.NotAssigned : SyncResponseHandlingResult.NoProgress;
}

_resetLock.EnterReadLock();
try
{
if (batch.RequestSize == 0)
if (!_sent.ContainsKey(batch))
{
return SyncResponseHandlingResult.OK; // 1
if (_logger.IsDebug) _logger.Debug("Ignoring batch not in sent record");
return SyncResponseHandlingResult.Ignored;
}

lock (_handlerLock)
if ((batch.Response?.Length ?? 0) == 0)
{
batch.MarkHandlingStart();
int added = InsertHeaders(batch);
return added == 0 ? SyncResponseHandlingResult.NoProgress : SyncResponseHandlingResult.OK;
if (_logger.IsTrace) _logger.Trace($"{batch} - came back EMPTY");
_pending.Enqueue(batch);
batch.MarkHandlingEnd();
return batch.ResponseSourcePeer is null ? SyncResponseHandlingResult.NotAssigned : SyncResponseHandlingResult.NoProgress;
}

try
{
if (batch.RequestSize == 0)
{
return SyncResponseHandlingResult.OK; // 1
}

lock (_handlerLock)
{
batch.MarkHandlingStart();
int added = InsertHeaders(batch);
return added == 0 ? SyncResponseHandlingResult.NoProgress : SyncResponseHandlingResult.OK;
}
}
finally
{
batch.MarkHandlingEnd();
_sent.TryRemove(batch, out _);
}
}
finally
{
batch.MarkHandlingEnd();
_sent.TryRemove(batch, out _);
_resetLock.ExitReadLock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ private void ReactToHandlingResult(T request, SyncResponseHandlingResult result,
case SyncResponseHandlingResult.Emptish:
break;
case SyncResponseHandlingResult.Ignored:
Logger.Error($"Feed response was ignored.");
break;
case SyncResponseHandlingResult.LesserQuality:
SyncPeerPool.ReportWeakPeer(peer, Feed.Contexts);
Expand Down

0 comments on commit 9ca61f9

Please sign in to comment.