Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet related changes for remote read #707

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions parquet/src/util/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub trait TryClone: Sized {
pub trait ParquetReader: Read + Seek + Length + TryClone {}
impl<T: Read + Seek + Length + TryClone> ParquetReader for T {}

pub trait ThreadSafeParquetReader: ParquetReader + Send + Sync + 'static {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative approach, that we could use in DataFusion, would be to implement something like ThreadSafeFileSource

I think the core change that is needed to work with remote s3-like object storages is to make / allow for an async api in the parquet reader.

e.g. #111

Here is one potential way of doing it from @jorgecarleitao 's arrow2 crate: jorgecarleitao/arrow2#260

impl<T: ParquetReader + Send + Sync + 'static> ThreadSafeParquetReader for T {}

// Read/Write wrappers for `File`.

/// Position trait returns the current position in the stream.
Expand All @@ -54,16 +57,16 @@ pub trait Position {
/// while preserving independent position, which is not available with `try_clone()`.
///
/// Designed after `arrow::io::RandomAccessFile` and `std::io::BufReader`
pub struct FileSource<R: ParquetReader> {
reader: RefCell<R>,
pub struct FileSource<R: ThreadSafeParquetReader> {
reader: Mutext<R>,
start: u64, // start position in a file
end: u64, // end position in a file
buf: Vec<u8>, // buffer where bytes read in advance are stored
buf_pos: usize, // current position of the reader in the buffer
buf_cap: usize, // current number of bytes read into the buffer
}

impl<R: ParquetReader> fmt::Debug for FileSource<R> {
impl<R: ThreadSafeParquetReader> fmt::Debug for FileSource<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FileSource")
.field("reader", &"OPAQUE")
Expand All @@ -76,7 +79,7 @@ impl<R: ParquetReader> fmt::Debug for FileSource<R> {
}
}

impl<R: ParquetReader> FileSource<R> {
impl<R: ThreadSafeParquetReader> FileSource<R> {
/// Creates new file reader with start and length from a file handle
pub fn new(fd: &R, start: u64, length: usize) -> Self {
let reader = RefCell::new(fd.try_clone().unwrap());
Expand Down Expand Up @@ -118,7 +121,7 @@ impl<R: ParquetReader> FileSource<R> {
}
}

impl<R: ParquetReader> Read for FileSource<R> {
impl<R: ThreadSafeParquetReader> Read for FileSource<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let bytes_to_read = cmp::min(buf.len(), (self.end - self.start) as usize);
let buf = &mut buf[0..bytes_to_read];
Expand All @@ -142,13 +145,13 @@ impl<R: ParquetReader> Read for FileSource<R> {
}
}

impl<R: ParquetReader> Position for FileSource<R> {
impl<R: ThreadSafeParquetReader> Position for FileSource<R> {
fn pos(&self) -> u64 {
self.start
}
}

impl<R: ParquetReader> Length for FileSource<R> {
impl<R: ThreadSafeParquetReader> Length for FileSource<R> {
fn len(&self) -> u64 {
self.end - self.start
}
Expand Down