-
Notifications
You must be signed in to change notification settings - Fork 750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Push-Based CSV Decoder #3604
Merged
Merged
Changes from 1 commit
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,7 +57,7 @@ use arrow_cast::parse::Parser; | |
use arrow_schema::*; | ||
|
||
use crate::map_csv_error; | ||
use crate::reader::records::{RecordReader, StringRecords}; | ||
use crate::reader::records::{RecordDecoder, StringRecords}; | ||
use arrow_data::decimal::validate_decimal_precision; | ||
use csv::StringRecord; | ||
use std::ops::Neg; | ||
|
@@ -330,24 +330,11 @@ pub type Reader<R> = BufReader<StdBufReader<R>>; | |
|
||
/// CSV file reader | ||
pub struct BufReader<R> { | ||
/// Explicit schema for the CSV file | ||
schema: SchemaRef, | ||
/// Optional projection for which columns to load (zero-based column indices) | ||
projection: Option<Vec<usize>>, | ||
/// File reader | ||
reader: RecordReader<R>, | ||
/// Rows to skip | ||
to_skip: usize, | ||
/// Current line number | ||
line_number: usize, | ||
/// End line number | ||
end: usize, | ||
/// Number of records per batch | ||
batch_size: usize, | ||
/// datetime format used to parse datetime values, (format understood by chrono) | ||
/// | ||
/// For format refer to [chrono docs](https://docs.rs/chrono/0.4.19/chrono/format/strftime/index.html) | ||
datetime_format: Option<String>, | ||
reader: R, | ||
|
||
/// The decoder | ||
decoder: Decoder, | ||
} | ||
|
||
impl<R> fmt::Debug for BufReader<R> | ||
|
@@ -356,10 +343,7 @@ where | |
{ | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
f.debug_struct("Reader") | ||
.field("schema", &self.schema) | ||
.field("projection", &self.projection) | ||
.field("line_number", &self.line_number) | ||
.field("datetime_format", &self.datetime_format) | ||
.field("decoder", &self.decoder) | ||
.finish() | ||
} | ||
} | ||
|
@@ -383,7 +367,8 @@ impl<R: Read> Reader<R> { | |
) -> Self { | ||
let mut builder = ReaderBuilder::new() | ||
.has_header(has_header) | ||
.with_batch_size(batch_size); | ||
.with_batch_size(batch_size) | ||
.with_schema(schema); | ||
|
||
if let Some(delimiter) = delimiter { | ||
builder = builder.with_delimiter(delimiter); | ||
|
@@ -397,21 +382,25 @@ impl<R: Read> Reader<R> { | |
if let Some(format) = datetime_format { | ||
builder = builder.with_datetime_format(format) | ||
} | ||
builder.build_with_schema(StdBufReader::new(reader), schema) | ||
|
||
Self { | ||
decoder: builder.build_decoder(), | ||
reader: StdBufReader::new(reader), | ||
} | ||
} | ||
|
||
/// Returns the schema of the reader, useful for getting the schema without reading | ||
/// record batches | ||
pub fn schema(&self) -> SchemaRef { | ||
match &self.projection { | ||
match &self.decoder.projection { | ||
Some(projection) => { | ||
let fields = self.schema.fields(); | ||
let fields = self.decoder.schema.fields(); | ||
let projected_fields: Vec<Field> = | ||
projection.iter().map(|i| fields[*i].clone()).collect(); | ||
|
||
Arc::new(Schema::new(projected_fields)) | ||
} | ||
None => self.schema.clone(), | ||
None => self.decoder.schema.clone(), | ||
} | ||
} | ||
|
||
|
@@ -444,38 +433,146 @@ impl<R: Read> Reader<R> { | |
} | ||
} | ||
|
||
impl<R: BufRead> BufReader<R> { | ||
fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> { | ||
loop { | ||
let buf = self.reader.fill_buf()?; | ||
let decoded = self.decoder.decode(buf)?; | ||
if decoded == 0 { | ||
break; | ||
} | ||
self.reader.consume(decoded); | ||
} | ||
|
||
Ok(self.decoder.flush()?) | ||
} | ||
} | ||
|
||
impl<R: BufRead> Iterator for BufReader<R> { | ||
type Item = Result<RecordBatch, ArrowError>; | ||
|
||
fn next(&mut self) -> Option<Self::Item> { | ||
self.read().transpose() | ||
} | ||
} | ||
|
||
/// A push-based interface for decoding CSV data from an arbitrary byte stream | ||
/// | ||
/// See [`Reader`] for a higher-level interface for interface with [`Read`] | ||
/// | ||
/// The push-based interface facilitates integration with sources that yield arbitrarily | ||
/// delimited bytes ranges, such as [`BufRead`], or a chunked byte stream received from | ||
/// object storage | ||
/// | ||
/// ``` | ||
/// # use std::io::BufRead; | ||
/// # use arrow_array::RecordBatch; | ||
/// # use arrow_csv::ReaderBuilder; | ||
/// # use arrow_schema::{ArrowError, SchemaRef}; | ||
/// # | ||
/// fn read_from_csv<R: BufRead>( | ||
/// mut reader: R, | ||
/// schema: SchemaRef, | ||
/// batch_size: usize, | ||
/// ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> { | ||
/// let mut decoder = ReaderBuilder::new() | ||
/// .with_schema(schema) | ||
/// .with_batch_size(batch_size) | ||
/// .build_decoder(); | ||
/// | ||
/// let mut next = move || { | ||
/// loop { | ||
/// let buf = reader.fill_buf()?; | ||
/// let decoded = decoder.decode(buf)?; | ||
/// if decoded == 0 { | ||
/// break; | ||
/// } | ||
/// | ||
/// // Consume the number of bytes read | ||
/// reader.consume(decoded); | ||
/// } | ||
/// decoder.flush() | ||
/// }; | ||
/// Ok(std::iter::from_fn(move || next().transpose())) | ||
/// } | ||
/// ``` | ||
#[derive(Debug)] | ||
pub struct Decoder { | ||
/// Explicit schema for the CSV file | ||
schema: SchemaRef, | ||
|
||
/// Optional projection for which columns to load (zero-based column indices) | ||
projection: Option<Vec<usize>>, | ||
|
||
/// Number of records per batch | ||
batch_size: usize, | ||
|
||
/// Rows to skip | ||
to_skip: usize, | ||
|
||
/// Current line number | ||
line_number: usize, | ||
|
||
/// End line number | ||
end: usize, | ||
|
||
/// A decoder for [`StringRecords`] | ||
record_decoder: RecordDecoder, | ||
|
||
/// datetime format used to parse datetime values, (format understood by chrono) | ||
/// | ||
/// For format refer to [chrono docs](https://docs.rs/chrono/0.4.19/chrono/format/strftime/index.html) | ||
datetime_format: Option<String>, | ||
} | ||
|
||
impl Decoder { | ||
/// Decode records from `buf` returning the number of bytes read | ||
/// | ||
/// This method returns once `batch_size` objects have been parsed since the | ||
/// last call to [`Self::flush`], or `buf` is exhausted. Any remaining bytes | ||
/// should be included in the next call to [`Self::decode`] | ||
/// | ||
/// There is no requirement that `buf` contains a whole number of records, facilitating | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
/// integration with arbitrary byte streams, such as that yielded by [`BufRead`] or | ||
/// network sources such as object storage | ||
pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> { | ||
if self.to_skip != 0 { | ||
if let Err(e) = self.reader.skip(std::mem::take(&mut self.to_skip)) { | ||
return Some(Err(e)); | ||
} | ||
// Skip in units of `to_read` to avoid over-allocating buffers | ||
let to_skip = self.to_skip.min(self.batch_size); | ||
let (skipped, bytes) = self.record_decoder.decode(buf, to_skip)?; | ||
self.to_skip -= skipped; | ||
self.record_decoder.clear(); | ||
return Ok(bytes); | ||
} | ||
|
||
let remaining = self.end - self.line_number; | ||
let to_read = self.batch_size.min(remaining); | ||
let to_read = | ||
self.batch_size.min(self.end - self.line_number) - self.record_decoder.len(); | ||
let (_, bytes) = self.record_decoder.decode(buf, to_read)?; | ||
Ok(bytes) | ||
} | ||
|
||
let batch = match self.reader.read(to_read) { | ||
Ok(b) if b.is_empty() => return None, | ||
Ok(b) => b, | ||
Err(e) => return Some(Err(e)), | ||
}; | ||
/// Flushes the currently buffered data to a [`RecordBatch`] | ||
/// | ||
/// This should only be called after [`Self::decode`] has returned `Ok(0)`, | ||
/// otherwise may return an error if part way through decoding a record | ||
/// | ||
/// Returns `Ok(None)` if no buffered data | ||
pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> { | ||
if self.record_decoder.is_empty() { | ||
return Ok(None); | ||
} | ||
|
||
// parse the batches into a RecordBatch | ||
let result = parse( | ||
&batch, | ||
let rows = self.record_decoder.flush()?; | ||
let batch = parse( | ||
&rows, | ||
self.schema.fields(), | ||
Some(self.schema.metadata.clone()), | ||
self.projection.as_ref(), | ||
self.line_number, | ||
self.datetime_format.as_deref(), | ||
); | ||
|
||
self.line_number += batch.len(); | ||
|
||
Some(result) | ||
)?; | ||
self.line_number += rows.len(); | ||
Ok(Some(batch)) | ||
} | ||
} | ||
|
||
|
@@ -1060,29 +1157,35 @@ impl ReaderBuilder { | |
mut reader: R, | ||
) -> Result<BufReader<R>, ArrowError> { | ||
// check if schema should be inferred | ||
let delimiter = self.delimiter.unwrap_or(b','); | ||
let schema = match self.schema.take() { | ||
Some(schema) => schema, | ||
None => { | ||
let roptions = ReaderOptions { | ||
delimiter: Some(delimiter), | ||
max_read_records: self.max_records, | ||
has_header: self.has_header, | ||
escape: self.escape, | ||
quote: self.quote, | ||
terminator: self.terminator, | ||
datetime_re: self.datetime_re.take(), | ||
}; | ||
let (inferred_schema, _) = | ||
infer_file_schema_with_csv_options(&mut reader, roptions)?; | ||
|
||
Arc::new(inferred_schema) | ||
} | ||
}; | ||
Ok(self.build_with_schema(reader, schema)) | ||
if self.schema.is_none() { | ||
let delimiter = self.delimiter.unwrap_or(b','); | ||
let roptions = ReaderOptions { | ||
delimiter: Some(delimiter), | ||
max_read_records: self.max_records, | ||
has_header: self.has_header, | ||
escape: self.escape, | ||
quote: self.quote, | ||
terminator: self.terminator, | ||
datetime_re: self.datetime_re.take(), | ||
}; | ||
let (inferred_schema, _) = | ||
infer_file_schema_with_csv_options(&mut reader, roptions)?; | ||
self.schema = Some(Arc::new(inferred_schema)) | ||
} | ||
|
||
Ok(BufReader { | ||
reader, | ||
decoder: self.build_decoder(), | ||
}) | ||
} | ||
|
||
fn build_with_schema<R: BufRead>(self, reader: R, schema: SchemaRef) -> BufReader<R> { | ||
/// Builds a decoder that can be used to decode CSV from an arbitrary byte stream | ||
/// | ||
/// # Panics | ||
/// | ||
/// This method panics if no schema provided | ||
pub fn build_decoder(self) -> Decoder { | ||
let schema = self.schema.expect("schema should be provided"); | ||
let mut reader_builder = csv_core::ReaderBuilder::new(); | ||
reader_builder.escape(self.escape); | ||
|
||
|
@@ -1096,7 +1199,7 @@ impl ReaderBuilder { | |
reader_builder.terminator(csv_core::Terminator::Any(t)); | ||
} | ||
let delimiter = reader_builder.build(); | ||
let reader = RecordReader::new(reader, delimiter, schema.fields().len()); | ||
let record_decoder = RecordDecoder::new(delimiter, schema.fields().len()); | ||
|
||
let header = self.has_header as usize; | ||
|
||
|
@@ -1105,15 +1208,15 @@ impl ReaderBuilder { | |
None => (header, usize::MAX), | ||
}; | ||
|
||
BufReader { | ||
Decoder { | ||
schema, | ||
projection: self.projection, | ||
reader, | ||
to_skip: start, | ||
record_decoder, | ||
line_number: start, | ||
end, | ||
batch_size: self.batch_size, | ||
projection: self.projection, | ||
datetime_format: self.datetime_format, | ||
batch_size: self.batch_size, | ||
} | ||
} | ||
} | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