Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make csv schema inference more robust; defaulting to utf8-lossy #168

Merged
merged 1 commit into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion polars/src/frame/ser/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ pub enum CsvEncoding {
/// CsvReader::new(file)
/// .infer_schema(None)
/// .has_header(true)
/// .with_one_thread(true) // set this to false to try multi-threaded parsing
/// .finish()
/// }
/// ```
Expand Down Expand Up @@ -247,7 +248,10 @@ where
self
}

/// Use slower single threaded CSV parsing.
/// Use single threaded CSV parsing (this is default).
/// This is recommended when there are not many columns in the csv file.
///
/// If multi-threaded is faster depends on your specific use case.
/// This is internally used for Python file handlers
pub fn with_one_thread(mut self, one_thread: bool) -> Self {
self.one_thread = one_thread;
Expand Down
86 changes: 52 additions & 34 deletions polars/src/frame/ser/fork/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ use crate::prelude::*;
use crate::utils;
use ahash::RandomState;
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use crossbeam::{
channel::{bounded, TryRecvError},
thread,
};
use csv::{ByteRecord, ByteRecordsIntoIter};
use rayon::prelude::*;
use std::borrow::Cow;
use std::collections::HashSet;
use std::fmt;
use std::io::{Read, Seek, SeekFrom};
Expand Down Expand Up @@ -63,6 +63,16 @@ fn infer_field_schema(string: &str) -> ArrowDataType {
ArrowDataType::Utf8
}

fn parse_bytes_with_encoding(bytes: &[u8], encoding: CsvEncoding) -> Result<Cow<str>> {
let s = match encoding {
CsvEncoding::Utf8 => std::str::from_utf8(bytes)
.map_err(anyhow::Error::from)?
.into(),
CsvEncoding::LossyUtf8 => String::from_utf8_lossy(bytes),
};
Ok(s)
}

/// Infer the schema of a CSV file by reading through the first n records of the file,
/// with `max_read_records` controlling the maximum number of records to read.
///
Expand All @@ -74,52 +84,61 @@ fn infer_file_schema<R: Read + Seek>(
delimiter: u8,
max_read_records: Option<usize>,
has_header: bool,
) -> ArrowResult<(Schema, usize)> {
let mut csv_reader = csv::ReaderBuilder::new()
.delimiter(delimiter)
.from_reader(reader);
) -> Result<(Schema, usize)> {
// We use lossy utf8 here because we don't want the schema inference to fail on utf8.
// It may later.
let encoding = CsvEncoding::LossyUtf8;
// set headers to false otherwise the csv crate, skips them.
let csv_reader = init_csv_reader(reader, false, delimiter);

let mut records = csv_reader.into_byte_records();
let header_length;

// get or create header names
// when has_header is false, creates default column names with column_ prefix
let headers: Vec<String> = if has_header {
let headers = csv_reader.headers()?;
headers.iter().map(|s| s.to_string()).collect()
let headers: Vec<String> = if let Some(byterecord) = records.next() {
let byterecord = byterecord.map_err(anyhow::Error::from)?;
header_length = byterecord.len();
if has_header {
byterecord
.iter()
.map(|slice| {
let s = parse_bytes_with_encoding(slice, encoding)?;
Ok(s.into())
})
.collect::<Result<_>>()?
} else {
(0..header_length)
.map(|i| format!("column_{}", i + 1))
.collect()
}
} else {
let first_record_count = &csv_reader.headers()?.len();
(0..*first_record_count)
.map(|i| format!("column_{}", i + 1))
.collect()
return Err(PolarsError::NoData("empty csv".into()));
};

// save the csv reader position after reading headers
let position = csv_reader.position().clone();

let header_length = headers.len();
// keep track of inferred field types
let mut column_types: Vec<HashSet<ArrowDataType, RandomState>> =
vec![HashSet::with_hasher(RandomState::new()); header_length];
// keep track of columns with nulls
let mut nulls: Vec<bool> = vec![false; header_length];

// return csv reader position to after headers
csv_reader.seek(position)?;

let mut records_count = 0;
let mut fields = vec![];
let mut fields = Vec::with_capacity(header_length);

for result in csv_reader
.records()
.take(max_read_records.unwrap_or(std::usize::MAX))
{
let record = result?;
// needed to prevent ownership going into the iterator loop
let records_ref = &mut records;

for result in records_ref.take(max_read_records.unwrap_or(std::usize::MAX)) {
let record = result.map_err(anyhow::Error::from)?;
records_count += 1;

for i in 0..header_length {
if let Some(string) = record.get(i) {
if string.is_empty() {
if let Some(slice) = record.get(i) {
if slice.is_empty() {
nulls[i] = true;
} else {
column_types[i].insert(infer_field_schema(string));
let s = parse_bytes_with_encoding(slice, encoding)?;
column_types[i].insert(infer_field_schema(&s));
}
}
}
Expand Down Expand Up @@ -153,6 +172,7 @@ fn infer_file_schema<R: Read + Seek>(
_ => fields.push(Field::new(&field_name, ArrowDataType::Utf8, has_nulls)),
}
}
let csv_reader = records.into_reader();

// return the reader seek back to the start
csv_reader.into_inner().seek(SeekFrom::Start(0))?;
Expand Down Expand Up @@ -256,12 +276,10 @@ fn add_to_utf8_builder(
let v = row.get(col_idx);
match v {
None => builder.append_null(),
Some(bytes) => match encoding {
CsvEncoding::Utf8 => {
builder.append_value(std::str::from_utf8(bytes).map_err(anyhow::Error::from)?)
}
CsvEncoding::LossyUtf8 => builder.append_value(String::from_utf8_lossy(bytes)),
},
Some(bytes) => {
let s = parse_bytes_with_encoding(bytes, encoding)?;
builder.append_value(&s);
}
}
}
Ok(())
Expand Down
38 changes: 0 additions & 38 deletions py-polars/pypolars/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,44 +91,6 @@ def read_csv(
encoding: str = "utf8",
one_thread: bool = True,
) -> "DataFrame":
"""
Read into a DataFrame from a csv file.

Parameters
----------
file
Path to a file or a file like object.
infer_schema_length
Maximum number of lines to read to infer schema.
batch_size
Number of lines to read into the buffer at once. Modify this to change performance.
has_headers
If the CSV file has headers or not.
ignore_errors
Try to keep reading lines if some lines yield errors.
stop_after_n_rows
After n rows are read from the CSV stop reading. This probably not stops exactly at `n_rows` it is dependent
on the batch size.
skip_rows
Start reading after `skip_rows`.
projection
Indexes of columns to select
sep
Delimiter/ value seperator
columns
Columns to project/ select
rechunk
Make sure that all columns are contiguous in memory by aggregating the chunks into a single array.
encoding
- "utf8"
_ "utf8-lossy"
one_thread
Use a single thread for the csv parsing.

Returns
-------
DataFrame
"""
self = DataFrame.__new__(DataFrame)
self._df = PyDataFrame.read_csv(
file,
Expand Down
38 changes: 38 additions & 0 deletions py-polars/pypolars/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,44 @@ def read_csv(
encoding: str = "utf8",
one_thread: bool = True,
) -> "DataFrame":
"""
Read into a DataFrame from a csv file.

Parameters
----------
file
Path to a file or a file like object.
infer_schema_length
Maximum number of lines to read to infer schema.
batch_size
Number of lines to read into the buffer at once. Modify this to change performance.
has_headers
If the CSV file has headers or not.
ignore_errors
Try to keep reading lines if some lines yield errors.
stop_after_n_rows
After n rows are read from the CSV stop reading. This probably not stops exactly at `n_rows` it is dependent
on the batch size.
skip_rows
Start reading after `skip_rows`.
projection
Indexes of columns to select
sep
Delimiter/ value seperator
columns
Columns to project/ select
rechunk
Make sure that all columns are contiguous in memory by aggregating the chunks into a single array.
encoding
- "utf8"
_ "utf8-lossy"
one_thread
Use a single thread for the csv parsing. Set this to False to try multi-threaded parsing.

Returns
-------
DataFrame
"""
return DataFrame.read_csv(
file=file,
infer_schema_length=infer_schema_length,
Expand Down