-
-
Notifications
You must be signed in to change notification settings - Fork 2.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bad completion of some futures in io::copy_bidirectional #6519
Comments
This could be a fix, but I think that ideally it should be implemented in such a way that reads to buffer do not stop if there is room in the buffer but flushing future is pending. |
I agree, that would be definitely more efficient. As I imagine it, this solution would look like this: loop {
// If our buffer is empty, then we need to read some data to
// continue.
if self.pos == self.cap && !self.read_done {
// This code remains the same
}
// If a flush future is in progress, do not write until it is finished
if self.need_flush {
ready!(writer.as_mut().poll_flush(cx))?;
#[cfg(any(
feature = "fs",
feature = "io-std",
feature = "net",
feature = "process",
feature = "rt",
feature = "signal",
feature = "sync",
feature = "time",
))]
coop.made_progress();
self.need_flush = false;
}
// Remaining code is untouched
} Note that any error during flushing is ignored in my code, since it is also ignored elsewhere in the function. The flushing code could probably be factorized, since it's the same as the one in case of a pending read. Besides, I think that we can fix the other similar problem coming from this optimization (read future may never be properly polled upon termination) by changing the condition at the entry of the loop: loop {
// If our buffer is empty, then we need to read some data to
// continue. Otherwise, if we can read a bit more data, do it
// to improve the chances of a large write
let is_readable = (self.pos == self.cap) || (self.cap < self.buf.len());
if is_readable && !self.read_done {
// This code remains the same
}
// Remaining code is untouched
} |
While digging into the first issue I had (with just a Hence, I've reworked my fixes and implemented them in the related PR. With those fixes, everything works as expected, both in the real project where I spotted the issue and the minimal example provided here. The fix related to the flush future is definitely the most important since it breaks the |
If |
So if I write N bytes of data, then I start to flush (thus getting a |
Yes. If your IO resource relies on futures internally, then the |
Ok, thank you very much, I did not understand it this way. The documentation is not very explicit in this sense in my opinion, but maybe that I'm the only one to think so and that I've misunderstood its wording. Hence, I guess that the only thing that still makes sense in this issue and the related PR is about this optimization. It's a minor detail, but with the current implementation, if the read operation is pending, then the next write might occur before the next read, without letting any chance for a second poll for the read operation. In other terms, we could potentially allow for more optimizations by trying to read a second time before writing again when both operations are pending. What do you think @Darksonn? |
Reading before writing to include more data in the write makes sense to me. I'll close this issue, but if you want to adapt the PR to optimize something, then that's fine with me. |
Version
tokio v1.37.0
Platform
Debian 10, but also WSL 2, WIndows and Debian 11 (untested on others platforms)
Description
It seems I've reached an edge case with the function
io::copy_bidirectional
. Unfortunately, the minimal reproducible example provided below is a bit long, so I'll rather describe the problem instead.It might be a bad understanding or implementation on my side, but I feel like it's rather a tricky problem in some very rare circumstances.
Minimal reproducible example
The gist can be found here.
Context
I have a server which acts as a proxy which is made using
tokio
. In this case, the server will take some data from TCP clients and distribute it to the appropriate backend services. Data is not transformed, it is just aVec<u8>
that can be moved. Data is flowing in both directions, but the TCP clients are sending much more data.I had no problems with
io::copy_bidirectional
with most use cases, but I encountered exactly one edge case where the issue showed up, since certain conditions need to be met.Note that my
AsyncWrite
andAsyncRead
implementations were working perfectly fine with a custom, manual loop using atokio::select!
(with or without the correctpoll_flush
implementation).Origin of the problem
I use
io::copy_bidirectional
between aTcpStream
and aStream
of my own, which sends and receives data usingmspc
bounded channels. At the beginning, I just implementedpoll_write
andpoll_read
, and my implementation ofpoll_flush
was a no-op. However, data sent through the channel looked like it was "corrupted" at some point.The service in charge of processing this data was claiming that a packet was incomplete and provoked an interruption of the connection. Then, I implemented properly
poll_flush
, which led me to a mutual halt, where both the service and myStream
were waiting for some data. Then, I remarked thatio::copy_bidirectional
did not wait for mypoll_flush
future to complete before calling it again after many writing operations.For reference, my flushing code roughly looks like this (simplified for the issue):
Understanding of the problem
After a while, I decided to go into the details of
io::copy_bidirectional
. I've put some debugging in there to be sure, but I could finally observe that my problem was related to this code:According to the comments, a flush might be performed when there is data to write and when a read operation is pending, for optimization purpose. However, I realized that when this flushing future itself is pending, the function
poll_copy
is returned from, which leads to a new scheduling and eventually to a new call.Ignoring some
tokio
stuff which is not part of our problem, the function will again enter the main loop and step into thepoll_fill_buf
matching. If the read operation is not pending anymore, thenself.buf
will be filled again, then it will be written, and finally it might be flushed again. To summarize, we have the following process:In
poll_copy
:self.buf
holds some data andself.need_flush
istrue
poll_fill_buf
is pendingpoll_flush
is performed but is pending (let's call it F)poll_copy
is returned from, then called againpoll_fill_buf
is performed again and returns immediately a resultpoll_write_buf
is done several times in a row (due to my bufferization, the future returns immediately)poll_flush
is performed (let's call it F'), but F did just finish, so it returnsPoll::Ready(Ok(()))
poll_copy
thinks that F' has completed, but in fact it was just F that did complete, and F' is not even startedHence, the main issue is that when a flush is performed during a read operation, it will probably never be led to completion, so when
poll_copy
calls it again, it will probably returnsPoll::Ready()
, whereas the second future was not even started. While it does not appear as a problem when the data flow is "uninterrupted", it becomes one when both ends of the connection are waiting for some data, which is not flushed because of this issue.Moreover, I suspect that this optimization was responsible for my previous issue, when I did not implement
poll_flush
properly. My guess is that it triggers a similar problem, starting a future that will never be completed properly.Solution
In my case, with the same
poll_flush
implementation on my side, I could fixed the problem by copying this code here, just before the loop.It works because as soon as the function is called again, it will first check that the potential
poll_flush
that was triggered during a pending read has completed properly. Of course, this likely has a performance cost, but it is now working.Note that it does not fixes the problem I had without
poll_flush
, but I think it is just another check to perform to lead another future to completion (see the end of the previous section).The only other way to fix the problem is either to use a manual loop with
tokio::select!
, which does the same asio::copy_bidirectional
but is less efficient, or to replace myOption<Future>
in myStream
by aVec<Future>
, which does not make sense in my opinion.Conclusion
If that's not clear enough, I can provide more information or rework the minimal reproducible example.
If my issue is correct, I can happily provide a PR with my solution (which I'm sure can be improved).
Otherwise, I would be happy to know what I'm missing.
Sorry for the long read :)
The text was updated successfully, but these errors were encountered: