-
Notifications
You must be signed in to change notification settings - Fork 738
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: cross runtime object store wrapper #4015
Conversation
b01648b
to
975f34a
Compare
|
||
/// [Object store](ObjectStore) wrapper that isolates the IO runtime from the using runtime. | ||
#[derive(Debug)] | ||
pub struct CrossRtStore<S> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about calling this TokioStore
, as it effectively spawns the inner work to a tokio runtime. This would highlight that it can also be used to spawn to a tokio runtime from a non-tokio runtime
//! the tokio runtime and stall your IO up to the point that upstream servers cut your connections. | ||
//! [DataFusion](https://arrow.apache.org/datafusion/) is one such example. | ||
//! | ||
//! # Example |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we could give an example of using this to bridge to a non-tokio runtime, e.g. futures::block_on
/// Signals that [`inner`](Self::inner) finished. | ||
/// | ||
/// Note that we must also drive the [driver](Self::driver) even when the stream finished to allow proper state clean-ups. | ||
inner_done: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed, if the inner stream returns None
we should just return None
from that point on?
Poll::Pending | ||
} | ||
} else { | ||
match ready!(this.inner.poll_recv(cx)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will poll the inner stream even after receiving an error from the inner stream?
} | ||
|
||
impl<T> CrossRtStream<T> { | ||
async fn new<F, S>(store: Arc<S>, f: F, handle: &Handle) -> Result<Self> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to pass store
and f
into this function, could we instead just pass in the generated future? I think this would remove the need for a HRTB
} | ||
} | ||
|
||
struct AliveCheck { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think some docs to explain what this is doing would go a long way
// keep barrier Arc alive | ||
#[allow(clippy::drop_ref)] | ||
drop(barrier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The clippy warning here is legitimately telling you that the drop here does nothing? barrier
is kept alive to the end of the block because it is borrowed out of mode?
match mode { | ||
StoreMode::StreamPendingForever(barrier) => { | ||
barrier.wait().await; | ||
futures::future::pending::<()>().await; | ||
// keep barrier Arc alive | ||
drop(barrier); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
match mode { | |
StoreMode::StreamPendingForever(barrier) => { | |
barrier.wait().await; | |
futures::future::pending::<()>().await; | |
// keep barrier Arc alive | |
drop(barrier); | |
} | |
match &mode { | |
StoreMode::StreamPendingForever(barrier) => { | |
barrier.wait().await; | |
futures::future::pending::<()>().await; | |
} |
fn check(&mut self) { | ||
let next = Instant::now(); | ||
assert!( | ||
next - self.last < Duration::from_millis(150), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I worry that this will be rather flaky, especially on the OS X test runner... Not really sure what to do about that though...
S: Send + Sync + 'static, | ||
{ | ||
let (tx_creation, rx_creation) = tokio::sync::oneshot::channel(); | ||
let (tx_stream, rx_stream) = channel(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A buffer size of 1 is likely ok for ObjectStore::get
but list is likely going to want a much larger buffer size
Big fan of this, left some comments. I would recommend phrasing this as allowing dispatching IO to a dedicated tokio pool, as opposed to as a cross runtime store, as there is actually no requirement for the outer context to be a tokio runtime, or even a runtime at all. |
As discussed, #4040 contains an alternative way of achieving this, PTAL |
What is the status of this PR? Is it ready for review? Should we merge it? |
I would prefer to move forward with #4040 but I've not had time to get it over the line yet |
I'm going to close this based on #4040 (comment) |
Which issue does this PR close?
-
Rationale for this change
In DataFusion (and potentially other users of
object_store
) people run CPU-bound tasks within a tokio runtime. In a larger setting, this can easily stall and destabilize IO tasks (e.g. TLS connections) because tokio is unable to task-steal from CPU-blocked worker threads. Users may observe this as TLS connection failures or timeouts.What changes are included in this PR?
A
CrossRtStore
object store wrapper that ensures that the store-related futures are executed within an IO-specific runtime. Extensive tests for blocking and cancellation are added.Are there any user-facing changes?
Users may now use
CrossRtStore
to protect object store IO from being stalled.