Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Refactors and agg improvements for new local execution model #2497

Merged
merged 3 commits into from
Jul 10, 2024

Conversation

colin-ho
Copy link
Contributor

Makes source trait async
Connect to new local physical plan
Add incremental aggregations

@github-actions github-actions bot added the enhancement New feature or request label Jul 10, 2024
pub trait Source: Send + Sync {
async fn get_data(
&self,
) -> Box<dyn Stream<Item = DaftResult<Arc<MicroPartition>>> + Send + Unpin>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use boxstream as we talked about

@@ -32,6 +35,6 @@ impl Source for ScanTaskSource {
// TODO: Implement dynamic splitting / merging of MicroPartition from scan task
Ok(Arc::new(out))
}));
Box::pin(stream)
Box::new(stream)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream.boxed() instead

Ok(Arc::new(out))
}

fn name(&self) -> String {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to block here but this trait should return a &'static str so we don't need to perform a heap allocation each time we ask for the name

}
}

impl IntermediateOperator for AggregateOperator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

future work: this should instead just be a blocking sink

fn get_data(&self) -> Pin<Box<dyn Stream<Item = DaftResult<Arc<MicroPartition>>> + Send>> {
async fn get_data(
&self,
) -> Box<dyn Stream<Item = DaftResult<Arc<MicroPartition>>> + Send + Unpin> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

favor typedef-ed BoxStream<DaftResult<Arc<MicroPartition>>>


Box::pin(ReceiverStream::new(rx))
Box::new(ReceiverStream::new(rx))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.boxed()

@colin-ho colin-ho merged commit a5badb4 into main Jul 10, 2024
43 checks passed
@colin-ho colin-ho deleted the colin/execution2 branch July 10, 2024 23:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants