-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Make FtpStream::get read the final response
Once the client has finished reading a file from the server, the server sends a status message to the client. The current API of FtpStream::get doesn't allow us to read that response, since we return a BufReader that reads directly from the data stream. Instead of doing this, we can return a new future type that wraps the data stream and reads the status message once the data stream is finished. There is one caveat to this - if this future is dropped then the data stream will be closed, and the server will send us an error message, which still needs to be read manually. There isn't really a nice way around this without an AsyncDrop trait. Since this is already a breaking change, we also no longer use a BufReader at all - this lets the caller decide whether to buffer or not.
- Loading branch information
Showing
3 changed files
with
104 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
use std::{ | ||
future::Future, | ||
io, mem, | ||
pin::Pin, | ||
task::{Context, Poll}, | ||
}; | ||
|
||
use tokio::io::{AsyncRead, ReadBuf}; | ||
|
||
use crate::{status, DataStream, FtpStream}; | ||
|
||
pub struct FileReader<'a> { | ||
state: State<'a>, | ||
} | ||
|
||
enum State<'a> { | ||
Stream { | ||
data_stream: DataStream, | ||
ftp_stream: &'a mut FtpStream, | ||
}, | ||
FinalRead(Pin<Box<dyn 'a + Future<Output = io::Result<()>>>>), | ||
Finished, | ||
} | ||
|
||
impl FileReader<'_> { | ||
pub(crate) fn new(data_stream: DataStream, ftp_stream: &mut FtpStream) -> FileReader { | ||
FileReader { | ||
state: State::Stream { | ||
data_stream, | ||
ftp_stream, | ||
}, | ||
} | ||
} | ||
} | ||
|
||
impl AsyncRead for FileReader<'_> { | ||
fn poll_read( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
buf: &mut ReadBuf<'_>, | ||
) -> Poll<io::Result<()>> { | ||
let bytes_read_before = buf.filled().len(); | ||
let (state, result) = match mem::replace(&mut self.state, State::Finished) { | ||
State::Stream { | ||
mut data_stream, | ||
ftp_stream, | ||
} => match Pin::new(&mut data_stream).poll_read(cx, buf) { | ||
Poll::Ready(result) => { | ||
let bytes_read_after = buf.filled().len(); | ||
if bytes_read_after == bytes_read_before { | ||
// finished reading the file, wait for a status message from the server | ||
let mut status_fut = Box::pin(async move { | ||
ftp_stream | ||
.read_response_in(&[ | ||
status::CLOSING_DATA_CONNECTION, | ||
status::REQUESTED_FILE_ACTION_OK, | ||
]) | ||
.await | ||
.map(|_| ()) | ||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e)) | ||
.and(result) | ||
}); | ||
match Pin::new(&mut status_fut).poll(cx) { | ||
Poll::Ready(r) => (State::Finished, Poll::Ready(r)), | ||
Poll::Pending => (State::FinalRead(status_fut), Poll::Pending), | ||
} | ||
} else { | ||
( | ||
State::Stream { | ||
data_stream, | ||
ftp_stream, | ||
}, | ||
Poll::Ready(result), | ||
) | ||
} | ||
} | ||
Poll::Pending => ( | ||
State::Stream { | ||
data_stream, | ||
ftp_stream, | ||
}, | ||
Poll::Pending, | ||
), | ||
}, | ||
State::FinalRead(mut status_fut) => match Pin::new(&mut status_fut).poll(cx) { | ||
Poll::Ready(r) => (State::Finished, Poll::Ready(r)), | ||
Poll::Pending => (State::FinalRead(status_fut), Poll::Pending), | ||
}, | ||
State::Finished => panic!("poll called on finished FileReader"), | ||
}; | ||
|
||
self.state = state; | ||
result | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters