Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite the mpsc channels #984

Closed
wants to merge 1 commit into from
Closed

Conversation

cramertj
Copy link
Member

Previously, the bounded mpsc channels wouldn't provide backpressure
in many cases because they provided a guaranteed slot for every
sender. This change refactors mpsc channels to never allow more than
buffer + 1 messages in-flight at a time.

Senders will attempt to increment the number of in-flight messages
in poll-ready. If successful, the count is incremented and a message
is sent. If the buffer was full, the sender adds itself to a queue
to be awoken. When a message is received, the receiver attempts to
awaken a Sender and grant it "sending rights". If no such channel
was found, it decreases the number of in-flight messages currently
present in the buffer. If a channel with "sending rights" is dropped,
it gives those sending rights to another waiting Sender or decreases
the number of in-flight messages.

There's one odd choice in this change, which was to pick buffer + 1
rather than buffer as the max number of elements. In practice, many
of the tests and a handful of real-world code relied on the ability
to have an initial element available for the first sender, and were
attempting to send into channels with a buffer size of zero. These
use-cases were all broken by this change without the change to use
buffer + 1.

Despite the buffer + 1 mitigation, this is still a breaking change
since it modifies the maximum number of elements allowed into a
channel. 0.2 has been out for a short enough time that it's probably
possible to transition without breaking existing code, but this
approach should be taken with caution. Luckily, 0.2 organized
futures-channel into a separate crate, so it could be released as
an 0.3 version of futures-channel, futures-sink, futures-util, and
the futures facade while still maintaining futures-io and futures-core
compatibility.

Fix #800
Fix rustasync/team#11

cc @danburkert, @stjepang, @carllerche

@cramertj cramertj force-pushed the fix-channels branch 2 times, most recently from 897ad27 to bf4460e Compare April 24, 2018 01:23
Previously, the bounded mpsc channels wouldn't provide backpressure
in many cases because they provided a guaranteed slot for every
sender. This change refactors mpsc channels to never allow more than
buffer + 1 messages in-flight at a time.

Senders will attempt to increment the number of in-flight messages
in poll-ready. If successful, the count is incremented and a message
is sent. If the buffer was full, the sender adds itself to a queue
to be awoken. When a message is received, the receiver attempts to
awaken a Sender and grant it "sending rights". If no such channel
was found, it decreases the number of in-flight messages currently
present in the buffer.

There's one odd choice in this change, which was to pick buffer + 1
rather than buffer as the max number of elements. In practice, many
of the tests and a handful of real-world code relied on the ability
to have an initial element available for the first sender, and were
attempting to send into channels with a buffer size of zero. These
use-cases were all broken by this change without the change to use
buffer + 1.

Despite the buffer + 1 mitigation, this is still a breaking change
since it modifies the maximum number of elements allowed into a
channel. 0.2 has been out for a short enough time that it's probably
possible to transition without breaking existing code, but this
approach should be taken with caution. Luckily, 0.2 organized
futures-channel into a separate crate, so it could be released as
an 0.3 version of futures-channel, futures-sink, futures-util, and
the futures facade while still maintaining futures-io and futures-core
compatibility.
// Atomic, FIFO queue used to send wakers for blocked `Sender` tasks to the `Receiver`.
sender_waker_queue: Queue<Arc<Mutex<SenderWaker>>>,
// Lock for popping off of the sender_waker_queue
sender_waker_pop_lock: Mutex<()>,
Copy link
Member Author

@cramertj cramertj Apr 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't love that this is necessary, but it shouldn't be contended often-- it's only used in receiving and in the Drop impl of Sender. I considered having Sender::drop wake up the receiver to queue the item instead, but that means it's doing two wakeups instead of one, which is almost definitely going to be slower than a mutex acquire.

@jonhoo
Copy link
Contributor

jonhoo commented Apr 24, 2018

This would address #403! :D

@carllerche
Copy link
Member

I have not reviewed the code at all, but based off of the description, this sounds exactly like the strategy that was first attempted and ruled out as it was deemed flawed. I will attempt to explain the reasoning here.

