-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
muxers/yamux: Mitigation of unnecessary stream drops #3071
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apart from the nitpick it looks good to me, thanks!
@@ -123,6 +140,8 @@ where | |||
return Poll::Ready(Ok(stream)); | |||
} | |||
|
|||
self.inbound_stream_waker = Some(cx.waker().clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The poll
contract says that a waker needs to be registered in case Poll::Pending
is returned. While perhaps this unconditional registration may be legal, I think it still violates expectations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This registration is not unconditional. It is conditional on the list being empty! The list no longer being empty is IMO a valid reason to wake the task that last polled here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, are you saying that self.poll_inner
will return Poll::Pending
? That would be fine, then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OTOH: even if the inner stream does return Poll::Pending
, it will have registered that same waker already, and it will thus wake it when new streams become available. What am I missing?
In general, the Rust async rules are thus: if you create a Poll::Pending
, then you’re responsible for waking a waker. Otherwise you just pass along poll results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, are you saying that
self.poll_inner
will returnPoll::Pending
? That would be fine, then.
Not quite.
This poll
function essentially should we polled again in two different scenarios:
- If the general poll-function pushed a new stream to the buffer.
- If the socket has more bytes to read (that is
poll_inner
)
Registering this waker here isn't strictly necessary because we poll the StreamMuxer
in swarm::Connection
in a loop anyway. But I think it is more correct to still do this here because it showcases that there are two conditions on which the task should be polled again.
OTOH: even if the inner stream does return
Poll::Pending
, it will have registered that same waker already, and it will thus wake it when new streams become available. What am I missing?
Like I said, it probably works without too because we will always be implicitly woken again, even if the task that calls the general poll
consumes everything from the inner socket already and pushed them to the buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I think it is more correct to still do this here because it showcases that there are two conditions on which the task should be polled again.
I agree with this. I don't think the inner working of impl StreamMuxer for Yamux
should make assumptions on how it is called in Connection
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making things fail due to a missed wake-up would require polling this task through Arc<Mutex<...>>
, which I hope nobody would ever consider doing, so I agree that this is harmless.
This discussion is yet another spotlight on how difficult it is to correctly reason about poll
functions. This is exacerbated within libp2p by extending the concept — like is being done here — in non-trivial ways. If there actually was some surrounding task that did care about being woken for two different reasons, it would have to manufacture its own wakers (like FuturesUnordered does). But even that usage would be broken because the Waker destined for poll_inbound
may be passed to poll_inner
as well, overwriting a previously registered Waker that was destined for that purpose.
While debugging wake-up loops is painful, it is orders of magnitude easier than debugging missed wake-ups. May I suggest that we adhere to the policy that each async object offers exactly one poll
function, with Future semantics, that drives exactly (i.e. only and completely) the state machine of that object? Interrogating the state machine should not have poll
semantics, because that can lead to confusing behaviour and bugs.
The following pattern is what I suggest:
if let Poll::Ready(x) = connection.poll(cx) {
...
return x;
}
for sub in connection.inbound_streams.drain(..) {
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I suggest that we adhere to the policy that each async object offers exactly one
poll
function, with Future semantics, that drives exactly (i.e. only and completely) the state machine of that object? Interrogating the state machine should not havepoll
semantics, because that can lead to confusing behaviour and bugs.
The design of this trait was inspired by the AsyncWrite
design which also has multiple poll_
functions that users need to drive. Sink
is similar.
One problem with a single "poll" function design is that it puts more burden on implementations. For example, opening a new stream is not instantaneous, it may require negotiation of new credits with the other party. As such, a "new_outbound" function can only give you an ID or some other kind of handle for a new stream. This means every implementation needs to implement some kind of "stream ID" management. In contrast to that a poll_new_outbound
function can just return Pending
until the new stream is ready to be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I never found Sink usable for anything I had to do, so its design carries little weight in my opinion.
Offering multiple “poll” functions to poll the same underlying thing has severe issues, as I argued above — and which you so far have not commented on. The example of a “new_outbound” function boils down to the choice of polling the machinery until the new connection is ready, ignoring everything else that happens in the meantime. This already requires the machinery to aggregate its output and let the poller inspect it, for which there is no reason a priori to offer a poll
-shaped API. In particular, no Context
is necessary to ask whether events have been emitted, which removes one prolific source of confusion inherent to Rust’s Task design.
So my solution to the new_outbound
problem would be to offer a front-end Future that polls until the new connection is ready and leaves all other side-effects uninspected, to be dealt with by the caller afterwards.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So my solution to the
new_outbound
problem would be to offer a front-end Future that polls until the new connection is ready and leaves all other side-effects uninspected, to be dealt with by the caller afterwards.
This is basically the poll_ready
& start_send
API of Sink
then, yes?
Offering multiple “poll” functions to poll the same underlying thing has severe issues, as I argued above — and which you so far have not commented on.
poll_outbound
is not the only issue. The problem of having to buffer streams is mostly because yamux
doesn't allow us to backpressure the number of streams. The QUIC muxer on the other hand allows us to make progress on the connection itself without necessarily accepting new inbound streams. I am happy to change the API for something better but so far I've not found a solution where the caller (swarm::Connection
) can explicitly signal to the muxer that it is now able to take more inbound streams.
We could move away from poll_inbound
by having just a pop_inbound
function. This however then requires more documentation on when the caller should call this function again if it ever returns None
. At that stage, we are just re-inventing the wheel when we could also be using Poll
and automatically wake the task when we know that there are new inbound
streams available.
this.inbound_stream_buffer.push_back(inbound_stream); | ||
|
||
if let Some(waker) = this.inbound_stream_waker.take() { | ||
waker.wake() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The discussion above makes me think: is this muxer polled from multiple tasks? If it is, then poll_inner
will probably switch out wakers all the time, making it non-deterministic which caller will eventually be woken. If not, then this extra wakeup makes little sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least in rust-libp2p
production code, it is only polled from a single task but it is a public interface so there may be other consumers, #2952 for example.
Thanks @thomaseizinger for providing the hotfix. I will take a deeper look. In the meantime @rkuhn can you confirm that this improves or resolves the issue you reported in #3041? |
This comment was marked as resolved.
This comment was marked as resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the long delay! I have now tested this branch (patched back to 0.49 in top-level Cargo.toml for drop-in replacement) with the ipfs-embed test suite and it works nicely! I don’t need to increase max_buffered_inbound_streams
, increasing max_negotiating_inbound_streams
suffices, also when hammering bitswap with 1800 Wants at once.
That is great to hear! I am inclined to remove the configuration knob again then. Less config surface is better IMO. |
3af6234
to
ca139c1
Compare
This comment was marked as resolved.
This comment was marked as resolved.
ca139c1
to
bfc9c4d
Compare
@mxinden I've rebased this PR onto the Let me know if you need any other changes. I'd suggest we do the release first and then merge this PR! |
@kpp @melekes @divagant-martian Would you mind testing this PR on one of your networks? Thank you 🙏 |
will do, sounds good if we run over the weekend and report back on monday (Tue in Australia)? |
Yeah that is totally fine! |
I would prefer patch releases to go through our pull request workflow as well. I created branch Once that is merged I will cut the patch release. We can then either merge branch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the record, this fix looks good to me. Though see comment above before we can merge here.
We didn't see any notable difference, for better or worse, between v0.49.0 and this PR. Hope this helps 🤷 |
Can do but I also would like to understand what the benefit is? Are you trying to partly use GitFlow here? Renaming my branch to
There will be merge conflicts, which I already resolved in this PR as part of the last patch. |
See #3121. |
I think I misunderstood your comment in #3071 (comment). No benefit from my proposed workflow.
Got it. Thanks. I will merge #3121 and cut a release. Then we can merge this pull request into |
Description
A busy network connection can easily fill the internal buffer. We utilise
wake_by_ref
so we can returnPoll::Pending
without breaking thepoll
-function contract. This gives callers a chance to callpoll_inbound
and empty the buffer.Fixes #3041.
Notes
cc @rkuhn
Links to any relevant issues
max_negotiating_inbound_streams
setting #3041StreamMuxer
#2861Open Questions
Change checklist
I have added tests that prove my fix is effective or that my feature works