Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make candidate validation bounded again #2125

Merged
merged 19 commits into from
Jan 21, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions polkadot/node/core/candidate-validation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ license.workspace = true
workspace = true

[dependencies]
async-semaphore = "1.2.0"
async-trait = "0.1.74"
futures = "0.3.21"
futures-timer = "3.0.2"
Expand Down
220 changes: 121 additions & 99 deletions polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#![deny(unused_crate_dependencies, unused_results)]
#![warn(missing_docs)]

use async_semaphore::Semaphore;
use polkadot_node_core_pvf::{
InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PossiblyInvalidError,
PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost,
Expand Down Expand Up @@ -55,10 +56,11 @@ use polkadot_primitives::{

use parity_scale_codec::Encode;

use futures::{channel::oneshot, prelude::*};
use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered};

use std::{
path::PathBuf,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -130,6 +132,105 @@ impl<Context> CandidateValidationSubsystem {
}
}

fn handle_validation_message<S>(
mut sender: S,
validation_host: ValidationHost,
metrics: Metrics,
msg: CandidateValidationMessage,
) -> Pin<Box<dyn Future<Output = ()> + Send>>
where
S: SubsystemSender<RuntimeApiMessage>,
{
match msg {
CandidateValidationMessage::ValidateFromChainState {
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => {
// let mut sender = ctx.sender().clone();
// let metrics = metrics.clone();
// let validation_host = validation_host.clone();
s0me0ne-unkn0wn marked this conversation as resolved.
Show resolved Hide resolved

// let guard = semaphore.acquire_arc().await;
async move {
// let _guard = guard;
let _timer = metrics.time_validate_from_chain_state();
let res = validate_from_chain_state(
&mut sender,
validation_host,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
.boxed()
},
CandidateValidationMessage::ValidateFromExhaustive {
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => {
// let metrics = metrics.clone();
// let validation_host = validation_host.clone();

// let guard = semaphore.acquire_arc().await;
async move {
// let _guard = guard;
let _timer = metrics.time_validate_from_exhaustive();
let res = validate_candidate_exhaustive(
validation_host,
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
.boxed()
},
CandidateValidationMessage::PreCheck {
relay_parent,
validation_code_hash,
response_sender,
..
} => {
// let mut sender = ctx.sender().clone();
// let validation_host = validation_host.clone();

// let guard = semaphore.acquire_arc().await;
async move {
// let _guard = guard;
let precheck_result =
precheck_pvf(&mut sender, validation_host, relay_parent, validation_code_hash)
.await;

let _ = response_sender.send(precheck_result);
}
.boxed()
},
}
}

#[overseer::contextbounds(CandidateValidation, prefix = self::overseer)]
async fn run<Context>(
mut ctx: Context,
Expand All @@ -156,106 +257,27 @@ async fn run<Context>(
.await?;
ctx.spawn_blocking("pvf-validation-host", task.boxed())?;

let mut tasks = FuturesUnordered::new();
// The task queue size is chosen to be somewhat bigger than the PVF host incoming queue size
// to allow exhaustive validation messages to fall through in case the tasks are clogged with
// `ValidateFromChainState` messages awaiting data from the runtime
let semaphore = Arc::new(Semaphore::new(polkadot_node_core_pvf::HOST_MESSAGE_QUEUE_SIZE * 2));

loop {
match ctx.recv().await? {
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_)) => {},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Communication { msg } => match msg {
CandidateValidationMessage::ValidateFromChainState {
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => {
let bg = {
let mut sender = ctx.sender().clone();
let metrics = metrics.clone();
let validation_host = validation_host.clone();

async move {
let _timer = metrics.time_validate_from_chain_state();
let res = validate_from_chain_state(
&mut sender,
validation_host,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
};

ctx.spawn("validate-from-chain-state", bg.boxed())?;
},
CandidateValidationMessage::ValidateFromExhaustive {
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} => {
let bg = {
let metrics = metrics.clone();
let validation_host = validation_host.clone();

async move {
let _timer = metrics.time_validate_from_exhaustive();
let res = validate_candidate_exhaustive(
validation_host,
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
)
.await;

metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
};

ctx.spawn("validate-from-exhaustive", bg.boxed())?;
},
CandidateValidationMessage::PreCheck {
relay_parent,
validation_code_hash,
response_sender,
..
} => {
let bg = {
let mut sender = ctx.sender().clone();
let validation_host = validation_host.clone();

async move {
let precheck_result = precheck_pvf(
&mut sender,
validation_host,
relay_parent,
validation_code_hash,
)
.await;

let _ = response_sender.send(precheck_result);
}
};

ctx.spawn("candidate-validation-pre-check", bg.boxed())?;
},
futures::select! {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we gave up on spawn I imagined we can do something like this, what am I missing ?

--- a/polkadot/node/core/candidate-validation/src/lib.rs
+++ b/polkadot/node/core/candidate-validation/src/lib.rs
@@ -152,6 +152,8 @@ async fn run<Context>(
        )
        .await;
        ctx.spawn_blocking("pvf-validation-host", task.boxed())?;
+       use async_semaphore::Semaphore;
+       let semaphore = Arc::new(Semaphore::new(10));
 
        loop {
                match ctx.recv().await? {
@@ -171,9 +173,12 @@ async fn run<Context>(
                                                let mut sender = ctx.sender().clone();
                                                let metrics = metrics.clone();
                                                let validation_host = validation_host.clone();
+                                               let guard = semaphore.acquire_arc().await;
 
                                                async move {
                                                        let _timer = metrics.time_validate_from_chain_state();
+                                                       let _guard = guard;
+
                                                        let res = validate_from_chain_state(
                                                                &mut sender,
                                                                validation_host,
@@ -205,9 +210,12 @@ async fn run<Context>(
                                        let bg = {
                                                let metrics = metrics.clone();
                                                let validation_host = validation_host.clone();
+                                               let guard = semaphore.acquire_arc().await;
 
                                                async move {
                                                        let _timer = metrics.time_validate_from_exhaustive();
+                                                       let _guard = guard;
+
                                                        let res = validate_candidate_exhaustive(
                                                                validation_host,
                                                                validation_data,
@@ -236,8 +244,11 @@ async fn run<Context>(
                                        let bg = {
                                                let mut sender = ctx.sender().clone();
                                                let validation_host = validation_host.clone();
+                                               let guard = semaphore.acquire_arc().await;
 
                                                async move {
+                                                       let _guard = guard;
+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind :D I was dumb we need the select if don't want to block signals processing while the queue is full.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was dumb we need the select if don't want to block signals processing while the queue is full.

AFAICT, in its current form, this PR will still not process signals while the queue is full. The select only helps with the other tasks being processed concurrently with the overseer messages (if we were using spawn, we wouldn't need the select)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, and as far as I understand, that's exactly what we want. When the queue is full, signal processing stops, thus channeling the backpressure upstream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we'd like to not block signal processing when the queue is full (which is really only for Conclude) we should add a separate task that handles CandidateValidationMessages (and the main task sends work to it via a channel).

Not having this would prevent the subsystem from shutting down if the queue is full I guess. Not sure if it's a real problem with a queue size of 10.. idk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a dumb question but how exactly do signals and messages differ? Signals only notify of some change in state (i.e. shutdown, leaf update, etc)? Do they only originate from the overseer or from somewhere else as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly signals originate from the overseer and are prioritized.

The suggestion is to have an additional call recv_signal(), which only receives signals, but no messages ever. Indeed, we could make it three functions in total:

  • recv ... the existing call, receiving everything.
  • recv_signal ... only receiving signals.
  • recv_msg .... only receiving messages.

Then you would either use recv or the recv_signal/recv_msg combination, giving you more control

Prioritizing signals even more than what is done by default should not do any harm. The overseer already ensures that you are never receiving messages that have been sent after a leaf activation, before you have seen that signal as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly signals originate from the overseer and are prioritized.

Is it possible to document this @s0me0ne-unkn0wn? Also, the docs for OverseerSignal are not very helpful: "Signals sent by an overseer to a subsystem".

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signals are sent as a form of broadcast to all subsystems by integration code, here it's (from memory) from the substrate block import pipeline (with a few intermediate steps).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the better solution is to change orchestra to allow implementation of subsystems that don't really care about signals, maybe except Conclude. What I mean is that we either have a per subsystem configurable SIGNAL_TIMEOUT, or annotate these subsystems such that broadcast_signal only forwards them Conclude. Then we'd have to be careful to ensure nodes get stuck at shutdown because of this.

comm = ctx.recv().fuse() => {
match comm {
Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_))) => {},
Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {},
Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()),
Ok(FromOrchestra::Communication { msg }) => {
let task = handle_validation_message(ctx.sender().clone(), validation_host.clone(), metrics.clone(), msg);
tasks.push(task);
}
Err(e) => return Err(SubsystemError::from(e))
}
},
_ = tasks.select_next_some() => ()
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion polkadot/node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ pub const PREPARE_BINARY_NAME: &str = "polkadot-prepare-worker";
/// The name of binary spawned to execute a PVF
pub const EXECUTE_BINARY_NAME: &str = "polkadot-execute-worker";

/// The size of incoming message queue
pub const HOST_MESSAGE_QUEUE_SIZE: usize = 10;

/// An alias to not spell the type for the oneshot sender for the PVF execution result.
pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;

Expand Down Expand Up @@ -227,7 +230,7 @@ pub async fn start(
Err(err) => return Err(SubsystemError::Context(err)),
};

let (to_host_tx, to_host_rx) = mpsc::channel(10);
let (to_host_tx, to_host_rx) = mpsc::channel(HOST_MESSAGE_QUEUE_SIZE);

let validation_host = ValidationHost { to_host_tx, security_status: security_status.clone() };

Expand Down
5 changes: 4 additions & 1 deletion polkadot/node/core/pvf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ mod worker_interface;
pub mod testing;

pub use error::{InvalidCandidate, PossiblyInvalidError, ValidationError};
pub use host::{start, Config, ValidationHost, EXECUTE_BINARY_NAME, PREPARE_BINARY_NAME};
pub use host::{
start, Config, ValidationHost, EXECUTE_BINARY_NAME, HOST_MESSAGE_QUEUE_SIZE,
PREPARE_BINARY_NAME,
};
pub use metrics::Metrics;
pub use priority::Priority;
pub use worker_interface::{framed_recv, framed_send, JOB_TIMEOUT_WALL_CLOCK_FACTOR};
Expand Down
Loading