The problem is that this strategy assumes two things:

  • Generating the value is fast (usually because it already exists before channel capacity is requested). This is because once capacity is assigned to a Sender, capacity is blocked until the sender actually sends a value.
  • Once capacity is assigned to a Sender, a value will either be sent or the Sender will be dropped.

Unfortunately, neither of these assumptions hold true and violating either will result in reduced latency and / or deadlocks.

For example, if using HTTP/2.0, you might use the protocol support to propagate back pressure signals from your channel to a remote peer. In this case, once the Sender has capacity, messages over the network will signal to the peer that there is capacity, then the peer must generate the message, send it back over the wire, and finally reach the Sender. This is not a quick turn around.

Another example is the tower load balancer. A load balancer instance is composed of many services (which, internally use channels). The load balancer will call poll_ready on every inner service to check if the service instance is ready to handle a request. However, it will only dispatch a request to a single service. The balancer collects metrics to determine which inner services are healthier and will push more requests to inner services that respond faster. So, if an inner service takes a long time to go from NotReady -> Ready, it might be never called. i.e., the capacity assigned to the Sender will be held indefinitely, leading to deadlocks.

Leaky abstraction

The behavior of requiring a fast turn around between requesting capacity and sending a message (or dropping the handle) is a leaky abstraction. Future places no requirements on how a future gets polled after it is notified. The proposed behavior changes this, exposing the implementation detail of the sender to the future.

The problem is, middleware like the tower balancer can't assume that each Future / Service that it is generic over might have special requirements. As far as I can tell, this would make the proposed channel unusable from such contexts.

Channels and backpressure

Since this question comes up often, I probably should take this opportunity to write up the history and the background behind the current design since everybody gets confused. If there is a better general purpose channel strategy, then great (though, IMO this is not it).

There are fundamentally two dimensions in which channel usage need to be bounded:

  • The rate at which individual Sender call sites send messages into the channel.
  • The total number of Sender call sites.

This channel implementation only bounds the first item, and I will argue that it is incorrect to attempt to bound the second one as this responsibility lies elsewhere.

Blocking channels

Lets take a step back and look at how back pressure works with your synchronous, blocking channels. With blocking channels, there can be any number of Sender clones spread out across any number of threads, each thread racing to push values into the sender.

For simplicity, lets assume that each thread only has one Sender handle. Because synchronous code is synchronous, even if more than one Sender clone is assigned to a single thread, that thread can only be attempting to send a value in one Sender clone at a time. If the channel is full, that send will block and the thread is unable to send a value in another Sender handle.

Imagine you have a channel that is bounded to 5 values and you clone the Sender handle 50 times and spawn 50 threads and each thread attempts to send a value through the Sender. The first 5 threads will successfully send and the other 45 threads will block on the send. But, the value being sent will already have been generated and is buffered by the thread. Also, the original 5 winners are able to loop and attempt to send another value, which will block (be buffered by the thread). So, the total number of values that were rendered in memory is 55. i.e. the channel capacity + the number of sender clones.

To solve this, the user of the blocking Sender must come up with some strategy external to the channel to limit the number of threads that are spawned (i.e. limit the number of call sites). This is not the responsibility of the channel.

Limiting the call sites

The original argument for changing the behavior is that send takes self, so you have to clone for each send. The fact is, even if this channel patch lands, it would not resolve the unbounded nature of the calling code.

Assuming that the pattern is:

tx.clone().send(foo);

This returns a future, which does no work by itself. Something has to spawn this future or push it into a vector somewhere. If nothing is limiting this logic at a higher level, there will be an unbounded number of spawn tasks or items pushed into storage somewhere. Even with the proposed change, unbounded buffering behavior remains.

In my experience, every case of Sender::clone growing out of bounds was gracefully solved by adding a limit at a higher level.

But everyone gets it wrong

Yes, it's true. It is a point of serious confusion. I would argue that this is due to using the functional combinators being really hard and less the behavior of the channel. When using combinators, one must employ an entirely functional programming style, threading all necessary state through the combinators. This is very very annoying and a common shortcut is to just clone into the combinators, resulting in this behavior.

It all gets better with async / await

Finally, async / await returns the current channel implementation to sanity as one will be able to send w/ &mut self and not require moving anymore. So, the following will "just work" as expected:

await!(self.tx.send(my_val))

