Skip to content

Commit

Permalink
Import target block body during warp sync (paritytech#12300)
Browse files Browse the repository at this point in the history
* Receive and import target block body

* Request target block

* minor: wording

* Check for block body in the test

* Import target block justifications

* Fix: do not fail block validation if no justifications received

* Fix: import target blocks without justifications

Co-authored-by: arkpar <arkady.paronyan@gmail.com>
  • Loading branch information
2 people authored and ark0f committed Feb 27, 2023
1 parent 37020b1 commit 4d990ca
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 31 deletions.
2 changes: 2 additions & 0 deletions client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ pub enum OnBlockData<Block: BlockT> {
Import(BlockOrigin, Vec<IncomingBlock<Block>>),
/// A new block request needs to be made to the given peer.
Request(PeerId, BlockRequest<Block>),
/// Continue processing events.
Continue,
}

/// Result of [`ChainSync::on_block_justification`].
Expand Down
3 changes: 3 additions & 0 deletions client/network/common/src/sync/warp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ pub enum WarpSyncPhase<Block: BlockT> {
AwaitingPeers,
/// Downloading and verifying grandpa warp proofs.
DownloadingWarpProofs,
/// Downloading target block.
DownloadingTargetBlock,
/// Downloading state data.
DownloadingState,
/// Importing state.
Expand All @@ -77,6 +79,7 @@ impl<Block: BlockT> fmt::Display for WarpSyncPhase<Block> {
match self {
Self::AwaitingPeers => write!(f, "Waiting for peers"),
Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
Self::DownloadingTargetBlock => write!(f, "Downloading target block"),
Self::DownloadingState => write!(f, "Downloading state"),
Self::ImportingState => write!(f, "Importing state"),
Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n),
Expand Down
2 changes: 2 additions & 0 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ where
CustomMessageOutcome::BlockImport(origin, blocks),
Ok(OnBlockData::Request(peer, req)) =>
prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, peer, req),
Ok(OnBlockData::Continue) => CustomMessageOutcome::None,
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
Expand Down Expand Up @@ -974,6 +975,7 @@ where
CustomMessageOutcome::BlockImport(origin, blocks),
Ok(OnBlockData::Request(peer, req)) =>
prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, peer, req),
Ok(OnBlockData::Continue) => CustomMessageOutcome::None,
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
Expand Down
90 changes: 80 additions & 10 deletions client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ use std::{
pin::Pin,
sync::Arc,
};
use warp::TargetBlockImportResult;

mod extra_requests;

Expand Down Expand Up @@ -315,6 +316,8 @@ pub enum PeerSyncState<B: BlockT> {
DownloadingState,
/// Downloading warp proof.
DownloadingWarpProof,
/// Downloading warp sync target block.
DownloadingWarpTargetBlock,
/// Actively downloading block history after warp sync.
DownloadingGap(NumberFor<B>),
}
Expand Down Expand Up @@ -659,10 +662,11 @@ where
}

fn block_requests(&mut self) -> Box<dyn Iterator<Item = (&PeerId, BlockRequest<B>)> + '_> {
if self.allowed_requests.is_empty() ||
self.state_sync.is_some() ||
self.mode == SyncMode::Warp
{
if self.mode == SyncMode::Warp {
return Box::new(std::iter::once(self.warp_target_block_request()).flatten())
}

if self.allowed_requests.is_empty() || self.state_sync.is_some() {
return Box::new(std::iter::empty())
}

Expand Down Expand Up @@ -824,7 +828,7 @@ where
// Only one pending state request is allowed.
return None
}
if let Some(request) = sync.next_warp_poof_request() {
if let Some(request) = sync.next_warp_proof_request() {
let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
if !targets.is_empty() {
targets.sort();
Expand Down Expand Up @@ -1031,6 +1035,40 @@ where
Vec::new()
}
},
PeerSyncState::DownloadingWarpTargetBlock => {
peer.state = PeerSyncState::Available;
if let Some(warp_sync) = &mut self.warp_sync {
if blocks.len() == 1 {
validate_blocks::<B>(&blocks, who, Some(request))?;
match warp_sync.import_target_block(
blocks.pop().expect("`blocks` len checked above."),
) {
TargetBlockImportResult::Success =>
return Ok(OnBlockData::Continue),
TargetBlockImportResult::BadResponse =>
return Err(BadPeer(*who, rep::VERIFICATION_FAIL)),
}
} else if blocks.is_empty() {
debug!(target: "sync", "Empty block response from {}", who);
return Err(BadPeer(*who, rep::NO_BLOCK))
} else {
debug!(
target: "sync",
"Too many blocks ({}) in warp target block response from {}",
blocks.len(),
who,
);
return Err(BadPeer(*who, rep::NOT_REQUESTED))
}
} else {
debug!(
target: "sync",
"Logic error: we think we are downloading warp target block from {}, but no warp sync is happening.",
who,
);
return Ok(OnBlockData::Continue)
}
},
PeerSyncState::Available |
PeerSyncState::DownloadingJustification(..) |
PeerSyncState::DownloadingState |
Expand Down Expand Up @@ -1112,14 +1150,14 @@ where
};

