Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize fork sync targets handling #1199

Closed
wants to merge 6 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 74 additions & 47 deletions substrate/client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ use sp_runtime::{
};

use std::{
collections::{HashMap, HashSet},
collections::{BTreeMap, HashMap, HashSet},
iter,
ops::Range,
pin::Pin,
Expand Down Expand Up @@ -292,7 +292,7 @@ pub struct ChainSync<B: BlockT, Client> {
/// downloaded and are queued for import.
queue_blocks: HashSet<B::Hash>,
/// Fork sync targets.
fork_targets: HashMap<B::Hash, ForkTarget<B>>,
fork_targets: BTreeMap<(NumberFor<B>, B::Hash), ForkTarget<B>>,
/// A set of peers for which there might be potential block requests
allowed_requests: AllowedRequests,
/// Maximum number of peers to ask the same blocks in parallel.
Expand Down Expand Up @@ -368,7 +368,6 @@ impl<B: BlockT> PeerSync<B> {
}

struct ForkTarget<B: BlockT> {
number: NumberFor<B>,
parent_hash: Option<B::Hash>,
peers: HashSet<PeerId>,
}
Expand Down Expand Up @@ -476,8 +475,7 @@ where

fn num_sync_requests(&self) -> usize {
self.fork_targets
.values()
.filter(|f| f.number <= self.best_queued_number)
.range(..(self.best_queued_number.saturating_add(One::one()), Default::default()))
.count()
}

Expand Down Expand Up @@ -680,8 +678,8 @@ where
}

self.fork_targets
.entry(*hash)
.or_insert_with(|| ForkTarget { number, peers: Default::default(), parent_hash: None })
.entry((number, *hash))
.or_insert_with(|| ForkTarget { peers: Default::default(), parent_hash: None })
.peers
.extend(peers);
}
Expand Down Expand Up @@ -866,9 +864,8 @@ where
who,
);
self.fork_targets
.entry(peer.best_hash)
.entry((peer.best_number, peer.best_hash))
.or_insert_with(|| ForkTarget {
number: peer.best_number,
parent_hash: None,
peers: Default::default(),
})
Expand Down Expand Up @@ -1100,7 +1097,7 @@ where
// known block case
if known || self.is_already_downloading(&hash) {
trace!(target: "sync", "Known block announce from {}: {}", who, hash);
if let Some(target) = self.fork_targets.get_mut(&hash) {
if let Some(target) = self.fork_targets.get_mut(&(number, hash)) {
target.peers.insert(who);
}
return
Expand All @@ -1126,9 +1123,8 @@ where
announce.summary(),
);
self.fork_targets
.entry(hash)
.entry((number, hash))
.or_insert_with(|| ForkTarget {
number,
parent_hash: Some(*announce.header.parent_hash()),
peers: Default::default(),
})
Expand Down Expand Up @@ -1452,7 +1448,7 @@ where
/// Updates our internal state for best queued block and then goes
/// through all peers to update our view of their state as well.
fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor<B>) {
if self.fork_targets.remove(hash).is_some() {
if self.fork_targets.remove(&(number, *hash)).is_some() {
trace!(target: LOG_TARGET, "Completed fork sync {hash:?}");
}
if let Some(gap_sync) = &mut self.gap_sync {
Expand Down Expand Up @@ -2721,20 +2717,20 @@ fn peer_gap_block_request<B: BlockT>(
/// Get pending fork sync targets for a peer.
fn fork_sync_request<B: BlockT>(
id: &PeerId,
targets: &mut HashMap<B::Hash, ForkTarget<B>>,
targets: &mut BTreeMap<(NumberFor<B>, B::Hash), ForkTarget<B>>,
best_num: NumberFor<B>,
finalized: NumberFor<B>,
attributes: BlockAttributes,
check_block: impl Fn(&B::Hash) -> BlockStatus,
max_blocks_per_request: u32,
) -> Option<(B::Hash, BlockRequest<B>)> {
targets.retain(|hash, r| {
if r.number <= finalized {
targets.retain(|(number, hash), _| {
if *number <= finalized {
trace!(
target: LOG_TARGET,
"Removed expired fork sync request {:?} (#{})",
hash,
r.number,
number,
);
return false
}
Expand All @@ -2743,45 +2739,42 @@ fn fork_sync_request<B: BlockT>(
target: LOG_TARGET,
"Removed obsolete fork sync request {:?} (#{})",
hash,
r.number,
number,
);
return false
}
true
});
for (hash, r) in targets {
// Download the fork only if it is behind or not too far ahead our tip of the chain
// Otherwise it should be downloaded in full sync mode.
let range = ..(best_num.saturating_add(max_blocks_per_request.into()), Default::default());
// Iterate in reverse order to download target with the highest number first.
// Other targets might belong to the same fork, so we don't need to download them separately.
for ((number, hash), r) in targets.range(range).rev() {
Comment on lines +2751 to +2753
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain this logic in more detail? It checks the fork targets in reverse order by (number, hash) so the last block of the fork is checked first and if it's not available, it proceeds to the parent of the latest block, am I understanding this correctly?

Copy link
Author

@prybalko prybalko Aug 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite. The logic goes like this:

If there are multiple fork_targets, in what order should they be checked?
I believe the optimal approach would be to begin with the fork_target with the highest number (aka latest). If it's available, then download (number - finalized) blocks. If not, go to the next fork_target (which can be unrelated to the first one).
The optimization lies in the fact that other fork_targets with lower numbers might be part of the same fork that has already been downloaded.

Note: at this point fork_targets have already been filtered by number < best_num + max_blocks_per_request

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tested running the original test yet with this optimization applied but how much better it got, do you remember?

It looks like it won't even traverse through the fork targets that are too far in to the future which is what we want but I think this could be optimized even further. Right now if a peer is not able to provide any fork targets, it still has to go through all of them which is not optimal. Which fork targets the peer is able to provide should probably be stored so the list of targets wouldn't have to be looped through each time we call fork_sync_request(). This will admittedly complicate the design and as @dmitry-markin is already working on the syncing code, it may be preferable to not make significant refactors until we've decided how the ChainSync should be structured.

Copy link
Author

@prybalko prybalko Aug 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how much better it got, do you remember?

Althought fork_targets were clearly stored not in optimal way, I didn't see any significalt changes in preformance. In my tests, the difference was negligible, as the time spent in fork_sync_request is in microseconds even for 10k blocks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic itself looks good, but I'm not sure if this improvement makes significant difference. The thing is, due to the bug paritytech/substrate#12830 fork_sync_request is only capable of downloading forks of less than max_blocks_per_request (128 blocks). The performance difference on this scale can be not that big. For longer forks, may be, we somehow invert (or at least should) the notion of fork vs. canonical chain and download the fork as a canonical chain if we are lucky (or get stuck like in the issue above).

Also, as a downside, it looks like if we have multiple fork targets on the same fork, this change increases the chances that we hit a 128 block limit (as we are starting from the higher fork targets) and never import the fork.

The problems described are due to the bug paritytech/substrate#12830 and not due to this PR per se.

if !r.peers.contains(&id) {
continue
}
// Download the fork only if it is behind or not too far ahead our tip of the chain
// Otherwise it should be downloaded in full sync mode.
if r.number <= best_num ||
(r.number - best_num).saturated_into::<u32>() < max_blocks_per_request as u32
{
let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
let count = if parent_status == BlockStatus::Unknown {
(r.number - finalized).saturated_into::<u32>() // up to the last finalized block
} else {
// request only single block
1
};
trace!(
target: LOG_TARGET,
"Downloading requested fork {hash:?} from {id}, {count} blocks",
);
return Some((
*hash,
BlockRequest::<B> {
id: 0,
fields: attributes,
from: FromBlock::Hash(*hash),
direction: Direction::Descending,
max: Some(count),
},
))
let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
let count = if parent_status == BlockStatus::Unknown {
(*number - finalized).saturated_into::<u32>() // up to the last finalized block
} else {
trace!(target: LOG_TARGET, "Fork too far in the future: {:?} (#{})", hash, r.number);
}
// request only single block
1
};
trace!(
target: LOG_TARGET,
"Downloading requested fork {hash:?} from {id}, {count} blocks",
);
return Some((
*hash,
BlockRequest::<B> {
id: 0,
fields: attributes,
from: FromBlock::Hash(*hash),
direction: Direction::Descending,
max: Some(count),
},
))
}
None
}
Expand Down Expand Up @@ -3917,4 +3910,38 @@ mod test {
sync.peer_disconnected(&peers[1]);
assert_eq!(sync.pending_responses.len(), 0);
}

#[test]
fn fork_sync_request_prefers_latest_fork() {
let peer_id = PeerId::random();
let mut targets = (0..200u8).fold(BTreeMap::new(), |mut acc, i| {
acc.insert(
(i.into(), Hash::random()),
ForkTarget::<Block> {
parent_hash: None,
peers: vec![peer_id].into_iter().collect(),
},
);
acc
});
let (_, block_request) = fork_sync_request(
&peer_id,
&mut targets,
100,
50,
BlockAttributes::BODY,
|_| BlockStatus::Unknown,
10,
)
.expect("should have block request");

// Find expected chosen target
let expected_number = 109; // 100 + 10 - 1
let ((_, expected_hash), _) = targets
.range((expected_number, Default::default())..(expected_number + 1, Default::default()))
.next()
.unwrap();
assert_eq!(block_request.from, FromBlock::Hash(*expected_hash));
assert_eq!(block_request.max, Some(59)); // 109 - 50
}
}