Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy array display (#3638) #3647

Merged
merged 18 commits into from
Feb 8, 2023
1,378 changes: 718 additions & 660 deletions arrow-cast/src/display.rs

Large diffs are not rendered by default.

213 changes: 52 additions & 161 deletions arrow-csv/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,10 @@
//! }
//! ```

use arrow_array::types::*;
use arrow_array::*;
use arrow_cast::display::{
array_value_to_string, lexical_to_string, temporal_array_value_to_string,
};
use arrow_cast::display::*;
use arrow_schema::*;
use csv::ByteRecord;
use std::io::Write;

use crate::map_csv_error;
Expand All @@ -79,15 +77,6 @@ const DEFAULT_TIMESTAMP_FORMAT: &str = "%FT%H:%M:%S.%9f";
const DEFAULT_TIMESTAMP_TZ_FORMAT: &str = "%FT%H:%M:%S.%9f%:z";
const DEFAULT_NULL_VALUE: &str = "";

fn write_primitive_value<T>(array: &ArrayRef, i: usize) -> String
where
T: ArrowPrimitiveType,
T::Native: lexical_core::ToLexical,
{
let c = array.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
lexical_to_string(c.value(i))
}

/// A CSV writer
#[derive(Debug)]
pub struct Writer<W: Write> {
Expand All @@ -100,10 +89,8 @@ pub struct Writer<W: Write> {
/// The datetime format for datetime arrays
datetime_format: Option<String>,
/// The timestamp format for timestamp arrays
#[allow(dead_code)]
timestamp_format: Option<String>,
/// The timestamp format for timestamp (with timezone) arrays
#[allow(dead_code)]
timestamp_tz_format: Option<String>,
/// The time format for time arrays
time_format: Option<String>,
Expand Down Expand Up @@ -132,113 +119,6 @@ impl<W: Write> Writer<W> {
}
}

/// Convert a record to a string vector
fn convert(
&self,
batch: &[ArrayRef],
row_index: usize,
buffer: &mut [String],
) -> Result<(), ArrowError> {
// TODO: it'd be more efficient if we could create `record: Vec<&[u8]>
for (col_index, item) in buffer.iter_mut().enumerate() {
let col = &batch[col_index];
if col.is_null(row_index) {
// write the configured null value
*item = self.null_value.clone();
continue;
}
let string = match col.data_type() {
DataType::Float64 => write_primitive_value::<Float64Type>(col, row_index),
DataType::Float32 => write_primitive_value::<Float32Type>(col, row_index),
DataType::Int8 => write_primitive_value::<Int8Type>(col, row_index),
DataType::Int16 => write_primitive_value::<Int16Type>(col, row_index),
DataType::Int32 => write_primitive_value::<Int32Type>(col, row_index),
DataType::Int64 => write_primitive_value::<Int64Type>(col, row_index),
DataType::UInt8 => write_primitive_value::<UInt8Type>(col, row_index),
DataType::UInt16 => write_primitive_value::<UInt16Type>(col, row_index),
DataType::UInt32 => write_primitive_value::<UInt32Type>(col, row_index),
DataType::UInt64 => write_primitive_value::<UInt64Type>(col, row_index),
DataType::Boolean => array_value_to_string(col, row_index)?.to_string(),
DataType::Utf8 => array_value_to_string(col, row_index)?.to_string(),
DataType::LargeUtf8 => array_value_to_string(col, row_index)?.to_string(),
DataType::Date32 => temporal_array_value_to_string(
col,
col_index,
row_index,
self.date_format.as_deref(),
)?
.to_string(),
DataType::Date64 => temporal_array_value_to_string(
col,
col_index,
row_index,
self.datetime_format.as_deref(),
)?
.to_string(),
DataType::Time32(TimeUnit::Second) => temporal_array_value_to_string(
col,
col_index,
row_index,
self.time_format.as_deref(),
)?
.to_string(),
DataType::Time32(TimeUnit::Millisecond) => {
temporal_array_value_to_string(
col,
col_index,
row_index,
self.time_format.as_deref(),
)?
.to_string()
}
DataType::Time64(TimeUnit::Microsecond) => {
temporal_array_value_to_string(
col,
col_index,
row_index,
self.time_format.as_deref(),
)?
.to_string()
}
DataType::Time64(TimeUnit::Nanosecond) => temporal_array_value_to_string(
col,
col_index,
row_index,
self.time_format.as_deref(),
)?
.to_string(),
DataType::Timestamp(_, time_zone) => match time_zone {
Some(_tz) => temporal_array_value_to_string(
col,
col_index,
row_index,
self.timestamp_tz_format.as_deref(),
)?
.to_string(),
None => temporal_array_value_to_string(
col,
col_index,
row_index,
self.timestamp_format.as_deref(),
)?
.to_string(),
},
DataType::Decimal128(..) => {
array_value_to_string(col, row_index)?.to_string()
}
t => {
// List and Struct arrays not supported by the writer, any
// other type needs to be implemented
return Err(ArrowError::CsvError(format!(
"CSV Writer does not support {t:?} data type"
)));
}
};
*item = string;
}
Ok(())
}

/// Write a vector of record batches to a writable object
pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
let num_columns = batch.num_columns();
Expand All @@ -257,23 +137,49 @@ impl<W: Write> Writer<W> {
self.beginning = false;
}

let columns: Vec<_> = batch
let options = FormatOptions::default()
.with_null(&self.null_value)
.with_date_format(self.date_format.as_deref())
.with_datetime_format(self.datetime_format.as_deref())
.with_timestamp_format(self.timestamp_format.as_deref())
.with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
.with_time_format(self.time_format.as_deref());

let converters = batch
.columns()
.iter()
.map(|array| match array.data_type() {
DataType::Dictionary(_, value_type) => {
arrow_cast::cast(array, value_type)
.expect("cannot cast dictionary to underlying values")
}
_ => array.clone(),
.map(|a| match a.data_type() {
d if d.is_nested() => Err(ArrowError::CsvError(format!(
"Nested type {} is not supported in CSV",
a.data_type()
))),
DataType::Binary | DataType::LargeBinary => Err(ArrowError::CsvError(
"Binary data cannot be written to CSV".to_string(),
)),
_ => ArrayFormatter::try_new(a.as_ref(), &options),
})
.collect();

let mut buffer = vec!["".to_string(); batch.num_columns()];
.collect::<Result<Vec<_>, ArrowError>>()?;

let mut buffer = String::with_capacity(1024);
let mut byte_record = ByteRecord::with_capacity(1024, converters.len());

for row_idx in 0..batch.num_rows() {
byte_record.clear();
for (col_idx, converter) in converters.iter().enumerate() {
buffer.clear();
converter.value(row_idx).write(&mut buffer).map_err(|e| {
ArrowError::CsvError(format!(
"Error formatting row {} and column {}: {e}",
row_idx + 1,
col_idx + 1
))
})?;
byte_record.push_field(buffer.as_bytes());
}

for row_index in 0..batch.num_rows() {
self.convert(columns.as_slice(), row_index, &mut buffer)?;
self.writer.write_record(&buffer).map_err(map_csv_error)?;
self.writer
.write_byte_record(&byte_record)
.map_err(map_csv_error)?;
}
self.writer.flush()?;

Expand Down Expand Up @@ -384,16 +290,13 @@ impl WriterBuilder {
self
}

/// Use RFC3339 format for date/time/timestamps by clearing all
/// date/time specific formats.
pub fn with_rfc3339(mut self, use_rfc3339: bool) -> Self {
if use_rfc3339 {
self.date_format = None;
self.datetime_format = None;
self.time_format = None;
self.timestamp_format = None;
self.timestamp_tz_format = None;
}
/// Use RFC3339 format for date/time/timestamps
pub fn with_rfc3339(mut self) -> Self {
self.date_format = None;
self.datetime_format = None;
self.time_format = None;
self.timestamp_format = None;
self.timestamp_tz_format = None;
self
}

Expand Down Expand Up @@ -423,15 +326,10 @@ mod tests {
use super::*;

use crate::Reader;
use arrow_array::types::*;
use std::io::{Cursor, Read, Seek};
use std::sync::Arc;

fn invalid_cast_error(dt: &str, col_idx: usize, row_idx: usize) -> ArrowError {
ArrowError::CastError(format!(
"Cannot cast to {dt} at col index: {col_idx} row index: {row_idx}"
))
}

#[test]
fn test_write_csv() {
let schema = Schema::new(vec![
Expand Down Expand Up @@ -654,15 +552,8 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,foo
let batches = vec![&batch, &batch];

for batch in batches {
writer
.write(batch)
.map_err(|e| {
dbg!(e.to_string());
assert!(e.to_string().ends_with(
invalid_cast_error("Date64", 1, 1).to_string().as_str()
))
})
.unwrap_err();
let err = writer.write(batch).unwrap_err().to_string();
assert_eq!(err, "Csv error: Error formatting row 2 and column 2: Cast error: Failed to convert 1926632005177685347 to temporal for Date64")
}
drop(writer);
}
Expand Down Expand Up @@ -700,7 +591,7 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,foo

let mut file = tempfile::tempfile().unwrap();

let builder = WriterBuilder::new().with_rfc3339(true);
let builder = WriterBuilder::new().with_rfc3339();
let mut writer = builder.build(&mut file);
let batches = vec![&batch];
for batch in batches {
Expand Down
Loading