And, async / await will prevent attempting to send on other Sender handles from within the same task, returning us to the same behavior as blocking channels.

TL;DR

  • This strategy was attempted a long time ago and it results in reduced throughput and latency.
  • The "lack of back pressure" problems is due to a user error (though not their fault) and not a channel problem.
  • Everything gets magically better with async / await.

@cramertj
Copy link
Member Author

@carllerche Thanks for your detailed reply! I agree that this solution isn't perfect for some of the reasons you describe, but I'll try and outline why I think that we can use this PR as a stepping stone to something better than the status quo. Hopefully we can start to sort out more of these issues.

For example, if using HTTP/2.0, you might use the protocol support to propagate back pressure signals from your channel to a remote peer. In this case, once the Sender has capacity, messages over the network will signal to the peer that there is capacity, then the peer must generate the message, send it back over the wire, and finally reach the Sender. This is not a quick turn around.

I agree that the backpressure implemented by this PR would not suffice for such a situation, but I don't believe that the current backpressure implementation works here either. If you want backpressure to propagate from the channel back to a remote peer, you must necessarily delay requesting a value until you can be sure that there is space available to receive and process the client's message. The current implementation doesn't allow for this type of fixed-buffer backpressure with a growable number of senders. Is there something about the way that this system currently works that makes it usable in the HTTP2 case where this PR's implementation wouldn't be usable? If you're not making use of channel backpressure to push back at a higher level, then is there a reason for not using the UnboundedXXX channel types?

There are fundamentally two dimensions in which channel usage need to be bounded:

  • The rate at which individual Sender call sites send messages into the channel.
  • The total number of Sender call sites.
    This channel implementation only bounds the first item, and I will argue that it is incorrect to attempt to bound the second one as this responsibility lies elsewhere.

The original argument for changing the behavior is that send takes self, so you have to clone for each send. The fact is, even if this channel patch lands, it would not resolve the unbounded nature of the calling code.

Assuming that the pattern is:
tx.clone().send(foo);
This returns a future, which does no work by itself. Something has to spawn this future or push it into a vector somewhere. If nothing is limiting this logic at a higher level, there will be an unbounded number of spawn tasks or items pushed into storage somewhere. Even with the proposed change, unbounded buffering behavior remains.

Yes, getting backpressure right in this case is something that requires a higher level of coordination. In my intended use-case, I'm reading reading requests from an OS channel and spawning the tx.clone().send(foo) tasks onto a FuturesUnordered. Once that FuturesUnodered hits a certain size, the task will stop pulling requests off of the OS channel until more of the tx.clone().send(foo) tasks complete. Before this PR, channels failed to prevent any of those tasks from completing, resulting in unbounded growth of the channel buffer. After this PR, the channels provide backpressure, meaning that fewer of the tx.clone().send(foo) tasks complete immediately, causing the FuturesUnordered to grow in size so that the OS channel is no longer read from, exerting backpressure on the underlying OS channel. As far as I can tell so far, this works just fine, but requires channels not to leak memory when cloned.

Finally, WRT "everything is fixed under async/await", I agree that async/await will help here tremendously as it reduces the number of cases in which channels must be cloned. However, there are other reasons for cloning channels apart from wanting to call self-consuming combinators-- one might want to spawn a number of 'static tasks which each contain a Sender. WIthout this PR, these cases won't correctly limit the channel to the configured buffer size.

@carllerche
Copy link
Member

If you want backpressure to propagate from the channel back to a remote peer, you must necessarily delay requesting a value until you can be sure that there is space available to receive and process the client's message.

This is exactly what poll_ready is for.

The current implementation doesn't allow for this type of fixed-buffer backpressure with a growable number of senders.

I'm not sure what you mean. Both implementations would support this as this is the Sink::poll_ready guarantee.

tx.clone().send(foo)

tbh, I never used the combinators personally, so I never hit the pain. However, I spoke w/ @danburkert offline a bunch about this and he explained the issue to me. The solution is a pretty simple change to the current implementation: implement Sink::poll_complete as an alias to poll_ready. This achieves the exact desired semantics. send does not complete until the value enters the channel's main buffer and not the Sender dedicated slot.

@seanmonstar
Copy link
Contributor

