diff --git a/polars/src/frame/ser/csv.rs b/polars/src/frame/ser/csv.rs index dc00baf40226..5beabbdbb4c9 100644 --- a/polars/src/frame/ser/csv.rs +++ b/polars/src/frame/ser/csv.rs @@ -147,6 +147,7 @@ pub enum CsvEncoding { /// CsvReader::new(file) /// .infer_schema(None) /// .has_header(true) +/// .with_one_thread(true) // set this to false to try multi-threaded parsing /// .finish() /// } /// ``` @@ -247,7 +248,10 @@ where self } - /// Use slower single threaded CSV parsing. + /// Use single threaded CSV parsing (this is default). + /// This is recommended when there are not many columns in the csv file. + /// + /// If multi-threaded is faster depends on your specific use case. /// This is internally used for Python file handlers pub fn with_one_thread(mut self, one_thread: bool) -> Self { self.one_thread = one_thread; diff --git a/polars/src/frame/ser/fork/csv.rs b/polars/src/frame/ser/fork/csv.rs index 25a566afcf94..5735843c0c56 100644 --- a/polars/src/frame/ser/fork/csv.rs +++ b/polars/src/frame/ser/fork/csv.rs @@ -20,13 +20,13 @@ use crate::prelude::*; use crate::utils; use ahash::RandomState; use arrow::datatypes::SchemaRef; -use arrow::error::Result as ArrowResult; use crossbeam::{ channel::{bounded, TryRecvError}, thread, }; use csv::{ByteRecord, ByteRecordsIntoIter}; use rayon::prelude::*; +use std::borrow::Cow; use std::collections::HashSet; use std::fmt; use std::io::{Read, Seek, SeekFrom}; @@ -63,6 +63,16 @@ fn infer_field_schema(string: &str) -> ArrowDataType { ArrowDataType::Utf8 } +fn parse_bytes_with_encoding(bytes: &[u8], encoding: CsvEncoding) -> Result> { + let s = match encoding { + CsvEncoding::Utf8 => std::str::from_utf8(bytes) + .map_err(anyhow::Error::from)? + .into(), + CsvEncoding::LossyUtf8 => String::from_utf8_lossy(bytes), + }; + Ok(s) +} + /// Infer the schema of a CSV file by reading through the first n records of the file, /// with `max_read_records` controlling the maximum number of records to read. /// @@ -74,52 +84,61 @@ fn infer_file_schema( delimiter: u8, max_read_records: Option, has_header: bool, -) -> ArrowResult<(Schema, usize)> { - let mut csv_reader = csv::ReaderBuilder::new() - .delimiter(delimiter) - .from_reader(reader); +) -> Result<(Schema, usize)> { + // We use lossy utf8 here because we don't want the schema inference to fail on utf8. + // It may later. + let encoding = CsvEncoding::LossyUtf8; + // set headers to false otherwise the csv crate, skips them. + let csv_reader = init_csv_reader(reader, false, delimiter); + + let mut records = csv_reader.into_byte_records(); + let header_length; // get or create header names // when has_header is false, creates default column names with column_ prefix - let headers: Vec = if has_header { - let headers = csv_reader.headers()?; - headers.iter().map(|s| s.to_string()).collect() + let headers: Vec = if let Some(byterecord) = records.next() { + let byterecord = byterecord.map_err(anyhow::Error::from)?; + header_length = byterecord.len(); + if has_header { + byterecord + .iter() + .map(|slice| { + let s = parse_bytes_with_encoding(slice, encoding)?; + Ok(s.into()) + }) + .collect::>()? + } else { + (0..header_length) + .map(|i| format!("column_{}", i + 1)) + .collect() + } } else { - let first_record_count = &csv_reader.headers()?.len(); - (0..*first_record_count) - .map(|i| format!("column_{}", i + 1)) - .collect() + return Err(PolarsError::NoData("empty csv".into())); }; - // save the csv reader position after reading headers - let position = csv_reader.position().clone(); - - let header_length = headers.len(); // keep track of inferred field types let mut column_types: Vec> = vec![HashSet::with_hasher(RandomState::new()); header_length]; // keep track of columns with nulls let mut nulls: Vec = vec![false; header_length]; - // return csv reader position to after headers - csv_reader.seek(position)?; - let mut records_count = 0; - let mut fields = vec![]; + let mut fields = Vec::with_capacity(header_length); - for result in csv_reader - .records() - .take(max_read_records.unwrap_or(std::usize::MAX)) - { - let record = result?; + // needed to prevent ownership going into the iterator loop + let records_ref = &mut records; + + for result in records_ref.take(max_read_records.unwrap_or(std::usize::MAX)) { + let record = result.map_err(anyhow::Error::from)?; records_count += 1; for i in 0..header_length { - if let Some(string) = record.get(i) { - if string.is_empty() { + if let Some(slice) = record.get(i) { + if slice.is_empty() { nulls[i] = true; } else { - column_types[i].insert(infer_field_schema(string)); + let s = parse_bytes_with_encoding(slice, encoding)?; + column_types[i].insert(infer_field_schema(&s)); } } } @@ -153,6 +172,7 @@ fn infer_file_schema( _ => fields.push(Field::new(&field_name, ArrowDataType::Utf8, has_nulls)), } } + let csv_reader = records.into_reader(); // return the reader seek back to the start csv_reader.into_inner().seek(SeekFrom::Start(0))?; @@ -256,12 +276,10 @@ fn add_to_utf8_builder( let v = row.get(col_idx); match v { None => builder.append_null(), - Some(bytes) => match encoding { - CsvEncoding::Utf8 => { - builder.append_value(std::str::from_utf8(bytes).map_err(anyhow::Error::from)?) - } - CsvEncoding::LossyUtf8 => builder.append_value(String::from_utf8_lossy(bytes)), - }, + Some(bytes) => { + let s = parse_bytes_with_encoding(bytes, encoding)?; + builder.append_value(&s); + } } } Ok(()) diff --git a/py-polars/pypolars/frame.py b/py-polars/pypolars/frame.py index 575825828a9f..81ee643faf66 100644 --- a/py-polars/pypolars/frame.py +++ b/py-polars/pypolars/frame.py @@ -91,44 +91,6 @@ def read_csv( encoding: str = "utf8", one_thread: bool = True, ) -> "DataFrame": - """ - Read into a DataFrame from a csv file. - - Parameters - ---------- - file - Path to a file or a file like object. - infer_schema_length - Maximum number of lines to read to infer schema. - batch_size - Number of lines to read into the buffer at once. Modify this to change performance. - has_headers - If the CSV file has headers or not. - ignore_errors - Try to keep reading lines if some lines yield errors. - stop_after_n_rows - After n rows are read from the CSV stop reading. This probably not stops exactly at `n_rows` it is dependent - on the batch size. - skip_rows - Start reading after `skip_rows`. - projection - Indexes of columns to select - sep - Delimiter/ value seperator - columns - Columns to project/ select - rechunk - Make sure that all columns are contiguous in memory by aggregating the chunks into a single array. - encoding - - "utf8" - _ "utf8-lossy" - one_thread - Use a single thread for the csv parsing. - - Returns - ------- - DataFrame - """ self = DataFrame.__new__(DataFrame) self._df = PyDataFrame.read_csv( file, diff --git a/py-polars/pypolars/pandas.py b/py-polars/pypolars/pandas.py index 014a20d8a550..43b94690fafb 100644 --- a/py-polars/pypolars/pandas.py +++ b/py-polars/pypolars/pandas.py @@ -23,6 +23,44 @@ def read_csv( encoding: str = "utf8", one_thread: bool = True, ) -> "DataFrame": + """ + Read into a DataFrame from a csv file. + + Parameters + ---------- + file + Path to a file or a file like object. + infer_schema_length + Maximum number of lines to read to infer schema. + batch_size + Number of lines to read into the buffer at once. Modify this to change performance. + has_headers + If the CSV file has headers or not. + ignore_errors + Try to keep reading lines if some lines yield errors. + stop_after_n_rows + After n rows are read from the CSV stop reading. This probably not stops exactly at `n_rows` it is dependent + on the batch size. + skip_rows + Start reading after `skip_rows`. + projection + Indexes of columns to select + sep + Delimiter/ value seperator + columns + Columns to project/ select + rechunk + Make sure that all columns are contiguous in memory by aggregating the chunks into a single array. + encoding + - "utf8" + _ "utf8-lossy" + one_thread + Use a single thread for the csv parsing. Set this to False to try multi-threaded parsing. + + Returns + ------- + DataFrame + """ return DataFrame.read_csv( file=file, infer_schema_length=infer_schema_length,