Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocols/kad/: Replace manual procedural state machines with async/await #3130

Closed
mxinden opened this issue Nov 16, 2022 · 12 comments · Fixed by #4901
Closed

protocols/kad/: Replace manual procedural state machines with async/await #3130

mxinden opened this issue Nov 16, 2022 · 12 comments · Fixed by #4901
Labels
difficulty:easy getting-started Issues that can be tackled if you don't know the internals of libp2p very well help wanted priority:nicetohave

Comments

@mxinden
Copy link
Member

mxinden commented Nov 16, 2022

Reading and writing on inbound/outbound streams in libp2p-kad is implemented via hand written procedural / sequential state machines. This logic can be simplified by using Rust's async-await.

Manual state machines in libp2p-kad:

/// State of an active outbound substream.
enum OutboundSubstreamState<TUserData> {
/// We haven't started opening the outgoing substream yet.
/// Contains the request we want to send, and the user data if we expect an answer.
PendingOpen(SubstreamProtocol<KademliaProtocolConfig, (KadRequestMsg, Option<TUserData>)>),
/// Waiting to send a message to the remote.
PendingSend(
KadOutStreamSink<NegotiatedSubstream>,
KadRequestMsg,
Option<TUserData>,
),
/// Waiting to flush the substream so that the data arrives to the remote.
PendingFlush(KadOutStreamSink<NegotiatedSubstream>, Option<TUserData>),
/// Waiting for an answer back from the remote.
// TODO: add timeout
WaitingAnswer(KadOutStreamSink<NegotiatedSubstream>, TUserData),
/// An error happened on the substream and we should report the error to the user.
ReportError(KademliaHandlerQueryErr, TUserData),
/// The substream is being closed.
Closing(KadOutStreamSink<NegotiatedSubstream>),
/// The substream is complete and will not perform any more work.
Done,
Poisoned,
}

Example of using async-await in libp2p-dcutr:

pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), UpgradeError> {
let msg = StopMessage {
r#type: stop_message::Type::Status.into(),
peer: None,
limit: None,
status: Some(Status::Ok.into()),
};
self.send(msg).await?;
let FramedParts {
io,
read_buffer,
write_buffer,
..
} = self.substream.into_parts();
assert!(
write_buffer.is_empty(),
"Expect a flushed Framed to have an empty write buffer."
);
Ok((io, read_buffer.freeze()))
}

@mxinden mxinden added priority:nicetohave difficulty:easy help wanted getting-started Issues that can be tackled if you don't know the internals of libp2p very well labels Nov 16, 2022
@GlenDC
Copy link

GlenDC commented Nov 16, 2022

Please assign it to me @mxinden . I’ll look into it!

@thomaseizinger
Copy link
Contributor

For reference, this is only possible for outbound streams. Inbound streams need to be able to be suspended to allow the behaviour to answer the incoming request.

@rkuhn
Copy link
Contributor

rkuhn commented Nov 17, 2022

Allowing suspension is the sole point of .await, so in principle this could be embedded in the same paradigm.

@thomaseizinger
Copy link
Contributor

Allowing suspension is the sole point of .await, so in principle this could be embedded in the same paradigm.

If Rust would have native generators, then yeah we would likely be able to use that and resume the suspension with arbitrary data.

Until then we are stuck with channels or hand-rolled state machines.

@rkuhn
Copy link
Contributor

rkuhn commented Nov 17, 2022

Well, Rust has native generators, they’re just exposed without the yield keyword. Fortunately, that can rather easily be implemented in terms of .await, cf. genawaiter. I’ve been using this for quite a while, works nicely and without any issues (I don’t use any of the macros, don’t need them).

@mxinden
Copy link
Member Author

mxinden commented Nov 17, 2022

Please assign it to me @mxinden . I’ll look into it!

Please post here once you started working on it. I will then assign you. Until then, in case someone else races you to it, great. We will have it done sooner and we will find another cool task for you. Sounds good?

@thomaseizinger
Copy link
Contributor

thomaseizinger commented Nov 18, 2022

Well, Rust has native generators, they’re just exposed without the yield keyword. Fortunately, that can rather easily be implemented in terms of .await, cf. genawaiter. I’ve been using this for quite a while, works nicely and without any issues (I don’t use any of the macros, don’t need them).

Yeah, I am familiar with genawaiter. It is a nice library :)

It would be interesting to toy around with it to build something like from_fn. I am not sure it is necessarily better to suspend the stream to ask for more data instead of forcing the user to gather the data ahead of time.

I think the former is more verbose (need to model the data request and response) but it can potentially yield more up-to-date results for the remote. In theory, it might also suffer from higher latency because you can't enforce how quickly the data request is answered.

