Skip to content

Commit

Permalink
Make FtpStream::get read the final response
Browse files Browse the repository at this point in the history
Once the client has finished reading a file from the server, the server
sends a status message to the client. The current API of FtpStream::get
doesn't allow us to read that response, since we return a BufReader that
reads directly from the data stream. Instead of doing this, we can
return a new AsyncRead type (FileReader) that wraps the data stream and
reads the status message once the data stream is finished.

There is one caveat to this - if the FileReader is dropped then the data
stream will be closed, and the server will send us an error message,
which still needs to be read manually. There isn't really a nice way
around this without an AsyncDrop trait.

Since this is already a breaking change, we also no longer use a
BufReader at all - this lets the caller decide whether to buffer or not.
  • Loading branch information
oscarwcl committed Apr 6, 2022
1 parent da79249 commit ba0aa34
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 5 deletions.
95 changes: 95 additions & 0 deletions src/file_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use std::{
future::Future,
io, mem,
pin::Pin,
task::{Context, Poll},
};

use tokio::io::{AsyncRead, ReadBuf};

use crate::{status, DataStream, FtpStream};

pub struct FileReader<'a> {
state: State<'a>,
}

enum State<'a> {
Stream {
data_stream: DataStream,
ftp_stream: &'a mut FtpStream,
},
FinalRead(Pin<Box<dyn 'a + Future<Output = io::Result<()>>>>),
Finished,
}

impl FileReader<'_> {
pub(crate) fn new(data_stream: DataStream, ftp_stream: &mut FtpStream) -> FileReader {
FileReader {
state: State::Stream {
data_stream,
ftp_stream,
},
}
}
}

impl AsyncRead for FileReader<'_> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let bytes_read_before = buf.filled().len();
let (state, result) = match mem::replace(&mut self.state, State::Finished) {
State::Stream {
mut data_stream,
ftp_stream,
} => match Pin::new(&mut data_stream).poll_read(cx, buf) {
Poll::Ready(result) => {
let bytes_read_after = buf.filled().len();
if bytes_read_after == bytes_read_before {
// finished reading the file, wait for a status message from the server
let mut status_fut = Box::pin(async move {
ftp_stream
.read_response_in(&[
status::CLOSING_DATA_CONNECTION,
status::REQUESTED_FILE_ACTION_OK,
])
.await
.map(|_| ())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
.and(result)
});
match Pin::new(&mut status_fut).poll(cx) {
Poll::Ready(r) => (State::Finished, Poll::Ready(r)),
Poll::Pending => (State::FinalRead(status_fut), Poll::Pending),
}
} else {
(
State::Stream {
data_stream,
ftp_stream,
},
Poll::Ready(result),
)
}
}
Poll::Pending => (
State::Stream {
data_stream,
ftp_stream,
},
Poll::Pending,
),
},
State::FinalRead(mut status_fut) => match Pin::new(&mut status_fut).poll(cx) {
Poll::Ready(r) => (State::Finished, Poll::Ready(r)),
Poll::Pending => (State::FinalRead(status_fut), Poll::Pending),
},
State::Finished => panic!("poll called on finished FileReader"),
};

self.state = state;
result
}
}
12 changes: 7 additions & 5 deletions src/ftp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tokio::net::{TcpStream, ToSocketAddrs};
use tokio_rustls::{rustls::ClientConfig, rustls::ServerName, TlsConnector};

use crate::data_stream::DataStream;
use crate::file_reader::FileReader;
use crate::status;
use crate::types::{FileType, FtpError, Line, Result};

Expand Down Expand Up @@ -316,14 +317,15 @@ impl FtpStream {

/// Retrieves the file name specified from the server.
/// This method is a more complicated way to retrieve a file.
/// The reader returned should be dropped.
/// Also you will have to read the response to make sure it has the correct value.
pub async fn get(&mut self, file_name: &str) -> Result<BufReader<DataStream>> {
///
/// If the reader is dropped before the file is fully read, the server will send a error message that
/// should be read with [`Self::read_response`]/[`Self::read_response_in`].
pub async fn get(&mut self, file_name: &str) -> Result<FileReader<'_>> {
let retr_command = format!("RETR {}\r\n", file_name);
let data_stream = BufReader::new(self.data_command(&retr_command).await?);
let data_stream = self.data_command(&retr_command).await?;
self.read_response_in(&[status::ABOUT_TO_SEND, status::ALREADY_OPEN])
.await?;
Ok(data_stream)
Ok(FileReader::new(data_stream, self))
}

/// Renames the file from_name to to_name
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@
//!
mod data_stream;
mod file_reader;
mod ftp;
pub mod status;
pub mod types;

pub use self::data_stream::DataStream;
pub use self::file_reader::FileReader;
pub use self::ftp::FtpStream;
pub use self::types::FtpError;

0 comments on commit ba0aa34

Please sign in to comment.