Skip to content

Commit

Permalink
feat(streaming): plan asof join (#18683)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su authored Sep 30, 2024
1 parent fae0201 commit e82932f
Show file tree
Hide file tree
Showing 27 changed files with 897 additions and 97 deletions.
143 changes: 143 additions & 0 deletions e2e_test/streaming/asof_join.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

# asof inner join

statement ok
create table t1 (v1 int, v2 int, v3 int primary key);

statement ok
create table t2 (v1 int, v2 int, v3 int primary key);

statement ok
create materialized view mv1 as SELECT t1.v1 t1_v1, t1.v2 t1_v2, t1.v3 t1_v3, t2.v1 t2_v1, t2.v2 t2_v2, t2.v3 t2_v3 FROM t1 ASOF JOIN t2 ON t1.v1 = t2.v1 and t1.v2 <= t2.v2;

statement ok
insert into t1 values (1, 2, 3);

statement ok
insert into t2 values (1, 3, 4);

query III
select * from mv1;
----
1 2 3 1 3 4

statement ok
insert into t2 values (1, 2, 3);

query III
select * from mv1;
----
1 2 3 1 2 3

statement ok
delete from t1 where v3 = 3;

query III
select * from mv1;
----


statement ok
insert into t1 values (2, 3, 4);

statement ok
insert into t2 values (2, 3, 6), (2, 3, 7), (2, 3, 5);

query III
select * from mv1;
----
2 3 4 2 3 5

statement ok
insert into t2 values (2, 3, 1), (2, 3, 2);

query III
select * from mv1;
----
2 3 4 2 3 1

statement ok
drop materialized view mv1;

statement ok
drop table t1;

statement ok
drop table t2;


# asof left join

statement ok
create table t1 (v1 int, v2 int, v3 int primary key);

statement ok
create table t2 (v1 int, v2 int, v3 int primary key);

statement ok
create materialized view mv1 as SELECT t1.v1 t1_v1, t1.v2 t1_v2, t1.v3 t1_v3, t2.v1 t2_v1, t2.v2 t2_v2, t2.v3 t2_v3 FROM t1 ASOF LEFT JOIN t2 ON t1.v1 = t2.v1 and t1.v2 > t2.v2;

statement ok
insert into t1 values (1, 2, 3);

statement ok
insert into t2 values (1, 2, 4);

query III
select * from mv1;
----
1 2 3 NULL NULL NULL

statement ok
insert into t2 values (1, 1, 3);

query III
select * from mv1;
----
1 2 3 1 1 3

statement ok
delete from t1 where v3 = 3;

query III
select * from mv1;
----


statement ok
insert into t1 values (2, 3, 4);

statement ok
insert into t2 values (2, 2, 6), (2, 2, 7), (2, 2, 5);

query III
select * from mv1;
----
2 3 4 2 2 5

statement ok
insert into t2 values (2, 2, 1), (2, 2, 2);

query III
select * from mv1;
----
2 3 4 2 2 1

statement ok
delete from t2 where v1 = 2;

query III
select * from mv1;
----
2 3 4 NULL NULL NULL

statement ok
drop materialized view mv1;

statement ok
drop table t1;

statement ok
drop table t2;
2 changes: 2 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ enum JoinType {
JOIN_TYPE_LEFT_ANTI = 6;
JOIN_TYPE_RIGHT_SEMI = 7;
JOIN_TYPE_RIGHT_ANTI = 8;
JOIN_TYPE_ASOF_INNER = 9;
JOIN_TYPE_ASOF_LEFT_OUTER = 10;
}

enum AsOfJoinType {
Expand Down
1 change: 1 addition & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ message StreamNode {
LocalApproxPercentileNode local_approx_percentile = 144;
GlobalApproxPercentileNode global_approx_percentile = 145;
RowMergeNode row_merge = 146;
AsOfJoinNode as_of_join = 147;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
4 changes: 3 additions & 1 deletion src/batch/src/executor/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ impl JoinType {
PbJoinType::RightSemi => JoinType::RightSemi,
PbJoinType::RightAnti => JoinType::RightAnti,
PbJoinType::FullOuter => JoinType::FullOuter,
PbJoinType::Unspecified => unreachable!(),
PbJoinType::AsofInner | PbJoinType::AsofLeftOuter | PbJoinType::Unspecified => {
unreachable!()
}
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,12 @@ pub fn visit_stream_node_tables_inner<F>(
always!(node.bucket_state_table, "GlobalApproxPercentileBucketState");
always!(node.count_state_table, "GlobalApproxPercentileCountState");
}

// AsOf join
NodeBody::AsOfJoin(node) => {
always!(node.left_table, "AsOfJoinLeft");
always!(node.right_table, "AsOfJoinRight");
}
_ => {}
}
};
Expand Down
35 changes: 35 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/asof_join.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
- sql:
CREATE TABLE t1(v1 varchar, v2 int, v3 int);
CREATE TABLE t2(v1 varchar, v2 int, v3 int);
SELECT * FROM t1 ASOF JOIN t2 ON t1.v1 = t2.v1;
expected_outputs:
- stream_error

- sql:
CREATE TABLE t1(v1 varchar, v2 int, v3 int);
CREATE TABLE t2(v1 varchar, v2 int, v3 int);
SELECT t1.v1 t1_v1, t1.v2 t1_v2, t2.v1 t2_v1, t2.v2 t2_v2 FROM t1 ASOF JOIN t2 ON t1.v1 = t2.v1 || 'a' and t1.v2 > t2.v2;
expected_outputs:
- batch_error
- stream_plan

- sql:
CREATE TABLE t1(v1 varchar, v2 int, v3 int);
CREATE TABLE t2(v1 varchar, v2 int, v3 int);
SELECT t1.v1 t1_v1, t1.v2 t1_v2, t2.v1 t2_v1, t2.v2 t2_v2 FROM t1 ASOF LEFT JOIN t2 ON t1.v1 = t2.v1 and t1.v2 *2 < t2.v2;
expected_outputs:
- stream_plan

- sql:
CREATE TABLE t1(v1 varchar, v2 int, v3 int);
CREATE TABLE t2(v1 varchar, v2 int, v3 int);
SELECT t1.v1 t1_v1, t1.v2 t1_v2, t2.v1 t2_v1, t2.v2 t2_v2 FROM t1 ASOF JOIN t2 ON t1.v1 = t2.v1 and t1.v2 < t2.v2 and t1.v3 < t2.v3;
expected_outputs:
- stream_error

- sql:
CREATE TABLE t1(v1 varchar, v2 int, v3 int);
CREATE TABLE t2(v1 varchar, v2 int, v3 int);
SELECT t1.v1 t1_v1, t1.v2 t1_v2, t2.v1 t2_v1, t2.v2 t2_v2 FROM t1 ASOF JOIN t2 ON t1.v2 < t2.v2;
expected_outputs:
- stream_error
28 changes: 28 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/asof_join.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- sql: CREATE TABLE t1(v1 varchar, v2 int, v3 int); CREATE TABLE t2(v1 varchar, v2 int, v3 int); SELECT * FROM t1 ASOF JOIN t2 ON t1.v1 = t2.v1;
stream_error: 'Invalid input syntax: AsOf join requires exactly 1 ineuquality condition'
- sql: CREATE TABLE t1(v1 varchar, v2 int, v3 int); CREATE TABLE t2(v1 varchar, v2 int, v3 int); SELECT t1.v1 t1_v1, t1.v2 t1_v2, t2.v1 t2_v1, t2.v2 t2_v2 FROM t1 ASOF JOIN t2 ON t1.v1 = t2.v1 || 'a' and t1.v2 > t2.v2;
stream_plan: |-
StreamMaterialize { columns: [t1_v1, t1_v2, t2_v1, t2_v2, t1._row_id(hidden), t2._row_id(hidden)], stream_key: [t1._row_id, t2._row_id, t1_v1], pk_columns: [t1._row_id, t2._row_id, t1_v1], pk_conflict: NoCheck }
└─StreamAsOfJoin { type: AsofInner, predicate: t1.v1 = $expr1 AND (t1.v2 > t2.v2), output: [t1.v1, t1.v2, t2.v1, t2.v2, t1._row_id, t2._row_id] }
├─StreamExchange { dist: HashShard(t1.v1) }
│ └─StreamTableScan { table: t1, columns: [t1.v1, t1.v2, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard($expr1) }
└─StreamProject { exprs: [t2.v1, t2.v2, ConcatOp(t2.v1, 'a':Varchar) as $expr1, t2._row_id] }
└─StreamTableScan { table: t2, columns: [t2.v1, t2.v2, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) }
batch_error: |-
Not supported: AsOf join in batch query
HINT: AsOf join is only supported in streaming query
- sql: CREATE TABLE t1(v1 varchar, v2 int, v3 int); CREATE TABLE t2(v1 varchar, v2 int, v3 int); SELECT t1.v1 t1_v1, t1.v2 t1_v2, t2.v1 t2_v1, t2.v2 t2_v2 FROM t1 ASOF LEFT JOIN t2 ON t1.v1 = t2.v1 and t1.v2 *2 < t2.v2;
stream_plan: |-
StreamMaterialize { columns: [t1_v1, t1_v2, t2_v1, t2_v2, t1._row_id(hidden), t2._row_id(hidden)], stream_key: [t1._row_id, t2._row_id, t1_v1], pk_columns: [t1._row_id, t2._row_id, t1_v1], pk_conflict: NoCheck }
└─StreamAsOfJoin { type: AsofLeftOuter, predicate: t1.v1 = t2.v1 AND ($expr1 < t2.v2), output: [t1.v1, t1.v2, t2.v1, t2.v2, t1._row_id, t2._row_id] }
├─StreamExchange { dist: HashShard(t1.v1) }
│ └─StreamProject { exprs: [t1.v1, t1.v2, (t1.v2 * 2:Int32) as $expr1, t1._row_id] }
│ └─StreamTableScan { table: t1, columns: [t1.v1, t1.v2, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.v1) }
└─StreamTableScan { table: t2, columns: [t2.v1, t2.v2, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) }
- sql: CREATE TABLE t1(v1 varchar, v2 int, v3 int); CREATE TABLE t2(v1 varchar, v2 int, v3 int); SELECT t1.v1 t1_v1, t1.v2 t1_v2, t2.v1 t2_v1, t2.v2 t2_v2 FROM t1 ASOF JOIN t2 ON t1.v1 = t2.v1 and t1.v2 < t2.v2 and t1.v3 < t2.v3;
stream_error: 'Invalid input syntax: AsOf join requires exactly 1 ineuquality condition'
- sql: CREATE TABLE t1(v1 varchar, v2 int, v3 int); CREATE TABLE t2(v1 varchar, v2 int, v3 int); SELECT t1.v1 t1_v1, t1.v2 t1_v2, t2.v1 t2_v1, t2.v2 t2_v2 FROM t1 ASOF JOIN t2 ON t1.v2 < t2.v2;
stream_error: 'Invalid input syntax: AsOf join requires at least 1 equal condition'
2 changes: 2 additions & 0 deletions src/frontend/src/binder/relation/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ impl Binder {
JoinOperator::FullOuter(constraint) => (constraint, JoinType::FullOuter),
// Cross join equals to inner join with with no constraint.
JoinOperator::CrossJoin => (JoinConstraint::None, JoinType::Inner),
JoinOperator::AsOfInner(constraint) => (constraint, JoinType::AsofInner),
JoinOperator::AsOfLeft(constraint) => (constraint, JoinType::AsofLeftOuter),
};
let right: Relation;
let cond: ExprImpl;
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ impl BatchHashJoin {
// we can not derive the hash distribution from the side where outer join can generate a
// NULL row
(Distribution::HashShard(_), Distribution::HashShard(_)) => match join.join_type {
JoinType::Unspecified => unreachable!(),
JoinType::AsofInner | JoinType::AsofLeftOuter | JoinType::Unspecified => {
unreachable!()
}
JoinType::FullOuter => Distribution::SomeShard,
JoinType::Inner | JoinType::LeftOuter | JoinType::LeftSemi | JoinType::LeftAnti => {
let l2o = join.l2i_col_mapping().composite(&join.i2o_col_mapping());
Expand Down
34 changes: 25 additions & 9 deletions src/frontend/src/optimizer/plan_node/generic/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {
.rewrite_functional_dependency_set(right_fd_set)
};
let fd_set: FunctionalDependencySet = match self.join_type {
JoinType::Inner => {
JoinType::Inner | JoinType::AsofInner => {
let mut fd_set = FunctionalDependencySet::new(full_out_col_num);
for i in &self.on.conjunctions {
if let Some((col, _)) = i.as_eq_const() {
Expand All @@ -300,7 +300,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {
.for_each(|fd| fd_set.add_functional_dependency(fd));
fd_set
}
JoinType::LeftOuter => get_new_left_fd_set(left_fd_set),
JoinType::LeftOuter | JoinType::AsofLeftOuter => get_new_left_fd_set(left_fd_set),
JoinType::RightOuter => get_new_right_fd_set(right_fd_set),
JoinType::FullOuter => FunctionalDependencySet::new(full_out_col_num),
JoinType::LeftSemi | JoinType::LeftAnti => left_fd_set,
Expand All @@ -325,9 +325,12 @@ impl<PlanRef> Join<PlanRef> {

pub fn full_out_col_num(left_len: usize, right_len: usize, join_type: JoinType) -> usize {
match join_type {
JoinType::Inner | JoinType::LeftOuter | JoinType::RightOuter | JoinType::FullOuter => {
left_len + right_len
}
JoinType::Inner
| JoinType::LeftOuter
| JoinType::RightOuter
| JoinType::FullOuter
| JoinType::AsofInner
| JoinType::AsofLeftOuter => left_len + right_len,
JoinType::LeftSemi | JoinType::LeftAnti => left_len,
JoinType::RightSemi | JoinType::RightAnti => right_len,
JoinType::Unspecified => unreachable!(),
Expand Down Expand Up @@ -371,7 +374,12 @@ impl<PlanRef: GenericPlanRef> Join<PlanRef> {
let right_len = self.right.schema().len();

match self.join_type {
JoinType::Inner | JoinType::LeftOuter | JoinType::RightOuter | JoinType::FullOuter => {
JoinType::Inner
| JoinType::LeftOuter
| JoinType::RightOuter
| JoinType::FullOuter
| JoinType::AsofInner
| JoinType::AsofLeftOuter => {
ColIndexMapping::identity_or_none(left_len + right_len, left_len)
}

Expand All @@ -389,7 +397,12 @@ impl<PlanRef: GenericPlanRef> Join<PlanRef> {
let right_len = self.right.schema().len();

match self.join_type {
JoinType::Inner | JoinType::LeftOuter | JoinType::RightOuter | JoinType::FullOuter => {
JoinType::Inner
| JoinType::LeftOuter
| JoinType::RightOuter
| JoinType::FullOuter
| JoinType::AsofInner
| JoinType::AsofLeftOuter => {
ColIndexMapping::with_shift_offset(left_len + right_len, -(left_len as isize))
}
JoinType::LeftSemi | JoinType::LeftAnti => ColIndexMapping::empty(left_len, right_len),
Expand Down Expand Up @@ -445,13 +458,16 @@ impl<PlanRef: GenericPlanRef> Join<PlanRef> {

pub fn add_which_join_key_to_pk(&self) -> EitherOrBoth<(), ()> {
match self.join_type {
JoinType::Inner => {
JoinType::Inner | JoinType::AsofInner => {
// Theoretically adding either side is ok, but the distribution key of the inner
// join derived based on the left side by default, so we choose the left side here
// to ensure the pk comprises the distribution key.
EitherOrBoth::Left(())
}
JoinType::LeftOuter | JoinType::LeftSemi | JoinType::LeftAnti => EitherOrBoth::Left(()),
JoinType::LeftOuter
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::AsofLeftOuter => EitherOrBoth::Left(()),
JoinType::RightSemi | JoinType::RightAnti | JoinType::RightOuter => {
EitherOrBoth::Right(())
}
Expand Down
Loading

0 comments on commit e82932f

Please sign in to comment.