Skip to content

Commit

Permalink
feat: Expand NDJson glob into one SCAN (#17063)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Jun 19, 2024
1 parent d265f35 commit df14d67
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 10 deletions.
38 changes: 38 additions & 0 deletions crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,44 @@ impl LazyJsonLineReader {
}

impl LazyFileListReader for LazyJsonLineReader {
fn finish(self) -> PolarsResult<LazyFrame> {
if !self.glob() {
return self.finish_no_glob();
}

let paths = self.expand_paths()?.0;

let file_options = FileScanOptions {
n_rows: self.n_rows,
with_columns: None,
cache: false,
row_index: self.row_index,
rechunk: self.rechunk,
file_counter: 0,
hive_options: Default::default(),
};

let options = NDJsonReadOptions {
n_threads: None,
infer_schema_length: self.infer_schema_length,
chunk_size: NonZeroUsize::new(1 << 18).unwrap(),
low_memory: self.low_memory,
ignore_errors: self.ignore_errors,
schema: self.schema,
};

let scan_type = FileScan::NDJson { options };

Ok(LazyFrame::from(DslPlan::Scan {
paths,
file_info: None,
hive_parts: None,
predicate: None,
file_options,
scan_type,
}))
}

fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
let file_options = FileScanOptions {
n_rows: self.n_rows,
Expand Down
42 changes: 32 additions & 10 deletions crates/polars-mem-engine/src/executors/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,50 @@ impl JsonExec {
.as_ref()
.unwrap_right();

let mut n_rows = self.file_scan_options.n_rows;

let dfs = self
.paths
.iter()
.map(|p| {
let df = JsonLineReader::from_path(p)?
.map_while(|p| {
if n_rows == Some(0) {
return None;
}

let reader = match JsonLineReader::from_path(p) {
Ok(r) => r,
Err(e) => return Some(Err(e)),
};

let df = reader
.with_schema(schema.clone())
.with_rechunk(self.file_scan_options.rechunk)
.with_chunk_size(Some(self.options.chunk_size))
.low_memory(self.options.low_memory)
.with_n_rows(self.file_scan_options.n_rows)
.with_n_rows(n_rows)
.with_ignore_errors(self.options.ignore_errors)
.finish()?;
.finish();

if let Some(row_index) = &mut self.file_scan_options.row_index {
let offset = row_index.offset;
row_index.offset += df.height() as IdxSize;
df.with_row_index(row_index.name.as_ref(), Some(offset))
} else {
Ok(df)
let df = match df {
Ok(df) => df,
Err(e) => return Some(Err(e)),
};

if let Some(ref mut n_rows) = n_rows {
*n_rows -= df.height();
}

Some(match self.file_scan_options.row_index {
Some(ref mut row_index) => {
let offset = row_index.offset;
row_index.offset += df.height() as IdxSize;
df.with_row_index(row_index.name.as_ref(), Some(offset))
},
None => Ok(df),
})
})
.collect::<PolarsResult<Vec<_>>>()?;

accumulate_dataframes_vertical(dfs)
}
}
Expand Down
10 changes: 10 additions & 0 deletions py-polars/tests/unit/io/test_lazy_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,13 @@ def test_ndjson_list_arg(io_files_path: Path) -> None:
assert df.shape == (54, 4)
assert df.row(-1) == ("seafood", 194, 12.0, 1)
assert df.row(0) == ("vegetables", 45, 0.5, 2)


def test_glob_single_scan(io_files_path: Path) -> None:
file_path = io_files_path / "foods*.ndjson"
df = pl.scan_ndjson(file_path, n_rows=40)

explain = df.explain()

assert explain.count("SCAN") == 1
assert "UNION" not in explain

0 comments on commit df14d67

Please sign in to comment.