Skip to content

Commit

Permalink
[FEAT] Refactors and agg improvements for new local execution model (#…
Browse files Browse the repository at this point in the history
…2497)

Makes source trait async
Connect to new local physical plan
Add incremental aggregations

---------

Co-authored-by: Colin Ho <colinho@Colins-MBP.localdomain>
  • Loading branch information
colin-ho and Colin Ho authored Jul 10, 2024
1 parent 26254cb commit a5badb4
Show file tree
Hide file tree
Showing 25 changed files with 358 additions and 189 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1638,6 +1638,13 @@ class LogicalPlanBuilder:
def to_adaptive_physical_plan_scheduler(self, cfg: PyDaftExecutionConfig) -> AdaptivePhysicalPlanScheduler: ...
def repr_ascii(self, simple: bool) -> str: ...

class NativeExecutor:
@staticmethod
def from_logical_plan_builder(
logical_plan_builder: LogicalPlanBuilder,
) -> NativeExecutor: ...
def run(self, psets: dict[str, list[PartitionT]]) -> Iterator[PyMicroPartition]: ...

class PyDaftExecutionConfig:
@staticmethod
def from_env() -> PyDaftExecutionConfig: ...
Expand Down
34 changes: 34 additions & 0 deletions daft/execution/native_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Iterator

from daft.daft import (
NativeExecutor as _NativeExecutor,
)
from daft.logical.builder import LogicalPlanBuilder
from daft.runners.partitioning import (
MaterializedResult,
PartitionT,
)
from daft.table import MicroPartition

if TYPE_CHECKING:
from daft.runners.pyrunner import PyMaterializedResult


class NativeExecutor:
def __init__(self, executor: _NativeExecutor):
self._executor = executor

@classmethod
def from_logical_plan_builder(cls, builder: LogicalPlanBuilder) -> NativeExecutor:
executor = _NativeExecutor.from_logical_plan_builder(builder._builder)
return cls(executor)

def run(self, psets: dict[str, list[MaterializedResult[PartitionT]]]) -> Iterator[PyMaterializedResult]:
from daft.runners.pyrunner import PyMaterializedResult

