Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/master' into master_wasmtime_fas…
Browse files Browse the repository at this point in the history
…ter_instantiation
  • Loading branch information
parity-processbot committed May 19, 2022
2 parents 0834c52 + 4596a21 commit 067d069
Show file tree
Hide file tree
Showing 11 changed files with 230 additions and 127 deletions.
5 changes: 5 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,8 @@ updates:
- dependency-name: "sub-tokens"
schedule:
interval: "daily"
- package-ecosystem: github-actions
directory: '/'
labels: ["A2-insubstantial", "B0-silent", "C1-low 📌", "E3-dependencies"]
schedule:
interval: daily
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 22 additions & 30 deletions node/core/runtime-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use sp_core::traits::SpawnNamed;

use cache::{RequestResult, RequestResultCache};
use futures::{channel::oneshot, prelude::*, select, stream::FuturesUnordered};
use std::{collections::VecDeque, pin::Pin, sync::Arc};
use std::sync::Arc;

mod cache;

Expand All @@ -51,7 +51,8 @@ mod tests;

const LOG_TARGET: &str = "parachain::runtime-api";

/// The number of maximum runtime API requests can be executed in parallel. Further requests will be buffered.
/// The number of maximum runtime API requests can be executed in parallel.
/// Further requests will backpressure the bounded channel.
const MAX_PARALLEL_REQUESTS: usize = 4;

/// The name of the blocking task that executes a runtime API request.
Expand All @@ -62,11 +63,6 @@ pub struct RuntimeApiSubsystem<Client> {
client: Arc<Client>,
metrics: Metrics,
spawn_handle: Box<dyn SpawnNamed>,
/// If there are [`MAX_PARALLEL_REQUESTS`] requests being executed, we buffer them in here until they can be executed.
waiting_requests: VecDeque<(
Pin<Box<dyn Future<Output = ()> + Send>>,
oneshot::Receiver<Option<RequestResult>>,
)>,
/// All the active runtime API requests that are currently being executed.
active_requests: FuturesUnordered<oneshot::Receiver<Option<RequestResult>>>,
/// Requests results cache
Expand All @@ -84,7 +80,6 @@ impl<Client> RuntimeApiSubsystem<Client> {
client,
metrics,
spawn_handle: Box::new(spawn_handle),
waiting_requests: Default::default(),
active_requests: Default::default(),
requests_cache: RequestResultCache::default(),
}
Expand Down Expand Up @@ -276,13 +271,12 @@ where
}

/// Spawn a runtime API request.
///
/// If there are already [`MAX_PARALLEL_REQUESTS`] requests being executed, the request will be buffered.
fn spawn_request(&mut self, relay_parent: Hash, request: Request) {
let client = self.client.clone();
let metrics = self.metrics.clone();
let (sender, receiver) = oneshot::channel();

// TODO: make the cache great again https://github.com/paritytech/polkadot/issues/5546
let request = match self.query_cache(relay_parent.clone(), request) {
Some(request) => request,
None => return,
Expand All @@ -294,21 +288,9 @@ where
}
.boxed();

if self.active_requests.len() >= MAX_PARALLEL_REQUESTS {
self.waiting_requests.push_back((request, receiver));

if self.waiting_requests.len() > MAX_PARALLEL_REQUESTS * 10 {
gum::warn!(
target: LOG_TARGET,
"{} runtime API requests waiting to be executed.",
self.waiting_requests.len(),
)
}
} else {
self.spawn_handle
.spawn_blocking(API_REQUEST_TASK_NAME, Some("runtime-api"), request);
self.active_requests.push(receiver);
}
self.spawn_handle
.spawn_blocking(API_REQUEST_TASK_NAME, Some("runtime-api"), request);
self.active_requests.push(receiver);
}

/// Poll the active runtime API requests.
Expand All @@ -322,12 +304,11 @@ where
if let Some(Ok(Some(result))) = self.active_requests.next().await {
self.store_cache(result);
}
}

