Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[NC-1273] Start of fast sync downloader #613

Merged
merged 27 commits into from
Jan 23, 2019
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f2d5f68
Add support for initiating fast sync to DefaultSynchronizer, starting…
ajsutton Jan 19, 2019
cb8cfb2
Wait for a minimum number of peers to be available before starting fa…
ajsutton Jan 20, 2019
8b10aed
Add tests for fast sync waiting for peers and the overall coordinatio…
ajsutton Jan 21, 2019
6f3abb0
Select pivot block.
ajsutton Jan 21, 2019
27b044c
Fetch the pivot block header.
ajsutton Jan 21, 2019
d813ffc
Switch to throwing an exception to abort the fast sync pipeline inste…
ajsutton Jan 21, 2019
6ecea9c
waitForSuitablePeers doesn't need to return a FastSyncState.
ajsutton Jan 21, 2019
89b134d
Add a basic test for downloadPivotBlockHeader.
ajsutton Jan 21, 2019
bf5188e
Create a task specifically for getting the pivot block header so that…
ajsutton Jan 21, 2019
f67c9f9
Merge branch 'master' of github.com:PegaSysEng/pantheon into NC-2136
ajsutton Jan 21, 2019
815d385
Merge branch 'master' of github.com:PegaSysEng/pantheon into NC-2136
ajsutton Jan 21, 2019
c1e1c2f
Add basic tests for GetPivotBlockHeader.
ajsutton Jan 21, 2019
b7266d1
Merge branch 'master' of github.com:PegaSysEng/pantheon into NC-2136
ajsutton Jan 21, 2019
32bdd13
Add check to ensure that a majority of peers (which have the pivot bl…
ajsutton Jan 22, 2019
ae0a318
Merge branch 'master' of github.com:PegaSysEng/pantheon into NC-2136
ajsutton Jan 22, 2019
7f42f9a
Merge branch 'master' of github.com:PegaSysEng/pantheon into NC-2136
ajsutton Jan 22, 2019
c43524a
Merge branch 'master' into NC-2136
ajsutton Jan 22, 2019
5329db7
Merge branch 'master' of github.com:PegaSysEng/pantheon into NC-2136
ajsutton Jan 22, 2019
1ac4f4f
Throw exceptions all the way back out the top instead of mapping to a…
ajsutton Jan 22, 2019
3598197
Move PivotBlockRetriever to the fastsync package.
ajsutton Jan 22, 2019
6a7fac0
Call wait for peers directly instead of sending to the worker pool.
ajsutton Jan 22, 2019
1bdc38d
Simplify check for any available peers.
ajsutton Jan 22, 2019
1ba9c7f
Pull isRetryingError and assignPeer up to AbstractRetryingPeerTask so…
ajsutton Jan 23, 2019
599af66
Merge branch 'NC-2136' of github.com:ajsutton/pantheon into NC-2136
ajsutton Jan 23, 2019
d0b6fac
Merge branch 'master' of github.com:PegaSysEng/pantheon into NC-2136
ajsutton Jan 23, 2019
0f2ec34
Merge branch 'master' of github.com:PegaSysEng/pantheon into NC-2136
ajsutton Jan 23, 2019
e733789
Ensure we repeatedly print messages to indicate we're waiting for a p…
ajsutton Jan 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,26 @@

import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.MaxRetriesReachedException;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerBreachedProtocolException;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.PeerDisconnectedException;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* A task that will retry a fixed number of times before completing the associated CompletableFuture
* exceptionally with a new {@link MaxRetriesReachedException}. If the future returned from {@link
* #executePeerTask()} is complete with a non-empty list the retry counter is reset.
* #executePeerTask(Optional)} is complete with a non-empty list the retry counter is reset.
*
* @param <T> The type as a typed list that the peer task can get partial or full results in.
*/
Expand All @@ -40,6 +44,7 @@ public abstract class AbstractRetryingPeerTask<T extends Collection<?>> extends
private final int maxRetries;
private int retryCount = 0;
private final LabelledMetric<OperationTimer> ethTasksTimer;
private Optional<EthPeer> assignedPeer = Optional.empty();

/**
* @param ethContext The context of the current Eth network we are attached to.
Expand All @@ -56,6 +61,10 @@ public AbstractRetryingPeerTask(
this.maxRetries = maxRetries;
}

public void assignPeer(final EthPeer peer) {
assignedPeer = Optional.of(peer);
}

@Override
protected void executeTask() {
if (result.get().isDone()) {
Expand All @@ -68,7 +77,7 @@ protected void executeTask() {
}

retryCount += 1;
executePeerTask()
executePeerTask(assignedPeer)
.whenComplete(
(peerResult, error) -> {
if (error != null) {
Expand All @@ -83,7 +92,7 @@ protected void executeTask() {
});
}

protected abstract CompletableFuture<T> executePeerTask();
protected abstract CompletableFuture<T> executePeerTask(Optional<EthPeer> assignedPeer);

private void handleTaskError(final Throwable error) {
final Throwable cause = ExceptionUtils.rootCause(error);
Expand Down Expand Up @@ -118,5 +127,12 @@ private void handleTaskError(final Throwable error) {
.scheduleFutureTask(this::executeTaskTimed, Duration.ofSeconds(1)));
}

protected abstract boolean isRetryableError(Throwable error);
private boolean isRetryableError(final Throwable error) {
final boolean isPeerError =
error instanceof PeerBreachedProtocolException
|| error instanceof PeerDisconnectedException
|| error instanceof NoAvailablePeersException;

return error instanceof TimeoutException || (!assignedPeer.isPresent() && isPeerError);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@
import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncActions;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncException;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.PendingBlocks;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -35,7 +40,8 @@ public class DefaultSynchronizer<C> implements Synchronizer {
private final SyncState syncState;
private final AtomicBoolean started = new AtomicBoolean(false);
private final BlockPropagationManager<C> blockPropagationManager;
private final Downloader<C> downloader;
private final FullSyncDownloader<C> fullSyncDownloader;
private final Optional<FastSyncDownloader<C>> fastSyncDownloader;

public DefaultSynchronizer(
final SynchronizerConfiguration syncConfig,
Expand All @@ -54,28 +60,60 @@ public DefaultSynchronizer(
syncState,
new PendingBlocks(),
ethTasksTimer);
this.downloader =
new Downloader<>(
this.fullSyncDownloader =
new FullSyncDownloader<>(
syncConfig, protocolSchedule, protocolContext, ethContext, syncState, ethTasksTimer);

ChainHeadTracker.trackChainHeadForPeers(
ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, ethTasksTimer);
if (syncConfig.syncMode().equals(SyncMode.FAST)) {
if (syncConfig.syncMode() == SyncMode.FAST) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious - why prefer == here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Purely ease of reading and because I was confused by the .equals for a bit thinking it wasn't actually an enum.

LOG.info("Fast sync enabled.");
this.fastSyncDownloader =
Optional.of(
new FastSyncDownloader<>(
new FastSyncActions<>(
syncConfig, protocolSchedule, protocolContext, ethContext, ethTasksTimer)));
} else {
this.fastSyncDownloader = Optional.empty();
}
}

@Override
public void start() {
if (started.compareAndSet(false, true)) {
LOG.info("Starting synchronizer.");
blockPropagationManager.start();
downloader.start();
if (fastSyncDownloader.isPresent()) {
fastSyncDownloader.get().start().whenComplete(this::handleFastSyncResult);
} else {
startFullSync();
}
} else {
throw new IllegalStateException("Attempt to start an already started synchronizer.");
}
}

private void handleFastSyncResult(final FastSyncState result, final Throwable error) {

final Throwable rootCause = ExceptionUtils.rootCause(error);
if (rootCause instanceof FastSyncException) {
LOG.error(
"Fast sync failed ({}), switching to full sync.",
((FastSyncException) rootCause).getError());
} else if (error != null) {
LOG.error("Fast sync failed, switching to full sync.", error);
} else {
LOG.info(
"Fast sync completed successfully with pivot block {}",
result.getPivotBlockNumber().getAsLong());
}
startFullSync();
}

private void startFullSync() {
LOG.info("Starting synchronizer.");
blockPropagationManager.start();
fullSyncDownloader.start();
}

@Override
public Optional<SyncStatus> getSyncStatus() {
if (!started.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class Downloader<C> {
public class FullSyncDownloader<C> {
private static final Logger LOG = LogManager.getLogger();

private final SynchronizerConfiguration config;
Expand All @@ -73,7 +73,7 @@ public class Downloader<C> {
private long syncTargetDisconnectListenerId;
protected CompletableFuture<?> currentTask;

Downloader(
FullSyncDownloader(
final SynchronizerConfiguration config,
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.util.uint.UInt256;

import java.time.Duration;
import java.util.Optional;

import com.google.common.collect.Range;
Expand All @@ -30,10 +31,14 @@ public class SynchronizerConfiguration {
// TODO: Determine reasonable defaults here
public static int DEFAULT_PIVOT_DISTANCE_FROM_HEAD = 500;
public static float DEFAULT_FULL_VALIDATION_RATE = .1f;
public static int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 5;
private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofMinutes(3);

// Fast sync config
private final int fastSyncPivotDistance;
private final float fastSyncFullValidationRate;
private final int fastSyncMinimumPeerCount;
private final Duration fastSyncMaximumPeerWaitTime;

// Block propagation config
private final Range<Long> blockPropagationRange;
Expand All @@ -58,6 +63,8 @@ private SynchronizerConfiguration(
final SyncMode requestedSyncMode,
final int fastSyncPivotDistance,
final float fastSyncFullValidationRate,
final int fastSyncMinimumPeerCount,
final Duration fastSyncMaximumPeerWaitTime,
final Range<Long> blockPropagationRange,
final Optional<SyncMode> syncMode,
final long downloaderChangeTargetThresholdByHeight,
Expand All @@ -73,6 +80,8 @@ private SynchronizerConfiguration(
this.requestedSyncMode = requestedSyncMode;
this.fastSyncPivotDistance = fastSyncPivotDistance;
this.fastSyncFullValidationRate = fastSyncFullValidationRate;
this.fastSyncMinimumPeerCount = fastSyncMinimumPeerCount;
this.fastSyncMaximumPeerWaitTime = fastSyncMaximumPeerWaitTime;
this.blockPropagationRange = blockPropagationRange;
this.syncMode = syncMode;
this.downloaderChangeTargetThresholdByHeight = downloaderChangeTargetThresholdByHeight;
Expand Down Expand Up @@ -115,6 +124,8 @@ public SynchronizerConfiguration validated(final Blockchain blockchain) {
requestedSyncMode,
fastSyncPivotDistance,
fastSyncFullValidationRate,
fastSyncMinimumPeerCount,
fastSyncMaximumPeerWaitTime,
blockPropagationRange,
Optional.of(actualSyncMode),
downloaderChangeTargetThresholdByHeight,
Expand Down Expand Up @@ -222,6 +233,14 @@ public float fastSyncFullValidationRate() {
return fastSyncFullValidationRate;
}

public int getFastSyncMinimumPeerCount() {
return fastSyncMinimumPeerCount;
}

public Duration getFastSyncMaximumPeerWaitTime() {
return fastSyncMaximumPeerWaitTime;
}

public static class Builder {
private int fastSyncPivotDistance = DEFAULT_PIVOT_DISTANCE_FROM_HEAD;
private float fastSyncFullValidationRate = DEFAULT_FULL_VALIDATION_RATE;
Expand Down Expand Up @@ -318,6 +337,8 @@ public SynchronizerConfiguration build() {
syncMode,
fastSyncPivotDistance,
fastSyncFullValidationRate,
DEFAULT_FAST_SYNC_MINIMUM_PEERS,
DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME,
blockPropagationRange,
Optional.empty(),
downloaderChangeTargetThresholdByHeight,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;

import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.CHAIN_TOO_SHORT;
import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.NO_PEERS_AVAILABLE;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeersTask;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class FastSyncActions<C> {

private static final Logger LOG = LogManager.getLogger();
private final SynchronizerConfiguration syncConfig;
private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final LabelledMetric<OperationTimer> ethTasksTimer;

public FastSyncActions(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final LabelledMetric<OperationTimer> ethTasksTimer) {
this.syncConfig = syncConfig;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
this.ethTasksTimer = ethTasksTimer;
}

public CompletableFuture<Void> waitForSuitablePeers() {
final WaitForPeersTask waitForPeersTask =
WaitForPeersTask.create(
ethContext, syncConfig.getFastSyncMinimumPeerCount(), ethTasksTimer);

final EthScheduler scheduler = ethContext.getScheduler();
final CompletableFuture<Void> result = new CompletableFuture<>();
scheduler
.timeout(waitForPeersTask, syncConfig.getFastSyncMaximumPeerWaitTime())
.handle(
(waitResult, error) -> {
if (ExceptionUtils.rootCause(error) instanceof TimeoutException) {
if (ethContext.getEthPeers().availablePeerCount() > 0) {
LOG.warn(
"Fast sync timed out before minimum peer count was reached. Continuing with reduced peers.");
result.complete(null);
} else {
waitForAnyPeer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would just have the action fail at this point and get handled at a higher level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with that is you wind up having to duplicate most of this handle lambda at the higher level so you'd basically have to have this in FastSyncDownloader:

private CompletableFuture<Void> ensurePeersAvailable() {
    final CompletableFuture<Void> result = new CompletableFuture<>();

    fastSyncActions
        .waitForSuitablePeers()
        .handle(
            (waitResult, error) -> {
              if (ExceptionUtils.rootCause(error) instanceof TimeoutException) {
                fastSyncActions
                    .waitForAnyPeer()
                    .thenAccept(result::complete)
                    .exceptionally(
                        taskError -> {
                          result.completeExceptionally(error);
                          return null;
                        });
              } else if (error != null) {
                result.completeExceptionally(error);
              } else {
                result.complete(null);
              }
              return null;
            });
    return result;
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking we'd just cut the waitForAnyPeer() part - seems a little iffy to continue with fast sync if we don't have enough peers. Then the calling code could decide whether to run another round of waiting, or to abort fast sync. Anyway - just thinking out loud.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's a bit of a toss up. The penalty for not waiting is a month long full sync so it's unlikely the user would want that, but maybe they'd want to be strict about waiting for a minimum number of peers.

We probably should also allow the user to specifically set a pivot block by hash and we wait indefinitely until that block is available. That would be the most secure option since then users can guarantee they sync onto a chain they actually trust.

.thenAccept(result::complete)
.exceptionally(
taskError -> {
result.completeExceptionally(error);
return null;
});
}
} else if (error != null) {
LOG.error("Failed to find peers for fast sync", error);
result.completeExceptionally(error);
} else {
result.complete(null);
}
return null;
});

return result;
}

private CompletableFuture<Void> waitForAnyPeer() {
LOG.warn(
"Maximum wait time for fast sync reached but no peers available. Continuing to wait for any available peer.");
return WaitForPeerTask.create(ethContext, ethTasksTimer).run();
}

public FastSyncState selectPivotBlock() {
return ethContext
.getEthPeers()
.bestPeer()
.map(
peer -> {
final long pivotBlockNumber =
peer.chainState().getEstimatedHeight() - syncConfig.fastSyncPivotDistance();
if (pivotBlockNumber <= BlockHeader.GENESIS_BLOCK_NUMBER) {
throw new FastSyncException(CHAIN_TOO_SHORT);
} else {
LOG.info("Selecting block number {} as fast sync pivot block.", pivotBlockNumber);
return new FastSyncState(OptionalLong.of(pivotBlockNumber));
}
})
.orElseThrow(() -> new FastSyncException(NO_PEERS_AVAILABLE));
}

public CompletableFuture<FastSyncState> downloadPivotBlockHeader(
final FastSyncState currentState) {
return new PivotBlockRetriever<>(
protocolSchedule,
ethContext,
ethTasksTimer,
currentState.getPivotBlockNumber().getAsLong())
.downloadPivotBlockHeader();
}
}
Loading