Skip to content

Commit

Permalink
fix: Update line-splitting logic in batched CSV reader (pola-rs#19508)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored and tylerriccio33 committed Nov 8, 2024
1 parent eb4019c commit 22f8c0b
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 66 deletions.
98 changes: 56 additions & 42 deletions crates/polars-io/src/csv/read/read_impl/batched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ use polars_error::PolarsResult;
use polars_utils::IdxSize;
use rayon::iter::{IntoParallelIterator, ParallelIterator};

use super::{cast_columns, read_chunk, CoreReader};
use super::{cast_columns, read_chunk, CoreReader, CountLines};
use crate::csv::read::options::{CommentPrefix, CsvEncoding, NullValuesCompiled};
use crate::csv::read::parser::next_line_position;
use crate::csv::read::CsvReader;
use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::prelude::update_row_counts2;
Expand All @@ -22,34 +21,51 @@ pub(crate) fn get_file_chunks_iterator(
offsets: &mut VecDeque<(usize, usize)>,
last_pos: &mut usize,
n_chunks: usize,
chunk_size: usize,
chunk_size: &mut usize,
bytes: &[u8],
expected_fields: usize,
separator: u8,
quote_char: Option<u8>,
eol_char: u8,
) {
let cl = CountLines::new(quote_char, eol_char);

for _ in 0..n_chunks {
let search_pos = *last_pos + chunk_size;
let bytes = &bytes[*last_pos..];

if search_pos >= bytes.len() {
if bytes.is_empty() {
break;
}

let end_pos = match next_line_position(
&bytes[search_pos..],
Some(expected_fields),
separator,
quote_char,
eol_char,
) {
Some(pos) => search_pos + pos,
None => {
break;
},
};
offsets.push_back((*last_pos, end_pos));
*last_pos = end_pos;
let position;

loop {
let b = &bytes[..(*chunk_size).min(bytes.len())];
let (count, position_) = cl.count(b);

let (count, position_) = if b.len() == bytes.len() {
(if count != 0 { count } else { 1 }, b.len())
} else {
(
count,
if position_ < b.len() {
// 1+ for the '\n'
1 + position_
} else {
position_
},
)
};

if count == 0 {
*chunk_size *= 2;
continue;
}

position = position_;
break;
}

offsets.push_back((*last_pos, *last_pos + position));
*last_pos += position;
}
}

Expand All @@ -58,10 +74,10 @@ struct ChunkOffsetIter<'a> {
offsets: VecDeque<(usize, usize)>,
last_offset: usize,
n_chunks: usize,
chunk_size: usize,
// not a promise, but something we want
#[allow(unused)]
rows_per_batch: usize,
expected_fields: usize,
separator: u8,
quote_char: Option<u8>,
eol_char: u8,
}
Expand All @@ -76,27 +92,12 @@ impl Iterator for ChunkOffsetIter<'_> {
if self.last_offset == self.bytes.len() {
return None;
}
let bytes_first_row = if self.rows_per_batch > 1 {
let bytes_first_row = next_line_position(
&self.bytes[self.last_offset + 2..],
Some(self.expected_fields),
self.separator,
self.quote_char,
self.eol_char,
)
.unwrap_or(1);
bytes_first_row + 2
} else {
1
};
get_file_chunks_iterator(
&mut self.offsets,
&mut self.last_offset,
self.n_chunks,
self.rows_per_batch * bytes_first_row,
&mut self.chunk_size,
self.bytes,
self.expected_fields,
self.separator,
self.quote_char,
self.eol_char,
);
Expand All @@ -122,6 +123,20 @@ impl<'a> CoreReader<'a> {
let (bytes, starting_point_offset) =
self.find_starting_point(bytes, self.quote_char, self.eol_char)?;

let n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());

// Copied from [`Self::parse_csv`]
let n_parts_hint = n_threads * 16;
let chunk_size = std::cmp::min(bytes.len() / n_parts_hint, 16 * 1024 * 1024);

// Use a small min chunk size to catch failures in tests.
#[cfg(debug_assertions)]
let min_chunk_size = 64;
#[cfg(not(debug_assertions))]
let min_chunk_size = 1024 * 4;

let chunk_size = std::cmp::max(chunk_size, min_chunk_size);

// this is arbitrarily chosen.
// we don't want this to depend on the thread pool size
// otherwise the chunks are not deterministic
Expand All @@ -134,9 +149,8 @@ impl<'a> CoreReader<'a> {
offsets: VecDeque::with_capacity(offset_batch_size),
last_offset: 0,
n_chunks: offset_batch_size,
chunk_size,
rows_per_batch: self.chunk_size,
expected_fields: self.schema.len(),
separator: self.separator,
quote_char: self.quote_char,
eol_char: self.eol_char,
};
Expand Down Expand Up @@ -249,7 +263,7 @@ impl BatchedCsvReader<'_> {
self.null_values.as_ref(),
self.missing_is_null,
self.truncate_ragged_lines,
self.chunk_size,
usize::MAX,
stop_at_nbytes,
self.starting_point_offset,
self.decimal_comma,
Expand Down
5 changes: 1 addition & 4 deletions crates/polars-lazy/src/tests/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@ fn test_streaming_csv() -> PolarsResult<()> {

#[test]
fn test_streaming_glob() -> PolarsResult<()> {
let q = get_csv_glob();
let q = q.sort(["sugars_g"], Default::default());

assert_streaming_with_default(q, true, false);
assert_streaming_with_default(get_csv_glob(), true, false);
Ok(())
}

Expand Down
11 changes: 7 additions & 4 deletions crates/polars/tests/it/io/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1389,9 +1389,12 @@ fn test_read_io_reader() {

let mut reader = reader.batched_borrowed().unwrap();
let batches = reader.next_batches(5).unwrap().unwrap();
// TODO: Fix this
// assert_eq!(batches.len(), 5);
assert_eq!(batches.len(), 5);
let df = concat_df(&batches).unwrap();
let expected = CsvReader::new(file).finish().unwrap();
assert!(df.equals(&expected))
assert!(df.height() > 0);
let expected = CsvReader::new(file)
.finish()
.unwrap()
.head(Some(df.height()));
assert_eq!(&df, &expected);
}
21 changes: 5 additions & 16 deletions py-polars/tests/unit/io/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -1497,26 +1497,13 @@ def test_batched_csv_reader(foods_file_path: Path) -> None:

batches = reader.next_batches(5)
assert batches is not None
assert len(batches) == 5
assert batches[0].to_dict(as_series=False) == {
"category": ["vegetables", "seafood", "meat", "fruit", "seafood", "meat"],
"calories": [45, 150, 100, 60, 140, 120],
"fats_g": [0.5, 5.0, 5.0, 0.0, 5.0, 10.0],
"sugars_g": [2, 0, 0, 11, 1, 1],
}
assert batches[-1].to_dict(as_series=False) == {
"category": ["fruit", "meat", "vegetables", "fruit"],
"calories": [130, 100, 30, 50],
"fats_g": [0.0, 7.0, 0.0, 0.0],
"sugars_g": [25, 0, 5, 11],
}
assert_frame_equal(pl.concat(batches), pl.read_csv(foods_file_path))
out = pl.concat(batches)
assert_frame_equal(out, pl.read_csv(foods_file_path).head(out.height))

# the final batch of the low-memory variant is different
reader = pl.read_csv_batched(foods_file_path, batch_size=4, low_memory=True)
batches = reader.next_batches(10)
assert batches is not None
assert len(batches) == 5

assert_frame_equal(pl.concat(batches), pl.read_csv(foods_file_path))

Expand Down Expand Up @@ -1563,6 +1550,8 @@ def test_batched_csv_reader_all_batches(foods_file_path: Path) -> None:
batched_dfs.extend(batches)
batches = reader.next_batches(5)

assert all(x.height > 0 for x in batched_dfs)

batched_concat_df = pl.concat(batched_dfs, rechunk=True)
assert_frame_equal(out, batched_concat_df)

Expand Down Expand Up @@ -2129,7 +2118,7 @@ def test_read_csv_only_loads_selected_columns(
break
result += next_batch
del result
assert 8_000_000 < memory_usage_without_pyarrow.get_peak() < 13_000_000
assert 8_000_000 < memory_usage_without_pyarrow.get_peak() < 20_000_000


def test_csv_escape_cf_15349() -> None:
Expand Down

0 comments on commit 22f8c0b

Please sign in to comment.