I was actually looking into this myself today, as the hyper test suite has been hitting the race condition daily with the change to a thread pool of executors. I thought maybe of just trying to patch the existing implementation, but eventually I came to think that probably we don't need yet another channel implementation in futures, when excellent work has been done on the libstd and crossbeam channels.

Specifically, they both better handle all the edge cases around multiple or single produces send at the same time a consumer is dropped.

The proposed changes I was thinking was to use either std or crossbeam's channel internally, and just add the task/waker stuff on top.

@cramertj
Copy link
Member Author

cramertj commented Apr 24, 2018

@seanmonstar

The proposed changes I was thinking was to use either std or crossbeam's channel internally, and just add the task/waker stuff on top.

That's basically what this is-- the underlying queue is basically the same as the one from std-- all the noise on top is for getting proper wakeups. It's annoyingly complicated to get right.

@carllerche

If you want backpressure to propagate from the channel back to a remote peer, you must necessarily delay requesting a value until you can be sure that there is space available to receive and process the client's message.
This is exactly what poll_ready is for.

Perhaps you've misunderstood this PR-- that's what I implemented. poll_ready will delay the send until there is space available on the channel to receive a message.

The current implementation doesn't allow for this type of fixed-buffer backpressure with a growable number of senders.

I'm not sure what you mean. Both implementations would support this as this is the Sink::poll_ready guarantee.

No, the current poll_ready guarantee is to only apply backpressure for buffer + num_senders slots. This PR changes it to use buffer slots, rather than adding num_senders extra. This makes it so that splitting up the work across more Senders doesn't increase the underlying number of slots.

@carllerche
Copy link
Member

@seanmonstar I have no opinion w.r.t the implementation as long as it maintains the current behavior and equal or greater performance. IIRC, I originally attempted to add a layer on top of std channel, but it was not possible w/o a mutex guarding both ends. I could be wrong as it was a long time ago.

@cramertj I understand what the PR is doing. I re-read the original snippet I quoted, and I think that I see what you mean.

Again, this conflates two dimensions of growth and I do not believe that the channel backlog should be used to guard against a growing number of call sites.

It seems like the crux of the problem is that the current implementation does not behave correctly when using combinators and I proposed a simple fix, what do you think about that propsal?

@aturon
Copy link
Member

aturon commented Apr 24, 2018

I'm gonna try to estate things for myself, basically repeating some of @carllerche's earlier points.

I think we can all agree with the following premise:

  • The goal is to limit the global number of in-flight messages destined for a given queue.

That is, when we look at the application as a whole, we want bounded "buffering", where buffering includes not just literal buffers, but also cases where a potential sender has generated a value to send but is prevented from queuing it.

That last point is key: it means that we need to coordinate not just queuing of already-produced values, but also the production of values in the first place.

The "one limit" strategy

This PR achieves our goal in the most "direct" way: by having the queue itself fully manage the global limit. To do this, it must hand out "tickets" for sending. Since the total number of tickets is bounded, so is the number of values ever produced to send (assuming senders get a ticket before producing their value).

The downside to this strategy, as @carllerche explains (and I think that @cramertj agrees), is that there is a potentially arbitrary delay between acquiring and using a ticket.

For the moment, let's focus on the potential for deadlock. As @carllerche says, reasonable abstractions elsewhere in the system may result in a given ticket for sending never being used or dropped, which can ultimately eat up the queue's capacity. @cramertj, do you agree with this potential?

The "two limits" strategy

The alternative approach, represented by the current implementation, breaks down the global limit of in-flight messages into two parts, as @carllerche articulated:

  • The rate at which individual Sender call sites send messages into the channel.
  • The total number of Sender call sites.

The first limit is imposed by the channel itself, through its limited buffer.

The second limit is imposed at a higher level in the application, where it is less likely to cause deadlock. And, in particular, this second limit will often "fall out" from other limits, e.g. on the number of active connections.

Moreover, @carllerche argues, it is not clear how to have the channel alone impose the second limit without leading to potential for deadlock and other bad inter-task interference.

Discussion

I think a core issue here is local vs global thinking. It's very tempting to think about backpressure in purely local terms: we want to avoid this particular receiver from getting too backlogged, and we do so by imposing a limit purely from its local perspective. The problem is that we can't safely impose this limit purely locally; it fundamentally involves some global coordination.

