diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 09b107e31f30..248d80d1a35b 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -744,6 +744,25 @@ enum ColumnChunkData { Dense { offset: usize, data: Bytes }, } +impl ColumnChunkData { + fn get(&self, start: u64) -> Result { + match &self { + ColumnChunkData::Sparse { data, .. } => data + .binary_search_by_key(&start, |(offset, _)| *offset as u64) + .map(|idx| data[idx].1.clone()) + .map_err(|_| { + ParquetError::General(format!( + "Invalid offset in sparse column chunk data: {start}" + )) + }), + ColumnChunkData::Dense { offset, data } => { + let start = start as usize - *offset; + Ok(data.slice(start..)) + } + } + } +} + impl Length for ColumnChunkData { fn len(&self) -> u64 { match &self { @@ -756,26 +775,12 @@ impl Length for ColumnChunkData { impl ChunkReader for ColumnChunkData { type T = bytes::buf::Reader; - fn get_read(&self, start: u64, length: usize) -> Result { - Ok(self.get_bytes(start, length)?.reader()) + fn get_read(&self, start: u64) -> Result { + Ok(self.get(start)?.reader()) } fn get_bytes(&self, start: u64, length: usize) -> Result { - match &self { - ColumnChunkData::Sparse { data, .. } => data - .binary_search_by_key(&start, |(offset, _)| *offset as u64) - .map(|idx| data[idx].1.slice(0..length)) - .map_err(|_| { - ParquetError::General(format!( - "Invalid offset in sparse column chunk data: {start}" - )) - }), - ColumnChunkData::Dense { offset, data } => { - let start = start as usize - *offset; - let end = start + length; - Ok(data.slice(start..end)) - } - } + Ok(self.get(start)?.slice(..length)) } } diff --git a/parquet/src/bin/parquet-layout.rs b/parquet/src/bin/parquet-layout.rs index 5f71551e1f20..d749bb8a4ba7 100644 --- a/parquet/src/bin/parquet-layout.rs +++ b/parquet/src/bin/parquet-layout.rs @@ -175,8 +175,7 @@ fn read_page_header( } } - let len = reader.len().checked_sub(offset).unwrap() as usize; - let input = reader.get_read(offset, len)?; + let input = reader.get_read(offset)?; let mut tracked = TrackedRead(input, 0); let mut prot = TCompactInputProtocol::new(&mut tracked); let header = PageHeader::read_from_in_protocol(&mut prot)?; diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs index a14b3ce4d6c5..7cc92afc014a 100644 --- a/parquet/src/file/footer.rs +++ b/parquet/src/file/footer.rs @@ -46,7 +46,7 @@ pub fn parse_metadata(chunk_reader: &R) -> Result Result; + type T: Read; + + /// Get a [`Read`] starting at the provided file offset + /// + /// Subsequent or concurrent calls to [`Self::get_read`] or [`Self::get_bytes`] may + /// side-effect on previously returned [`Self::T`]. Care should be taken to avoid this + /// + /// See [`File::try_clone`] for more information + fn get_read(&self, start: u64) -> Result; /// Get a range as bytes - /// This should fail if the exact number of bytes cannot be read + /// + /// Concurrent calls to [`Self::get_bytes`] may result in interleaved output + /// + /// See [`File::try_clone`] for more information + fn get_bytes(&self, start: u64, length: usize) -> Result; +} + +impl Length for File { + fn len(&self) -> u64 { + self.metadata().map(|m| m.len()).unwrap_or(0u64) + } +} + +impl ChunkReader for File { + type T = BufReader; + + fn get_read(&self, start: u64) -> Result { + let mut reader = self.try_clone()?; + reader.seek(SeekFrom::Start(start))?; + Ok(BufReader::new(self.try_clone()?)) + } + fn get_bytes(&self, start: u64, length: usize) -> Result { let mut buffer = Vec::with_capacity(length); - let read = self.get_read(start, length)?.read_to_end(&mut buffer)?; + let mut reader = self.try_clone()?; + reader.seek(SeekFrom::Start(start))?; + let read = reader.take(length as _).read_to_end(&mut buffer)?; if read != length { return Err(eof_err!( @@ -69,6 +99,26 @@ pub trait ChunkReader: Length + Send + Sync { } } +impl Length for Bytes { + fn len(&self) -> u64 { + self.len() as u64 + } +} + +impl ChunkReader for Bytes { + type T = bytes::buf::Reader; + + fn get_read(&self, start: u64) -> Result { + let start = start as usize; + Ok(self.slice(start..).reader()) + } + + fn get_bytes(&self, start: u64, length: usize) -> Result { + let start = start as usize; + Ok(self.slice(start..start + length)) + } +} + // ---------------------------------------------------------------------- // APIs for file & row group readers diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 7b08567427e6..bf843562ed02 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -40,60 +40,8 @@ use crate::format::{PageHeader, PageLocation, PageType}; use crate::record::reader::RowIter; use crate::record::Row; use crate::schema::types::Type as SchemaType; -use crate::util::{io::TryClone, memory::ByteBufferPtr}; -use bytes::{Buf, Bytes}; +use crate::util::memory::ByteBufferPtr; use thrift::protocol::{TCompactInputProtocol, TSerializable}; -// export `SliceableCursor` and `FileSource` publicly so clients can -// re-use the logic in their own ParquetFileWriter wrappers -pub use crate::util::io::FileSource; - -// ---------------------------------------------------------------------- -// Implementations of traits facilitating the creation of a new reader - -impl Length for File { - fn len(&self) -> u64 { - self.metadata().map(|m| m.len()).unwrap_or(0u64) - } -} - -impl TryClone for File { - fn try_clone(&self) -> std::io::Result { - self.try_clone() - } -} - -impl ChunkReader for File { - type T = FileSource; - - fn get_read(&self, start: u64, length: usize) -> Result { - Ok(FileSource::new(self, start, length)) - } -} - -impl Length for Bytes { - fn len(&self) -> u64 { - self.len() as u64 - } -} - -impl TryClone for Bytes { - fn try_clone(&self) -> std::io::Result { - Ok(self.clone()) - } -} - -impl ChunkReader for Bytes { - type T = bytes::buf::Reader; - - fn get_read(&self, start: u64, length: usize) -> Result { - Ok(self.get_bytes(start, length)?.reader()) - } - - fn get_bytes(&self, start: u64, length: usize) -> Result { - let start = start as usize; - Ok(self.slice(start..start + length)) - } -} impl TryFrom for SerializedFileReader { type Error = ParquetError; @@ -662,7 +610,7 @@ impl PageReader for SerializedPageReader { return Ok(None); } - let mut read = self.reader.get_read(*offset as u64, *remaining)?; + let mut read = self.reader.get_read(*offset as u64)?; let header = if let Some(header) = next_page_header.take() { *header } else { @@ -752,8 +700,7 @@ impl PageReader for SerializedPageReader { continue; } } else { - let mut read = - self.reader.get_read(*offset as u64, *remaining_bytes)?; + let mut read = self.reader.get_read(*offset as u64)?; let (header_len, header) = read_page_header_len(&mut read)?; *offset += header_len; *remaining_bytes -= header_len; @@ -807,8 +754,7 @@ impl PageReader for SerializedPageReader { *offset += buffered_header.compressed_page_size as usize; *remaining_bytes -= buffered_header.compressed_page_size as usize; } else { - let mut read = - self.reader.get_read(*offset as u64, *remaining_bytes)?; + let mut read = self.reader.get_read(*offset as u64)?; let (header_len, header) = read_page_header_len(&mut read)?; let data_page_size = header.compressed_page_size as usize; *offset += header_len + data_page_size; @@ -827,6 +773,7 @@ impl PageReader for SerializedPageReader { #[cfg(test)] mod tests { + use bytes::Bytes; use std::sync::Arc; use crate::format::BoundaryOrder; diff --git a/parquet/src/util/io.rs b/parquet/src/util/io.rs deleted file mode 100644 index 43d78866d9ef..000000000000 --- a/parquet/src/util/io.rs +++ /dev/null @@ -1,246 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::{cell::RefCell, cmp, fmt, io::*}; - -use crate::file::reader::Length; - -const DEFAULT_BUF_SIZE: usize = 8 * 1024; - -// ---------------------------------------------------------------------- - -/// TryClone tries to clone the type and should maintain the `Seek` position of the given -/// instance. -pub trait TryClone: Sized { - /// Clones the type returning a new instance or an error if it's not possible - /// to clone it. - fn try_clone(&self) -> Result; -} - -/// ParquetReader is the interface which needs to be fulfilled to be able to parse a -/// parquet source. -pub trait ParquetReader: Read + Seek + Length + TryClone {} -impl ParquetReader for T {} - -// Read/Write wrappers for `File`. - -/// Struct that represents a slice of a file data with independent start position and -/// length. Internally clones provided file handle, wraps with a custom implementation -/// of BufReader that resets position before any read. -/// -/// This is workaround and alternative for `file.try_clone()` method. It clones `File` -/// while preserving independent position, which is not available with `try_clone()`. -/// -/// Designed after `arrow::io::RandomAccessFile` and `std::io::BufReader` -pub struct FileSource { - reader: RefCell, - start: u64, // start position in a file - end: u64, // end position in a file - buf: Vec, // buffer where bytes read in advance are stored - buf_pos: usize, // current position of the reader in the buffer - buf_cap: usize, // current number of bytes read into the buffer -} - -impl fmt::Debug for FileSource { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("FileSource") - .field("reader", &"OPAQUE") - .field("start", &self.start) - .field("end", &self.end) - .field("buf.len", &self.buf.len()) - .field("buf_pos", &self.buf_pos) - .field("buf_cap", &self.buf_cap) - .finish() - } -} - -impl FileSource { - /// Creates new file reader with start and length from a file handle - pub fn new(fd: &R, start: u64, length: usize) -> Self { - let reader = RefCell::new(fd.try_clone().unwrap()); - Self { - reader, - start, - end: start + length as u64, - buf: vec![0_u8; DEFAULT_BUF_SIZE], - buf_pos: 0, - buf_cap: 0, - } - } - - fn fill_inner_buf(&mut self) -> Result<&[u8]> { - if self.buf_pos >= self.buf_cap { - // If we've reached the end of our internal buffer then we need to fetch - // some more data from the underlying reader. - // Branch using `>=` instead of the more correct `==` - // to tell the compiler that the pos..cap slice is always valid. - debug_assert!(self.buf_pos == self.buf_cap); - let mut reader = self.reader.borrow_mut(); - reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading - self.buf_cap = reader.read(&mut self.buf)?; - self.buf_pos = 0; - } - Ok(&self.buf[self.buf_pos..self.buf_cap]) - } - - fn skip_inner_buf(&mut self, buf: &mut [u8]) -> Result { - // discard buffer - self.buf_pos = 0; - self.buf_cap = 0; - // read directly into param buffer - let mut reader = self.reader.borrow_mut(); - reader.seek(SeekFrom::Start(self.start))?; // always seek to start before reading - let nread = reader.read(buf)?; - self.start += nread as u64; - Ok(nread) - } -} - -impl Read for FileSource { - fn read(&mut self, buf: &mut [u8]) -> Result { - let bytes_to_read = cmp::min(buf.len(), (self.end - self.start) as usize); - let buf = &mut buf[0..bytes_to_read]; - - // If we don't have any buffered data and we're doing a massive read - // (larger than our internal buffer), bypass our internal buffer - // entirely. - if self.buf_pos == self.buf_cap && buf.len() >= self.buf.len() { - return self.skip_inner_buf(buf); - } - let nread = { - let mut rem = self.fill_inner_buf()?; - // copy the data from the inner buffer to the param buffer - rem.read(buf)? - }; - // consume from buffer - self.buf_pos = cmp::min(self.buf_pos + nread, self.buf_cap); - - self.start += nread as u64; - Ok(nread) - } -} - -impl Length for FileSource { - fn len(&self) -> u64 { - self.end - self.start - } -} - -#[cfg(test)] -mod tests { - use super::*; - - use std::iter; - - use crate::util::test_common::file_util::get_test_file; - - #[test] - fn test_io_read_fully() { - let mut buf = vec![0; 8]; - let mut src = FileSource::new(&get_test_file("alltypes_plain.parquet"), 0, 4); - - let bytes_read = src.read(&mut buf[..]).unwrap(); - assert_eq!(bytes_read, 4); - assert_eq!(buf, vec![b'P', b'A', b'R', b'1', 0, 0, 0, 0]); - } - - #[test] - fn test_io_read_in_chunks() { - let mut buf = vec![0; 4]; - let mut src = FileSource::new(&get_test_file("alltypes_plain.parquet"), 0, 4); - - let bytes_read = src.read(&mut buf[0..2]).unwrap(); - assert_eq!(bytes_read, 2); - let bytes_read = src.read(&mut buf[2..]).unwrap(); - assert_eq!(bytes_read, 2); - assert_eq!(buf, vec![b'P', b'A', b'R', b'1']); - } - - #[test] - fn test_io_read_pos() { - let mut src = FileSource::new(&get_test_file("alltypes_plain.parquet"), 0, 4); - - let _ = src.read(&mut [0; 1]).unwrap(); - assert_eq!(src.start, 1); - - let _ = src.read(&mut [0; 4]).unwrap(); - assert_eq!(src.start, 4); - } - - #[test] - fn test_io_read_over_limit() { - let mut src = FileSource::new(&get_test_file("alltypes_plain.parquet"), 0, 4); - - // Read all bytes from source - let _ = src.read(&mut [0; 128]).unwrap(); - assert_eq!(src.start, 4); - - // Try reading again, should return 0 bytes. - let bytes_read = src.read(&mut [0; 128]).unwrap(); - assert_eq!(bytes_read, 0); - assert_eq!(src.start, 4); - } - - #[test] - fn test_io_seek_switch() { - let mut buf = vec![0; 4]; - let mut file = get_test_file("alltypes_plain.parquet"); - let mut src = FileSource::new(&file, 0, 4); - - file.seek(SeekFrom::Start(5_u64)) - .expect("File seek to a position"); - - let bytes_read = src.read(&mut buf[..]).unwrap(); - assert_eq!(bytes_read, 4); - assert_eq!(buf, vec![b'P', b'A', b'R', b'1']); - } - - #[test] - fn test_io_large_read() { - // Generate repeated 'abcdef' pattern and write it into a file - let patterned_data: Vec = iter::repeat(vec![0, 1, 2, 3, 4, 5]) - .flatten() - .take(3 * DEFAULT_BUF_SIZE) - .collect(); - - let mut file = tempfile::tempfile().unwrap(); - file.write_all(&patterned_data).unwrap(); - - // seek the underlying file to the first 'd' - file.seek(SeekFrom::Start(3)).unwrap(); - - // create the FileSource reader that starts at pos 1 ('b') - let mut chunk = FileSource::new(&file, 1, patterned_data.len() - 1); - - // read the 'b' at pos 1 - let mut res = vec![0u8; 1]; - chunk.read_exact(&mut res).unwrap(); - assert_eq!(res, &[1]); - - // the underlying file is sought to 'e' - file.seek(SeekFrom::Start(4)).unwrap(); - - // now read large chunk that starts with 'c' (after 'b') - let mut res = vec![0u8; 2 * DEFAULT_BUF_SIZE]; - chunk.read_exact(&mut res).unwrap(); - assert_eq!( - res, - &patterned_data[2..2 + 2 * DEFAULT_BUF_SIZE], - "read buf and original data are not equal" - ); - } -} diff --git a/parquet/src/util/mod.rs b/parquet/src/util/mod.rs index 5f43023941fd..d96a62a9f363 100644 --- a/parquet/src/util/mod.rs +++ b/parquet/src/util/mod.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -pub mod io; pub mod memory; #[macro_use] pub mod bit_util;