Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare syncing for parallel sync strategies #3224

Merged
merged 11 commits into from
Feb 13, 2024
Merged
143 changes: 76 additions & 67 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
},
strategy::{
warp::{EncodedProof, WarpProofRequest, WarpSyncParams},
SyncingAction, SyncingConfig, SyncingStrategy,
StrategyKey, SyncingAction, SyncingConfig, SyncingStrategy,
},
types::{
BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent,
Expand All @@ -48,7 +48,7 @@ use futures::{
FutureExt, StreamExt,
};
use libp2p::{request_response::OutboundFailure, PeerId};
use log::{debug, error, trace};
use log::{debug, error, trace, warn};
use prometheus_endpoint::{
register, Counter, Gauge, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
};
Expand Down Expand Up @@ -214,9 +214,6 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// Syncing strategy.
strategy: SyncingStrategy<B, Client>,

/// Syncing configuration for startegies.
syncing_config: SyncingConfig,

/// Blockchain client.
client: Arc<Client>,

Expand Down Expand Up @@ -441,8 +438,7 @@ where
.map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse());

// Initialize syncing strategy.
let strategy =
SyncingStrategy::new(syncing_config.clone(), client.clone(), warp_sync_config)?;
let strategy = SyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?;

let block_announce_protocol_name = block_announce_config.protocol_name().clone();
let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
Expand Down Expand Up @@ -471,7 +467,6 @@ where
roles,
client,
strategy,
syncing_config,
network_service,
peers: HashMap::new(),
block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
Expand Down Expand Up @@ -661,8 +656,15 @@ where
Some(event) => self.process_notification_event(event),
None => return,
},
warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused =>
self.pass_warp_sync_target_block_header(warp_target_block_header),
warp_target_block_header = &mut self.warp_sync_target_block_header_rx_fused => {
if let Err(_) = self.pass_warp_sync_target_block_header(warp_target_block_header) {
error!(
target: LOG_TARGET,
"Failed to set warp sync target block header, terminating `SyncingEngine`.",
);
return
}
},
response_event = self.pending_responses.select_next_some() =>
self.process_response_event(response_event),
validation_result = self.block_announce_validator.select_next_some() =>
Expand All @@ -675,48 +677,61 @@ where

// Process actions requested by a syncing strategy.
if let Err(e) = self.process_strategy_actions() {
error!("Terminating `SyncingEngine` due to fatal error: {e:?}");
error!(
target: LOG_TARGET,
"Terminating `SyncingEngine` due to fatal error: {e:?}.",
);
return
}
}
}

