-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(rust): Add new streaming CSV source #19694
Conversation
3fb0430
to
868d5a8
Compare
@@ -14,7 +14,7 @@ use super::row_group_decode::RowGroupDecoder; | |||
use super::{AsyncTaskData, ParquetSourceNode}; | |||
use crate::async_executor; | |||
use crate::async_primitives::connector::connector; | |||
use crate::async_primitives::wait_group::{WaitGroup, WaitToken}; | |||
use crate::async_primitives::wait_group::IndexedWaitGroup; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some minor drive-by refactors of parquet source
line_batch_receivers, | ||
// TODO: Refactor so that we don't unwrap, it's currently hard because | ||
// `ComputeNode::{initialize, spawn}` doesn't return a `PolarsResult` | ||
Arc::new(self.try_init_chunk_reader().unwrap()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This unwrap()
is not ideal, but I don't currently have any good ideas on how to avoid this
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #19694 +/- ##
==========================================
- Coverage 79.55% 79.37% -0.19%
==========================================
Files 1544 1545 +1
Lines 213240 213735 +495
Branches 2441 2445 +4
==========================================
+ Hits 169643 169647 +4
- Misses 43048 43536 +488
- Partials 549 552 +3 ☔ View full report in Codecov by Sentry. |
208d346
to
79be2c9
Compare
What do you think causes the difference? |
Adds a simple CSV source - we have one thread scanning for line batches which get sent to other threads for parsing
The performance is mostly equivalent with the in-memory-engine, except for one very specific type of query -