Skip to content

Commit

Permalink
fix: Document HeaderSync (#10280)
Browse files Browse the repository at this point in the history
Made no functional changes except:
* Replaced a tuple with a struct
* Introduced a function `made_enough_progress()` to make the code much
simpler to understand.
* Renamed a ban reason
  • Loading branch information
nikurt authored Dec 4, 2023
1 parent 81b7082 commit b9ae299
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 63 deletions.
6 changes: 5 additions & 1 deletion chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,12 @@ pub enum SyncStatus {
EpochSync { epoch_ord: u64 },
/// Downloading block headers for fast sync.
HeaderSync {
/// Header head height at the beginning.
/// Used only for reporting the progress of the sync.
start_height: BlockHeight,
/// Current header head height.
current_height: BlockHeight,
/// Highest height of our peers.
highest_height: BlockHeight,
},
/// State sync, with different states of state sync for different shards.
Expand Down Expand Up @@ -327,7 +331,7 @@ impl SyncStatus {
let _span =
debug_span!(target: "sync", "update_sync_status", old_value = ?self, ?new_value)
.entered();
*self = new_value
*self = new_value;
}
}

Expand Down
13 changes: 3 additions & 10 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1643,16 +1643,9 @@ impl ClientActor {
.chain
.reset_data_pre_state_sync(sync_hash));
}
let s = StateSyncStatus { sync_hash, sync_status: HashMap::default() };
// An artificial span to easily detect a node getting into the sync state in Grafana.
{
let _span =
debug_span!(target: "sync", "set_sync", sync = "StateSync")
.entered();
debug!(target: "sync", prev_sync_status = ?self.client.sync_status);
self.client.sync_status = SyncStatus::StateSync(s);
debug!(target: "sync", sync_status = ?self.client.sync_status);
}
self.client.sync_status.update(SyncStatus::StateSync(
StateSyncStatus { sync_hash, sync_status: HashMap::default() },
));
// This is the first time we run state sync.
notify_start_sync = true;
}
Expand Down
197 changes: 146 additions & 51 deletions chain/client/src/sync/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use near_primitives::block::Tip;
use near_primitives::hash::CryptoHash;
use near_primitives::static_clock::StaticClock;
use near_primitives::types::BlockHeight;
use near_primitives::utils::to_timestamp;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::cmp::min;
Expand All @@ -23,17 +22,42 @@ pub const MAX_BLOCK_HEADER_HASHES: usize = 20;

pub const NS_PER_SECOND: u128 = 1_000_000_000;

/// Progress of downloading the currently requested batch of headers.
struct BatchProgress {
/// An intermediate timeout by which a certain number of headers is expected.
timeout: DateTime<Utc>,
/// Height expected at the moment of `timeout`.
expected_height: BlockHeight,
/// Header head height at the moment this batch was requested.
header_head_height: BlockHeight,
highest_height_of_peers: BlockHeight,
}

/// Helper to keep track of sync headers.
/// Handles major re-orgs by finding closest header that matches and re-downloading headers from that point.
pub struct HeaderSync {
network_adapter: PeerManagerAdapter,
prev_header_sync: (DateTime<Utc>, BlockHeight, BlockHeight, BlockHeight),

/// Progress of downloading the currently requested batch of headers.
// TODO: Change type to Option<BatchProgress>.
batch_progress: BatchProgress,

/// Peer from which the next batch of headers was requested.
syncing_peer: Option<HighestHeightPeerInfo>,

/// When the stalling was first detected.
stalling_ts: Option<DateTime<Utc>>,

/// How much time to wait after initial header sync.
initial_timeout: Duration,

/// How much time to wait after some progress is made in header sync.
progress_timeout: Duration,

/// How much time to wait before banning a peer in header sync if sync is too slow.
stall_ban_timeout: Duration,

/// Expected increase of header head height per second during header sync
expected_height_per_second: u64,
}

Expand All @@ -47,7 +71,12 @@ impl HeaderSync {
) -> Self {
HeaderSync {
network_adapter,
prev_header_sync: (StaticClock::utc(), 0, 0, 0),
batch_progress: BatchProgress {
timeout: StaticClock::utc(),
expected_height: 0,
header_head_height: 0,
highest_height_of_peers: 0,
},
syncing_peer: None,
stalling_ts: None,
initial_timeout: Duration::from_std(initial_timeout).unwrap(),
Expand All @@ -57,6 +86,9 @@ impl HeaderSync {
}
}

/// Can update `sync_status` to `HeaderSync`.
/// Can request a new batch of headers from a peer.
/// This function won't tell you that header sync is complete.
pub fn run(
&mut self,
sync_status: &mut SyncStatus,
Expand All @@ -66,15 +98,25 @@ impl HeaderSync {
) -> Result<(), near_chain::Error> {
let _span = tracing::debug_span!(target: "sync", "run", sync = "HeaderSync").entered();
let header_head = chain.header_head()?;

// Check if we need to start a new request for a batch of header.
if !self.header_sync_due(sync_status, &header_head, highest_height) {
// Either
// * header sync is not needed, or
// * a request is already in-flight and more progress is expected.
return Ok(());
}

// TODO: Why call `header_sync_due()` if that decision can be overridden here?
let enable_header_sync = match sync_status {
SyncStatus::HeaderSync { .. }
| SyncStatus::BodySync { .. }
| SyncStatus::StateSyncDone => true,
| SyncStatus::StateSyncDone => {
// TODO: Transitioning from BodySync to HeaderSync is fine if the highest height of peers gets too far from our header_head_height. However it's currently unconditional.
true
}
SyncStatus::NoSync | SyncStatus::AwaitingPeers | SyncStatus::EpochSync { .. } => {
// TODO: How can it get to EpochSync if it's hardcoded to go from NoSync to HeaderSync?
debug!(target: "sync", "Sync: initial transition to Header sync. Header head {} at {}",
header_head.last_block_hash, header_head.height,
);
Expand All @@ -83,28 +125,37 @@ impl HeaderSync {
SyncStatus::StateSync { .. } => false,
};

if enable_header_sync {
let start_height = match sync_status.start_height() {
Some(height) => height,
None => chain.head()?.height,
};
if !enable_header_sync {
// Header sync is blocked for whatever reason.
return Ok(());
}

sync_status.update(SyncStatus::HeaderSync {
start_height,
current_height: header_head.height,
highest_height,
});
self.syncing_peer = None;
if let Some(peer) = highest_height_peers.choose(&mut thread_rng()).cloned() {
if peer.highest_block_height > header_head.height {
self.syncing_peer = self.request_headers(chain, peer);
}
// start_height is used to report the progress of header sync, e.g. to say that it's 50% complete.
// This number has no other functional value.
let start_height = match &sync_status {
SyncStatus::HeaderSync { start_height, .. } => *start_height,
SyncStatus::BodySync { start_height, .. } => *start_height,
_ => chain.head()?.height,
};

sync_status.update(SyncStatus::HeaderSync {
start_height,
current_height: header_head.height,
highest_height,
});

self.syncing_peer = None;
// Pick a new random peer to request the next batch of headers.
if let Some(peer) = highest_height_peers.choose(&mut thread_rng()).cloned() {
// TODO: This condition should always be true, otherwise we can already complete header sync.
if peer.highest_block_height > header_head.height {
self.syncing_peer = self.request_headers(chain, peer);
}
}

Ok(())
}

/// Returns the height that we expect to reach starting from `old_height` after `time_delta`.
fn compute_expected_height(
&self,
old_height: BlockHeight,
Expand All @@ -116,37 +167,57 @@ impl HeaderSync {
/ NS_PER_SECOND)) as u64
}

/// Returns whether a new batch of headers needs to be requested.
// Checks whether the batch of headers is completely downloaded, or if the peer failed to satisfy our expectations for long enough.
// If yes, then returns true to request a new batch of headers. Maybe bans a peer.
// Otherwise, returns false to indicate that we're expecting more headers from the same requested batch.
// TODO: This function should check the difference between the current header_head height and the highest height of the peers.
// TODO: Triggering header sync to get 1 header (or even 0 headers) makes little sense.
pub(crate) fn header_sync_due(
&mut self,
sync_status: &SyncStatus,
header_head: &Tip,
highest_height: BlockHeight,
) -> bool {
let now = StaticClock::utc();
let (timeout, old_expected_height, prev_height, prev_highest_height) =
self.prev_header_sync;

// Received all necessary header, can request more.
let BatchProgress {
timeout,
expected_height: old_expected_height,
header_head_height: prev_height,
highest_height_of_peers: prev_highest_height,
} = self.batch_progress;
// Received all headers from a batch requested on the previous iteration.
// Can proceed to the next iteration.
let all_headers_received =
header_head.height >= min(prev_height + MAX_BLOCK_HEADERS - 4, prev_highest_height);

// Did we receive as many headers as we expected from the peer? Request more or ban peer.
// Did we receive as many headers as we expected from the peer?
// If not, consider the peer stalling.
// This can be either the initial timeout, or any of the progress timeouts after the initial timeout.
let stalling = header_head.height <= old_expected_height && now > timeout;

// Always enable header sync on initial state transition from NoSync / NoSyncFewBlocksBehind / AwaitingPeers.
// Always enable header sync on initial state transition from
// * NoSync
// * AwaitingPeers.
// TODO: Will this remain correct with the introduction of EpochSync?
// TODO: Shouldn't a node transition to EpochSync from these states?
let force_sync = match sync_status {
SyncStatus::NoSync | SyncStatus::AwaitingPeers => true,
_ => false,
};

if force_sync || all_headers_received || stalling {
self.prev_header_sync = (
now + self.initial_timeout,
self.compute_expected_height(header_head.height, self.initial_timeout),
header_head.height,
highest_height,
);
// Request a new batch of headers.

self.batch_progress = BatchProgress {
timeout: now + self.initial_timeout,
expected_height: self
.compute_expected_height(header_head.height, self.initial_timeout),
header_head_height: header_head.height,
highest_height_of_peers: highest_height,
};

// Record the timestamp when the stalling was first noticed.
if stalling {
if self.stalling_ts.is_none() {
self.stalling_ts = Some(now);
Expand All @@ -156,60 +227,84 @@ impl HeaderSync {
}

if all_headers_received {
// As the batch of headers is received completely, reset the stalling timestamp.
self.stalling_ts = None;
} else {
if let Some(ref stalling_ts) = self.stalling_ts {
// syncing_peer is expected to be present.
if let Some(ref peer) = self.syncing_peer {
match sync_status {
SyncStatus::HeaderSync { highest_height, .. } => {
if now > *stalling_ts + self.stall_ban_timeout
&& *highest_height == peer.highest_block_height
{
warn!(target: "sync", "Sync: ban a fraudulent peer: {}, claimed height: {}",
peer.peer_info, peer.highest_block_height);
// The peer is one of the peers with the highest height, but we consider the peer stalling.
warn!(target: "sync", "Sync: ban a peer: {}, for not providing enough headers. Peer's height: {}", peer.peer_info, peer.highest_block_height);
// Ban the peer, which blocks all interactions with the peer for some time.
// TODO: Consider not banning straightaway, but give a node a few attempts before banning it.
// TODO: Prefer not to request the next batch of headers from the same peer.
self.network_adapter.send(
PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::BanPeer {
peer_id: peer.peer_info.id.clone(),
ban_reason:
near_network::types::ReasonForBan::HeightFraud,
ban_reason: near_network::types::ReasonForBan::ProvidedNotEnoughHeaders,
},
),
);
// This peer is fraudulent, let's skip this beat and wait for
// the next one when this peer is not in the list anymore.
// Will retry without this peer.
self.syncing_peer = None;
return false;
}
}
_ => (),
_ => {
// Unexpected
}
}
}
}
}
self.syncing_peer = None;
// Return true to request a new batch of headers.
true
} else {
// Manage the currently requested batch of headers.
// Note that it is guaranteed that `now < timeout`, because otherwise it will be `stalling` or `all_headers_received`.

// Resetting the timeout as long as we make progress.
let ns_time_till_timeout =
(to_timestamp(timeout).saturating_sub(to_timestamp(now))) as u128;
let remaining_expected_height = (self.expected_height_per_second as u128
* ns_time_till_timeout
/ NS_PER_SECOND) as u64;
if header_head.height >= old_expected_height.saturating_sub(remaining_expected_height) {
if self.made_enough_progress(header_head.height, old_expected_height, now, timeout) {
// Update our expectation.
// `new_expected_height` can be beyond the requested batch of header, but that is fine.
let new_expected_height =
self.compute_expected_height(header_head.height, self.progress_timeout);
self.prev_header_sync = (
now + self.progress_timeout,
new_expected_height,
prev_height,
prev_highest_height,
);
self.batch_progress = BatchProgress {
timeout: now + self.progress_timeout,
expected_height: new_expected_height,
header_head_height: prev_height,
highest_height_of_peers: prev_highest_height,
};
}
// Keep getting headers from the same batch.
// Don't request a new batch of headers.
false
}
}

/// Checks whether the node made enough progress.
/// Returns true iff it needs less time than (timeout-now) to get (expected_height - current_height) headers at the rate of `expected_height_per_second` headers per second.
fn made_enough_progress(
&self,
current_height: BlockHeight,
expected_height: BlockHeight,
now: DateTime<Utc>,
timeout: DateTime<Utc>,
) -> bool {
if now <= timeout {
self.compute_expected_height(current_height, timeout - now) >= expected_height
} else {
current_height >= expected_height
}
}

/// Request headers from a given peer to advance the chain.
fn request_headers(
&mut self,
Expand Down
1 change: 1 addition & 0 deletions chain/network/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub enum ReasonForBan {
InvalidEdge = 10,
InvalidDistanceVector = 11,
Blacklisted = 14,
ProvidedNotEnoughHeaders = 15,
}

/// Banning signal sent from Peer instance to PeerManager
Expand Down
2 changes: 1 addition & 1 deletion core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ pub struct ClientConfig {
pub header_sync_progress_timeout: Duration,
/// How much time to wait before banning a peer in header sync if sync is too slow
pub header_sync_stall_ban_timeout: Duration,
/// Expected increase of header head weight per second during header sync
/// Expected increase of header head height per second during header sync
pub header_sync_expected_height_per_second: u64,
/// How long to wait for a response during state sync
pub state_sync_timeout: Duration,
Expand Down

0 comments on commit b9ae299

Please sign in to comment.