Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async ParquetExec #1617

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
ordered-float = "2.10"
parking_lot = "0.12"
parquet = { version = "11", features = ["arrow"] }
parquet = { version = "11", features = ["arrow", "async"] }
paste = "^1.0"
pin-project-lite= "^0.2.7"
pyo3 = { version = "0.16", optional = true }
Expand Down
16 changes: 6 additions & 10 deletions datafusion/src/datasource/object_store/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;

use async_trait::async_trait;
use futures::{stream, AsyncRead, StreamExt};
use futures::{stream, StreamExt};

use crate::datasource::object_store::{
FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore,
ChunkReader, FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore,
};
use crate::datasource::PartitionedFile;
use crate::error::{DataFusionError, Result};
Expand Down Expand Up @@ -72,14 +72,10 @@ impl LocalFileReader {

#[async_trait]
impl ObjectReader for LocalFileReader {
async fn chunk_reader(
&self,
_start: u64,
_length: usize,
) -> Result<Box<dyn AsyncRead>> {
todo!(
"implement once async file readers are available (arrow-rs#78, arrow-rs#111)"
)
async fn chunk_reader(&self) -> Result<Box<dyn ChunkReader>> {
let file = tokio::fs::File::open(&self.file.path).await?;
let file = tokio::io::BufReader::new(file);
Ok(Box::new(file))
}

fn sync_chunk_reader(
Expand Down
29 changes: 25 additions & 4 deletions datafusion/src/datasource/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,42 @@ use std::sync::Arc;

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::{AsyncRead, Stream, StreamExt};
use futures::{Stream, StreamExt};
use tokio::io::{AsyncBufRead, AsyncSeek};

use local::LocalFileSystem;

use crate::error::{DataFusionError, Result};

/// Provides async access to read a file, combing [`AsyncSeek`]
/// and [`AsyncBufRead`] so they can be used as a trait object
///
/// [`AsyncSeek`] is necessary because readers may need to seek around whilst
/// reading, either because the format itself is structured (e.g. parquet)
/// or because it needs to read metadata or infer schema as an initial step
///
/// [`AsyncBufRead`] is necessary because readers may wish to read data
/// up until some delimiter (e.g. csv or newline-delimited JSON)
///
/// Note: the same block of data may be read multiple times
///
/// Implementations that fetch from object storage may wish to maintain an internal
/// buffer of fetched data blocks, potentially discarding them or spilling them to disk
/// based on memory pressure
///
/// TODO(#1614): Remove Sync
pub trait ChunkReader: AsyncBufRead + AsyncSeek + Send + Sync + Unpin {}
impl<T: AsyncBufRead + AsyncSeek + Send + Sync + Unpin> ChunkReader for T {}

/// Object Reader for one file in an object store.
///
/// Note that the dynamic dispatch on the reader might
/// have some performance impacts.
#[async_trait]
pub trait ObjectReader: Send + Sync {
/// Get reader for a part [start, start + length] in the file asynchronously
async fn chunk_reader(&self, start: u64, length: usize)
-> Result<Box<dyn AsyncRead>>;
/// Get a [`ChunkReader`] for the file, successive calls to this MUST
/// return readers with independent seek positions
async fn chunk_reader(&self) -> Result<Box<dyn ChunkReader>>;

/// Get reader for a part [start, start + length] in the file
fn sync_chunk_reader(
Expand Down
Loading