Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Introduce async runtime calling trait for runtime-api subsystem (#5782)
Browse files Browse the repository at this point in the history
* Implement OverseerRuntimeClient

* blockchainevents

* Update patches

* Finish merging rntime-api subsystem

* First version that is able to produce blocks

* Make OverseerRuntimeClient async

* Move overseer notification stream forwarding to cumulus

* Remove unused imports

* Add more logging to collator-protocol

* Lockfile

* Use hashes in OverseerRuntimeClient

* Move OverseerRuntimeClient into extra module

* Fix old session info call and make HeadSupportsParachain async

* Improve naming of trait

* Cleanup

* Remove unused From trait implementation

* Remove unwanted debug print

* Move trait to polkadot-node-subsystem-types

* Add sections to runtime client

Co-authored-by: Davide Galassi <davxy@datawok.net>

* Reorder methods

* Fix spelling

* Fix spacing in Cargo.toml

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Remove unused babe methods

Co-authored-by: Davide Galassi <davxy@datawok.net>
Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
  • Loading branch information
3 people authored Jul 20, 2022
1 parent f6831e9 commit bb1bf69
Show file tree
Hide file tree
Showing 15 changed files with 467 additions and 74 deletions.
11 changes: 9 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/core/approval-voting/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ sp-application-crypto = { git = "https://github.com/paritytech/substrate", branc
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }

[dev-dependencies]
async-trait = "0.1.56"
parking_lot = "0.12.0"
rand_core = "0.5.1" # should match schnorrkel
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down
4 changes: 3 additions & 1 deletion node/core/approval-voting/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use polkadot_primitives::v2::{
use std::time::Duration;

use assert_matches::assert_matches;
use async_trait::async_trait;
use parking_lot::Mutex;
use sp_keyring::sr25519::Keyring as Sr25519Keyring;
use sp_keystore::CryptoStore;
Expand Down Expand Up @@ -117,8 +118,9 @@ pub mod test_constants {

struct MockSupportsParachains;

#[async_trait]
impl HeadSupportsParachains for MockSupportsParachains {
fn head_supports_parachains(&self, _head: &Hash) -> bool {
async fn head_supports_parachains(&self, _head: &Hash) -> bool {
true
}
}
Expand Down
7 changes: 4 additions & 3 deletions node/core/runtime-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ gum = { package = "tracing-gum", path = "../../gum" }
memory-lru = "0.1.0"
parity-util-mem = { version = "0.11.0", default-features = false }

sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" }

polkadot-primitives = { path = "../../../primitives" }
polkadot-node-subsystem = {path = "../../subsystem" }
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-node-subsystem-types = { path = "../../subsystem-types" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }

[dev-dependencies]
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = { version = "0.3.21", features = ["thread-pool"] }
Expand Down
57 changes: 19 additions & 38 deletions node/core/runtime-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,8 @@ use polkadot_node_subsystem::{
messages::{RuntimeApiMessage, RuntimeApiRequest as Request},
overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult,
};
use polkadot_primitives::{
runtime_api::ParachainHost,
v2::{Block, BlockId, Hash},
};

use sp_api::ProvideRuntimeApi;
use sp_authority_discovery::AuthorityDiscoveryApi;
use sp_consensus_babe::BabeApi;
use polkadot_node_subsystem_types::RuntimeApiSubsystemClient;
use polkadot_primitives::v2::Hash;

use cache::{RequestResult, RequestResultCache};
use futures::{channel::oneshot, prelude::*, select, stream::FuturesUnordered};
Expand Down Expand Up @@ -88,8 +82,7 @@ impl<Client> RuntimeApiSubsystem<Client> {
#[overseer::subsystem(RuntimeApi, error = SubsystemError, prefix = self::overseer)]
impl<Client, Context> RuntimeApiSubsystem<Client>
where
Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Client: RuntimeApiSubsystemClient + Send + Sync + 'static,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
SpawnedSubsystem { future: run(ctx, self).boxed(), name: "runtime-api-subsystem" }
Expand All @@ -98,8 +91,7 @@ where

impl<Client> RuntimeApiSubsystem<Client>
where
Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Client: RuntimeApiSubsystemClient + Send + 'static + Sync,
{
fn store_cache(&mut self, result: RequestResult) {
use RequestResult::*;
Expand Down Expand Up @@ -282,7 +274,7 @@ where
};

let request = async move {
let result = make_runtime_api_request(client, metrics, relay_parent, request);
let result = make_runtime_api_request(client, metrics, relay_parent, request).await;
let _ = sender.send(result);
}
.boxed();
Expand Down Expand Up @@ -317,8 +309,7 @@ async fn run<Client, Context>(
mut subsystem: RuntimeApiSubsystem<Client>,
) -> SubsystemResult<()>
where
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Client: RuntimeApiSubsystemClient + Send + Sync + 'static,
{
loop {
// Let's add some back pressure when the subsystem is running at `MAX_PARALLEL_REQUESTS`.
Expand Down Expand Up @@ -348,26 +339,21 @@ where
}
}

fn make_runtime_api_request<Client>(
async fn make_runtime_api_request<Client>(
client: Arc<Client>,
metrics: Metrics,
relay_parent: Hash,
request: Request,
) -> Option<RequestResult>
where
Client: ProvideRuntimeApi<Block>,
Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
Client: RuntimeApiSubsystemClient + 'static,
{
use sp_api::ApiExt;

let _timer = metrics.time_make_runtime_api_request();

macro_rules! query {
($req_variant:ident, $api_name:ident ($($param:expr),*), ver = $version:literal, $sender:expr) => {{
let sender = $sender;
let api = client.runtime_api();

let runtime_version = api.api_version::<dyn ParachainHost<Block>>(&BlockId::Hash(relay_parent))
let runtime_version = client.api_version_parachain_host(relay_parent).await
.unwrap_or_else(|e| {
gum::warn!(
target: LOG_TARGET,
Expand All @@ -385,7 +371,7 @@ where
});

let res = if runtime_version >= $version {
api.$api_name(&BlockId::Hash(relay_parent) $(, $param.clone() )*)
client.$api_name(relay_parent $(, $param.clone() )*).await
.map_err(|e| RuntimeApiError::Execution {
runtime_api_name: stringify!($api_name),
source: std::sync::Arc::new(e),
Expand All @@ -404,11 +390,7 @@ where

match request {
Request::Version(sender) => {
let api = client.runtime_api();

let runtime_version = match api
.api_version::<dyn ParachainHost<Block>>(&BlockId::Hash(relay_parent))
{
let runtime_version = match client.api_version_parachain_host(relay_parent).await {
Ok(Some(v)) => Ok(v),
Ok(None) => Err(RuntimeApiError::NotSupported { runtime_api_name: "api_version" }),
Err(e) => Err(RuntimeApiError::Execution {
Expand Down Expand Up @@ -465,25 +447,24 @@ where
Request::CandidateEvents(sender) =>
query!(CandidateEvents, candidate_events(), ver = 1, sender),
Request::SessionInfo(index, sender) => {
let api = client.runtime_api();
let block_id = BlockId::Hash(relay_parent);

let api_version = api
.api_version::<dyn ParachainHost<Block>>(&BlockId::Hash(relay_parent))
let api_version = client
.api_version_parachain_host(relay_parent)
.await
.unwrap_or_default()
.unwrap_or_default();

let res = if api_version >= 2 {
let res =
api.session_info(&block_id, index).map_err(|e| RuntimeApiError::Execution {
let res = client.session_info(relay_parent, index).await.map_err(|e| {
RuntimeApiError::Execution {
runtime_api_name: "SessionInfo",
source: std::sync::Arc::new(e),
});
}
});
metrics.on_request(res.is_ok());
res
} else {
#[allow(deprecated)]
let res = api.session_info_before_version_2(&block_id, index).map_err(|e| {
let res = client.session_info_before_version_2(relay_parent, index).await.map_err(|e| {
RuntimeApiError::Execution {
runtime_api_name: "SessionInfo",
source: std::sync::Arc::new(e),
Expand Down
18 changes: 12 additions & 6 deletions node/core/runtime-api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@ use ::test_helpers::{dummy_committed_candidate_receipt, dummy_validation_code};
use polkadot_node_primitives::{BabeAllowedSlots, BabeEpoch, BabeEpochConfiguration};
use polkadot_node_subsystem::SpawnGlue;
use polkadot_node_subsystem_test_helpers::make_subsystem_context;
use polkadot_primitives::v2::{
AuthorityDiscoveryId, BlockNumber, CandidateEvent, CandidateHash, CommittedCandidateReceipt,
CoreState, DisputeState, GroupRotationInfo, Id as ParaId, InboundDownwardMessage,
InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement,
ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash,
ValidatorId, ValidatorIndex, ValidatorSignature,
use polkadot_primitives::{
runtime_api::ParachainHost,
v2::{
AuthorityDiscoveryId, Block, BlockNumber, CandidateEvent, CandidateHash,
CommittedCandidateReceipt, CoreState, DisputeState, GroupRotationInfo, Id as ParaId,
InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption,
PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo,
ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
},
};
use sp_api::ProvideRuntimeApi;
use sp_authority_discovery::AuthorityDiscoveryApi;
use sp_consensus_babe::BabeApi;
use sp_core::testing::TaskExecutor;
use std::{
collections::{BTreeMap, HashMap},
Expand Down
1 change: 1 addition & 0 deletions node/overseer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ gum = { package = "tracing-gum", path = "../gum" }
lru = "0.7"
parity-util-mem = { version = "0.11.0", default-features = false }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
async-trait = "0.1.56"

[dev-dependencies]
metered = { package = "prioritized-metered-channel", path = "../metered-channel" }
Expand Down
5 changes: 4 additions & 1 deletion node/overseer/examples/minimal-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use futures::{channel::oneshot, pending, pin_mut, select, stream, FutureExt, StreamExt};
use futures_timer::Delay;
use orchestra::async_trait;
use std::time::Duration;

use ::test_helpers::{dummy_candidate_descriptor, dummy_hash};
Expand All @@ -34,8 +35,10 @@ use polkadot_overseer::{
use polkadot_primitives::v2::{CandidateReceipt, Hash};

struct AlwaysSupportsParachains;

#[async_trait]
impl HeadSupportsParachains for AlwaysSupportsParachains {
fn head_supports_parachains(&self, _head: &Hash) -> bool {
async fn head_supports_parachains(&self, _head: &Hash) -> bool {
true
}
}
Expand Down
37 changes: 16 additions & 21 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,7 @@ use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, St
use lru::LruCache;

use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
use polkadot_primitives::{
runtime_api::ParachainHost,
v2::{Block, BlockId, BlockNumber, Hash},
};
use sp_api::{ApiExt, ProvideRuntimeApi};
use polkadot_primitives::v2::{Block, BlockNumber, Hash};

use polkadot_node_subsystem_types::messages::{
ApprovalDistributionMessage, ApprovalVotingMessage, AvailabilityDistributionMessage,
Expand All @@ -89,6 +85,7 @@ use polkadot_node_subsystem_types::messages::{
pub use polkadot_node_subsystem_types::{
errors::{SubsystemError, SubsystemResult},
jaeger, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, OverseerSignal,
RuntimeApiSubsystemClient,
};

pub mod metrics;
Expand Down Expand Up @@ -157,25 +154,20 @@ impl<S: SpawnNamed + Clone + Send + Sync> crate::gen::Spawner for SpawnGlue<S> {
}

/// Whether a header supports parachain consensus or not.
#[async_trait::async_trait]
pub trait HeadSupportsParachains {
/// Return true if the given header supports parachain consensus. Otherwise, false.
fn head_supports_parachains(&self, head: &Hash) -> bool;
async fn head_supports_parachains(&self, head: &Hash) -> bool;
}

#[async_trait::async_trait]
impl<Client> HeadSupportsParachains for Arc<Client>
where
Client: ProvideRuntimeApi<Block>,
Client::Api: ParachainHost<Block>,
Client: RuntimeApiSubsystemClient + Sync + Send,
{
fn head_supports_parachains(&self, head: &Hash) -> bool {
let id = BlockId::Hash(*head);
async fn head_supports_parachains(&self, head: &Hash) -> bool {
// Check that the `ParachainHost` runtime api is at least with version 1 present on chain.
self.runtime_api()
.api_version::<dyn ParachainHost<Block>>(&id)
.ok()
.flatten()
.unwrap_or(0) >=
1
self.api_version_parachain_host(*head).await.ok().flatten().unwrap_or(0) >= 1
}
}

Expand Down Expand Up @@ -421,9 +413,12 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut hand
/// # fn main() { executor::block_on(async move {
///
/// struct AlwaysSupportsParachains;
///
/// #[async_trait::async_trait]
/// impl HeadSupportsParachains for AlwaysSupportsParachains {
/// fn head_supports_parachains(&self, _head: &Hash) -> bool { true }
/// async fn head_supports_parachains(&self, _head: &Hash) -> bool { true }
/// }
///
/// let spawner = sp_core::testing::TaskExecutor::new();
/// let (overseer, _handle) = dummy_overseer_builder(spawner, AlwaysSupportsParachains, None)
/// .unwrap()
Expand Down Expand Up @@ -718,7 +713,7 @@ where
// Notify about active leaves on startup before starting the loop
for (hash, number) in std::mem::take(&mut self.leaves) {
let _ = self.active_leaves.insert(hash, number);
if let Some((span, status)) = self.on_head_activated(&hash, None) {
if let Some((span, status)) = self.on_head_activated(&hash, None).await {
let update =
ActiveLeavesUpdate::start_work(ActivatedLeaf { hash, number, status, span });
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
Expand Down Expand Up @@ -780,7 +775,7 @@ where
},
};

let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)) {
let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
Some((span, status)) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: block.hash,
number: block.number,
Expand Down Expand Up @@ -837,12 +832,12 @@ where

/// Handles a header activation. If the header's state doesn't support the parachains API,
/// this returns `None`.
fn on_head_activated(
async fn on_head_activated(
&mut self,
hash: &Hash,
parent_hash: Option<Hash>,
) -> Option<(Arc<jaeger::Span>, LeafStatus)> {
if !self.supports_parachains.head_supports_parachains(hash) {
if !self.supports_parachains.head_supports_parachains(hash).await {
return None
}

Expand Down
Loading

0 comments on commit bb1bf69

Please sign in to comment.