From 2e96f46800a8ec69744a4a84240c2a197d2b100c Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 25 Jan 2023 19:08:18 +0000 Subject: [PATCH 1/4] Add Push-Based CSV Decoder --- arrow-csv/src/reader/mod.rs | 245 +++++++++++++++++-------- arrow-csv/src/reader/records.rs | 304 ++++++++++++++++++-------------- 2 files changed, 345 insertions(+), 204 deletions(-) diff --git a/arrow-csv/src/reader/mod.rs b/arrow-csv/src/reader/mod.rs index 0c7bfa897fd4..d46eee6b14f8 100644 --- a/arrow-csv/src/reader/mod.rs +++ b/arrow-csv/src/reader/mod.rs @@ -57,7 +57,7 @@ use arrow_cast::parse::Parser; use arrow_schema::*; use crate::map_csv_error; -use crate::reader::records::{RecordReader, StringRecords}; +use crate::reader::records::{RecordDecoder, StringRecords}; use arrow_data::decimal::validate_decimal_precision; use csv::StringRecord; use std::ops::Neg; @@ -330,24 +330,11 @@ pub type Reader = BufReader>; /// CSV file reader pub struct BufReader { - /// Explicit schema for the CSV file - schema: SchemaRef, - /// Optional projection for which columns to load (zero-based column indices) - projection: Option>, /// File reader - reader: RecordReader, - /// Rows to skip - to_skip: usize, - /// Current line number - line_number: usize, - /// End line number - end: usize, - /// Number of records per batch - batch_size: usize, - /// datetime format used to parse datetime values, (format understood by chrono) - /// - /// For format refer to [chrono docs](https://docs.rs/chrono/0.4.19/chrono/format/strftime/index.html) - datetime_format: Option, + reader: R, + + /// The decoder + decoder: Decoder, } impl fmt::Debug for BufReader @@ -356,10 +343,7 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Reader") - .field("schema", &self.schema) - .field("projection", &self.projection) - .field("line_number", &self.line_number) - .field("datetime_format", &self.datetime_format) + .field("decoder", &self.decoder) .finish() } } @@ -383,7 +367,8 @@ impl Reader { ) -> Self { let mut builder = ReaderBuilder::new() .has_header(has_header) - .with_batch_size(batch_size); + .with_batch_size(batch_size) + .with_schema(schema); if let Some(delimiter) = delimiter { builder = builder.with_delimiter(delimiter); @@ -397,21 +382,25 @@ impl Reader { if let Some(format) = datetime_format { builder = builder.with_datetime_format(format) } - builder.build_with_schema(StdBufReader::new(reader), schema) + + Self { + decoder: builder.build_decoder(), + reader: StdBufReader::new(reader), + } } /// Returns the schema of the reader, useful for getting the schema without reading /// record batches pub fn schema(&self) -> SchemaRef { - match &self.projection { + match &self.decoder.projection { Some(projection) => { - let fields = self.schema.fields(); + let fields = self.decoder.schema.fields(); let projected_fields: Vec = projection.iter().map(|i| fields[*i].clone()).collect(); Arc::new(Schema::new(projected_fields)) } - None => self.schema.clone(), + None => self.decoder.schema.clone(), } } @@ -444,38 +433,146 @@ impl Reader { } } +impl BufReader { + fn read(&mut self) -> Result, ArrowError> { + loop { + let buf = self.reader.fill_buf()?; + let decoded = self.decoder.decode(buf)?; + if decoded == 0 { + break; + } + self.reader.consume(decoded); + } + + Ok(self.decoder.flush()?) + } +} + impl Iterator for BufReader { type Item = Result; fn next(&mut self) -> Option { + self.read().transpose() + } +} + +/// A push-based interface for decoding CSV data from an arbitrary byte stream +/// +/// See [`Reader`] for a higher-level interface for interface with [`Read`] +/// +/// The push-based interface facilitates integration with sources that yield arbitrarily +/// delimited bytes ranges, such as [`BufRead`], or a chunked byte stream received from +/// object storage +/// +/// ``` +/// # use std::io::BufRead; +/// # use arrow_array::RecordBatch; +/// # use arrow_csv::ReaderBuilder; +/// # use arrow_schema::{ArrowError, SchemaRef}; +/// # +/// fn read_from_csv( +/// mut reader: R, +/// schema: SchemaRef, +/// batch_size: usize, +/// ) -> Result>, ArrowError> { +/// let mut decoder = ReaderBuilder::new() +/// .with_schema(schema) +/// .with_batch_size(batch_size) +/// .build_decoder(); +/// +/// let mut next = move || { +/// loop { +/// let buf = reader.fill_buf()?; +/// let decoded = decoder.decode(buf)?; +/// if decoded == 0 { +/// break; +/// } +/// +/// // Consume the number of bytes read +/// reader.consume(decoded); +/// } +/// decoder.flush() +/// }; +/// Ok(std::iter::from_fn(move || next().transpose())) +/// } +/// ``` +#[derive(Debug)] +pub struct Decoder { + /// Explicit schema for the CSV file + schema: SchemaRef, + + /// Optional projection for which columns to load (zero-based column indices) + projection: Option>, + + /// Number of records per batch + batch_size: usize, + + /// Rows to skip + to_skip: usize, + + /// Current line number + line_number: usize, + + /// End line number + end: usize, + + /// A decoder for [`StringRecords`] + record_decoder: RecordDecoder, + + /// datetime format used to parse datetime values, (format understood by chrono) + /// + /// For format refer to [chrono docs](https://docs.rs/chrono/0.4.19/chrono/format/strftime/index.html) + datetime_format: Option, +} + +impl Decoder { + /// Decode records from `buf` returning the number of bytes read + /// + /// This method returns once `batch_size` objects have been parsed since the + /// last call to [`Self::flush`], or `buf` is exhausted. Any remaining bytes + /// should be included in the next call to [`Self::decode`] + /// + /// There is no requirement that `buf` contains a whole number of records, facilitating + /// integration with arbitrary byte streams, such as that yielded by [`BufRead`] or + /// network sources such as object storage + pub fn decode(&mut self, buf: &[u8]) -> Result { if self.to_skip != 0 { - if let Err(e) = self.reader.skip(std::mem::take(&mut self.to_skip)) { - return Some(Err(e)); - } + // Skip in units of `to_read` to avoid over-allocating buffers + let to_skip = self.to_skip.min(self.batch_size); + let (skipped, bytes) = self.record_decoder.decode(buf, to_skip)?; + self.to_skip -= skipped; + self.record_decoder.clear(); + return Ok(bytes); } - let remaining = self.end - self.line_number; - let to_read = self.batch_size.min(remaining); + let to_read = + self.batch_size.min(self.end - self.line_number) - self.record_decoder.len(); + let (_, bytes) = self.record_decoder.decode(buf, to_read)?; + Ok(bytes) + } - let batch = match self.reader.read(to_read) { - Ok(b) if b.is_empty() => return None, - Ok(b) => b, - Err(e) => return Some(Err(e)), - }; + /// Flushes the currently buffered data to a [`RecordBatch`] + /// + /// This should only be called after [`Self::decode`] has returned `Ok(0)`, + /// otherwise may return an error if part way through decoding a record + /// + /// Returns `Ok(None)` if no buffered data + pub fn flush(&mut self) -> Result, ArrowError> { + if self.record_decoder.is_empty() { + return Ok(None); + } - // parse the batches into a RecordBatch - let result = parse( - &batch, + let rows = self.record_decoder.flush()?; + let batch = parse( + &rows, self.schema.fields(), Some(self.schema.metadata.clone()), self.projection.as_ref(), self.line_number, self.datetime_format.as_deref(), - ); - - self.line_number += batch.len(); - - Some(result) + )?; + self.line_number += rows.len(); + Ok(Some(batch)) } } @@ -1060,29 +1157,35 @@ impl ReaderBuilder { mut reader: R, ) -> Result, ArrowError> { // check if schema should be inferred - let delimiter = self.delimiter.unwrap_or(b','); - let schema = match self.schema.take() { - Some(schema) => schema, - None => { - let roptions = ReaderOptions { - delimiter: Some(delimiter), - max_read_records: self.max_records, - has_header: self.has_header, - escape: self.escape, - quote: self.quote, - terminator: self.terminator, - datetime_re: self.datetime_re.take(), - }; - let (inferred_schema, _) = - infer_file_schema_with_csv_options(&mut reader, roptions)?; - - Arc::new(inferred_schema) - } - }; - Ok(self.build_with_schema(reader, schema)) + if self.schema.is_none() { + let delimiter = self.delimiter.unwrap_or(b','); + let roptions = ReaderOptions { + delimiter: Some(delimiter), + max_read_records: self.max_records, + has_header: self.has_header, + escape: self.escape, + quote: self.quote, + terminator: self.terminator, + datetime_re: self.datetime_re.take(), + }; + let (inferred_schema, _) = + infer_file_schema_with_csv_options(&mut reader, roptions)?; + self.schema = Some(Arc::new(inferred_schema)) + } + + Ok(BufReader { + reader, + decoder: self.build_decoder(), + }) } - fn build_with_schema(self, reader: R, schema: SchemaRef) -> BufReader { + /// Builds a decoder that can be used to decode CSV from an arbitrary byte stream + /// + /// # Panics + /// + /// This method panics if no schema provided + pub fn build_decoder(self) -> Decoder { + let schema = self.schema.expect("schema should be provided"); let mut reader_builder = csv_core::ReaderBuilder::new(); reader_builder.escape(self.escape); @@ -1096,7 +1199,7 @@ impl ReaderBuilder { reader_builder.terminator(csv_core::Terminator::Any(t)); } let delimiter = reader_builder.build(); - let reader = RecordReader::new(reader, delimiter, schema.fields().len()); + let record_decoder = RecordDecoder::new(delimiter, schema.fields().len()); let header = self.has_header as usize; @@ -1105,15 +1208,15 @@ impl ReaderBuilder { None => (header, usize::MAX), }; - BufReader { + Decoder { schema, - projection: self.projection, - reader, to_skip: start, + record_decoder, line_number: start, end, - batch_size: self.batch_size, + projection: self.projection, datetime_format: self.datetime_format, + batch_size: self.batch_size, } } } diff --git a/arrow-csv/src/reader/records.rs b/arrow-csv/src/reader/records.rs index 501da408815c..4bd0916b9d81 100644 --- a/arrow-csv/src/reader/records.rs +++ b/arrow-csv/src/reader/records.rs @@ -17,7 +17,6 @@ use arrow_schema::ArrowError; use csv_core::{ReadRecordResult, Reader}; -use std::io::BufRead; /// The estimated length of a field in bytes const AVERAGE_FIELD_SIZE: usize = 8; @@ -25,112 +24,165 @@ const AVERAGE_FIELD_SIZE: usize = 8; /// The minimum amount of data in a single read const MIN_CAPACITY: usize = 1024; -pub struct RecordReader { - reader: R, +/// [`RecordDecoder`] provides a push-based interface to decoder [`StringRecords`] +#[derive(Debug)] +pub struct RecordDecoder { delimiter: Reader, + /// The expected number of fields per row num_columns: usize, + /// The current line number line_number: usize, + + /// Offsets delimiting field start positions offsets: Vec, + + /// The current offset into `self.offsets` + /// + /// We track this independently of Vec to avoid re-zeroing memory + offsets_len: usize, + + /// The number of fields read for the current record + current_field: usize, + + /// The number of rows buffered + num_rows: usize, + + /// Decoded field data data: Vec, + + /// Offsets into data + /// + /// We track this independently of Vec to avoid re-zeroing memory + data_len: usize, } -impl RecordReader { - pub fn new(reader: R, delimiter: Reader, num_columns: usize) -> Self { +impl RecordDecoder { + pub fn new(delimiter: Reader, num_columns: usize) -> Self { Self { - reader, delimiter, num_columns, line_number: 1, offsets: vec![], + offsets_len: 1, // The first offset is always 0 + current_field: 0, + data_len: 0, data: vec![], + num_rows: 0, } } - /// Clears and then fills the buffers on this [`RecordReader`] - /// returning the number of records read - fn fill_buf(&mut self, to_read: usize) -> Result { - // Reserve sufficient capacity in offsets - self.offsets.resize(to_read * self.num_columns + 1, 0); - - let mut read = 0; + /// Decodes records from `input` returning the number of records and bytes read + /// + /// Note: this expects to be called with an empty `input` to signal EOF + pub fn decode( + &mut self, + input: &[u8], + to_read: usize, + ) -> Result<(usize, usize), ArrowError> { if to_read == 0 { - return Ok(0); + return Ok((0, 0)); } - // The current offset into `self.data` - let mut output_offset = 0; + // Reserve sufficient capacity in offsets + self.offsets + .resize(self.offsets_len + to_read * self.num_columns, 0); + // The current offset into `input` let mut input_offset = 0; - // The current offset into `self.offsets` - let mut field_offset = 1; - // The number of fields read for the current row - let mut field_count = 0; - - 'outer: loop { - let input = self.reader.fill_buf()?; - - 'input: loop { - // Reserve necessary space in output data based on best estimate - let remaining_rows = to_read - read; - let capacity = remaining_rows * self.num_columns * AVERAGE_FIELD_SIZE; - let estimated_data = capacity.max(MIN_CAPACITY); - self.data.resize(output_offset + estimated_data, 0); - - loop { - let (result, bytes_read, bytes_written, end_positions) = - self.delimiter.read_record( - &input[input_offset..], - &mut self.data[output_offset..], - &mut self.offsets[field_offset..], - ); - - field_count += end_positions; - field_offset += end_positions; - input_offset += bytes_read; - output_offset += bytes_written; - - match result { - ReadRecordResult::End => break 'outer, // Reached end of file - ReadRecordResult::InputEmpty => break 'input, // Input exhausted, need to read more - ReadRecordResult::OutputFull => break, // Need to allocate more capacity - ReadRecordResult::OutputEndsFull => { - let line_number = self.line_number + read; - return Err(ArrowError::CsvError(format!("incorrect number of fields for line {}, expected {} got more than {}", line_number, self.num_columns, field_count))); + + // The number of rows decoded in this pass + let mut read = 0; + + loop { + // Reserve necessary space in output data based on best estimate + let remaining_rows = to_read - read; + let capacity = remaining_rows * self.num_columns * AVERAGE_FIELD_SIZE; + let estimated_data = capacity.max(MIN_CAPACITY); + self.data.resize(self.data_len + estimated_data, 0); + + // Try to read a record + loop { + let (result, bytes_read, bytes_written, end_positions) = + self.delimiter.read_record( + &input[input_offset..], + &mut self.data[self.data_len..], + &mut self.offsets[self.offsets_len..], + ); + + self.current_field += end_positions; + self.offsets_len += end_positions; + input_offset += bytes_read; + self.data_len += bytes_written; + + match result { + ReadRecordResult::End | ReadRecordResult::InputEmpty => { + // Reached end of input + return Ok((read, input_offset)); + } + // Need to allocate more capacity + ReadRecordResult::OutputFull => break, + ReadRecordResult::OutputEndsFull => { + return Err(ArrowError::CsvError(format!("incorrect number of fields for line {}, expected {} got more than {}", self.line_number, self.num_columns, self.current_field))); + } + ReadRecordResult::Record => { + if self.current_field != self.num_columns { + return Err(ArrowError::CsvError(format!("incorrect number of fields for line {}, expected {} got {}", self.line_number, self.num_columns, self.current_field))); } - ReadRecordResult::Record => { - if field_count != self.num_columns { - let line_number = self.line_number + read; - return Err(ArrowError::CsvError(format!("incorrect number of fields for line {}, expected {} got {}", line_number, self.num_columns, field_count))); - } - read += 1; - field_count = 0; - - if read == to_read { - break 'outer; // Read sufficient rows - } - - if input.len() == input_offset { - // Input exhausted, need to read more - // Without this read_record will interpret the empty input - // byte array as indicating the end of the file - break 'input; - } + read += 1; + self.current_field = 0; + self.line_number += 1; + self.num_rows += 1; + + if read == to_read { + // Read sufficient rows + return Ok((read, input_offset)); + } + + if input.len() == input_offset { + // Input exhausted, need to read more + // Without this read_record will interpret the empty input + // byte array as indicating the end of the file + return Ok((read, input_offset)); } } } } - self.reader.consume(input_offset); - input_offset = 0; } - self.reader.consume(input_offset); + } + + /// Returns the current number of buffered records + pub fn len(&self) -> usize { + self.num_rows + } + + /// Returns true if the decoder is empty + pub fn is_empty(&self) -> bool { + self.num_rows == 0 + } + + /// Clears the current contents of the decoder + pub fn clear(&mut self) { + // This does not reset current_field to allow clearing part way through a record + self.offsets_len = 1; + self.data_len = 0; + self.num_rows = 0; + } + + /// Flushes the current contents of the reader + pub fn flush(&mut self) -> Result, ArrowError> { + if self.current_field != 0 { + return Err(ArrowError::CsvError( + "Cannot flush part way through record".to_string(), + )); + } // csv_core::Reader writes end offsets relative to the start of the row // Therefore scan through and offset these based on the cumulative row offsets let mut row_offset = 0; - self.offsets[1..] - .chunks_mut(self.num_columns) + self.offsets[1..self.offsets_len] + .chunks_exact_mut(self.num_columns) .for_each(|row| { let offset = row_offset; row.iter_mut().for_each(|x| { @@ -139,49 +191,23 @@ impl RecordReader { }); }); - self.line_number += read; - - Ok(read) - } - - /// Skips forward `to_skip` rows, returning an error if insufficient lines in source - pub fn skip(&mut self, to_skip: usize) -> Result<(), ArrowError> { - // TODO: This could be done by scanning for unquoted newline delimiters - let mut skipped = 0; - while to_skip > skipped { - let read = self.fill_buf(to_skip.min(1024))?; - if read == 0 { - return Err(ArrowError::CsvError(format!( - "Failed to skip {} rows only found {}", - to_skip, skipped - ))); - } - - skipped += read; - } - Ok(()) - } - - /// Reads up to `to_read` rows from the reader - pub fn read(&mut self, to_read: usize) -> Result, ArrowError> { - let num_rows = self.fill_buf(to_read)?; - - // Need to slice fields to the actual number of rows read - // - // We intentionally avoid using `Vec::truncate` to avoid having - // to re-initialize the data again - let num_fields = num_rows * self.num_columns; - let last_offset = self.offsets[num_fields]; - - // Need to truncate data to the actual amount of data read - let data = std::str::from_utf8(&self.data[..last_offset]).map_err(|e| { + // Need to truncate data t1o the actual amount of data read + let data = std::str::from_utf8(&self.data[..self.data_len]).map_err(|e| { ArrowError::CsvError(format!("Encountered invalid UTF-8 data: {}", e)) })?; + let offsets = &self.offsets[..self.offsets_len]; + let num_rows = self.num_rows; + + // Reset state + self.offsets_len = 1; + self.data_len = 0; + self.num_rows = 0; + Ok(StringRecords { num_rows, num_columns: self.num_columns, - offsets: &self.offsets[..num_fields + 1], + offsets, data, }) } @@ -209,10 +235,6 @@ impl<'a> StringRecords<'a> { self.num_rows } - pub fn is_empty(&self) -> bool { - self.num_rows == 0 - } - pub fn iter(&self) -> impl Iterator> + '_ { (0..self.num_rows).map(|x| self.get(x)) } @@ -238,9 +260,9 @@ impl<'a> StringRecord<'a> { #[cfg(test)] mod tests { - use crate::reader::records::RecordReader; + use crate::reader::records::RecordDecoder; use csv_core::Reader; - use std::io::Cursor; + use std::io::{BufRead, BufReader, Cursor}; #[test] fn test_basic() { @@ -260,30 +282,43 @@ mod tests { ] .into_iter(); - let cursor = Cursor::new(csv.as_bytes()); - let mut reader = RecordReader::new(cursor, Reader::new(), 3); + let mut reader = BufReader::with_capacity(3, Cursor::new(csv.as_bytes())); + let mut decoder = RecordDecoder::new(Reader::new(), 3); loop { - let b = reader.read(3).unwrap(); - if b.is_empty() { + let to_read = 3; + let mut read = 0; + loop { + let buf = reader.fill_buf().unwrap(); + let (records, bytes) = decoder.decode(buf, to_read - read).unwrap(); + + reader.consume(bytes); + read += records; + + if read == to_read || bytes == 0 { + break; + } + } + if read == 0 { break; } + let b = decoder.flush().unwrap(); b.iter().zip(&mut expected).for_each(|(record, expected)| { let actual = (0..3) .map(|field_idx| record.get(field_idx)) .collect::>(); assert_eq!(actual, expected) - }) + }); } + assert!(expected.next().is_none()); } #[test] fn test_invalid_fields() { let csv = "a,b\nb,c\na\n"; - let cursor = Cursor::new(csv.as_bytes()); - let mut reader = RecordReader::new(cursor, Reader::new(), 2); - let err = reader.read(4).unwrap_err().to_string(); + let mut decoder = RecordDecoder::new(Reader::new(), 2); + let err = decoder.decode(csv.as_bytes(), 4).unwrap_err().to_string(); let expected = "Csv error: incorrect number of fields for line 3, expected 2 got 1"; @@ -291,19 +326,22 @@ mod tests { assert_eq!(err, expected); // Test with initial skip - let cursor = Cursor::new(csv.as_bytes()); - let mut reader = RecordReader::new(cursor, Reader::new(), 2); - reader.skip(1).unwrap(); - let err = reader.read(4).unwrap_err().to_string(); + let mut decoder = RecordDecoder::new(Reader::new(), 2); + let (skipped, bytes) = decoder.decode(csv.as_bytes(), 1).unwrap(); + assert_eq!(skipped, 1); + decoder.clear(); + + let remaining = &csv.as_bytes()[bytes..]; + let err = decoder.decode(remaining, 3).unwrap_err().to_string(); assert_eq!(err, expected); } #[test] fn test_skip_insufficient_rows() { let csv = "a\nv\n"; - let cursor = Cursor::new(csv.as_bytes()); - let mut reader = RecordReader::new(cursor, Reader::new(), 1); - let err = reader.skip(3).unwrap_err().to_string(); - assert_eq!(err, "Csv error: Failed to skip 3 rows only found 2"); + let mut decoder = RecordDecoder::new(Reader::new(), 1); + let (read, bytes) = decoder.decode(csv.as_bytes(), 3).unwrap(); + assert_eq!(read, 2); + assert_eq!(bytes, csv.len()); } } From 9dbed0f6cfbd43662fb203422e123caba731c831 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 25 Jan 2023 20:42:44 +0000 Subject: [PATCH 2/4] Clippy --- arrow-csv/src/reader/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-csv/src/reader/mod.rs b/arrow-csv/src/reader/mod.rs index d46eee6b14f8..bea0d1917d63 100644 --- a/arrow-csv/src/reader/mod.rs +++ b/arrow-csv/src/reader/mod.rs @@ -444,7 +444,7 @@ impl BufReader { self.reader.consume(decoded); } - Ok(self.decoder.flush()?) + self.decoder.flush() } } From 8b19f1c339a66e4f53e685763f75c176ffeb6308 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 27 Jan 2023 12:18:29 +0000 Subject: [PATCH 3/4] More tests --- arrow-csv/src/reader/mod.rs | 125 +++++++++++++++++++++++------------- 1 file changed, 82 insertions(+), 43 deletions(-) diff --git a/arrow-csv/src/reader/mod.rs b/arrow-csv/src/reader/mod.rs index 1c3b53a826ad..cff1337dd78f 100644 --- a/arrow-csv/src/reader/mod.rs +++ b/arrow-csv/src/reader/mod.rs @@ -1228,49 +1228,46 @@ mod tests { #[test] fn test_csv() { - let _: Vec<()> = vec![None, Some("%Y-%m-%dT%H:%M:%S%.f%:z".to_string())] - .into_iter() - .map(|format| { - let schema = Schema::new(vec![ - Field::new("city", DataType::Utf8, false), - Field::new("lat", DataType::Float64, false), - Field::new("lng", DataType::Float64, false), - ]); - - let file = File::open("test/data/uk_cities.csv").unwrap(); - let mut csv = Reader::new( - file, - Arc::new(schema.clone()), - false, - None, - 1024, - None, - None, - format, - ); - assert_eq!(Arc::new(schema), csv.schema()); - let batch = csv.next().unwrap().unwrap(); - assert_eq!(37, batch.num_rows()); - assert_eq!(3, batch.num_columns()); - - // access data from a primitive array - let lat = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(57.653484, lat.value(0)); - - // access data from a string array (ListArray) - let city = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); - }) - .collect(); + for format in [None, Some("%Y-%m-%dT%H:%M:%S%.f%:z".to_string())] { + let schema = Schema::new(vec![ + Field::new("city", DataType::Utf8, false), + Field::new("lat", DataType::Float64, false), + Field::new("lng", DataType::Float64, false), + ]); + + let file = File::open("test/data/uk_cities.csv").unwrap(); + let mut csv = Reader::new( + file, + Arc::new(schema.clone()), + false, + None, + 1024, + None, + None, + format, + ); + assert_eq!(Arc::new(schema), csv.schema()); + let batch = csv.next().unwrap().unwrap(); + assert_eq!(37, batch.num_rows()); + assert_eq!(3, batch.num_columns()); + + // access data from a primitive array + let lat = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(57.653484, lat.value(0)); + + // access data from a string array (ListArray) + let city = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13)); + } } #[test] @@ -2192,4 +2189,46 @@ mod tests { assert!(c.value(2)); assert!(c.is_null(3)); } + + #[test] + fn test_buffered() { + let tests = [ + ("test/data/uk_cities.csv", false, 37), + ("test/data/various_types.csv", true, 7), + ("test/data/decimal_test.csv", false, 10), + ]; + + for (path, has_header, expected_rows) in tests { + for batch_size in [1, 4] { + for capacity in [1, 3, 7, 100] { + let reader = ReaderBuilder::new() + .with_batch_size(batch_size) + .has_header(has_header) + .build(File::open(path).unwrap()) + .unwrap(); + + let expected = reader.collect::, _>>().unwrap(); + + assert_eq!( + expected.iter().map(|x| x.num_rows()).sum::(), + expected_rows + ); + + let buffered = std::io::BufReader::with_capacity( + capacity, + File::open(path).unwrap(), + ); + + let reader = ReaderBuilder::new() + .with_batch_size(batch_size) + .has_header(has_header) + .build_buffered(buffered) + .unwrap(); + + let actual = reader.collect::, _>>().unwrap(); + assert_eq!(expected, actual) + } + } + } + } } From fb32ba3fa7d03b40a60743f7c5c3cf49d366fc98 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 27 Jan 2023 12:58:20 +0000 Subject: [PATCH 4/4] Clippy --- arrow-csv/src/reader/records.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-csv/src/reader/records.rs b/arrow-csv/src/reader/records.rs index 4bd0916b9d81..c4da36ca4bfe 100644 --- a/arrow-csv/src/reader/records.rs +++ b/arrow-csv/src/reader/records.rs @@ -193,7 +193,7 @@ impl RecordDecoder { // Need to truncate data t1o the actual amount of data read let data = std::str::from_utf8(&self.data[..self.data_len]).map_err(|e| { - ArrowError::CsvError(format!("Encountered invalid UTF-8 data: {}", e)) + ArrowError::CsvError(format!("Encountered invalid UTF-8 data: {e}")) })?; let offsets = &self.offsets[..self.offsets_len];