fn process_strategy_actions(&mut self) -> Result<(), ClientError> {
for action in self.strategy.actions() {
for action in self.strategy.actions()? {
match action {
SyncingAction::SendBlockRequest { peer_id, request } => {
SyncingAction::SendBlockRequest { peer_id, key, request } => {
// Sending block request implies dropping obsolete pending response as we are
// not interested in it anymore (see [`SyncingAction::SendBlockRequest`]).
// Furthermore, only one request at a time is allowed to any peer.
let removed = self.pending_responses.remove(&peer_id);
self.send_block_request(peer_id, request.clone());

trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {} with {:?}, stale response removed: {}.",
peer_id,
request,
removed,
)
let removed = self.pending_responses.remove(peer_id, key);
self.send_block_request(peer_id, key, request.clone());

if removed {
warn!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {} from {:?} with {:?}. \
Stale response removed!",
peer_id,
key,
request,
)
} else {
trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {} from {:?} with {:?}.",
peer_id,
key,
request,
)
}
},
SyncingAction::CancelBlockRequest { peer_id } => {
let removed = self.pending_responses.remove(&peer_id);
SyncingAction::CancelRequest { peer_id, key } => {
let removed = self.pending_responses.remove(peer_id, key);
altonen marked this conversation as resolved.
Show resolved Hide resolved

trace!(
target: LOG_TARGET,
"Processed {action:?}, response removed: {removed}.",
);
},
SyncingAction::SendStateRequest { peer_id, request } => {
self.send_state_request(peer_id, request);
SyncingAction::SendStateRequest { peer_id, key, request } => {
self.send_state_request(peer_id, key, request);

trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendBlockRequest` to {peer_id}.",
"Processed `ChainSyncAction::SendStateRequest` to {peer_id}.",
);
},
SyncingAction::SendWarpProofRequest { peer_id, request } => {
self.send_warp_proof_request(peer_id, request.clone());
SyncingAction::SendWarpProofRequest { peer_id, key, request } => {
self.send_warp_proof_request(peer_id, key, request.clone());

trace!(
target: LOG_TARGET,
Expand All @@ -726,7 +741,7 @@ where
);
},
SyncingAction::DropPeer(BadPeer(peer_id, rep)) => {
self.pending_responses.remove(&peer_id);
self.pending_responses.remove_all(&peer_id);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
self.network_service.report_peer(peer_id, rep);
Expand All @@ -753,20 +768,8 @@ where
number,
)
},
SyncingAction::Finished => {
let connected_peers = self.peers.iter().filter_map(|(peer_id, peer)| {
peer.info.roles.is_full().then_some((
*peer_id,
peer.info.best_hash,
peer.info.best_number,
))
});
self.strategy.switch_to_next(
self.syncing_config.clone(),
self.client.clone(),
connected_peers,
)?;
},
// Nothing to do, this is handled internally by `SyncingStrategy`.
SyncingAction::Finished => {},
}
}

Expand Down Expand Up @@ -948,23 +951,18 @@ where
}
}

fn pass_warp_sync_target_block_header(&mut self, header: Result<B::Header, oneshot::Canceled>) {
fn pass_warp_sync_target_block_header(
&mut self,
header: Result<B::Header, oneshot::Canceled>,
) -> Result<(), ()> {
match header {
Ok(header) =>
if let SyncingStrategy::WarpSyncStrategy(warp_sync) = &mut self.strategy {
warp_sync.set_target_block(header);
} else {
error!(
target: LOG_TARGET,
"Cannot set warp sync target block: no warp sync strategy is active."
);
debug_assert!(false);
},
Ok(header) => self.strategy.set_warp_sync_target_block_header(header),
Err(err) => {
error!(
target: LOG_TARGET,
"Failed to get target block for warp sync. Error: {err:?}",
);
Err(())
},
}
}
Expand Down Expand Up @@ -1002,7 +1000,7 @@ where
}

self.strategy.remove_peer(&peer_id);
self.pending_responses.remove(&peer_id);
self.pending_responses.remove_all(&peer_id);
self.event_streams
.retain(|stream| stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok());
}
Expand Down Expand Up @@ -1167,7 +1165,7 @@ where
Ok(())
}

fn send_block_request(&mut self, peer_id: PeerId, request: BlockRequest<B>) {
fn send_block_request(&mut self, peer_id: PeerId, key: StrategyKey, request: BlockRequest<B>) {
if !self.peers.contains_key(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send block request to unknown peer {peer_id}");
debug_assert!(false);
Expand All @@ -1178,12 +1176,18 @@ where

self.pending_responses.insert(
peer_id,
key,
PeerRequest::Block(request.clone()),
async move { downloader.download_blocks(peer_id, request).await }.boxed(),
);
}

fn send_state_request(&mut self, peer_id: PeerId, request: OpaqueStateRequest) {
fn send_state_request(
&mut self,
peer_id: PeerId,
key: StrategyKey,
request: OpaqueStateRequest,
) {
if !self.peers.contains_key(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send state request to unknown peer {peer_id}");
debug_assert!(false);
Expand All @@ -1192,7 +1196,7 @@ where

let (tx, rx) = oneshot::channel();

self.pending_responses.insert(peer_id, PeerRequest::State, rx.boxed());
self.pending_responses.insert(peer_id, key, PeerRequest::State, rx.boxed());

match Self::encode_state_request(&request) {
Ok(data) => {
Expand All @@ -1213,7 +1217,12 @@ where
}
}

fn send_warp_proof_request(&mut self, peer_id: PeerId, request: WarpProofRequest<B>) {
fn send_warp_proof_request(
&mut self,
peer_id: PeerId,
key: StrategyKey,
request: WarpProofRequest<B>,
) {
if !self.peers.contains_key(&peer_id) {
trace!(target: LOG_TARGET, "Cannot send warp proof request to unknown peer {peer_id}");
debug_assert!(false);
Expand All @@ -1222,7 +1231,7 @@ where

let (tx, rx) = oneshot::channel();

self.pending_responses.insert(peer_id, PeerRequest::WarpProof, rx.boxed());
self.pending_responses.insert(peer_id, key, PeerRequest::WarpProof, rx.boxed());

match &self.warp_sync_protocol_name {
Some(name) => self.network_service.start_request(
Expand Down Expand Up @@ -1259,14 +1268,14 @@ where
}

fn process_response_event(&mut self, response_event: ResponseEvent<B>) {
let ResponseEvent { peer_id, request, response } = response_event;
let ResponseEvent { peer_id, key, request, response } = response_event;

match response {
Ok(Ok((resp, _))) => match request {
PeerRequest::Block(req) => {
match self.block_downloader.block_response_into_blocks(&req, resp) {
Ok(blocks) => {
self.strategy.on_block_response(peer_id, req, blocks);
self.strategy.on_block_response(peer_id, key, req, blocks);
},
Err(BlockResponseError::DecodeFailed(e)) => {
debug!(
Expand Down Expand Up @@ -1311,10 +1320,10 @@ where
},
};

self.strategy.on_state_response(peer_id, response);
self.strategy.on_state_response(peer_id, key, response);
},
PeerRequest::WarpProof => {
self.strategy.on_warp_proof_response(&peer_id, EncodedProof(resp));
self.strategy.on_warp_proof_response(&peer_id, key, EncodedProof(resp));
},
},
Ok(Err(e)) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Justification requests scheduling. [`ExtraRequests`] manages requesting justifications
//! from peers taking into account forks and their finalization (dropping pending requests
//! that don't make sense after one of the forks is finalized).

use crate::{
request_metrics::Metrics,
strategy::chain_sync::{PeerSync, PeerSyncState},
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ pub use strategy::warp::{WarpSyncParams, WarpSyncPhase, WarpSyncProgress};
pub use types::{SyncEvent, SyncEventStream, SyncState, SyncStatus, SyncStatusProvider};

mod block_announce_validator;
mod extra_requests;
mod futures_stream;
mod justification_requests;
mod pending_responses;
mod request_metrics;
mod schema;
Expand Down
Loading
Loading