Skip to content

Commit

Permalink
Merge commit '70db5eab8996af4816958f798f6ee887dffb69ed' into chunchun…
Browse files Browse the repository at this point in the history
…/update-df-apr-week-3
  • Loading branch information
appletreeisyellow committed Apr 26, 2024
2 parents 49bccd2 + 70db5ea commit ed17749
Show file tree
Hide file tree
Showing 15 changed files with 494 additions and 182 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ parquet = { version = "51.0.0", default-features = false, features = ["arrow", "
rand = "0.8"
rstest = "0.19.0"
serde_json = "1"
sqlparser = { version = "0.44.0", features = ["visitor"] }
sqlparser = { version = "0.45.0", features = ["visitor"] }
tempfile = "3"
thiserror = "1.0.44"
tokio = { version = "1.36", features = ["macros", "rt", "sync"] }
Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 30 additions & 14 deletions datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,32 +68,48 @@ impl Constraints {
let constraints = constraints
.iter()
.map(|c: &TableConstraint| match c {
TableConstraint::Unique {
columns,
is_primary,
..
} => {
TableConstraint::Unique { name, columns, .. } => {
let field_names = df_schema.field_names();
// Get primary key and/or unique indices in the schema:
// Get unique constraint indices in the schema:
let indices = columns
.iter()
.map(|pk| {
.map(|u| {
let idx = field_names
.iter()
.position(|item| *item == pk.value)
.position(|item| *item == u.value)
.ok_or_else(|| {
let name = name
.as_ref()
.map(|name| format!("with name '{name}' "))
.unwrap_or("".to_string());
DataFusionError::Execution(
"Primary key doesn't exist".to_string(),
format!("Column for unique constraint {}not found in schema: {}", name,u.value)
)
})?;
Ok(idx)
})
.collect::<Result<Vec<_>>>()?;
Ok(if *is_primary {
Constraint::PrimaryKey(indices)
} else {
Constraint::Unique(indices)
})
Ok(Constraint::Unique(indices))
}
TableConstraint::PrimaryKey { columns, .. } => {
let field_names = df_schema.field_names();
// Get primary key indices in the schema:
let indices = columns
.iter()
.map(|pk| {
let idx = field_names
.iter()
.position(|item| *item == pk.value)
.ok_or_else(|| {
DataFusionError::Execution(format!(
"Column for primary key not found in schema: {}",
pk.value
))
})?;
Ok(idx)
})
.collect::<Result<Vec<_>>>()?;
Ok(Constraint::PrimaryKey(indices))
}
TableConstraint::ForeignKey { .. } => {
_plan_err!("Foreign key constraints are not currently supported")
Expand Down
19 changes: 16 additions & 3 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1575,6 +1575,18 @@ impl ScalarValue {
tz
)
}
DataType::Duration(TimeUnit::Second) => {
build_array_primitive!(DurationSecondArray, DurationSecond)
}
DataType::Duration(TimeUnit::Millisecond) => {
build_array_primitive!(DurationMillisecondArray, DurationMillisecond)
}
DataType::Duration(TimeUnit::Microsecond) => {
build_array_primitive!(DurationMicrosecondArray, DurationMicrosecond)
}
DataType::Duration(TimeUnit::Nanosecond) => {
build_array_primitive!(DurationNanosecondArray, DurationNanosecond)
}
DataType::Interval(IntervalUnit::DayTime) => {
build_array_primitive!(IntervalDayTimeArray, IntervalDayTime)
}
Expand Down Expand Up @@ -1605,7 +1617,10 @@ impl ScalarValue {
let arrays = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
arrow::compute::concat(arrays.as_slice())?
}
DataType::List(_) | DataType::LargeList(_) | DataType::Struct(_) => {
DataType::List(_)
| DataType::LargeList(_)
| DataType::Struct(_)
| DataType::Union(_, _) => {
let arrays = scalars.map(|s| s.to_array()).collect::<Result<Vec<_>>>()?;
let arrays = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
arrow::compute::concat(arrays.as_slice())?
Expand Down Expand Up @@ -1673,8 +1688,6 @@ impl ScalarValue {
| DataType::Time32(TimeUnit::Nanosecond)
| DataType::Time64(TimeUnit::Second)
| DataType::Time64(TimeUnit::Millisecond)
| DataType::Duration(_)
| DataType::Union(_, _)
| DataType::Map(_, _)
| DataType::RunEndEncoded(_, _)
| DataType::Utf8View
Expand Down
73 changes: 18 additions & 55 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,6 @@ impl PhysicalOptimizerRule for JoinSelection {
/// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join sides.
/// When the `ignore_threshold` is false, this function will also check left
/// and right sizes in bytes or rows.
///
/// For [`JoinType::Full`], it can not use `CollectLeft` mode and will return `None`.
/// For [`JoinType::Left`] and [`JoinType::LeftAnti`], it can not run `CollectLeft`
/// mode as is, but it can do so by changing the join type to [`JoinType::Right`]
/// and [`JoinType::RightAnti`], respectively.
fn try_collect_left(
hash_join: &HashJoinExec,
ignore_threshold: bool,
Expand All @@ -318,38 +313,20 @@ fn try_collect_left(
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let left = hash_join.left();
let right = hash_join.right();
let join_type = hash_join.join_type();

let left_can_collect = match join_type {
JoinType::Left | JoinType::Full | JoinType::LeftAnti => false,
JoinType::Inner
| JoinType::LeftSemi
| JoinType::Right
| JoinType::RightSemi
| JoinType::RightAnti => {
ignore_threshold
|| supports_collect_by_thresholds(
&**left,
threshold_byte_size,
threshold_num_rows,
)
}
};
let right_can_collect = match join_type {
JoinType::Right | JoinType::Full | JoinType::RightAnti => false,
JoinType::Inner
| JoinType::RightSemi
| JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti => {
ignore_threshold
|| supports_collect_by_thresholds(
&**right,
threshold_byte_size,
threshold_num_rows,
)
}
};
let left_can_collect = ignore_threshold
|| supports_collect_by_thresholds(
&**left,
threshold_byte_size,
threshold_num_rows,
);
let right_can_collect = ignore_threshold
|| supports_collect_by_thresholds(
&**right,
threshold_byte_size,
threshold_num_rows,
);

match (left_can_collect, right_can_collect) {
(true, true) => {
if should_swap_join_order(&**left, &**right)?
Expand Down Expand Up @@ -916,9 +893,9 @@ mod tests_statistical {
}

#[tokio::test]
async fn test_left_join_with_swap() {
async fn test_left_join_no_swap() {
let (big, small) = create_big_and_small();
// Left out join should alway swap when the mode is PartitionMode::CollectLeft, even left side is small and right side is large

let join = Arc::new(
HashJoinExec::try_new(
Arc::clone(&small),
Expand All @@ -942,32 +919,18 @@ mod tests_statistical {
.optimize(join.clone(), &ConfigOptions::new())
.unwrap();

let swapping_projection = optimized_join
.as_any()
.downcast_ref::<ProjectionExec>()
.expect("A proj is required to swap columns back to their original order");

assert_eq!(swapping_projection.expr().len(), 2);
let (col, name) = &swapping_projection.expr()[0];
assert_eq!(name, "small_col");
assert_col_expr(col, "small_col", 1);
let (col, name) = &swapping_projection.expr()[1];
assert_eq!(name, "big_col");
assert_col_expr(col, "big_col", 0);

let swapped_join = swapping_projection
.input()
let swapped_join = optimized_join
.as_any()
.downcast_ref::<HashJoinExec>()
.expect("The type of the plan should not be changed");

assert_eq!(
swapped_join.left().statistics().unwrap().total_byte_size,
Precision::Inexact(2097152)
Precision::Inexact(8192)
);
assert_eq!(
swapped_join.right().statistics().unwrap().total_byte_size,
Precision::Inexact(8192)
Precision::Inexact(2097152)
);
crosscheck_plans(join.clone()).unwrap();
}
Expand Down
Loading

0 comments on commit ed17749

Please sign in to comment.