psets_mp = {part_id: [part.vpartition()._micropartition for part in parts] for part_id, parts in psets.items()}
return (
PyMaterializedResult(MicroPartition._from_pymicropartition(part)) for part in self._executor.run(psets_mp)
)
6 changes: 4 additions & 2 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from daft.daft import FileFormatConfig, FileInfos, IOConfig, ResourceRequest, SystemInfo
from daft.execution import physical_plan
from daft.execution.execution_step import Instruction, PartitionTask
from daft.execution.native_executor import NativeExecutor
from daft.filesystem import glob_path_with_stats
from daft.internal.gpu import cuda_device_count
from daft.logical.builder import LogicalPlanBuilder
Expand Down Expand Up @@ -183,14 +184,15 @@ def run_iter(
else:
# Finalize the logical plan and get a physical plan scheduler for translating the
# physical plan to executable tasks.
plan_scheduler = builder.to_physical_plan_scheduler(daft_execution_config)
if daft_execution_config.enable_native_executor:
logger.info("Using new executor")
results_gen = plan_scheduler.run(
executor = NativeExecutor.from_logical_plan_builder(builder)
results_gen = executor.run(
{k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}
)
yield from results_gen
else:
plan_scheduler = builder.to_physical_plan_scheduler(daft_execution_config)
psets = {k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}
# Get executable tasks from planner.
tasks = plan_scheduler.to_partition_tasks(psets)
Expand Down
2 changes: 2 additions & 0 deletions src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
[dependencies]
async-trait = {workspace = true}
common-error = {path = "../common/error", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-micropartition = {path = "../daft-micropartition", default-features = false}
daft-physical-plan = {path = "../daft-physical-plan", default-features = false}
daft-plan = {path = "../daft-plan", default-features = false}
daft-scan = {path = "../daft-scan", default-features = false}
dyn-clone = {workspace = true}
Expand Down
76 changes: 49 additions & 27 deletions src/daft-local-execution/src/create_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,67 +2,89 @@ use std::{collections::HashMap, sync::Arc};

use daft_dsl::Expr;
use daft_micropartition::MicroPartition;
use daft_plan::{
physical_ops::{Aggregate, Filter, InMemoryScan, Limit, Project, TabularScan},
PhysicalPlan,
use daft_physical_plan::{
Filter, InMemoryScan, Limit, LocalPhysicalPlan, PhysicalScan, Project, UnGroupedAggregate,
};
use daft_plan::populate_aggregation_stages;

use crate::{
intermediate_ops::{filter::FilterOperator, project::ProjectOperator},
intermediate_ops::{
aggregate::AggregateOperator, filter::FilterOperator, project::ProjectOperator,
},
pipeline::Pipeline,
sinks::{aggregate::AggregateSink, limit::LimitSink},
sources::{in_memory::InMemorySource, scan_task::ScanTaskSource},
};

pub fn physical_plan_to_pipeline(
physical_plan: &Arc<PhysicalPlan>,
physical_plan: &LocalPhysicalPlan,
psets: &HashMap<String, Vec<Arc<MicroPartition>>>,
) -> Pipeline {
match physical_plan.as_ref() {
PhysicalPlan::InMemoryScan(InMemoryScan { in_memory_info, .. }) => {
let partitions = psets
.get(&in_memory_info.cache_key)
.expect("Cache key not found");
Pipeline::new(Box::new(InMemorySource::new(partitions.clone())))
}
PhysicalPlan::TabularScan(TabularScan { scan_tasks, .. }) => {
match physical_plan {
LocalPhysicalPlan::PhysicalScan(PhysicalScan { scan_tasks, .. }) => {
Pipeline::new(Box::new(ScanTaskSource::new(scan_tasks.clone())))
}
PhysicalPlan::Project(Project {
LocalPhysicalPlan::InMemoryScan(InMemoryScan { info, .. }) => {
let partitions = psets.get(&info.cache_key).expect("Cache key not found");
Pipeline::new(Box::new(InMemorySource::new(partitions.clone())))
}
LocalPhysicalPlan::Project(Project {
input, projection, ..
}) => {
let current_pipeline = physical_plan_to_pipeline(input, psets);
let proj_op = ProjectOperator::new(projection.clone());
current_pipeline.with_intermediate_operator(Box::new(proj_op))
}
PhysicalPlan::Filter(Filter { input, predicate }) => {
LocalPhysicalPlan::Filter(Filter {
input, predicate, ..
}) => {
let current_pipeline = physical_plan_to_pipeline(input, psets);
let filter_op = FilterOperator::new(predicate.clone());
current_pipeline.with_intermediate_operator(Box::new(filter_op))
}
PhysicalPlan::Limit(Limit { limit, input, .. }) => {
LocalPhysicalPlan::Limit(Limit {
input, num_rows, ..
}) => {
let current_pipeline = physical_plan_to_pipeline(input, psets);
let sink = LimitSink::new(*limit as usize);
let sink = LimitSink::new(*num_rows as usize);
let current_pipeline = current_pipeline.with_sink(Box::new(sink));

Pipeline::new(Box::new(current_pipeline))
}
PhysicalPlan::Aggregate(Aggregate {
LocalPhysicalPlan::UnGroupedAggregate(UnGroupedAggregate {
input,
aggregations,
groupby,
schema,
..
}) => {
let current_pipeline = physical_plan_to_pipeline(input, psets);
let sink = AggregateSink::new(
aggregations
.iter()
.map(|agg| Arc::new(Expr::Agg(agg.clone())))
.collect::<Vec<_>>(),
groupby.clone(),
let (first_stage_aggs, second_stage_aggs, final_exprs) =
populate_aggregation_stages(aggregations, schema, &[]);

let first_stage_agg_op = AggregateOperator::new(
first_stage_aggs
.values()
.cloned()
.map(|e| Arc::new(Expr::Agg(e.clone())))
.collect(),
vec![],
);
let current_pipeline = current_pipeline.with_sink(Box::new(sink));
let current_pipeline =
current_pipeline.with_intermediate_operator(Box::new(first_stage_agg_op));

Pipeline::new(Box::new(current_pipeline))
let second_stage_agg_sink = AggregateSink::new(
second_stage_aggs
.values()
.cloned()
.map(|e| Arc::new(Expr::Agg(e.clone())))
.collect(),
vec![],
);
let current_pipeline = current_pipeline.with_sink(Box::new(second_stage_agg_sink));

let final_stage_project = ProjectOperator::new(final_exprs);
let new_pipeline = Pipeline::new(Box::new(current_pipeline));
new_pipeline.with_intermediate_operator(Box::new(final_stage_project))
}
_ => {
unimplemented!("Physical plan not supported: {}", physical_plan.name());
Expand Down
34 changes: 34 additions & 0 deletions src/daft-local-execution/src/intermediate_ops/aggregate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::sync::Arc;

use common_error::DaftResult;
use daft_dsl::ExprRef;
use daft_micropartition::MicroPartition;

use super::intermediate_op::IntermediateOperator;

#[derive(Clone)]
pub struct AggregateOperator {
agg_exprs: Vec<ExprRef>,
group_by: Vec<ExprRef>,
}

impl AggregateOperator {
pub fn new(agg_exprs: Vec<ExprRef>, group_by: Vec<ExprRef>) -> Self {
Self {
agg_exprs,
group_by,
}
}
}

impl IntermediateOperator for AggregateOperator {
fn execute(&self, input: &Arc<MicroPartition>) -> DaftResult<Arc<MicroPartition>> {
log::debug!("AggregateOperator::execute");
let out = input.agg(&self.agg_exprs, &self.group_by)?;
Ok(Arc::new(out))
}

fn name(&self) -> &'static str {
"AggregateOperator"
}
}
4 changes: 2 additions & 2 deletions src/daft-local-execution/src/intermediate_ops/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl IntermediateOperator for FilterOperator {
Ok(Arc::new(out))
}

fn name(&self) -> String {
"FilterOperator".to_string()
fn name(&self) -> &'static str {
"FilterOperator"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use daft_micropartition::MicroPartition;

pub trait IntermediateOperator: dyn_clone::DynClone + Send + Sync {
fn execute(&self, input: &Arc<MicroPartition>) -> DaftResult<Arc<MicroPartition>>;
fn name(&self) -> String;
fn name(&self) -> &'static str;
}

dyn_clone::clone_trait_object!(IntermediateOperator);
1 change: 1 addition & 0 deletions src/daft-local-execution/src/intermediate_ops/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod aggregate;
pub mod filter;
pub mod intermediate_op;
pub mod project;
4 changes: 2 additions & 2 deletions src/daft-local-execution/src/intermediate_ops/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl IntermediateOperator for ProjectOperator {
Ok(Arc::new(out))
}

fn name(&self) -> String {
"ProjectOperator".to_string()
fn name(&self) -> &'static str {
"ProjectOperator"
}
}
5 changes: 3 additions & 2 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::Arc;

use common_error::{DaftError, DaftResult};
use daft_micropartition::MicroPartition;
pub use run::run_streaming;
pub use run::NativeExecutor;
use snafu::Snafu;

type Sender = tokio::sync::mpsc::Sender<DaftResult<Arc<MicroPartition>>>;
Expand Down Expand Up @@ -42,6 +42,7 @@ impl From<Error> for DaftError {
}

#[cfg(feature = "python")]
pub fn register_modules(_py: Python, _parent: &PyModule) -> PyResult<()> {
pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
parent.add_class::<NativeExecutor>()?;
Ok(())
}
Loading

0 comments on commit a5badb4

Please sign in to comment.