Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Undoing a Sink::poll_ready #2109

Open
jonhoo opened this issue Apr 1, 2020 · 4 comments
Open

Undoing a Sink::poll_ready #2109

jonhoo opened this issue Apr 1, 2020 · 4 comments
Labels
A-sink Area: futures::sink S-needs-api-design Status: Before implementing this, a discussion or decision on the new API is needed.

Comments

@jonhoo
Copy link
Contributor

jonhoo commented Apr 1, 2020

Before we had Sink::poll_ready, senders always had to have an item ready to send before interacting with the Sink. This sometimes led to unnecessary buffering. The most trivial example of this is the current implementation of StreamExt::forward, which still adheres to the "have an item before you access the sink" model. It calls Stream::poll_next, and then calls Sink::poll_ready when it gets an element. Assuming poll_ready returns Poll::Ready, the element is sent, and all is good. The issue arises if Sink::poll_ready returns Poll::Pending. The implementation must now buffer that element somewhere, and re-try Sink::poll_ready on the next call to poll before it attempts to receive another element from the Stream (this is the buffered_item field on Forward). The upside of this approach is that when we poll_ready returns Poll::Ready, we almost immediately call .start_send and "consume" the slot that poll_ready told us was available.

The alternative approach that Sink::poll_ready enabled is one that does not require any buffering, and it can be written out pretty easily:

loop {
    ready!(self.sink.poll_ready());
    if let Some(item) = ready!(self.stream.poll_recv()) {
        self.sink.start_send(item);
    } else {
        break;
    }
}

Here, we never need to buffer, since we only ever accept another item if we know we can send it immediately (I use this technique in tokio-tower for example).