match import_result {
state::ImportResult::Import(hash, header, state) => {
state::ImportResult::Import(hash, header, state, body, justifications) => {
let origin = BlockOrigin::NetworkInitialSync;
let block = IncomingBlock {
hash,
header: Some(header),
body: None,
body,
indexed_body: None,
justifications: None,
justifications,
origin: None,
allow_missing_state: true,
import_existing: true,
Expand Down Expand Up @@ -1399,8 +1437,13 @@ where
number,
hash,
);
self.state_sync =
Some(StateSync::new(self.client.clone(), header, *skip_proofs));
self.state_sync = Some(StateSync::new(
self.client.clone(),
header,
None,
None,
*skip_proofs,
));
self.allowed_requests.set_all();
}
}
Expand Down Expand Up @@ -2163,6 +2206,33 @@ where
})
.collect()
}

/// Generate block request for downloading of the target block body during warp sync.
fn warp_target_block_request(&mut self) -> Option<(&PeerId, BlockRequest<B>)> {
if let Some(sync) = &self.warp_sync {
if self.allowed_requests.is_empty() ||
sync.is_complete() ||
self.peers
.iter()
.any(|(_, peer)| peer.state == PeerSyncState::DownloadingWarpTargetBlock)
{
// Only one pending warp target block request is allowed.
return None
}
if let Some((target_number, request)) = sync.next_target_block_request() {
// Find a random peer that has a block with the target number.
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.best_number >= target_number {
trace!(target: "sync", "New warp target block request for {}", id);
peer.state = PeerSyncState::DownloadingWarpTargetBlock;
self.allowed_requests.clear();
return Some((id, request))
}
}
}
}
None
}
}

// This is purely during a backwards compatible transitionary period and should be removed
Expand Down
27 changes: 21 additions & 6 deletions client/network/sync/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ use sc_consensus::ImportedState;
use sc_network_common::sync::StateDownloadProgress;
use smallvec::SmallVec;
use sp_core::storage::well_known_keys;
use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
use sp_runtime::{
traits::{Block as BlockT, Header, NumberFor},
Justifications,
};
use std::{collections::HashMap, sync::Arc};

/// State sync state machine. Accumulates partial state data until it
Expand All @@ -35,6 +38,8 @@ pub struct StateSync<B: BlockT, Client> {
target_block: B::Hash,
target_header: B::Header,
target_root: B::Hash,
target_body: Option<Vec<B::Extrinsic>>,
target_justifications: Option<Justifications>,
last_key: SmallVec<[Vec<u8>; 2]>,
state: HashMap<Vec<u8>, (Vec<(Vec<u8>, Vec<u8>)>, Vec<Vec<u8>>)>,
complete: bool,
Expand All @@ -46,7 +51,7 @@ pub struct StateSync<B: BlockT, Client> {
/// Import state chunk result.
pub enum ImportResult<B: BlockT> {
/// State is complete and ready for import.
Import(B::Hash, B::Header, ImportedState<B>),
Import(B::Hash, B::Header, ImportedState<B>, Option<Vec<B::Extrinsic>>, Option<Justifications>),
/// Continue downloading.
Continue,
/// Bad state chunk.
Expand All @@ -59,12 +64,20 @@ where
Client: ProofProvider<B> + Send + Sync + 'static,
{
/// Create a new instance.
pub fn new(client: Arc<Client>, target: B::Header, skip_proof: bool) -> Self {
pub fn new(
client: Arc<Client>,
target_header: B::Header,
target_body: Option<Vec<B::Extrinsic>>,
target_justifications: Option<Justifications>,
skip_proof: bool,
) -> Self {
Self {
client,
target_block: target.hash(),
target_root: *target.state_root(),
target_header: target,
target_block: target_header.hash(),
target_root: *target_header.state_root(),
target_header,
target_body,
target_justifications,
last_key: SmallVec::default(),
state: HashMap::default(),
complete: false,
Expand Down Expand Up @@ -213,6 +226,8 @@ where
block: self.target_block,
state: std::mem::take(&mut self.state).into(),
},
self.target_body.clone(),
self.target_justifications.clone(),
)
} else {
ImportResult::Continue
Expand Down
Loading

0 comments on commit 4d990ca

Please sign in to comment.