Skip to content

Commit

Permalink
refactor(rust): Add new streaming CSV source (#19694)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Nov 16, 2024
1 parent 87b5bca commit 4f3e828
Show file tree
Hide file tree
Showing 14 changed files with 916 additions and 92 deletions.
24 changes: 24 additions & 0 deletions crates/polars-plan/src/plans/optimizer/slice_pushdown_lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,30 @@ impl SlicePushDown {
Ok(lp)
}
#[cfg(feature = "csv")]
(Scan {
sources,
file_info,
hive_parts,
output_schema,
mut file_options,
predicate,
scan_type: FileScan::Csv { options, cloud_options },
}, Some(state)) if predicate.is_none() && self.new_streaming => {
file_options.slice = Some((state.offset, state.len as usize));

let lp = Scan {
sources,
file_info,
hive_parts,
output_schema,
scan_type: FileScan::Csv { options, cloud_options },
file_options,
predicate,
};

Ok(lp)
},
#[cfg(feature = "csv")]
(Scan {
sources,
file_info,
Expand Down
28 changes: 28 additions & 0 deletions crates/polars-stream/src/async_primitives/wait_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,34 @@ impl WaitGroup {
}
}

// Wait group with an associated index.
pub struct IndexedWaitGroup {
index: usize,
wait_group: WaitGroup,
}

impl IndexedWaitGroup {
pub fn new(index: usize) -> Self {
Self {
index,
wait_group: Default::default(),
}
}

pub fn index(&self) -> usize {
self.index
}

pub fn token(&self) -> WaitToken {
self.wait_group.token()
}

pub async fn wait(self) -> Self {
self.wait_group.wait().await;
self
}
}

struct WaitGroupFuture<'a> {
inner: &'a Arc<WaitGroupInner>,
}
Expand Down
Loading

0 comments on commit 4f3e828

Please sign in to comment.