Unfortunately, this example highlights a problem with Sink as it stands: we may now take a long time (potentially forever) between when we get Poll::Ready from poll_ready and when we call start_send. It might be that the client is simply not sending us any more requests. But, the Sink has promised us that we have a slot available to send, which we are still holding up. In the context of something like a bounded channel, this may mean that we are holding up one of the few slots in the channel, even though we will never use it (see #1312 and #984 (comment) for related discussion). If we have multiple of these loops (e.g., if you are forwarding multiple streams to clones of the same bounded mpsc::Sender), then you can easily end up in a livelock: all the slots of the bounded channel are held up by forwards for streams that do not have any elements to send, and as a result forward for streams that do have elements to send are unable to forward them since poll_ready returns Poll::Pending for them.

This is something we've also run into in tower, whose Service trait features a similar poll_ready mechanism. What is needed here is some way to "undo" a Sink::poll_ready that returned Poll::Ready. In tower we've had two different proposals come up. The first (tower-rs/tower#408) is to add a disarm_ready method. The idea here is that implementations of poll that discover that they won't use the slot given to them by poll_ready can "disarm"/give up that slot by calling the disarm_ready method. The second (tower-rs/tower#412) is to have poll_ready return some kind of "token" that must then be passed to start_send (tower's equivalent is called call). This makes the API more misuse-resistant (you can't call start_send without calling poll_ready), and also gives us a way to give up a poll_ready slot: just drop the token.

Neither of these proposals are necessarily the right one, but I figured I would start discussion here since this is definitely an issue with Sink (and similar traits that include a poll_ready method), and one that must be fixed before the trait is considered finished.

@jonhoo
Copy link
Contributor Author

jonhoo commented Apr 1, 2020

I'll add to this that there is only a small number of impl Sink whose poll_ready actually does some kind of reservation. Off the top of my head: anything semaphore based (lock/rwlock) and anything based on a bounded channel (like mpsc::Sender). I don't know that that observation helps, but figured I'd put it out there.

@jonhoo
Copy link
Contributor Author

jonhoo commented Apr 1, 2020

The observation here is essentially that you cannot currently yield after calling poll_ready. You have to effectively treat every call to poll_ready as a possible semaphore acquisition, or face a potential deadlock (unless you have a concrete Sink type that you know this is not the case for). In practice, this means that you pretty much have to do whatever work ultimately gets you an item to send, and then poll_ready + immediately start_send. At which point the benefits of poll_ready are lost, and we may as well just have poll_start_send and dump poll_ready. Or, as argued above, we'll need something akin to disarm.

@najamelan
Copy link
Contributor

najamelan commented Apr 3, 2020

I was just looking into the Sink API myself, and I found it rather hard to find prior discussions to why everything is the way it is, and why people don't like it (eg. it's not being nominated for std even though Stream is).

As for poll_ready, the only thing I've been able to find is that @carllerche said it was important to be able to verify that an item can be processed before having to generate it to correctly implement service back pressure.

I haven't been able to find any more justification that this is actually the case. Maybe it is. In any case, the requirement has a considerable API cost which would benefit from proper justification and probably good documentation for implementers.

It also contrasts a bit with the discussion in the PR you linked above, where it seems @carllerche argues that keeping one slot per sender is fine and you should just call poll_ready from poll_flush to make sure back pressure happens. I feel it contrasts because keeping one slot per sender defeats the point of not buffering the items before they can be processed.

I can see a number of issues with the current Sink API. A semantical single operation is split out over two methods: poll_ready and start_send opening up a can of worms:

  • clients might call start_send first
  • clients might call poll_ready and never call start_send
  • generating an item might take a long time
  • clients can drop the sink (easily worked around if you can liberate the slot in a Drop impl, so this one isn't so dramatic)
  • implementers tend to dodge the poll_ready concept by always returning ready and then spreading out the send over start_send and poll_flush, which means if you are generic over Sink, you have no guarantee that no extra buffering is taking place, and that your item will only be created when it can be processed. On the up side, that means that a single trait here can cover both cases (albeit at a cost of not respecting the buffer size set by the user as it requires each sender to buffer a slot).

In practice, this puts constraints on both the implementer and the client, and those aren't really documented very well, nor compiler enforced. And as @jonhoo points out above, even the combinators in the futures lib don't really follow the "spirit" of poll_ready, and the deadlocking problem seems to demotivate adhering to it when generating the item takes some time.

Looking at tokio::sync::mpsc::Sender, it doesn't even implement Sink anymore. However it still has both methods. It alleviates the problem somewhat by using a semaphore to count the available slots, which allows it to reserve just one slot as opposed to a design with a global lock which deadlocks as soon as any one Sender is between poll_ready and start_send, and which does not require senders to buffer a slot like I think futures::mpsc::Sender still does. It seems to strike the best compromise, allowing slot level locking and allowing an intuitive implementation of the buffer bound, since the number of items buffered corresponds exactly to the bound set by the client.

That being said, it still is susceptible to deadlocking I think (haven't tested) when all slot's are reserved by clients that don't send.

Allowing to undo a poll_ready can help relieve some pain here, but only solves part of the problem. Given the lack of a central discussion about the design of the Sink API combined with it's poor popularity, I would love for a proper discussion to take place that helps to determine the best future for this interface. I personally think it's important to have an interface for this concept and would love to see consensus and adoption around it. Stabilizing Stream without Sink feels like stabilizing AsyncRead without AsyncWrite to me.

To move forward, I see the following as essential:

  • The requirement of being able to reserve the slot seems to be antagonistic to an elegant, easy to use and implement interface, without footguns like unavoidable deadlocking. Thus I think we should really question it and hopefully settle one way or another in a way we won't regret later. As a start, are there any current users of the Sink API that count on this requirement that do something that couldn't really be done otherwise?
  • If the requirement has to stay, what are some of the best ways to relieve some pain? I can see several ways of accomplishing things:
    • what's needed is deferred creation of the item. This does not necessarily need to be through 2 methods. Other means of deferring code exist. A poll_send could take a closure or a future that will resolve to an item. The upside is, harder to misuse the API. Downside is that it might still take this closure or future a long time before materializing, if ever, so it doesn't really solve the deadlock problem. Is this practical? I don't know, given that closures and futures sometimes will have trouble capturing the necessary context, it might be inconvenient.
    • reserving a slot and undoing it. Can be useful, but requires manual intervention, and it won't solve anything for StreamExt::forward, unless you want to use some timeout and backoff mechanism.
    • Maybe it does make sense to split it in two traits, with something like ReserveSlot which clearly indicates whether the sink supports reserving a slot and which makes it more explicit. It seems that those kind of API's have been examined, but it's not really clear why it was concluded not to use them and whether the arguments outweighed this alternative.
    • as suggested below by @jonhoo, it would be possible to return a token from poll_ready and require that in start_send to let the compiler guarantee that they are called in the right order. Seems hard to implement (without GAT).
    • Define Sink as something that can process a Stream. See: Potential new sink trait design vectordotdev/vector#2122. Downside is that this requires an extra layer of indirection (a stream) when the original items don't come out of a stream.
    • ...?

@jonhoo
Copy link
Contributor Author

jonhoo commented Apr 3, 2020

One thing we've played around with in tower (which has a very similar API) is to have poll_ready return a token that then has to be passed to start_send (call is the tower equivalent). If the token is dropped, it would free the reservation. Unfortunately, it's basically impossible to get that to work without generic associated types, or by doing nasty pointer tricks or allocation on every poll_ready, as far as I can tell. You can see some of the discussion we had about it on Discord.

In my mind, there are three options:

  1. poll_ready is removed, and start_send is replaced with poll_start_send, which returns the element if the poll fails (its signature will be a little non-poll-y)
  2. poll_ready is kept in its current form, and disarm is added
  3. poll_ready and start_send are modified somehow to require that poll_ready is called first through the type system (something like a token)

I think 3 would be great, but I think it's not feasible without GAT. I think 1 is unfortunate because it means you must have the item you're going to send before seeing whether you can send it (this hinges on @carllerche's argument that this is important, which I'm inclined to believe). That leaves us with 2.

Here's why I think option 2 isn't that bad: it will essentially only be used by people implementing low-level futures interfaces. "High-level" users will likely just be using async fn send, which hides the details from them, whatever they are. Low-level users will have to know to call poll_ready first, and will have to know the implications of that (that you reserve a slot), it's true, but maybe that's okay? Or maybe that's something we can fix with naming? Something like poll_start_send (poll_ready), complete_send (start_send), and abort_send (disarm)?

@taiki-e taiki-e added the A-sink Area: futures::sink label Dec 17, 2020
@taiki-e taiki-e added this to the futures-0.4 milestone Dec 17, 2020
@taiki-e taiki-e added the S-needs-api-design Status: Before implementing this, a discussion or decision on the new API is needed. label Jul 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-sink Area: futures::sink S-needs-api-design Status: Before implementing this, a discussion or decision on the new API is needed.
Projects
None yet
Development

No branches or pull requests

3 participants