if let Some((req, recv)) = self.waiting_requests.pop_front() {
self.spawn_handle
.spawn_blocking(API_REQUEST_TASK_NAME, Some("runtime-api"), req);
self.active_requests.push(recv);
}
/// Returns true if our `active_requests` queue is full.
fn is_busy(&self) -> bool {
self.active_requests.len() >= MAX_PARALLEL_REQUESTS
}
}

Expand All @@ -341,6 +322,17 @@ where
Client::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
{
loop {
// Let's add some back pressure when the subsystem is running at `MAX_PARALLEL_REQUESTS`.
// This can never block forever, because `active_requests` is owned by this task and any mutations
// happen either in `poll_requests` or `spawn_request` - so if `is_busy` returns true, then
// even if all of the requests finish before us calling `poll_requests` the `active_requests` length
// remains invariant.
if subsystem.is_busy() {
// Since we are not using any internal waiting queues, we need to wait for exactly
// one request to complete before we can read the next one from the overseer channel.
let _ = subsystem.poll_requests().await;
}

select! {
req = ctx.recv().fuse() => match req? {
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
Expand Down
19 changes: 14 additions & 5 deletions node/core/runtime-api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use super::*;

use ::test_helpers::{dummy_committed_candidate_receipt, dummy_validation_code};
use futures::channel::oneshot;
use polkadot_node_primitives::{BabeAllowedSlots, BabeEpoch, BabeEpochConfiguration};
use polkadot_node_subsystem_test_helpers::make_subsystem_context;
use polkadot_primitives::v2::{
Expand Down Expand Up @@ -847,23 +846,33 @@ fn multiple_requests_in_parallel_are_working() {
let lock = mutex.lock().unwrap();

let mut receivers = Vec::new();

for _ in 0..MAX_PARALLEL_REQUESTS * 10 {
for _ in 0..MAX_PARALLEL_REQUESTS {
let (tx, rx) = oneshot::channel();

ctx_handle
.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::AvailabilityCores(tx)),
})
.await;
receivers.push(rx);
}

// The backpressure from reaching `MAX_PARALLEL_REQUESTS` will make the test block, we need to drop the lock.
drop(lock);

for _ in 0..MAX_PARALLEL_REQUESTS * 100 {
let (tx, rx) = oneshot::channel();

ctx_handle
.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::AvailabilityCores(tx)),
})
.await;
receivers.push(rx);
}

let join = future::join_all(receivers);

drop(lock);

join.await
.into_iter()
.for_each(|r| assert_eq!(r.unwrap().unwrap(), runtime_api.availability_cores));
Expand Down
4 changes: 2 additions & 2 deletions node/network/approval-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ impl State {
|block_entry| block_entry.session == session,
|required_routing, local, validator_index| {
if *required_routing == RequiredRouting::PendingTopology {
*required_routing = topology.required_routing_for(*validator_index, local);
*required_routing = topology.required_routing_by_index(*validator_index, local);
}
},
)
Expand Down Expand Up @@ -861,7 +861,7 @@ impl State {
let local = source == MessageSource::Local;

let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| {
t.required_routing_for(validator_index, local)
t.required_routing_by_index(validator_index, local)
});

let message_state = match entry.candidates.get_mut(claimed_candidate_index as usize) {
Expand Down
60 changes: 13 additions & 47 deletions node/network/bitfield-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,18 @@ use futures::{channel::oneshot, FutureExt};

use polkadot_node_network_protocol::{
self as net_protocol,
grid_topology::{RandomRouting, RequiredRouting, SessionGridTopology},
grid_topology::{
RandomRouting, RequiredRouting, SessionBoundGridTopologyStorage, SessionGridTopology,
},
v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, Versioned, View,
};
use polkadot_node_subsystem::{
jaeger, messages::*, overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan,
SpawnedSubsystem, SubsystemError, SubsystemResult,
};
use polkadot_node_subsystem_util::{self as util};
use polkadot_primitives::v2::{
Hash, SessionIndex, SignedAvailabilityBitfield, SigningContext, ValidatorId,
};

use polkadot_primitives::v2::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId};
use rand::{CryptoRng, Rng, SeedableRng};
use std::collections::{HashMap, HashSet};

Expand Down Expand Up @@ -80,44 +81,6 @@ impl BitfieldGossipMessage {
}
}