Gathering the data ahead of time (like I do in #2852) has guaranteed latency properties, is simpler to write but can potentially serve slightly stale data, depending on how much work the protocol does between setting up the stream and when it uses the gathered data / when it would suspend waiting for more.

In my personal view, good network protocol should be designed to be handled autonomously i.e. they should be quick to answer for a peer and not wait for user interaction in-between1. The design I am proposing in #2852 encourages that by making it impossible to suspend a stream to wait for more data.

I think I want to write a blog post about this.

Footnotes

  1. If you need your user to interact in the overall workflow, you should likely make two network protocols out of it where the entire communication model itself is async.

@thomaseizinger
Copy link
Contributor

thomaseizinger commented Jan 10, 2023

For reference, this is only possible for outbound streams. Inbound streams need to be able to be suspended to allow the behaviour to answer the incoming request.

I wrote up a small PoC of what I believe is a reusable abstraction that allows us to more easily express request-response protocols without having to write manual state machines every time: https://github.com/thomaseizinger/rust-async-response-stream/blob/master/tests/smoke.rs

The idea is simple: It is basically a oneshot channel into a future that will send the message on the given stream. However, instead of having to construct this in every protocol, async-response-stream just requires you to pass the stream, a codec and a timeout.

The resulting future will eventually yield the received message, a Slot and a new future. The message and slot can be handed to wherever processing needs to happen. The Slot can be used to asynchronously send the response. Most notable, this is not an async function but just modifies a shared buffer (like a oneshot channel). The actual IO happens within the returned future.

When integrating this into our codebase, I was thinking that a ConnectionHandler would have 2 FuturesUnordered fields, one for receiving messages and one for sending the responses.

If this is well received, I can transfer the repository to the libp2p organisation and we can start integrating into our codebase.

Input on naming, API design and overall idea welcome!

@mxinden
Copy link
Member Author

mxinden commented Jan 23, 2023

Thanks for preparing the proof of concept.

The idea is simple: It is basically a oneshot channel into a future that will send the message on the given stream. However, instead of having to construct this in every protocol, async-response-stream just requires you to pass the stream, a codec and a timeout.

It took me a bit to understand the internals of the abstraction. In case this turns out to not be leaky, i.e. not required to be understood by users, great. In case it is leaky, using familiar concepts like oneshot directly might be easier for developers to grasp.

(Neat tricks with the marker types.)

I am in favor of moving forward here.

@thomaseizinger
Copy link
Contributor

The idea is simple: It is basically a oneshot channel into a future that will send the message on the given stream. However, instead of having to construct this in every protocol, async-response-stream just requires you to pass the stream, a codec and a timeout.

It took me a bit to understand the internals of the abstraction. In case this turns out to not be leaky, i.e. not required to be understood by users, great. In case it is leaky, using familiar concepts like oneshot directly might be easier for developers to grasp.

All naming subject to bike-shedding :)

What in particular was hard to understand? Do you have suggestions for different names? Slot could be named PendingResponse perhaps.

(Neat tricks with the marker types.)

I am in favor of moving forward here.

I think it is also worth extending the library with a Sending future that models the other end. That one could also easily be written as async move {}.boxed() but I think it is cleaner if we can use the same library for both ends of a protocol.

I'll extend the library with a Sending type and then send an initial PR that integrates it into libp2p-kad to start with so we can see what it looks like.

@Mubelotix
Copy link

Mubelotix commented Feb 7, 2023

I have been working on solving this exact problem for a protocol I'm currently implementing with libp2p. I didn't like manual state machines either. My solution consists in the handlers and behaviors to spawn async tasks for handling requests or doing useful stuff. They then poll tasks from their own poll method. You can see how I implemented it here in the KamilataHandler struct and also in the tasks/mod.rs file. My code is still in its early days and the protocol is far from finished, but I am already very satisfied with the task logic I implemented

@thomaseizinger
Copy link
Contributor

Thanks for sharing!

I think conceptually, this is equivalent to what is discussed in #3411 and #3130 (comment).

On an abstract level, yes if you move the processing of the data into the ConnectionHandler (via reference to a DB / RecordStore / etc), then you get to avoid a lot of the message passing that is currently happening. It comes at the cost of having to use synchronization primitives like mutexes as the data also needs to be shared across several ConnectionHandlers.

Message passing is nice because it makes this very explicit and avoid the possibility of deadlocks. However, exercising backpressure is also really important. I am curious what the experiments by @nazar-pc will yield.

@mergify mergify bot closed this as completed in #4901 Nov 22, 2023
mergify bot pushed a commit that referenced this issue Nov 22, 2023
This refactoring addresses several aspects of the current handler implementation:

- Remove the manual state machine for outbound streams in favor of using `async-await`.
- Use `oneshot`s to track the number of requested outbound streams
- Use `futures_bounded::FuturesMap` to track the execution of a stream, thus applying a timeout to the entire request.

Resolves: #3130.
Related: #3268.
Related: #4510.

Pull-Request: #4901.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
difficulty:easy getting-started Issues that can be tackled if you don't know the internals of libp2p very well help wanted priority:nicetohave
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants