Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Push-Based CSV Decoder #3604

Merged
merged 5 commits into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 174 additions & 71 deletions arrow-csv/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use arrow_cast::parse::Parser;
use arrow_schema::*;

use crate::map_csv_error;
use crate::reader::records::{RecordReader, StringRecords};
use crate::reader::records::{RecordDecoder, StringRecords};
use arrow_data::decimal::validate_decimal_precision;
use csv::StringRecord;
use std::ops::Neg;
Expand Down Expand Up @@ -330,24 +330,11 @@ pub type Reader<R> = BufReader<StdBufReader<R>>;

/// CSV file reader
pub struct BufReader<R> {
/// Explicit schema for the CSV file
schema: SchemaRef,
/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<usize>>,
/// File reader
reader: RecordReader<R>,
/// Rows to skip
to_skip: usize,
/// Current line number
line_number: usize,
/// End line number
end: usize,
/// Number of records per batch
batch_size: usize,
/// datetime format used to parse datetime values, (format understood by chrono)
///
/// For format refer to [chrono docs](https://docs.rs/chrono/0.4.19/chrono/format/strftime/index.html)
datetime_format: Option<String>,
reader: R,

/// The decoder
decoder: Decoder,
}

impl<R> fmt::Debug for BufReader<R>
Expand All @@ -356,10 +343,7 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Reader")
.field("schema", &self.schema)
.field("projection", &self.projection)
.field("line_number", &self.line_number)
.field("datetime_format", &self.datetime_format)
.field("decoder", &self.decoder)
.finish()
}
}
Expand All @@ -383,7 +367,8 @@ impl<R: Read> Reader<R> {
) -> Self {
let mut builder = ReaderBuilder::new()
.has_header(has_header)
.with_batch_size(batch_size);
.with_batch_size(batch_size)
.with_schema(schema);

if let Some(delimiter) = delimiter {
builder = builder.with_delimiter(delimiter);
Expand All @@ -397,21 +382,25 @@ impl<R: Read> Reader<R> {
if let Some(format) = datetime_format {
builder = builder.with_datetime_format(format)
}
builder.build_with_schema(StdBufReader::new(reader), schema)

Self {
decoder: builder.build_decoder(),
reader: StdBufReader::new(reader),
}
}

/// Returns the schema of the reader, useful for getting the schema without reading
/// record batches
pub fn schema(&self) -> SchemaRef {
match &self.projection {
match &self.decoder.projection {
Some(projection) => {
let fields = self.schema.fields();
let fields = self.decoder.schema.fields();
let projected_fields: Vec<Field> =
projection.iter().map(|i| fields[*i].clone()).collect();

Arc::new(Schema::new(projected_fields))
}
None => self.schema.clone(),
None => self.decoder.schema.clone(),
}
}

Expand Down Expand Up @@ -444,38 +433,146 @@ impl<R: Read> Reader<R> {
}
}

impl<R: BufRead> BufReader<R> {
fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
loop {
let buf = self.reader.fill_buf()?;
let decoded = self.decoder.decode(buf)?;
if decoded == 0 {
break;
}
self.reader.consume(decoded);
}

Ok(self.decoder.flush()?)
}
}

impl<R: BufRead> Iterator for BufReader<R> {
type Item = Result<RecordBatch, ArrowError>;

fn next(&mut self) -> Option<Self::Item> {
self.read().transpose()
}
}

/// A push-based interface for decoding CSV data from an arbitrary byte stream
///
/// See [`Reader`] for a higher-level interface for interface with [`Read`]
///
/// The push-based interface facilitates integration with sources that yield arbitrarily
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// delimited bytes ranges, such as [`BufRead`], or a chunked byte stream received from
/// object storage
///
/// ```
/// # use std::io::BufRead;
/// # use arrow_array::RecordBatch;
/// # use arrow_csv::ReaderBuilder;
/// # use arrow_schema::{ArrowError, SchemaRef};
/// #
/// fn read_from_csv<R: BufRead>(
/// mut reader: R,
/// schema: SchemaRef,
/// batch_size: usize,
/// ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
/// let mut decoder = ReaderBuilder::new()
/// .with_schema(schema)
/// .with_batch_size(batch_size)
/// .build_decoder();
///
/// let mut next = move || {
/// loop {
/// let buf = reader.fill_buf()?;
/// let decoded = decoder.decode(buf)?;
/// if decoded == 0 {
/// break;
/// }
///
/// // Consume the number of bytes read
/// reader.consume(decoded);
/// }
/// decoder.flush()
/// };
/// Ok(std::iter::from_fn(move || next().transpose()))
/// }
/// ```
#[derive(Debug)]
pub struct Decoder {
/// Explicit schema for the CSV file
schema: SchemaRef,

/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<usize>>,

/// Number of records per batch
batch_size: usize,

/// Rows to skip
to_skip: usize,

/// Current line number
line_number: usize,

/// End line number
end: usize,

/// A decoder for [`StringRecords`]
record_decoder: RecordDecoder,

/// datetime format used to parse datetime values, (format understood by chrono)
///
/// For format refer to [chrono docs](https://docs.rs/chrono/0.4.19/chrono/format/strftime/index.html)
datetime_format: Option<String>,
}

impl Decoder {
/// Decode records from `buf` returning the number of bytes read
///
/// This method returns once `batch_size` objects have been parsed since the
/// last call to [`Self::flush`], or `buf` is exhausted. Any remaining bytes
/// should be included in the next call to [`Self::decode`]
///
/// There is no requirement that `buf` contains a whole number of records, facilitating
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// integration with arbitrary byte streams, such as that yielded by [`BufRead`] or
/// network sources such as object storage
pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
if self.to_skip != 0 {
if let Err(e) = self.reader.skip(std::mem::take(&mut self.to_skip)) {
return Some(Err(e));
}
// Skip in units of `to_read` to avoid over-allocating buffers
let to_skip = self.to_skip.min(self.batch_size);
let (skipped, bytes) = self.record_decoder.decode(buf, to_skip)?;
self.to_skip -= skipped;
self.record_decoder.clear();
return Ok(bytes);
}

let remaining = self.end - self.line_number;
let to_read = self.batch_size.min(remaining);
let to_read =
self.batch_size.min(self.end - self.line_number) - self.record_decoder.len();
let (_, bytes) = self.record_decoder.decode(buf, to_read)?;
Ok(bytes)
}

let batch = match self.reader.read(to_read) {
Ok(b) if b.is_empty() => return None,
Ok(b) => b,
Err(e) => return Some(Err(e)),
};
/// Flushes the currently buffered data to a [`RecordBatch`]
///
/// This should only be called after [`Self::decode`] has returned `Ok(0)`,
/// otherwise may return an error if part way through decoding a record
///
/// Returns `Ok(None)` if no buffered data
pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
if self.record_decoder.is_empty() {
return Ok(None);
}

// parse the batches into a RecordBatch
let result = parse(
&batch,
let rows = self.record_decoder.flush()?;
let batch = parse(
&rows,
self.schema.fields(),
Some(self.schema.metadata.clone()),
self.projection.as_ref(),
self.line_number,
self.datetime_format.as_deref(),
);

self.line_number += batch.len();

Some(result)
)?;
self.line_number += rows.len();
Ok(Some(batch))
}
}

Expand Down Expand Up @@ -1060,29 +1157,35 @@ impl ReaderBuilder {
mut reader: R,
) -> Result<BufReader<R>, ArrowError> {
// check if schema should be inferred
let delimiter = self.delimiter.unwrap_or(b',');
let schema = match self.schema.take() {
Some(schema) => schema,
None => {
let roptions = ReaderOptions {
delimiter: Some(delimiter),
max_read_records: self.max_records,
has_header: self.has_header,
escape: self.escape,
quote: self.quote,
terminator: self.terminator,
datetime_re: self.datetime_re.take(),
};
let (inferred_schema, _) =
infer_file_schema_with_csv_options(&mut reader, roptions)?;

Arc::new(inferred_schema)
}
};
Ok(self.build_with_schema(reader, schema))
if self.schema.is_none() {
let delimiter = self.delimiter.unwrap_or(b',');
let roptions = ReaderOptions {
delimiter: Some(delimiter),
max_read_records: self.max_records,
has_header: self.has_header,
escape: self.escape,
quote: self.quote,
terminator: self.terminator,
datetime_re: self.datetime_re.take(),
};
let (inferred_schema, _) =
infer_file_schema_with_csv_options(&mut reader, roptions)?;
self.schema = Some(Arc::new(inferred_schema))
}

Ok(BufReader {
reader,
decoder: self.build_decoder(),
})
}

fn build_with_schema<R: BufRead>(self, reader: R, schema: SchemaRef) -> BufReader<R> {
/// Builds a decoder that can be used to decode CSV from an arbitrary byte stream
///
/// # Panics
///
/// This method panics if no schema provided
pub fn build_decoder(self) -> Decoder {
let schema = self.schema.expect("schema should be provided");
let mut reader_builder = csv_core::ReaderBuilder::new();
reader_builder.escape(self.escape);

Expand All @@ -1096,7 +1199,7 @@ impl ReaderBuilder {
reader_builder.terminator(csv_core::Terminator::Any(t));
}
let delimiter = reader_builder.build();
let reader = RecordReader::new(reader, delimiter, schema.fields().len());
let record_decoder = RecordDecoder::new(delimiter, schema.fields().len());

let header = self.has_header as usize;

Expand All @@ -1105,15 +1208,15 @@ impl ReaderBuilder {
None => (header, usize::MAX),
};

BufReader {
Decoder {
schema,
projection: self.projection,
reader,
to_skip: start,
record_decoder,
line_number: start,
end,
batch_size: self.batch_size,
projection: self.projection,
datetime_format: self.datetime_format,
batch_size: self.batch_size,
}
}
}
Expand Down
Loading