Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement separate signal and message flows #66

Merged
merged 6 commits into from
Nov 29, 2023

Conversation

s0me0ne-unkn0wn
Copy link
Contributor

Closes #65

It's a draft. Race conditions still have to be evaluated. Probably, a better synchronization model is needed. Proper documentation is still lacking, as well.

@s0me0ne-unkn0wn s0me0ne-unkn0wn added the enhancement New feature or request label Nov 17, 2023
@s0me0ne-unkn0wn
Copy link
Contributor Author

@eskimor @sandreim okay, here I wrote... Something. It compiles and probably works in theory, but it's useless. As SubsystemContext is not Clone, and both recv_signal() and recv_msg() borrow the context exclusively for the lifetime of the future, they cannot be polled concurrently. I'll think about how to overcome that.

@sandreim
Copy link
Collaborator

TBH, this feels odd because in practice subsystems that process signal information wouldn't use these separate implementations. As mentioned in https://github.com/paritytech/polkadot-sdk/pull/2125/files#r1404211885 we shouldn't really send them in the first place rather than work around to process them.

@s0me0ne-unkn0wn
Copy link
Contributor Author

Okay, but we still need to process Conclude ASAP, not waiting for a message with 60 sec processing timeout finish, that's the point of separation 🤔

@eskimor
Copy link
Member

eskimor commented Nov 24, 2023

@eskimor @sandreim okay, here I wrote... Something. It compiles and probably works in theory, but it's useless. As SubsystemContext is not Clone, and both recv_signal() and recv_msg() borrow the context exclusively for the lifetime of the future, they cannot be polled concurrently. I'll think about how to overcome that.

By polled concurrently, you mean you can not poll them from different spawned tasks? Yes, I would not have expected that. But why would we need that? You already have a select in the code - you do a select on signals and on the long running task (FuturesUnordered) you are waiting. That does not work?

Also we need to add some docs here, saying that you always have to process signals with priority, as otherwise you might not receive messages (which assume a signal to be received).

@s0me0ne-unkn0wn
Copy link
Contributor Author

s0me0ne-unkn0wn commented Nov 24, 2023

@eskimor pseudocode:

select! {
    signal = ctx.recv_signal() => process_signal(signal), // Borrows ctx as mutable
    msg = ctx.recv_msg() => process_msg(msg), // Borrows ctx as mutable for the second time
}

@sandreim
Copy link
Collaborator

The desired usage I would expect is having two states of the subsystem(accepting more work / not accepting more work) represented by separate loops for processing incoming messages:

  • one doing regular recv() and break when we are at max capacity
  • secondary one recv_signal() and break when we are no longer at max capacity

These would be of course wrapped in the main outer loop of subsystem.

@sandreim
Copy link
Collaborator

sandreim commented Nov 24, 2023

also we don't need recv_msg() if we do it like that.

@s0me0ne-unkn0wn
Copy link
Contributor Author

The point is that all three of them, recv(), recv_msg(), and recv_signal(), alter the internal state of SubsystemContext, thus requiring synchronization if we want to use them concurrently. It's not something impossible, but it implies synchronization overhead, and I'm not sure if we want to introduce it.

@sandreim
Copy link
Collaborator

The point is that all three of them, recv(), recv_msg(), and recv_signal(), alter the internal state of SubsystemContext, thus requiring synchronization if we want to use them concurrently. It's not something impossible, but it implies synchronization overhead, and I'm not sure if we want to introduce it.

Would something like this work ?

loop {
		loop {
			select! {
				message = ctx.recv() => { if process_message(message) { break} } // break when at max capacity
				_ = running_validations.fuse() => {}, // some validation finished
			}
		} 
		loop {
			select! {
				signal = ctx.recv_signal() => {  } 
				_ = running_validations.fuse() => { if running_validations.len() < MAX_PARALLEL_VALIDATIONS { break} } // break when at below max capacity
			}
		} 
	}

@eskimor
Copy link
Member

eskimor commented Nov 24, 2023

@eskimor pseudocode:

