From 7bac23939c3c3f27e1fdea51df70c6224538d6f2 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 23 Mar 2023 13:57:21 +0800 Subject: [PATCH] feat(meta): support scaling delta join (#8694) Signed-off-by: Bugen Zhao --- src/meta/src/barrier/command.rs | 2 +- src/meta/src/stream/scale.rs | 59 ++++----- .../src/table/batch_table/storage_table.rs | 7 + src/stream/src/executor/chain.rs | 6 +- src/stream/src/executor/lookup.rs | 4 + src/stream/src/executor/lookup/cache.rs | 5 + src/stream/src/executor/lookup/impl_.rs | 33 +++-- src/stream/src/executor/lookup/tests.rs | 6 +- src/stream/src/from_proto/lookup.rs | 1 + src/tests/simulation/src/ctl_ext.rs | 8 +- src/tests/simulation/src/risingwave.toml | 5 +- src/tests/simulation/tests/it/delta_join.rs | 123 ++++++++++++++++++ src/tests/simulation/tests/it/main.rs | 3 + 13 files changed, 209 insertions(+), 53 deletions(-) create mode 100644 src/tests/simulation/tests/it/delta_join.rs diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 57e4c1a55c219..badba46781735 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -443,7 +443,7 @@ where dropped_actors, actor_splits, }); - tracing::trace!("update mutation: {mutation:#?}"); + tracing::debug!("update mutation: {mutation:#?}"); Some(mutation) } }; diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 719dad39676a7..cf617fa09819d 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -432,10 +432,6 @@ where // treatment because the upstream and downstream of NoShuffle are always 1-1 // correspondence, so we need to clone the reschedule plan to the downstream of all // cascading relations. - // - // Delta join will introduce a `NoShuffle` edge between index chain node and lookup node - // (index_mv --NoShuffle--> index_chain --NoShuffle--> lookup) which will break current - // `NoShuffle` scaling assumption. Currently we detect this case and forbid it to scale. if no_shuffle_source_fragment_ids.contains(fragment_id) { let mut queue: VecDeque<_> = fragment_dispatcher_map .get(fragment_id) @@ -451,21 +447,12 @@ where if let Some(downstream_fragments) = fragment_dispatcher_map.get(&downstream_id) { - // If `NoShuffle` used by other fragment type rather than `ChainNode`, bail. - for downstream_fragment_id in downstream_fragments.keys() { - let downstream_fragment = fragment_map - .get(downstream_fragment_id) - .ok_or_else(|| anyhow!("fragment {fragment_id} does not exist"))?; - if (downstream_fragment.get_fragment_type_mask() - & (FragmentTypeFlag::ChainNode as u32 - | FragmentTypeFlag::Mview as u32)) - == 0 - { - bail!("Rescheduling NoShuffle edge only supports ChainNode and Mview. Other usage for e.g. delta join is forbidden currently."); - } - } + let no_shuffle_downstreams = downstream_fragments + .iter() + .filter(|(_, ty)| **ty == DispatcherType::NoShuffle) + .map(|(fragment_id, _)| fragment_id); - queue.extend(downstream_fragments.keys().cloned()); + queue.extend(no_shuffle_downstreams.copied()); } no_shuffle_reschedule.insert( @@ -743,7 +730,12 @@ where .unwrap(); if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(fragment_id) { - for downstream_fragment_id in downstream_fragments.keys() { + let no_shuffle_downstreams = downstream_fragments + .iter() + .filter(|(_, ty)| **ty == DispatcherType::NoShuffle) + .map(|(fragment_id, _)| fragment_id); + + for downstream_fragment_id in no_shuffle_downstreams { arrange_no_shuffle_relation( ctx, downstream_fragment_id, @@ -1014,20 +1006,19 @@ where } } - let downstream_fragment_ids = - if let Some(downstream_fragments) = ctx.fragment_dispatcher_map.get(&fragment_id) { - // Skip NoShuffle fragments' downstream - if ctx - .no_shuffle_source_fragment_ids - .contains(&fragment.fragment_id) - { - vec![] - } else { - downstream_fragments.keys().copied().collect_vec() - } - } else { - vec![] - }; + let downstream_fragment_ids = if let Some(downstream_fragments) = + ctx.fragment_dispatcher_map.get(&fragment_id) + { + // Skip fragments' no-shuffle downstream, as there's no need to update the merger + // (receiver) of a no-shuffle downstream + downstream_fragments + .iter() + .filter(|(_, dispatcher_type)| *dispatcher_type != &DispatcherType::NoShuffle) + .map(|(fragment_id, _)| *fragment_id) + .collect_vec() + } else { + vec![] + }; let vnode_bitmap_updates = match fragment.distribution_type() { FragmentDistributionType::Hash => { @@ -1123,7 +1114,7 @@ where let _source_pause_guard = self.source_manager.paused.lock().await; - tracing::trace!("reschedule plan: {:#?}", reschedule_fragment); + tracing::debug!("reschedule plan: {:#?}", reschedule_fragment); self.barrier_scheduler .run_command_with_paused(Command::RescheduleFragment(reschedule_fragment)) diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 56458b2be63cd..3c37728968b7c 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -379,6 +379,13 @@ impl StorageTableInner { Ok(None) } } + + /// Update the vnode bitmap of the storage table, returns the previous vnode bitmap. + #[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"] + pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> Arc { + assert_eq!(self.vnodes.len(), new_vnodes.len()); + std::mem::replace(&mut self.vnodes, new_vnodes) + } } pub trait PkAndRowStream = Stream, OwnedRow)>> + Send; diff --git a/src/stream/src/executor/chain.rs b/src/stream/src/executor/chain.rs index f4081492928ae..de98292eff448 100644 --- a/src/stream/src/executor/chain.rs +++ b/src/stream/src/executor/chain.rs @@ -76,7 +76,9 @@ impl ChainExecutor { // Otherwise, it means we've recovered and the snapshot is already consumed. let to_consume_snapshot = barrier.is_add_dispatcher(self.actor_id) && !self.upstream_only; - if self.upstream_only { + // If the barrier is a conf change of creating this mview, and the snapshot is not to be + // consumed, we can finish the progress immediately. + if barrier.is_add_dispatcher(self.actor_id) && self.upstream_only { self.progress.finish(barrier.epoch.curr); } @@ -100,7 +102,7 @@ impl ChainExecutor { #[for_await] for msg in upstream { let msg = msg?; - if let Message::Barrier(barrier) = &msg { + if to_consume_snapshot && let Message::Barrier(barrier) = &msg { self.progress.finish(barrier.epoch.curr); } yield msg; diff --git a/src/stream/src/executor/lookup.rs b/src/stream/src/executor/lookup.rs index db068fefc9b5b..94e9eeab76742 100644 --- a/src/stream/src/executor/lookup.rs +++ b/src/stream/src/executor/lookup.rs @@ -28,6 +28,8 @@ mod impl_; pub use impl_::LookupExecutorParams; +use super::ActorContextRef; + #[cfg(test)] mod tests; @@ -38,6 +40,8 @@ mod tests; /// The output schema is `| stream columns | arrangement columns |`. /// The input is required to be first stream and then arrangement. pub struct LookupExecutor { + ctx: ActorContextRef, + /// the data types of the produced data chunk inside lookup (before reordering) chunk_data_types: Vec, diff --git a/src/stream/src/executor/lookup/cache.rs b/src/stream/src/executor/lookup/cache.rs index 498cb164a9620..0922b0b7a3323 100644 --- a/src/stream/src/executor/lookup/cache.rs +++ b/src/stream/src/executor/lookup/cache.rs @@ -64,6 +64,11 @@ impl LookupCache { self.data.update_epoch(epoch); } + /// Clear the cache. + pub fn clear(&mut self) { + self.data.clear(); + } + pub fn new(watermark_epoch: AtomicU64Ref) -> Self { let cache = ExecutorCache::new(new_unbounded(watermark_epoch)); Self { data: cache } diff --git a/src/stream/src/executor/lookup/impl_.rs b/src/stream/src/executor/lookup/impl_.rs index 12b7610f1635a..375c307054724 100644 --- a/src/stream/src/executor/lookup/impl_.rs +++ b/src/stream/src/executor/lookup/impl_.rs @@ -28,16 +28,19 @@ use risingwave_storage::table::TableIter; use risingwave_storage::StateStore; use super::sides::{stream_lookup_arrange_prev_epoch, stream_lookup_arrange_this_epoch}; +use crate::cache::cache_may_stale; use crate::common::StreamChunkBuilder; use crate::executor::error::{StreamExecutorError, StreamExecutorResult}; use crate::executor::lookup::cache::LookupCache; use crate::executor::lookup::sides::{ArrangeJoinSide, ArrangeMessage, StreamJoinSide}; use crate::executor::lookup::LookupExecutor; -use crate::executor::{Barrier, Executor, Message, PkIndices}; +use crate::executor::{ActorContextRef, Barrier, Executor, Message, PkIndices}; use crate::task::AtomicU64Ref; /// Parameters for [`LookupExecutor`]. pub struct LookupExecutorParams { + pub ctx: ActorContextRef, + /// The side for arrangement. Currently, it should be a /// `MaterializeExecutor`. pub arrangement: Box, @@ -116,6 +119,7 @@ pub struct LookupExecutorParams { impl LookupExecutor { pub fn new(params: LookupExecutorParams) -> Self { let LookupExecutorParams { + ctx, arrangement, stream, arrangement_col_descs, @@ -202,6 +206,7 @@ impl LookupExecutor { ); Self { + ctx, chunk_data_types, schema: output_schema, pk_indices, @@ -273,10 +278,8 @@ impl LookupExecutor { self.lookup_cache.flush(); } - // Use the new stream barrier epoch as new cache epoch - self.lookup_cache.update_epoch(barrier.epoch.curr); + self.process_barrier(&barrier); - self.process_barrier(barrier.clone()).await?; if self.arrangement.use_current_epoch { // When lookup this epoch, stream side barrier always come after arrangement // ready, so we can forward barrier now. @@ -336,11 +339,23 @@ impl LookupExecutor { } } - /// Store the barrier. - #[expect(clippy::unused_async)] - async fn process_barrier(&mut self, barrier: Barrier) -> StreamExecutorResult<()> { - self.last_barrier = Some(barrier); - Ok(()) + /// Process the barrier and apply changes if necessary. + fn process_barrier(&mut self, barrier: &Barrier) { + if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) { + let previous_vnode_bitmap = self + .arrangement + .storage_table + .update_vnode_bitmap(vnode_bitmap.clone()); + + // Manipulate the cache if necessary. + if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) { + self.lookup_cache.clear(); + } + } + + // Use the new stream barrier epoch as new cache epoch + self.lookup_cache.update_epoch(barrier.epoch.curr); + self.last_barrier = Some(barrier.clone()); } /// Lookup all rows corresponding to a join key in shared buffer. diff --git a/src/stream/src/executor/lookup/tests.rs b/src/stream/src/executor/lookup/tests.rs index 66898dbd0d48a..7cbd975fcd50f 100644 --- a/src/stream/src/executor/lookup/tests.rs +++ b/src/stream/src/executor/lookup/tests.rs @@ -30,7 +30,7 @@ use crate::executor::lookup::impl_::LookupExecutorParams; use crate::executor::lookup::LookupExecutor; use crate::executor::test_utils::*; use crate::executor::{ - Barrier, BoxedMessageStream, Executor, MaterializeExecutor, Message, PkIndices, + ActorContext, Barrier, BoxedMessageStream, Executor, MaterializeExecutor, Message, PkIndices, }; fn arrangement_col_descs() -> Vec { @@ -218,6 +218,7 @@ async fn test_lookup_this_epoch() { let arrangement = create_arrangement(table_id, store.clone()).await; let stream = create_source(); let lookup_executor = Box::new(LookupExecutor::new(LookupExecutorParams { + ctx: ActorContext::create(0), arrangement, stream, arrangement_col_descs: arrangement_col_descs(), @@ -281,14 +282,13 @@ async fn test_lookup_this_epoch() { } #[tokio::test] -#[ignore] -// Deprecated because the ability to read from prev epoch has been deprecated. async fn test_lookup_last_epoch() { let store = MemoryStateStore::new(); let table_id = TableId::new(1); let arrangement = create_arrangement(table_id, store.clone()).await; let stream = create_source(); let lookup_executor = Box::new(LookupExecutor::new(LookupExecutorParams { + ctx: ActorContext::create(0), arrangement, stream, arrangement_col_descs: arrangement_col_descs(), diff --git a/src/stream/src/from_proto/lookup.rs b/src/stream/src/from_proto/lookup.rs index 3d0c8bb93923e..df574e5587214 100644 --- a/src/stream/src/from_proto/lookup.rs +++ b/src/stream/src/from_proto/lookup.rs @@ -125,6 +125,7 @@ impl ExecutorBuilder for LookupExecutorBuilder { ); Ok(Box::new(LookupExecutor::new(LookupExecutorParams { + ctx: params.actor_context, schema: params.schema, arrangement, stream, diff --git a/src/tests/simulation/src/ctl_ext.rs b/src/tests/simulation/src/ctl_ext.rs index 1fe319e7a779a..769a331f5f1b6 100644 --- a/src/tests/simulation/src/ctl_ext.rs +++ b/src/tests/simulation/src/ctl_ext.rs @@ -96,9 +96,11 @@ pub mod predicate { /// The fragment is able to be rescheduled. Used for locating random fragment. pub fn can_reschedule() -> BoxedPredicate { - // The rescheduling of `Chain` must be derived from the upstream `Materialize`, not - // specified by the user. - no_identity_contains("StreamTableScan") + // The rescheduling of no-shuffle downstreams must be derived from the upstream + // `Materialize`, not specified by the user. + let p = + |f: &PbFragment| no_identity_contains("Chain")(f) && no_identity_contains("Lookup")(f); + Box::new(p) } /// The fragment with the given id. diff --git a/src/tests/simulation/src/risingwave.toml b/src/tests/simulation/src/risingwave.toml index 72e6f7bcf2280..b88e8a0644f7c 100644 --- a/src/tests/simulation/src/risingwave.toml +++ b/src/tests/simulation/src/risingwave.toml @@ -4,4 +4,7 @@ [system] barrier_interval_ms = 250 -checkpoint_frequency = 4 \ No newline at end of file +checkpoint_frequency = 4 + +[server] +telemetry_enabled = false diff --git a/src/tests/simulation/tests/it/delta_join.rs b/src/tests/simulation/tests/it/delta_join.rs new file mode 100644 index 0000000000000..7f109e7db3126 --- /dev/null +++ b/src/tests/simulation/tests/it/delta_join.rs @@ -0,0 +1,123 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![cfg(madsim)] + +use anyhow::Result; +use itertools::Itertools; +use risingwave_simulation::cluster::{Cluster, Configuration}; +use risingwave_simulation::ctl_ext::predicate::identity_contains; +use risingwave_simulation::utils::AssertResult; + +#[madsim::test] +async fn test_delta_join() -> Result<()> { + let mut cluster = Cluster::start(Configuration::for_scale()).await?; + let mut session = cluster.start_session(); + + session.run("set rw_implicit_flush = true;").await?; + session + .run("set rw_streaming_enable_delta_join = true;") + .await?; + + session + .run("create table a (a1 int primary key, a2 int);") + .await?; + session + .run("create table b (b1 int primary key, b2 int);") + .await?; + let [t1, t2]: [_; 2] = cluster + .locate_fragments([identity_contains("materialize")]) + .await? + .try_into() + .unwrap(); + + session + .run("create materialized view v as select * from a join b on a.a1 = b.b1;") + .await?; + let lookup_fragments = cluster + .locate_fragments([identity_contains("lookup")]) + .await?; + assert_eq!(lookup_fragments.len(), 2, "failed to plan delta join"); + let union_fragment = cluster + .locate_one_fragment([identity_contains("union")]) + .await?; + + let mut test_times = 0; + macro_rules! test_works { + () => { + let keys = || (0..100).map(|i| test_times * 100 + i); + + for key in keys() { + session + .run(format!("insert into a values ({key}, 233)")) + .await?; + session + .run(format!("insert into b values ({key}, 666)")) + .await?; + } + session.run("flush").await?; + + let result = keys() + .rev() + .map(|key| format!("{key} 233 {key} 666")) + .join("\n"); + + session + .run("select * from v order by a1 desc limit 100;") + .await? + .assert_result_eq(result); + + #[allow(unused_assignments)] + test_times += 1; + }; + } + + test_works!(); + + // Scale-in one side + cluster.reschedule(format!("{}-[0]", t1.id())).await?; + test_works!(); + + // Scale-in both sides together + cluster + .reschedule(format!("{}-[2];{}-[0,2]", t1.id(), t2.id())) + .await?; + test_works!(); + + // Scale-out one side + cluster.reschedule(format!("{}+[0]", t2.id())).await?; + test_works!(); + + // Scale-out both sides together + cluster + .reschedule(format!("{}+[0,2];{}+[2]", t1.id(), t2.id())) + .await?; + test_works!(); + + // Scale-in join with union + cluster + .reschedule(format!("{}-[5];{}-[5]", t1.id(), union_fragment.id())) + .await?; + test_works!(); + + let result = cluster + .reschedule(format!("{}-[0]", lookup_fragments[0].id())) + .await; + assert!( + result.is_err(), + "directly scale-in lookup (downstream) should fail" + ); + + Ok(()) +} diff --git a/src/tests/simulation/tests/it/main.rs b/src/tests/simulation/tests/it/main.rs index 6fed15c83f22c..ee2f7f003b535 100644 --- a/src/tests/simulation/tests/it/main.rs +++ b/src/tests/simulation/tests/it/main.rs @@ -17,7 +17,10 @@ //! See [this post](https://matklad.github.io/2021/02/27/delete-cargo-integration-tests.html) //! for the rationale behind this approach. +#![feature(stmt_expr_attributes)] + mod cascade_materialized_view; +mod delta_join; mod dynamic_filter; mod hello; mod nexmark_chaos;