/// A simple storage for a topology and the corresponding session index
#[derive(Default, Debug)]
struct GridTopologySessionBound(SessionGridTopology, SessionIndex);

/// A storage for the current and maybe previous topology
#[derive(Default, Debug)]
struct BitfieldGridTopologyStorage {
current_topology: GridTopologySessionBound,
prev_topology: Option<GridTopologySessionBound>,
}

impl BitfieldGridTopologyStorage {
/// Return a grid topology based on the session index:
/// If we need a previous session and it is registered in the storage, then return that session.
/// Otherwise, return a current session to have some grid topology in any case
fn get_topology(&self, idx: SessionIndex) -> &SessionGridTopology {
if let Some(prev_topology) = &self.prev_topology {
if idx == prev_topology.1 {
return &prev_topology.0
}
}
// Return the current topology by default
&self.current_topology.0
}

/// Update the current topology preserving the previous one
fn update_topology(&mut self, idx: SessionIndex, topology: SessionGridTopology) {
let old_current =
std::mem::replace(&mut self.current_topology, GridTopologySessionBound(topology, idx));
self.prev_topology.replace(old_current);
}

/// Returns a current grid topology
fn get_current_topology(&self) -> &SessionGridTopology {
&self.current_topology.0
}
}

/// Data used to track information of peers and relay parents the
/// overseer ordered us to work on.
#[derive(Default, Debug)]
Expand All @@ -127,7 +90,7 @@ struct ProtocolState {
peer_views: HashMap<PeerId, View>,

/// The current and previous gossip topologies
topologies: BitfieldGridTopologyStorage,
topologies: SessionBoundGridTopologyStorage,

/// Our current view.
view: OurView,
Expand Down Expand Up @@ -364,8 +327,9 @@ async fn handle_bitfield_distribution<Context>(
};

let msg = BitfieldGossipMessage { relay_parent, signed_availability };
let topology = state.topologies.get_topology(session_idx);
let required_routing = topology.required_routing_for(validator_index, true);
let topology = state.topologies.get_topology_or_fallback(session_idx);
let required_routing = topology.required_routing_by_index(validator_index, true);

relay_message(
ctx,
job_data,
Expand Down Expand Up @@ -567,8 +531,10 @@ async fn process_incoming_peer_message<Context>(

let message = BitfieldGossipMessage { relay_parent, signed_availability };

let topology = state.topologies.get_topology(job_data.signing_context.session_index);
let required_routing = topology.required_routing_for(validator_index, false);
let topology = state
.topologies
.get_topology_or_fallback(job_data.signing_context.session_index);
let required_routing = topology.required_routing_by_index(validator_index, false);

metrics.on_bitfield_received();
one_per_validator.insert(validator.clone(), message.clone());
Expand Down
6 changes: 4 additions & 2 deletions node/network/bitfield-distribution/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use assert_matches::assert_matches;
use bitvec::bitvec;
use futures::executor;
use maplit::hashmap;
use polkadot_node_network_protocol::{our_view, view, ObservedRole};
use polkadot_node_network_protocol::{
grid_topology::SessionBoundGridTopologyStorage, our_view, view, ObservedRole,
};
use polkadot_node_subsystem::{
jaeger,
jaeger::{PerLeafSpan, Span},
Expand Down Expand Up @@ -60,7 +62,7 @@ fn prewarmed_state(
let relay_parent = known_message.relay_parent.clone();
let mut topology: SessionGridTopology = Default::default();
topology.peers_x = peers.iter().cloned().collect();
let mut topologies: BitfieldGridTopologyStorage = Default::default();
let mut topologies = SessionBoundGridTopologyStorage::default();
topologies.update_topology(0_u32, topology);
ProtocolState {
per_relay_parent: hashmap! {
Expand Down
1 change: 1 addition & 0 deletions node/network/protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ thiserror = "1.0.31"
fatality = "0.0.6"
rand = "0.8"
derive_more = "0.99"
gum = { package = "tracing-gum", path = "../../gum" }

[dev-dependencies]
rand_chacha = "0.3.1"
Loading

0 comments on commit 067d069

Please sign in to comment.