-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement separate signal and message flows #66
Conversation
@eskimor @sandreim okay, here I wrote... Something. It compiles and probably works in theory, but it's useless. As |
TBH, this feels odd because in practice subsystems that process signal information wouldn't use these separate implementations. As mentioned in https://github.com/paritytech/polkadot-sdk/pull/2125/files#r1404211885 we shouldn't really send them in the first place rather than work around to process them. |
Okay, but we still need to process |
By polled concurrently, you mean you can not poll them from different spawned tasks? Yes, I would not have expected that. But why would we need that? You already have a select in the code - you do a select on signals and on the long running task (FuturesUnordered) you are waiting. That does not work? Also we need to add some docs here, saying that you always have to process signals with priority, as otherwise you might not receive messages (which assume a signal to be received). |
@eskimor pseudocode: select! {
signal = ctx.recv_signal() => process_signal(signal), // Borrows ctx as mutable
msg = ctx.recv_msg() => process_msg(msg), // Borrows ctx as mutable for the second time
} |
The desired usage I would expect is having two states of the subsystem(accepting more work / not accepting more work) represented by separate loops for processing incoming messages:
These would be of course wrapped in the main outer loop of subsystem. |
also we don't need |
The point is that all three of them, |
Would something like this work ?
|
That code does not really make sense. If you are awaiting both at the same time, why use separate calls? @sandreim 's code seems to do what we want. |
Well yessss, probably I've got a short circuit in my brain and somehow convinced myself it cannot work... Now I see it can. But that means this PR makes sense! Does it? |
Could you craft a specific example that cannot work with the current API but would with your proposed split? From your initial approach I understand you'd want back-pressure on messages but not on signals, such that it becomes possible to terminate work that became obsolete? |
@drahnr, we already have one: the candidate validation subsystem. The message processing times are long there (up to 60 sec). We want backpressure on that subsystem; that is, when we find out it's running too many message-processing tasks concurrently, we want it to stop |
What I meant to say was, can we condense it to minimal example, where we can write down the exact requirements.
So far I assumed Taking a scout: Currently we use In case of the exhausted thread pool (your use case iiuc) then we'd have trouble to not consume by polling, a Hence, I don't see a way around splitting it up. The polling would then look like: const CONCURRENT_TASKS_LIMIT: usize = 5;
let (mut signals, mut messages) = ctx.channels_unpacked_mut(); // rename,
let mut termination_notifiers = UnorderedFutures::with_capacity(CONCURRENT_TASKS_LIMIT);
if termination_notifiers.len() >= CONCURRENT_TASKS_LIMIT {
let r = select(signals.recv(), termination_notifiers.select_next_some()).await;
// ...
} else {
let (the_end_tx, the_end_rx) = oneshot::channel::<()>();
termination_notifier.push(the_end_rx);
let work = async move { ..; the_end_tx.send().await; }
let r = select(signals.recv(), message.recv());
// regular work handling ...
} Now that said, I do see the need for the API change (assuming my assumption holds). What I'd like to see as part of this PR:
Happy to do a in depth review, please mark it as "Ready for review" once you think the PR is there. Thank you! |
Yes, that's correct. If we get I might also imagine a situation where a subsystem benefits from processing leaf activation signals while waiting for long-running work to complete and not accepting new messages, but I don't have an in vivo example for that. Besides that, there are subsystems (e.g. PVF pre-checking) that only process signals, not messages. Providing them with a signals-only endpoint could slightly benefit performance, lifting the need for message-to-signal synchronization. That is speculative, I'm not sure that holds (looking from another angle, no messages -> no synchronization -> no overhead), just one more thought. |
@s0me0ne-unkn0wn To be clear, when I am talking about an example, I mean independent of the polkadot codebase able to be implemented in a single succinct
I don't think that changes anything, if the subsystem doesn't receive any messages, the only overhead is an additional call to |
async fn recv_signal(&mut self) -> ::std::result::Result<#signal, #error_ty> { | ||
self.signals.next().await.ok_or(#support_crate ::OrchestraError::Context( | ||
"Signal channel is terminated and empty.".to_owned(), | ||
).into()) | ||
} | ||
|
||
async fn recv_msg(&mut self) -> ::std::result::Result<Self::Message, #error_ty> { | ||
loop { | ||
if let Some((needs_signals_received, msg)) = self.pending_incoming.take() { | ||
self.signals_received.wait_until(|v| v >= needs_signals_received).await; | ||
return Ok(msg); | ||
} | ||
let msg = self.messages.next().await.ok_or( | ||
#support_crate ::OrchestraError::Context( | ||
"Message channel is terminated and empty.".to_owned() | ||
) | ||
)?; | ||
self.pending_incoming = Some((msg.signals_received, msg.message)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't work, since it requires two borrows of self if both streams are to be polled.
Provide a method that produces two adapters.
fn incoming_spliter(&mut) -> (SplitOfMessageStream<Self::Message>, SplitOfSignalStream) {
}
where both SplitOfMessageStream
and SplitOfSignalStream
does impl Stream
or exposes async fn recv()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't work, since it requires two borrows of self if both streams are to be polled.
That is exactly what was discussed earlier in this thread :)
This comment by @sandreim explains how it is supposed to work (and before he explained that I was doubting if it could work myself).
orchestra/src/lib.rs
Outdated
@@ -217,22 +217,54 @@ pub type SubsystemIncomingMessages<M> = self::stream::SelectWithStrategy< | |||
(), | |||
>; | |||
|
|||
#[derive(Debug, Default)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing documentation of its purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current API of separate recv_signal
and recv_msg
won't work for cases where work is being processed asynchronously from what I understand.
We just need recv_signal . I think that should be fine |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A test for recv_signal
and this is good to go.
|
||
loop { | ||
select! { | ||
signal = ctx.recv_signal().fuse() => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 alright, this now does make a lot more sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
Is there a plan to release a new |
Closes #65
It's a draft. Race conditions still have to be evaluated. Probably, a better synchronization model is needed. Proper documentation is still lacking, as well.