Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Fix some problems with prove_warp_sync #8037

Merged
4 commits merged into from
Feb 5, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client/finality-grandpa-warp-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ struct Request<B: BlockT> {
const WARP_SYNC_FRAGMENTS_LIMIT: usize = 100;

/// Number of item with justification in warp sync cache.
/// This should be customizable, setting a low number
/// until then.
const WARP_SYNC_CACHE_SIZE: usize = 20;
/// This should be customizable, but setting it to the max number of fragments
/// we return seems like a good idea until then.
const WARP_SYNC_CACHE_SIZE: usize = WARP_SYNC_FRAGMENTS_LIMIT;

/// Handler for incoming grandpa warp sync requests from a remote peer.
pub struct GrandpaWarpSyncRequestHandler<TBackend, TBlock: BlockT> {
Expand Down
49 changes: 26 additions & 23 deletions client/finality-grandpa/src/finality_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ pub fn prove_warp_sync<Block: BlockT, B: BlockchainBackend<Block>>(
// This operation is a costy and only for the delay corner case.
while index > Zero::zero() {
index = index - One::one();
if let Some((fragement, apply_block)) = get_warp_sync_proof_fragment(blockchain, index, &mut cache)? {
if let Some((fragment, apply_block)) = get_warp_sync_proof_fragment(blockchain, index, &mut cache)? {
if last_apply.map(|next| &next > header.number()).unwrap_or(false) {
result.push(fragement);
result.push(fragment);
last_apply = Some(apply_block);
} else {
break;
Expand All @@ -289,7 +289,7 @@ pub fn prove_warp_sync<Block: BlockT, B: BlockchainBackend<Block>>(

let mut index = *header.number();
while index <= end_number {
if max_fragment_limit.map(|limit| result.len() <= limit).unwrap_or(false) {
if max_fragment_limit.map(|limit| result.len() >= limit).unwrap_or(false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 , sorry.

break;
}

Expand All @@ -305,7 +305,9 @@ pub fn prove_warp_sync<Block: BlockT, B: BlockchainBackend<Block>>(
index = index + One::one();
}

if result.last().as_ref().map(|head| head.header.number()) != Some(&end_number) {
let at_limit = max_fragment_limit.map(|limit| result.len() >= limit).unwrap_or(false);

if !at_limit && result.last().as_ref().map(|head| head.header.number()) != Some(&end_number) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 when at_limit we probably had twice the fragment
Actually realize this condition was not really simple to read, the point was just add the last finalized block if not already added, so I added a code suggestion bellow.

let header = blockchain.expect_header(end)?;
if let Some(justification) = blockchain.justification(BlockId::Number(end_number.clone()))? {
result.push(AuthoritySetProofFragment {
Expand All @@ -328,7 +330,7 @@ fn get_warp_sync_proof_fragment<Block: BlockT, B: BlockchainBackend<Block>>(
) -> sp_blockchain::Result<Option<(AuthoritySetProofFragment<Block::Header>, NumberFor<Block>)>> {
if let Some(cache) = cache.as_mut() {
if let Some(result) = cache.get_item(index) {
return Ok(result.clone());
return Ok(result);
}
}

Expand Down Expand Up @@ -541,20 +543,20 @@ impl<Block: BlockT> BlockJustification<Block::Header> for GrandpaJustification<B

/// Simple cache for warp sync queries.
pub struct WarpSyncFragmentCache<Header: HeaderT> {
header_has_proof_fragment: std::collections::HashMap<Header::Number, bool>,
cache: linked_hash_map::LinkedHashMap<
Header::Number,
Option<(AuthoritySetProofFragment<Header>, Header::Number)>,
(AuthoritySetProofFragment<Header>, Header::Number),
>,
headers_with_justification: usize,
limit: usize,
}

impl<Header: HeaderT> WarpSyncFragmentCache<Header> {
/// Instantiate a new cache for the warp sync prover.
pub fn new(size: usize) -> Self {
WarpSyncFragmentCache {
header_has_proof_fragment: Default::default(),
cache: Default::default(),
headers_with_justification: 0,
limit: size,
}
}
Expand All @@ -564,31 +566,32 @@ impl<Header: HeaderT> WarpSyncFragmentCache<Header> {
at: Header::Number,
item: Option<(AuthoritySetProofFragment<Header>, Header::Number)>,
) {
if self.cache.len() == self.limit {
self.pop_one();
}
if item.is_some() {
// we do not check previous value as cached value is always supposed to
// be queried before calling 'new_item'.
self.headers_with_justification += 1;
self.header_has_proof_fragment.insert(at, item.is_some());

if let Some(item) = item {
if self.cache.len() == self.limit {
self.pop_one();
}

self.cache.insert(at, item);
}
self.cache.insert(at, item);
}

fn pop_one(&mut self) {
while let Some(v) = self.cache.pop_front() {
if v.1.is_some() {
self.headers_with_justification -= 1;
break;
}
if let Some((header_number, _)) = self.cache.pop_front() {
self.header_has_proof_fragment.remove(&header_number);
}
}

fn get_item(
&mut self,
block: Header::Number,
) -> Option<&mut Option<(AuthoritySetProofFragment<Header>, Header::Number)>> {
self.cache.get_refresh(&block)
) -> Option<Option<(AuthoritySetProofFragment<Header>, Header::Number)>> {
match self.header_has_proof_fragment.get(&block) {
Some(true) => Some(self.cache.get_refresh(&block).cloned()),
Some(false) => Some(None),
None => None
}
}
}

Expand Down