Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move import queue from ChainSync to SyncingEngine #1736

Merged
merged 3 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions substrate/client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,18 @@ impl fmt::Display for BadPeer {

impl std::error::Error for BadPeer {}

/// Action that the parent of [`ChainSync`] should perform if we want to import blocks.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ImportBlocksAction<B: BlockT> {
pub origin: BlockOrigin,
pub blocks: Vec<IncomingBlock<B>>,
}

/// Result of [`ChainSync::on_block_data`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OnBlockData<Block: BlockT> {
/// The block should be imported.
Import(BlockOrigin, Vec<IncomingBlock<Block>>),
Import(ImportBlocksAction<Block>),
/// A new block request needs to be made to the given peer.
Request(PeerId, BlockRequest<Block>),
/// Continue processing events.
Expand All @@ -134,7 +141,7 @@ pub enum OnBlockJustification<Block: BlockT> {
Nothing,
/// The justification should be imported.
Import {
peer: PeerId,
peer_id: PeerId,
hash: Block::Hash,
number: NumberFor<Block>,
justifications: Justifications,
Expand Down Expand Up @@ -379,7 +386,8 @@ pub trait ChainSync<Block: BlockT>: Send {
/// Call when a peer has disconnected.
/// Canceled obsolete block request may result in some blocks being ready for
/// import, so this functions checks for such blocks and returns them.
fn peer_disconnected(&mut self, who: &PeerId);
#[must_use]
dmitry-markin marked this conversation as resolved.
Show resolved Hide resolved
fn peer_disconnected(&mut self, who: &PeerId) -> Option<ImportBlocksAction<Block>>;

/// Return some key metrics.
fn metrics(&self) -> Metrics;
Expand Down
77 changes: 64 additions & 13 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use crate::{
schema::v1::{StateRequest, StateResponse},
service::{self, chain_sync::ToServiceCommand},
warp::WarpSyncParams,
BlockRequestEvent, ChainSync, ClientError, SyncingService,
BlockRequestAction, ChainSync, ClientError, ImportBlocksAction, ImportJustificationsAction,
OnBlockResponse, SyncingService,
};

use codec::{Decode, Encode};
Expand All @@ -41,7 +42,8 @@ use futures_timer::Delay;
use libp2p::{request_response::OutboundFailure, PeerId};
use log::{debug, trace};
use prometheus_endpoint::{
register, Gauge, GaugeVec, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
register, Counter, Gauge, GaugeVec, MetricSource, Opts, PrometheusError, Registry,
SourcedGauge, U64,
};
use prost::Message;
use schnellru::{ByLength, LruMap};
Expand Down Expand Up @@ -135,6 +137,8 @@ struct Metrics {
queued_blocks: Gauge<U64>,
fork_targets: Gauge<U64>,
justifications: GaugeVec<U64>,
import_queue_blocks_submitted: Counter<U64>,
import_queue_justifications_submitted: Counter<U64>,
}

impl Metrics {
Expand Down Expand Up @@ -164,6 +168,20 @@ impl Metrics {
)?;
register(g, r)?
},
import_queue_blocks_submitted: {
let c = Counter::new(
"substrate_sync_import_queue_blocks_submitted",
"Number of blocks submitted to the import queue.",
)?;
register(c, r)?
},
import_queue_justifications_submitted: {
let c = Counter::new(
"substrate_sync_import_queue_justifications_submitted",
"Number of justifications submitted to the import queue.",
)?;
register(c, r)?
},
})
}
}
Expand Down Expand Up @@ -311,6 +329,9 @@ pub struct SyncingEngine<B: BlockT, Client> {

/// Protocol name used to send out warp sync requests
warp_sync_protocol_name: Option<ProtocolName>,

/// Handle to import queue.
import_queue: Box<dyn ImportQueueService<B>>,
}

impl<B: BlockT, Client> SyncingEngine<B, Client>
Expand Down Expand Up @@ -436,9 +457,7 @@ where
max_parallel_downloads,
max_blocks_per_request,
warp_sync_config,
metrics_registry,
network_service.clone(),
import_queue,
)?;

let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
Expand Down Expand Up @@ -501,6 +520,7 @@ where
block_downloader,
state_request_protocol_name,
warp_sync_protocol_name,
import_queue,
},
SyncingService::new(tx, num_connected, is_major_syncing),
block_announce_config,
Expand Down Expand Up @@ -728,13 +748,13 @@ where
ToServiceCommand::BlocksProcessed(imported, count, results) => {
for result in self.chain_sync.on_blocks_processed(imported, count, results) {
match result {
Ok(event) => match event {
BlockRequestEvent::SendRequest { peer_id, request } => {
Ok(action) => match action {
BlockRequestAction::SendRequest { peer_id, request } => {
// drop obsolete pending response first
self.pending_responses.remove(&peer_id);
self.send_block_request(peer_id, request);
},
BlockRequestEvent::RemoveStale { peer_id } => {
BlockRequestAction::RemoveStale { peer_id } => {
self.pending_responses.remove(&peer_id);
},
},
Expand Down Expand Up @@ -922,7 +942,10 @@ where
}
}

self.chain_sync.peer_disconnected(&peer_id);
if let Some(import_blocks_action) = self.chain_sync.peer_disconnected(&peer_id) {
self.import_blocks(import_blocks_action)
}

self.pending_responses.remove(&peer_id);
self.event_streams.retain(|stream| {
stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok()
Expand Down Expand Up @@ -1181,10 +1204,14 @@ where
PeerRequest::Block(req) => {
match self.block_downloader.block_response_into_blocks(&req, resp) {
Ok(blocks) => {
if let Some((peer_id, new_req)) =
self.chain_sync.on_block_response(peer_id, req, blocks)
{
self.send_block_request(peer_id, new_req);
match self.chain_sync.on_block_response(peer_id, req, blocks) {
OnBlockResponse::SendBlockRequest { peer_id, request } =>
self.send_block_request(peer_id, request),
OnBlockResponse::ImportBlocks(import_blocks_action) =>
self.import_blocks(import_blocks_action),
OnBlockResponse::ImportJustifications(action) =>
self.import_justifications(action),
OnBlockResponse::Nothing => {},
}
},
Err(BlockResponseError::DecodeFailed(e)) => {
Expand Down Expand Up @@ -1230,7 +1257,11 @@ where
},
};

self.chain_sync.on_state_response(peer_id, response);
if let Some(import_blocks_action) =
self.chain_sync.on_state_response(peer_id, response)
{
self.import_blocks(import_blocks_action);
}
},
PeerRequest::WarpProof => {
self.chain_sync.on_warp_sync_response(peer_id, EncodedProof(resp));
Expand Down Expand Up @@ -1337,4 +1368,24 @@ where
},
}
}

/// Import blocks.
fn import_blocks(&mut self, ImportBlocksAction { origin, blocks }: ImportBlocksAction<B>) {
if let Some(metrics) = &self.metrics {
metrics.import_queue_blocks_submitted.inc();
}

self.import_queue.import_blocks(origin, blocks);
}

/// Import justifications.
fn import_justifications(&mut self, action: ImportJustificationsAction<B>) {
if let Some(metrics) = &self.metrics {
metrics.import_queue_justifications_submitted.inc();
}

let ImportJustificationsAction { peer_id, hash, number, justifications } = action;

self.import_queue.import_justifications(peer_id, hash, number, justifications);
}
}
Loading