-
Notifications
You must be signed in to change notification settings - Fork 766
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make candidate validation bounded again #2125
Make candidate validation bounded again #2125
Conversation
Maybe I don't have the full picture, but how does this PR introduce boundedness to the number of spawned tasks? AFAICT, the number is still not bounded. The tasks are just added to a FuturesUnordered collection. Maybe we want to check the size before pushing another one? |
It's totally possible that I'm doing something wrong as I'm not a Rust async code expert yet (but trying to become one 🙂) My idea was like this: when |
Ah, you probably mean that the number of added tasks is not bounded by itself, so no real backpressure is channeled up from the candidate validation subsys... Yes, looks like a problem I've overseen. I'll think about how to fix it (and ideas are appreciated). |
Yes, that's what I was referring to. I guess we need to add check the size of the FuturesUnordered collection before pushing another element there |
Well, it looked easy at first, but after a lot of research, reading, and chatting with people, I gave birth to this monster. I'd definitely appreciate any help in improving it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, left you some other ideas if you want to experiment with them.
.zip(stream::repeat(sender)) | ||
// FIXME: The backlog size here is the same as in PVF host queues. It should either use a | ||
// common constant, or another appropriate value should be chosen | ||
.for_each_concurrent(30, |message_and_sender| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we add the common constant now rather than the todo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can, but I'm not totally sure it makes sense. If the candidate validation messages were just retransmitted to the PVF host's queue, it would be fine. But some invariants (like ValidateFromChainState
) require candidate validation subsys to do additional work involving runtime requests on which it await
s. So, if 30 ValidateFromChainState
messages arrive, they are processed concurrently, and the PVF host queue is empty. If some ValidateFromExhaustive
arrives at the moment, it is waiting until a free slot appears. But it could indeed fall through into the queue as it doesn't require any additional processing. That's why I think the limit here could be higher than the queue limit.
On the other hand, 30 concurrent ValidateFromChainState
s is an unlikely situation. The real message stream is mixed Validate*
requests with a handful of pre-checking requests. So I don't know, I just cannot decide which value is appropriate here.
.zip(stream::repeat(sender)) | ||
// FIXME: The backlog size here is the same as in PVF host queues. It should either use a | ||
// common constant, or another appropriate value should be chosen | ||
.for_each_concurrent(30, |message_and_sender| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing is that for_each_concurrent
is not equivalent with calling ctx.spawn
several times, because
the closure are run concurrently (but not in parallel– this combinator does not introduce any threads).
ctx.spawn gives us the opportunity if this being possibly run in parallel, not sure if it really matters, it might not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Speculatively, it has to be okay. What it actually does under the hood is to send a payload to the PVF host and then wait for the answer. All the actual processing is done by the PVF host within its own threading model. But I'm not 100% sure I'm not missing something not obvious here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, it also does code decompression, which can take a couple of ms: #599.
So that means that one future will block all other from making progress, since they are not being spawned on new tasks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, fair enough. And now we have a dilemma. Go back to the FuturesUnordered
and add a semaphore, or move the decompression to the validation host (something we want to do anyway) and leave this PR as-is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your call :D.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Benchmarked the decompression with the Moonbeam runtime (a large one), it takes 4.33 msec, not the end of the world, but still significant.
As far as I understand the problem, moving the decompression to the PVF host side (which requires some effort by itself) is only meaningful if we run the PVF host on the blocking pool? And if we ever switch to non-blocking (which we want to do), we'll still have the same problem with some executor threads being blocked in decompression?
If that's true, it's better not to move it but spawn it as a blocking task on the subsystem side. I benchmarked that just in case; it doesn't introduce any overhead and is a future-proof solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good to me. or we can just use regular spawn like before and see if #599 is worth taking a look at separately.
maybe it's too much of a micro-optimisation that doesn't make any difference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And if we ever switch to non-blocking (which we want to do), we'll still have the same problem with some executor threads being blocked in decompression?
If that's true, it's better not to move it but spawn it as a blocking task on the subsystem side.
Sounds good to me too. I do think the PVF host should be non-blocking as it does no blocking work itself. It waits on the child processes but that shouldn't block.
let mut res = Ok(()); | ||
let sender = ctx.sender().to_owned(); | ||
|
||
let read_stream = stream::poll_fn(|c| loop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another possible way for implementing this would be to use a async_semaphore.
let s = Semaphore::new(MAX_CONCURENT_REQUESTS))
s.acquire() before all the spawn and you move the resulting guard in the future.
This reverts commit 7f1698c.
…didate-validation
Okay, so here's take three: semaphores. I didn't do anything with decompression yet (as it's a micro-optimization anyway and may be left for a follow-up). The problem is, IIUC, we cannot spawn tasks from other tasks. The subsystem context exposing I could've spawned my own thread pool from a task to do parallel decompressions, but it sounds like firing a cannon at sparrows. I can also spawn a separate task on the blocking pool that will do decompressions, but in that case, it will do it FIFO; that is, tasks requesting decompression will not be blocked but will wait until all the decompression requests submitted by other tasks are processed. All in all, it seems to be a more complex problem than it sounds, or I'm just not enlightened enough yet to invent a simple solution. |
.await; | ||
|
||
let _ = response_sender.send(precheck_result); | ||
futures::select! { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we gave up on spawn I imagined we can do something like this, what am I missing ?
--- a/polkadot/node/core/candidate-validation/src/lib.rs
+++ b/polkadot/node/core/candidate-validation/src/lib.rs
@@ -152,6 +152,8 @@ async fn run<Context>(
)
.await;
ctx.spawn_blocking("pvf-validation-host", task.boxed())?;
+ use async_semaphore::Semaphore;
+ let semaphore = Arc::new(Semaphore::new(10));
loop {
match ctx.recv().await? {
@@ -171,9 +173,12 @@ async fn run<Context>(
let mut sender = ctx.sender().clone();
let metrics = metrics.clone();
let validation_host = validation_host.clone();
+ let guard = semaphore.acquire_arc().await;
async move {
let _timer = metrics.time_validate_from_chain_state();
+ let _guard = guard;
+
let res = validate_from_chain_state(
&mut sender,
validation_host,
@@ -205,9 +210,12 @@ async fn run<Context>(
let bg = {
let metrics = metrics.clone();
let validation_host = validation_host.clone();
+ let guard = semaphore.acquire_arc().await;
async move {
let _timer = metrics.time_validate_from_exhaustive();
+ let _guard = guard;
+
let res = validate_candidate_exhaustive(
validation_host,
validation_data,
@@ -236,8 +244,11 @@ async fn run<Context>(
let bg = {
let mut sender = ctx.sender().clone();
let validation_host = validation_host.clone();
+ let guard = semaphore.acquire_arc().await;
async move {
+ let _guard = guard;
+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never mind :D I was dumb we need the select if don't want to block signals processing while the queue is full.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was dumb we need the select if don't want to block signals processing while the queue is full.
AFAICT, in its current form, this PR will still not process signals while the queue is full. The select
only helps with the other tasks being processed concurrently with the overseer messages (if we were using spawn, we wouldn't need the select)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, and as far as I understand, that's exactly what we want. When the queue is full, signal processing stops, thus channeling the backpressure upstream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we'd like to not block signal processing when the queue is full (which is really only for Conclude) we should add a separate task that handles CandidateValidationMessage
s (and the main task sends work to it via a channel).
Not having this would prevent the subsystem from shutting down if the queue is full I guess. Not sure if it's a real problem with a queue size of 10.. idk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a dumb question but how exactly do signals and messages differ? Signals only notify of some change in state (i.e. shutdown, leaf update, etc)? Do they only originate from the overseer or from somewhere else as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly signals originate from the overseer and are prioritized.
The suggestion is to have an additional call recv_signal(), which only receives signals, but no messages ever. Indeed, we could make it three functions in total:
- recv ... the existing call, receiving everything.
- recv_signal ... only receiving signals.
- recv_msg .... only receiving messages.
Then you would either use recv or the recv_signal/recv_msg combination, giving you more control
Prioritizing signals even more than what is done by default should not do any harm. The overseer already ensures that you are never receiving messages that have been sent after a leaf activation, before you have seen that signal as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly signals originate from the overseer and are prioritized.
Is it possible to document this @s0me0ne-unkn0wn? Also, the docs for OverseerSignal
are not very helpful: "Signals sent by an overseer to a subsystem".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signals are sent as a form of broadcast to all subsystems by integration code, here it's (from memory) from the substrate block import pipeline (with a few intermediate steps).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the better solution is to change orchestra to allow implementation of subsystems that don't really care about signals, maybe except Conclude
. What I mean is that we either have a per subsystem configurable SIGNAL_TIMEOUT
, or annotate these subsystems such that broadcast_signal
only forwards them Conclude
. Then we'd have to be careful to ensure nodes get stuck at shutdown because of this.
…didate-validation
The CI pipeline was cancelled due to failure one of the required jobs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently message_channel_capacity
is 2048, if we really want to backpressure here, we'd have to make the channel size much smaller.
@sandreim thanks for the review! I'm not going to merge before the new |
And I wonder if we should push further to achieve the real backpressure. @rphmeier what's the point of spawning background tasks for the validation in the backing subsystem? If we wanted a real backpressure, we'd need to make them foreground so that backing could be blocked if the candidate validation queue is overflowed. Leaf activations could still be processed separately in that case thanks to the new separate signal processing mechanism, but other messages like That all sounds like a correct backpressure chain to me, but I'm not sure if some time-critical work the backing subsystem does should be prioritized over the backpressure correctness. |
I gave it a local zombienet burn-in, 10 hours, 20 validators, 6 parachains. Seems to be stable. |
This PR aims to channel the backpressure of the PVF host's preparation and execution queues to the candidate validation subsystem consumers.
Related: #708