Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cross runtime object store wrapper #4015

Closed

Conversation

crepererum
Copy link
Contributor

Which issue does this PR close?

-

Rationale for this change

In DataFusion (and potentially other users of object_store) people run CPU-bound tasks within a tokio runtime. In a larger setting, this can easily stall and destabilize IO tasks (e.g. TLS connections) because tokio is unable to task-steal from CPU-blocked worker threads. Users may observe this as TLS connection failures or timeouts.

What changes are included in this PR?

A CrossRtStore object store wrapper that ensures that the store-related futures are executed within an IO-specific runtime. Extensive tests for blocking and cancellation are added.

Are there any user-facing changes?

Users may now use CrossRtStore to protect object store IO from being stalled.

@github-actions github-actions bot added the object-store Object Store Interface label Apr 4, 2023

/// [Object store](ObjectStore) wrapper that isolates the IO runtime from the using runtime.
#[derive(Debug)]
pub struct CrossRtStore<S>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about calling this TokioStore, as it effectively spawns the inner work to a tokio runtime. This would highlight that it can also be used to spawn to a tokio runtime from a non-tokio runtime

//! the tokio runtime and stall your IO up to the point that upstream servers cut your connections.
//! [DataFusion](https://arrow.apache.org/datafusion/) is one such example.
//!
//! # Example
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could give an example of using this to bridge to a non-tokio runtime, e.g. futures::block_on

/// Signals that [`inner`](Self::inner) finished.
///
/// Note that we must also drive the [driver](Self::driver) even when the stream finished to allow proper state clean-ups.
inner_done: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed, if the inner stream returns None we should just return None from that point on?

Poll::Pending
}
} else {
match ready!(this.inner.poll_recv(cx)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will poll the inner stream even after receiving an error from the inner stream?

}

impl<T> CrossRtStream<T> {
async fn new<F, S>(store: Arc<S>, f: F, handle: &Handle) -> Result<Self>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to pass store and f into this function, could we instead just pass in the generated future? I think this would remove the need for a HRTB

}
}

struct AliveCheck {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some docs to explain what this is doing would go a long way

Comment on lines +790 to +792
// keep barrier Arc alive
#[allow(clippy::drop_ref)]
drop(barrier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The clippy warning here is legitimately telling you that the drop here does nothing? barrier is kept alive to the end of the block because it is borrowed out of mode?

Comment on lines +799 to +805
match mode {
StoreMode::StreamPendingForever(barrier) => {
barrier.wait().await;
futures::future::pending::<()>().await;
// keep barrier Arc alive
drop(barrier);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
match mode {
StoreMode::StreamPendingForever(barrier) => {
barrier.wait().await;
futures::future::pending::<()>().await;
// keep barrier Arc alive
drop(barrier);
}
match &mode {
StoreMode::StreamPendingForever(barrier) => {
barrier.wait().await;
futures::future::pending::<()>().await;
}

fn check(&mut self) {
let next = Instant::now();
assert!(
next - self.last < Duration::from_millis(150),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry that this will be rather flaky, especially on the OS X test runner... Not really sure what to do about that though...

S: Send + Sync + 'static,
{
let (tx_creation, rx_creation) = tokio::sync::oneshot::channel();
let (tx_stream, rx_stream) = channel(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A buffer size of 1 is likely ok for ObjectStore::get but list is likely going to want a much larger buffer size

@tustvold
Copy link
Contributor

tustvold commented Apr 4, 2023

Big fan of this, left some comments. I would recommend phrasing this as allowing dispatching IO to a dedicated tokio pool, as opposed to as a cross runtime store, as there is actually no requirement for the outer context to be a tokio runtime, or even a runtime at all.

@tustvold
Copy link
Contributor

tustvold commented Apr 9, 2023

As discussed, #4040 contains an alternative way of achieving this, PTAL

@alamb
Copy link
Contributor

alamb commented May 25, 2023

What is the status of this PR? Is it ready for review? Should we merge it?

@tustvold
Copy link
Contributor

I would prefer to move forward with #4040 but I've not had time to get it over the line yet

@tustvold
Copy link
Contributor

I'm going to close this based on #4040 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
object-store Object Store Interface
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants