From 387caf81ad11403f92831f16a5f52c1bcd2c7970 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 16 Mar 2023 17:54:44 +0800 Subject: [PATCH 1/3] remove it from state table --- src/storage/src/table/mod.rs | 12 ++++++--- src/stream/src/common/table/state_table.rs | 30 ++++++++++++++-------- src/stream/src/executor/sort.rs | 2 +- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index 5a7ac8627104c..02c6e66384652 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -16,6 +16,7 @@ pub mod batch_table; use std::sync::{Arc, LazyLock}; +use itertools::Itertools; use risingwave_common::array::DataChunk; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::Schema; @@ -124,14 +125,19 @@ pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> Virtu /// Get vnode values with `indices` on the given `chunk`. pub fn compute_chunk_vnode( chunk: &DataChunk, - indices: &[usize], + dist_key_in_pk_indices: &[usize], + pk_indices: &[usize], vnodes: &Bitmap, ) -> Vec { - if indices.is_empty() { + if dist_key_in_pk_indices.is_empty() { vec![DEFAULT_VNODE; chunk.capacity()] } else { + let dist_key_indices = dist_key_in_pk_indices + .iter() + .map(|idx| pk_indices[*idx]) + .collect_vec(); chunk - .get_hash_values(indices, Crc32FastBuilder) + .get_hash_values(&dist_key_indices, Crc32FastBuilder) .into_iter() .zip_eq_fast(chunk.vis().iter()) .map(|(h, vis)| { diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index d61b831f57099..b0320e4933a41 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -84,7 +84,7 @@ pub struct StateTableInner< /// Indices of distribution key for computing vnode. /// Note that the index is based on the all columns of the table, instead of the output ones. // FIXME: revisit constructions and usages. - dist_key_indices: Vec, + // dist_key_indices: Vec, /// Indices of distribution key for computing vnode. /// Note that the index is based on the primary key columns by `pk_indices`. @@ -251,7 +251,6 @@ where pk_serde, row_serde, pk_indices: pk_indices.to_vec(), - dist_key_indices, dist_key_in_pk_indices, prefix_hint_len, vnodes, @@ -452,7 +451,6 @@ where pk_serde, row_serde: SD::new(&column_ids, Arc::from(data_types.into_boxed_slice())), pk_indices, - dist_key_indices, dist_key_in_pk_indices, prefix_hint_len, vnodes, @@ -475,7 +473,7 @@ where if self.vnode_col_idx_in_pk.is_some() { false } else { - self.dist_key_indices.is_empty() + self.dist_key_in_pk_indices.is_empty() } } @@ -503,8 +501,13 @@ where } /// Get the vnode value of the given row - pub fn compute_vnode(&self, row: impl Row) -> VirtualNode { - compute_vnode(row, &self.dist_key_indices, &self.vnodes) + // pub fn compute_vnode(&self, row: impl Row) -> VirtualNode { + // compute_vnode(row, &self.dist_key_indices, &self.vnodes) + // } + + /// Get the vnode value of the given row + pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode { + compute_vnode(pk, &self.dist_key_in_pk_indices, &self.vnodes) } // TODO: remove, should not be exposed to user @@ -516,9 +519,9 @@ where &self.pk_serde } - pub fn dist_key_indices(&self) -> &[usize] { - &self.dist_key_indices - } + // pub fn dist_key_indices(&self) -> &[usize] { + // &self.dist_key_indices + // } pub fn vnodes(&self) -> &Arc { &self.vnodes @@ -724,7 +727,12 @@ where pub fn write_chunk(&mut self, chunk: StreamChunk) { let (chunk, op) = chunk.into_parts(); - let vnodes = compute_chunk_vnode(&chunk, &self.dist_key_indices, &self.vnodes); + let vnodes = compute_chunk_vnode( + &chunk, + &self.dist_key_in_pk_indices, + &self.pk_indices, + &self.vnodes, + ); let value_chunk = if let Some(ref value_indices) = self.value_indices { chunk.clone().reorder_columns(value_indices) @@ -984,7 +992,7 @@ where trace!( table_id = %self.table_id(), ?prefix_hint, ?encoded_key_range_with_vnode, ?pk_prefix, - dist_key_indices = ?self.dist_key_indices, ?pk_prefix_indices, + ?pk_prefix_indices, "storage_iter_with_prefix" ); diff --git a/src/stream/src/executor/sort.rs b/src/stream/src/executor/sort.rs index 67926152e5771..cc6b606443671 100644 --- a/src/stream/src/executor/sort.rs +++ b/src/stream/src/executor/sort.rs @@ -247,7 +247,7 @@ impl SortExecutor { let no_longer_owned_vnodes = Bitmap::bit_saturate_subtract(prev_vnode_bitmap, curr_vnode_bitmap); self.buffer.retain(|(_, pk), _| { - let vnode = self.state_table.compute_vnode(pk); + let vnode = self.state_table.compute_vnode_by_pk(pk); !no_longer_owned_vnodes.is_set(vnode.to_index()) }); } From 9afa1fccdc7c745225048401d72ff227dc694289 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 16 Mar 2023 18:00:33 +0800 Subject: [PATCH 2/3] make clippy happy --- src/storage/src/table/mod.rs | 11 +++++++++++ src/stream/src/common/table/state_table.rs | 13 ++++--------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index 02c6e66384652..53baaaf2dc78b 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -54,6 +54,17 @@ impl Distribution { } } + pub fn fallback_vnodes() -> Arc { + /// A bitmap that only the default vnode is set. + static FALLBACK_VNODES: LazyLock> = LazyLock::new(|| { + let mut vnodes = BitmapBuilder::zeroed(VirtualNode::COUNT); + vnodes.set(DEFAULT_VNODE.to_index(), true); + vnodes.finish().into() + }); + + FALLBACK_VNODES.clone() + } + /// Distribution that accesses all vnodes, mainly used for tests. pub fn all_vnodes(dist_key_indices: Vec) -> Self { /// A bitmap that all vnodes are set. diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index b0320e4933a41..ece7293f6ef28 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -198,15 +198,10 @@ where .collect(); let pk_serde = OrderedRowSerde::new(pk_data_types, order_types); - let Distribution { - dist_key_indices, - vnodes, - } = match vnodes { - Some(vnodes) => Distribution { - dist_key_indices, - vnodes, - }, - None => Distribution::fallback(), + let vnodes = match vnodes { + Some(vnodes) => vnodes, + + None => Distribution::fallback_vnodes(), }; let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| { let vnode_col_idx = *idx as usize; From 5cc47b395bb01f5b1f8d611f74e54d3b9e1a1d41 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 16 Mar 2023 18:05:58 +0800 Subject: [PATCH 3/3] remove it from storage table --- src/storage/src/table/batch_table/storage_table.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 7d3c95148818c..4d5dbec793e9f 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -89,11 +89,6 @@ pub struct StorageTableInner { // FIXME: revisit constructions and usages. pk_indices: Vec, - /// Indices of distribution key for computing vnode. - /// Note that the index is based on the all columns of the table, instead of the output ones. - // FIXME: revisit constructions and usages. - dist_key_indices: Vec, - /// Indices of distribution key for computing vnode. /// Note that the index is based on the primary key columns by `pk_indices`. dist_key_in_pk_indices: Vec, @@ -266,7 +261,6 @@ impl StorageTableInner { mapping: Arc::new(mapping), row_serde: Arc::new(row_serde), pk_indices, - dist_key_indices, dist_key_in_pk_indices, vnodes, table_option, @@ -592,23 +586,21 @@ impl StorageTableInner { Some(Bytes::from(encoded_prefix[..prefix_len].to_vec())) } else { trace!( - "iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?} dist_key_indices {:?} pk_prefix_indices {:?}", + "iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?} pk_prefix_indices {:?}", self.table_id, pk_prefix, - self.dist_key_indices, pk_prefix_indices ); None }; trace!( - "iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?} dist_key_indices {:?} pk_prefix_indices {:?}" , + "iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?} pk_prefix_indices {:?}" , self.table_id, prefix_hint, start_key, end_key, pk_prefix, - self.dist_key_indices, pk_prefix_indices );