select! {
    signal = ctx.recv_signal() => process_signal(signal), // Borrows ctx as mutable
    msg = ctx.recv_msg() => process_msg(msg), // Borrows ctx as mutable for the second time
}

That code does not really make sense. If you are awaiting both at the same time, why use separate calls? @sandreim 's code seems to do what we want.

@s0me0ne-unkn0wn
Copy link
Contributor Author

If you are awaiting both at the same time, why use separate calls? @sandreim 's code seems to do what we want.

Well yessss, probably I've got a short circuit in my brain and somehow convinced myself it cannot work... Now I see it can.

But that means this PR makes sense! Does it?

@drahnr
Copy link
Collaborator

drahnr commented Nov 25, 2023

Could you craft a specific example that cannot work with the current API but would with your proposed split?

From your initial approach I understand you'd want back-pressure on messages but not on signals, such that it becomes possible to terminate work that became obsolete?

@s0me0ne-unkn0wn
Copy link
Contributor Author

@drahnr, we already have one: the candidate validation subsystem.

The message processing times are long there (up to 60 sec). We want backpressure on that subsystem; that is, when we find out it's running too many message-processing tasks concurrently, we want it to stop recv()ing new messages, thus forcing the sender to block in his send(). But we still want to process signals in the meantime.

@drahnr
Copy link
Collaborator

drahnr commented Nov 27, 2023

@drahnr, we already have one: the candidate validation subsystem.

What I meant to say was, can we condense it to minimal example, where we can write down the exact requirements.

The message processing times are long there (up to 60 sec). We want backpressure on that subsystem; that is, when we find out it's running too many message-processing tasks concurrently, we want it to stop recv()ing new messages, thus forcing the sender to block in his send(). But we still want to process signals in the meantime.

So far I assumed want in the last sentence means to disregard unfinished work and terminate any remaining processing tasks. Is that correct or is there another reason?


Taking a scout: Currently we use select_biased! to multiplex between the signal and message stream, which would be the knee jerk reaction of fixing it, with a top-up of Peekable, but this won't work (see the doc disclaimers).

In case of the exhausted thread pool (your use case iiuc) then we'd have trouble to not consume by polling, a poll always provides Ready and to my knowledge there are no implementations that would filter based on a closure, especially not on Select<..> or SelectWithStrategy<..> types).

Hence, I don't see a way around splitting it up.

The polling would then look like:

const CONCURRENT_TASKS_LIMIT: usize = 5;
let (mut signals, mut messages) = ctx.channels_unpacked_mut(); // rename,
let mut termination_notifiers = UnorderedFutures::with_capacity(CONCURRENT_TASKS_LIMIT); 
if termination_notifiers.len() >= CONCURRENT_TASKS_LIMIT {
let r = select(signals.recv(), termination_notifiers.select_next_some()).await;
// ...
} else {
let (the_end_tx, the_end_rx)  = oneshot::channel::<()>();
termination_notifier.push(the_end_rx);
let work = async move { ..;  the_end_tx.send().await; }
let r = select(signals.recv(), message.recv());
// regular work handling ...
}

Now that said, I do see the need for the API change (assuming my assumption holds).

What I'd like to see as part of this PR:

  1. an example showcasing the new API usage
  2. make the example exhibit the need for the API
  3. add module level comment to the example explaining it concisely
  4. spring clean/green CI runs

Happy to do a in depth review, please mark it as "Ready for review" once you think the PR is there.

Thank you!

@drahnr drahnr self-assigned this Nov 27, 2023
@s0me0ne-unkn0wn
Copy link
Contributor Author

So far I assumed want in the last sentence means to disregard unfinished work and terminate any remaining processing tasks. Is that correct or is there another reason?

Yes, that's correct. If we get Conclude it doesn't make sense to keep waiting for the validations already running as the subsystem requesting validation has likely already been shut down by that time.

I might also imagine a situation where a subsystem benefits from processing leaf activation signals while waiting for long-running work to complete and not accepting new messages, but I don't have an in vivo example for that.

