Skip to content

Commit

Permalink
Support create_physical_expr and ExecutionContextState or `Defaul…
Browse files Browse the repository at this point in the history
…tPhysicalPlanner` for faster speed (#1700)

* Change physical_expr creation API

* Refactor API usage to avoid creating ExecutionContextState

* Fixup ballista

* clippy!
  • Loading branch information
alamb authored Jan 31, 2022
1 parent 2512608 commit 1caf52a
Show file tree
Hide file tree
Showing 6 changed files with 525 additions and 530 deletions.
3 changes: 1 addition & 2 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,6 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
let ctx_state = ExecutionContextState {
catalog_list,
scalar_functions: Default::default(),
var_provider: Default::default(),
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
Expand All @@ -632,7 +631,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {

let fun_expr = functions::create_physical_fun(
&(&scalar_function).into(),
&ctx_state,
&ctx_state.execution_props,
)?;

Arc::new(ScalarFunctionExpr::new(
Expand Down
40 changes: 34 additions & 6 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ impl ExecutionContext {
state: Arc::new(Mutex::new(ExecutionContextState {
catalog_list,
scalar_functions: HashMap::new(),
var_provider: HashMap::new(),
aggregate_functions: HashMap::new(),
config,
execution_props: ExecutionProps::new(),
Expand Down Expand Up @@ -324,8 +323,8 @@ impl ExecutionContext {
self.state
.lock()
.unwrap()
.var_provider
.insert(variable_type, provider);
.execution_props
.add_var_provider(variable_type, provider);
}

/// Registers a scalar UDF within this context.
Expand Down Expand Up @@ -1115,9 +1114,14 @@ impl ExecutionConfig {
/// An instance of this struct is created each time a [`LogicalPlan`] is prepared for
/// execution (optimized). If the same plan is optimized multiple times, a new
/// `ExecutionProps` is created each time.
///
/// It is important that this structure be cheap to create as it is
/// done so during predicate pruning and expression simplification
#[derive(Clone)]
pub struct ExecutionProps {
pub(crate) query_execution_start_time: DateTime<Utc>,
/// providers for scalar variables
pub var_providers: Option<HashMap<VarType, Arc<dyn VarProvider + Send + Sync>>>,
}

impl Default for ExecutionProps {
Expand All @@ -1131,6 +1135,7 @@ impl ExecutionProps {
pub fn new() -> Self {
ExecutionProps {
query_execution_start_time: chrono::Utc::now(),
var_providers: None,
}
}

Expand All @@ -1139,6 +1144,32 @@ impl ExecutionProps {
self.query_execution_start_time = chrono::Utc::now();
&*self
}

/// Registers a variable provider, returning the existing
/// provider, if any
pub fn add_var_provider(
&mut self,
var_type: VarType,
provider: Arc<dyn VarProvider + Send + Sync>,
) -> Option<Arc<dyn VarProvider + Send + Sync>> {
let mut var_providers = self.var_providers.take().unwrap_or_else(HashMap::new);

let old_provider = var_providers.insert(var_type, provider);

self.var_providers = Some(var_providers);

old_provider
}

/// Returns the provider for the var_type, if any
pub fn get_var_provider(
&self,
var_type: VarType,
) -> Option<Arc<dyn VarProvider + Send + Sync>> {
self.var_providers
.as_ref()
.and_then(|var_providers| var_providers.get(&var_type).map(Arc::clone))
}
}

/// Execution context for registering data sources and executing queries
Expand All @@ -1148,8 +1179,6 @@ pub struct ExecutionContextState {
pub catalog_list: Arc<dyn CatalogList>,
/// Scalar functions that are registered with the context
pub scalar_functions: HashMap<String, Arc<ScalarUDF>>,
/// Variable provider that are registered with the context
pub var_provider: HashMap<VarType, Arc<dyn VarProvider + Send + Sync>>,
/// Aggregate functions registered in the context
pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
/// Context configuration
Expand All @@ -1174,7 +1203,6 @@ impl ExecutionContextState {
ExecutionContextState {
catalog_list: Arc::new(MemoryCatalogList::new()),
scalar_functions: HashMap::new(),
var_provider: HashMap::new(),
aggregate_functions: HashMap::new(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
Expand Down
30 changes: 13 additions & 17 deletions datafusion/src/optimizer/simplify_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;

use crate::error::DataFusionError;
use crate::execution::context::{ExecutionContextState, ExecutionProps};
use crate::execution::context::ExecutionProps;
use crate::logical_plan::{lit, DFSchemaRef, Expr};
use crate::logical_plan::{DFSchema, ExprRewriter, LogicalPlan, RewriteRecursion};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use crate::physical_plan::functions::Volatility;
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::planner::create_physical_expr;
use crate::scalar::ScalarValue;
use crate::{error::Result, logical_plan::Operator};

Expand Down Expand Up @@ -223,7 +223,7 @@ impl SimplifyExpressions {
/// let rewritten = expr.rewrite(&mut const_evaluator).unwrap();
/// assert_eq!(rewritten, lit(3) + col("a"));
/// ```
pub struct ConstEvaluator {
pub struct ConstEvaluator<'a> {
/// can_evaluate is used during the depth-first-search of the
/// Expr tree to track if any siblings (or their descendants) were
/// non evaluatable (e.g. had a column reference or volatile
Expand All @@ -238,13 +238,12 @@ pub struct ConstEvaluator {
/// descendants) so this Expr can be evaluated
can_evaluate: Vec<bool>,

ctx_state: ExecutionContextState,
planner: DefaultPhysicalPlanner,
execution_props: &'a ExecutionProps,
input_schema: DFSchema,
input_batch: RecordBatch,
}

impl ExprRewriter for ConstEvaluator {
impl<'a> ExprRewriter for ConstEvaluator<'a> {
fn pre_visit(&mut self, expr: &Expr) -> Result<RewriteRecursion> {
// Default to being able to evaluate this node
self.can_evaluate.push(true);
Expand Down Expand Up @@ -282,16 +281,11 @@ impl ExprRewriter for ConstEvaluator {
}
}

impl ConstEvaluator {
impl<'a> ConstEvaluator<'a> {
/// Create a new `ConstantEvaluator`. Session constants (such as
/// the time for `now()` are taken from the passed
/// `execution_props`.
pub fn new(execution_props: &ExecutionProps) -> Self {
let planner = DefaultPhysicalPlanner::default();
let ctx_state = ExecutionContextState {
execution_props: execution_props.clone(),
..ExecutionContextState::new()
};
pub fn new(execution_props: &'a ExecutionProps) -> Self {
let input_schema = DFSchema::empty();

// The dummy column name is unused and doesn't matter as only
Expand All @@ -306,8 +300,7 @@ impl ConstEvaluator {

Self {
can_evaluate: vec![],
ctx_state,
planner,
execution_props,
input_schema,
input_batch,
}
Expand Down Expand Up @@ -364,11 +357,11 @@ impl ConstEvaluator {
return Ok(s);
}

let phys_expr = self.planner.create_physical_expr(
let phys_expr = create_physical_expr(
&expr,
&self.input_schema,
&self.input_batch.schema(),
&self.ctx_state,
self.execution_props,
)?;
let col_val = phys_expr.evaluate(&self.input_batch)?;
match col_val {
Expand Down Expand Up @@ -1141,6 +1134,7 @@ mod tests {
) {
let execution_props = ExecutionProps {
query_execution_start_time: *date_time,
var_providers: None,
};

let mut const_evaluator = ConstEvaluator::new(&execution_props);
Expand Down Expand Up @@ -1622,6 +1616,7 @@ mod tests {
let rule = SimplifyExpressions::new();
let execution_props = ExecutionProps {
query_execution_start_time: *date_time,
var_providers: None,
};

let err = rule
Expand All @@ -1638,6 +1633,7 @@ mod tests {
let rule = SimplifyExpressions::new();
let execution_props = ExecutionProps {
query_execution_start_time: *date_time,
var_providers: None,
};

let optimized_plan = rule
Expand Down
13 changes: 8 additions & 5 deletions datafusion/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ use arrow::{
record_batch::RecordBatch,
};

use crate::execution::context::ExecutionProps;
use crate::physical_plan::planner::create_physical_expr;
use crate::prelude::lit;
use crate::{
error::{DataFusionError, Result},
execution::context::ExecutionContextState,
logical_plan::{Column, DFSchema, Expr, Operator},
optimizer::utils,
physical_plan::{planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr},
physical_plan::{ColumnarValue, PhysicalExpr},
};

/// Interface to pass statistics information to [`PruningPredicates`]
Expand Down Expand Up @@ -129,12 +130,14 @@ impl PruningPredicate {
.collect::<Vec<_>>();
let stat_schema = Schema::new(stat_fields);
let stat_dfschema = DFSchema::try_from(stat_schema.clone())?;
let execution_context_state = ExecutionContextState::new();
let predicate_expr = DefaultPhysicalPlanner::default().create_physical_expr(

// TODO allow these properties to be passed in
let execution_props = ExecutionProps::new();
let predicate_expr = create_physical_expr(
&logical_predicate_expr,
&stat_dfschema,
&stat_schema,
&execution_context_state,
&execution_props,
)?;
Ok(Self {
schema,
Expand Down
34 changes: 17 additions & 17 deletions datafusion/src/physical_plan/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use super::{
type_coercion::{coerce, data_types},
ColumnarValue, PhysicalExpr,
};
use crate::execution::context::ExecutionContextState;
use crate::execution::context::ExecutionProps;
use crate::physical_plan::array_expressions;
use crate::physical_plan::datetime_expressions;
use crate::physical_plan::expressions::{
Expand Down Expand Up @@ -723,7 +723,7 @@ macro_rules! invoke_if_unicode_expressions_feature_flag {
/// Create a physical scalar function.
pub fn create_physical_fun(
fun: &BuiltinScalarFunction,
ctx_state: &ExecutionContextState,
execution_props: &ExecutionProps,
) -> Result<ScalarFunctionImplementation> {
Ok(match fun {
// math functions
Expand Down Expand Up @@ -820,7 +820,7 @@ pub fn create_physical_fun(
BuiltinScalarFunction::Now => {
// bind value for now at plan time
Arc::new(datetime_expressions::make_now(
ctx_state.execution_props.query_execution_start_time,
execution_props.query_execution_start_time,
))
}
BuiltinScalarFunction::InitCap => Arc::new(|args| match args[0].data_type() {
Expand Down Expand Up @@ -1157,7 +1157,7 @@ pub fn create_physical_expr(
fun: &BuiltinScalarFunction,
input_phy_exprs: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
ctx_state: &ExecutionContextState,
execution_props: &ExecutionProps,
) -> Result<Arc<dyn PhysicalExpr>> {
let coerced_phy_exprs = coerce(input_phy_exprs, input_schema, &signature(fun))?;

Expand Down Expand Up @@ -1254,7 +1254,7 @@ pub fn create_physical_expr(
}
}),
// These don't need args and input schema
_ => create_physical_fun(fun, ctx_state)?,
_ => create_physical_fun(fun, execution_props)?,
};

Ok(Arc::new(ScalarFunctionExpr::new(
Expand Down Expand Up @@ -1720,14 +1720,14 @@ mod tests {
($FUNC:ident, $ARGS:expr, $EXPECTED:expr, $EXPECTED_TYPE:ty, $DATA_TYPE: ident, $ARRAY_TYPE:ident) => {
// used to provide type annotation
let expected: Result<Option<$EXPECTED_TYPE>> = $EXPECTED;
let ctx_state = ExecutionContextState::new();
let execution_props = ExecutionProps::new();

// any type works here: we evaluate against a literal of `value`
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let columns: Vec<ArrayRef> = vec![Arc::new(Int32Array::from_slice(&[1]))];

let expr =
create_physical_expr(&BuiltinScalarFunction::$FUNC, $ARGS, &schema, &ctx_state)?;
create_physical_expr(&BuiltinScalarFunction::$FUNC, $ARGS, &schema, &execution_props)?;

// type is correct
assert_eq!(expr.data_type(&schema)?, DataType::$DATA_TYPE);
Expand Down Expand Up @@ -3888,7 +3888,7 @@ mod tests {

#[test]
fn test_empty_arguments_error() -> Result<()> {
let ctx_state = ExecutionContextState::new();
let execution_props = ExecutionProps::new();
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);

// pick some arbitrary functions to test
Expand All @@ -3900,7 +3900,7 @@ mod tests {
];

for fun in funs.iter() {
let expr = create_physical_expr(fun, &[], &schema, &ctx_state);
let expr = create_physical_expr(fun, &[], &schema, &execution_props);

match expr {
Ok(..) => {
Expand Down Expand Up @@ -3931,13 +3931,13 @@ mod tests {

#[test]
fn test_empty_arguments() -> Result<()> {
let ctx_state = ExecutionContextState::new();
let execution_props = ExecutionProps::new();
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);

let funs = [BuiltinScalarFunction::Now, BuiltinScalarFunction::Random];

for fun in funs.iter() {
create_physical_expr(fun, &[], &schema, &ctx_state)?;
create_physical_expr(fun, &[], &schema, &execution_props)?;
}
Ok(())
}
Expand All @@ -3954,13 +3954,13 @@ mod tests {
Field::new("b", value2.data_type().clone(), false),
]);
let columns: Vec<ArrayRef> = vec![value1, value2];
let ctx_state = ExecutionContextState::new();
let execution_props = ExecutionProps::new();

let expr = create_physical_expr(
&BuiltinScalarFunction::Array,
&[col("a", &schema)?, col("b", &schema)?],
&schema,
&ctx_state,
&execution_props,
)?;

// type is correct
Expand Down Expand Up @@ -4017,7 +4017,7 @@ mod tests {
fn test_regexp_match() -> Result<()> {
use arrow::array::ListArray;
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
let ctx_state = ExecutionContextState::new();
let execution_props = ExecutionProps::new();

let col_value: ArrayRef = Arc::new(StringArray::from_slice(&["aaa-555"]));
let pattern = lit(ScalarValue::Utf8(Some(r".*-(\d*)".to_string())));
Expand All @@ -4026,7 +4026,7 @@ mod tests {
&BuiltinScalarFunction::RegexpMatch,
&[col("a", &schema)?, pattern],
&schema,
&ctx_state,
&execution_props,
)?;

// type is correct
Expand Down Expand Up @@ -4056,7 +4056,7 @@ mod tests {
fn test_regexp_match_all_literals() -> Result<()> {
use arrow::array::ListArray;
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let ctx_state = ExecutionContextState::new();
let execution_props = ExecutionProps::new();

let col_value = lit(ScalarValue::Utf8(Some("aaa-555".to_string())));
let pattern = lit(ScalarValue::Utf8(Some(r".*-(\d*)".to_string())));
Expand All @@ -4065,7 +4065,7 @@ mod tests {
&BuiltinScalarFunction::RegexpMatch,
&[col_value, pattern],
&schema,
&ctx_state,
&execution_props,
)?;

// type is correct
Expand Down
Loading

0 comments on commit 1caf52a

Please sign in to comment.