The strategy that @carllerche advocates for is one that allows individual tasks to be written in a more-or-less "isolated" style, without having to think about how e.g. grabbing a ticket may cause problems for other unrelated tasks. Global coordination is instead handled at a much higher level. That means that backpressure isn't as simple as throwing in a bounded channel, but I think in the big picture it makes life easier overall (and in particular makes abstractions more independent).

It's also worth noting that we can, and probably should, build some pure ticket abstractions, i.e. futures-aware semaphors. These can then be used in a more explicit way as one simple means of imposing global limits (where the potential for deadlock is more obvious).


I wanted to clarify a few additional remarks:

@cramertj says:

The current implementation doesn't allow for this type of fixed-buffer backpressure with a growable number of senders.

Hopefully the above clarifies, but to reiterate: the current implementation takes care of one of the two limits, and you are expected to separately coordinate the other (number of senders) more globally.

@cramertj says:

If you're not making use of channel backpressure to push back at a higher level, then is there a reason for not using the UnboundedXXX channel types?

The reason is that the bounded channel does impose an important limit: the one on the rate of sending for individual senders. It's built so that you can compose this limit with a global one on the number of senders to guarantee an overall total bound.

@carllerche
Copy link
Member

@aturon a good summary, I would re-emphasize two additional points:

  • The ticket based strategy proposed in this PR does not actually solve the problem of bounding the number of callers as I explained in my long comment.

  • The root of the complaint about the current implementation is that it does not behave well in the context of the combinators, which is due to an omission in the Sink implementation (which I have never personally used): this should call poll_ready instead of being a no-op.

@cramertj
Copy link
Member Author

cramertj commented Apr 24, 2018

@carllerche

The ticket based strategy proposed in this PR does not actually solve the problem of bounding the number of callers as I explained in my long comment.

It doesn't solve bounding the number of callers, I agree. It solves a different problem: the one of callers finishing. If callers are unable to complete due to the channel being full, then it creates an increased number of callers which creates backpressure for higher levels of the system.

@aturon

For the moment, let's focus on the potential for deadlock. As @carllerche says, reasonable abstractions elsewhere in the system may result in a given ticket for sending never being used or dropped, which can ultimately eat up the queue's capacity. @cramertj, do you agree with this potential?

I'd expect operations like this to have timeouts or some other mechanism to prevent them from blocking indefinitely. If we allow them to block indefinitely, then we're explicitly allowing the system to gradually leak memory over time. The purpose of backpressure in my use-case is to specifically limit the amount of memory that a process can grow to use.

@aturon
Copy link
Member

aturon commented Apr 24, 2018

If we allow Senders to push items into the buffer that will never be properly consumed, nor the Sender or Receiver dropped, then we're creating a system that can leak memory.

To clarify, that's not the scenario @carllerche was describing. The worry was around a grabbing a ticket for sending, but never actually completing that send.

This is why I brought up explicit semaphores: they make clear that you're juggling an abstract resource count and that while you hold a ticket you are potentially blocking other tasks.

@cramertj
Copy link
Member Author

cramertj commented Apr 24, 2018

@aturon I'm explicitly trying to block other tasks by holding this ticket.

@carllerche
Copy link
Member

@cramertj this implies to me that you want a higher level point of coordination. As @aturon mentioned, this semaphore can be implemented as a standalone type.

Again, I go back to the synchronous analogy. In order to prevent the value from being materialized before channel capacity is guaranteed you would need additional coordination around the blocking channel. i.e. the behavior you want is not possible in the blocking world w/ your usual channels.

@cramertj
Copy link
Member Author

Closing in favor of a more minimal set of changes, which I'll open shortly and link here.

@cramertj cramertj closed this Apr 24, 2018
@cramertj cramertj mentioned this pull request Apr 24, 2018
carllerche added a commit to carllerche/futures-rs that referenced this pull request Aug 31, 2018
This fixes the "bounded senders are actually unbounded" problem. The
problem is discussed here: rust-lang#984
@cramertj cramertj deleted the fix-channels branch November 21, 2018 22:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants