-
Notifications
You must be signed in to change notification settings - Fork 211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OUTDATED fix: prevent timeouts while writing to object storage #1916
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,9 +13,11 @@ | |
// limitations under the License. | ||
|
||
use std::pin::Pin; | ||
use std::sync::{Arc, Mutex}; | ||
use std::task::{Context, Poll}; | ||
|
||
use async_trait::async_trait; | ||
use futures::FutureExt; | ||
use object_store::{path::Path, MultipartId, ObjectStore}; | ||
use pin_project::pin_project; | ||
use snafu::{location, Location}; | ||
|
@@ -29,9 +31,26 @@ use crate::traits::Writer; | |
/// | ||
#[pin_project] | ||
pub struct ObjectWriter { | ||
// TODO: wrap writer with a BufWriter. | ||
/// This writer is behind a Mutex because it is used both by the caller | ||
/// to write data and by the background task to flush the data. The background | ||
/// task never holds the mutex for longer than it takes to poll the flush | ||
/// future once, so it should never block the caller for long. | ||
/// | ||
/// Note: this is a std Mutex. It MUST NOT be held across await points. | ||
#[pin] | ||
writer: Box<dyn AsyncWrite + Send + Unpin>, | ||
writer: Arc<Mutex<Pin<Box<dyn AsyncWrite + Send + Unpin>>>>, | ||
|
||
/// A task that flushes the data every 500ms. This is to make sure that the | ||
/// futures within the writer are polled at least every 500ms. This is | ||
/// necessary because the internal writer buffers data and holds up to 10 | ||
/// write request futures in FuturesUnordered. These futures only make | ||
/// progress when polled, and if they are not polled for a while, they can | ||
/// cause the requests to timeout. | ||
background_flusher: tokio::task::JoinHandle<()>, | ||
|
||
/// When calling flush(), the background task may receive a ready error. | ||
/// This channel is used to send the error to the main task. | ||
background_error: tokio::sync::oneshot::Receiver<std::io::Error>, | ||
|
||
// TODO: pub(crate) | ||
pub multipart_id: MultipartId, | ||
|
@@ -50,15 +69,48 @@ impl ObjectWriter { | |
location: location!(), | ||
})?; | ||
|
||
let writer = Arc::new(Mutex::new(Pin::new(writer))); | ||
|
||
// If background task encounters an error, we use a channel to send the error | ||
// to the main task. | ||
let (error_sender, background_error) = tokio::sync::oneshot::channel(); | ||
|
||
let writer_ref = writer.clone(); | ||
let background_flusher = tokio::task::spawn(async move { | ||
// The background tasks continues forever, until it is cancelled by the | ||
// the call to shutdown() on the writer. | ||
loop { | ||
tokio::time::sleep(std::time::Duration::from_millis(100)).await; | ||
match writer_ref.lock().unwrap().flush().now_or_never() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if we call flush after a writer is finished writing / closed? Also, what is the normal exit path for this task? It looks like it can only exit this loop if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right now calling flush after it's done seems to be a no-op, but there's no strong guarantee of that in the API. We should abort this task in the shutdown method, I think. |
||
None => continue, | ||
Some(Ok(_)) => continue, | ||
Some(Err(e)) => { | ||
let _ = error_sender.send(e); | ||
break; | ||
} | ||
} | ||
} | ||
}); | ||
|
||
Ok(Self { | ||
writer, | ||
background_flusher, | ||
background_error, | ||
multipart_id, | ||
cursor: 0, | ||
}) | ||
} | ||
|
||
pub async fn shutdown(&mut self) -> Result<()> { | ||
Ok(self.writer.as_mut().shutdown().await?) | ||
Ok(AsyncWriteExt::shutdown(self).await?) | ||
} | ||
} | ||
|
||
impl Drop for ObjectWriter { | ||
fn drop(&mut self) { | ||
// If the writer is dropped, we need to make sure that the background | ||
// task is cancelled. We do this by aborting the JoinHandle. | ||
self.background_flusher.abort(); | ||
} | ||
} | ||
|
||
|
@@ -74,19 +126,36 @@ impl AsyncWrite for ObjectWriter { | |
cx: &mut Context<'_>, | ||
buf: &[u8], | ||
) -> Poll<std::io::Result<usize>> { | ||
let mut this = self.project(); | ||
this.writer.as_mut().poll_write(cx, buf).map_ok(|n| { | ||
let this = self.project(); | ||
let mut writer = this.writer.lock().unwrap(); | ||
// We lock the writer prior to checking for background errors to make | ||
// sure we don't miss the error. | ||
if let Ok(err) = this.background_error.try_recv() { | ||
return Poll::Ready(Err(err)); | ||
} | ||
Comment on lines
+133
to
+135
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should these Check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good idea. Thanks |
||
writer.as_mut().poll_write(cx, buf).map_ok(|n| { | ||
*this.cursor += n; | ||
n | ||
}) | ||
} | ||
|
||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { | ||
self.project().writer.as_mut().poll_flush(cx) | ||
let this = self.project(); | ||
let mut writer = this.writer.lock().unwrap(); | ||
if let Ok(err) = this.background_error.try_recv() { | ||
return Poll::Ready(Err(err)); | ||
} | ||
writer.as_mut().poll_flush(cx) | ||
} | ||
|
||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { | ||
self.project().writer.as_mut().poll_shutdown(cx) | ||
let this = self.project(); | ||
let mut writer = this.writer.lock().unwrap(); | ||
if let Ok(err) = this.background_error.try_recv() { | ||
return Poll::Ready(Err(err)); | ||
} | ||
this.background_flusher.abort(); | ||
writer.as_mut().poll_shutdown(cx) | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't those futures be polled when this
AsyncWrite
is polled?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that the
AsyncWrite::write
returnsPoll::Ready
once it has put the read task onto it's internalFuturesUnordered
. So the scheduler has no reason to poll the internal tasks.There is a simpler reproduction here:
apache/arrow-rs#5366 (comment)