Besides that, there are subsystems (e.g. PVF pre-checking) that only process signals, not messages. Providing them with a signals-only endpoint could slightly benefit performance, lifting the need for message-to-signal synchronization. That is speculative, I'm not sure that holds (looking from another angle, no messages -> no synchronization -> no overhead), just one more thought.

@drahnr
Copy link
Collaborator

drahnr commented Nov 27, 2023

@s0me0ne-unkn0wn To be clear, when I am talking about an example, I mean independent of the polkadot codebase able to be implemented in a single succinct exmples/*.rs file and being implemented as part of this PR. orchestra is meant to serve as an execution framework beyond polkadot.

Besides that, there are subsystems (e.g. PVF pre-checking) that only process signals, not messages. Providing them with a signals-only endpoint could slightly benefit performance, lifting the need for message-to-signal synchronization. That is speculative, I'm not sure that holds (looking from another angle, no messages -> no synchronization -> no overhead), just one more thought.

I don't think that changes anything, if the subsystem doesn't receive any messages, the only overhead is an additional call to poll on the message channel which is atomic read+cmp. That's it. There is no message-to-signal synchronization unless you reference select_biased? The sync is an additional atomic write and read. I'd be surprised if measurements would show anything above noise level.

Comment on lines 497 to 516
async fn recv_signal(&mut self) -> ::std::result::Result<#signal, #error_ty> {
self.signals.next().await.ok_or(#support_crate ::OrchestraError::Context(
"Signal channel is terminated and empty.".to_owned(),
).into())
}

async fn recv_msg(&mut self) -> ::std::result::Result<Self::Message, #error_ty> {
loop {
if let Some((needs_signals_received, msg)) = self.pending_incoming.take() {
self.signals_received.wait_until(|v| v >= needs_signals_received).await;
return Ok(msg);
}
let msg = self.messages.next().await.ok_or(
#support_crate ::OrchestraError::Context(
"Message channel is terminated and empty.".to_owned()
)
)?;
self.pending_incoming = Some((msg.signals_received, msg.message));
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work, since it requires two borrows of self if both streams are to be polled.

Provide a method that produces two adapters.

fn incoming_spliter(&mut) -> (SplitOfMessageStream<Self::Message>, SplitOfSignalStream) {
}

where both SplitOfMessageStream and SplitOfSignalStream does impl Stream or exposes async fn recv()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work, since it requires two borrows of self if both streams are to be polled.

That is exactly what was discussed earlier in this thread :)
This comment by @sandreim explains how it is supposed to work (and before he explained that I was doubting if it could work myself).

@@ -217,22 +217,54 @@ pub type SubsystemIncomingMessages<M> = self::stream::SelectWithStrategy<
(),
>;

#[derive(Debug, Default)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing documentation of its purpose.

Copy link
Collaborator

@drahnr drahnr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current API of separate recv_signal and recv_msg won't work for cases where work is being processed asynchronously from what I understand.

@sandreim
Copy link
Collaborator

We just need recv_signal . I think that should be fine

Copy link
Collaborator

@sandreim sandreim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A test for recv_signal and this is good to go.

@s0me0ne-unkn0wn s0me0ne-unkn0wn changed the title [DNM] Implement separate signal and message flows Implement separate signal and message flows Nov 27, 2023
@s0me0ne-unkn0wn s0me0ne-unkn0wn marked this pull request as ready for review November 27, 2023 22:13

loop {
select! {
signal = ctx.recv_signal().fuse() => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 alright, this now does make a lot more sense

Copy link
Collaborator

@drahnr drahnr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

@sandreim sandreim merged commit 1912785 into master Nov 29, 2023
7 checks passed
@s0me0ne-unkn0wn
Copy link
Contributor Author

Is there a plan to release a new orchestra crate including this change, so we could make use of it in external projects?

sandreim pushed a commit that referenced this pull request Jan 29, 2024
* fixup! Implement separate signal and message flows (#66)

* Address review feedback

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>

---------

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Provide separate sinks for signals and messages
4 participants