Skip to content

Commit

Permalink
refactor: introduce a newtype for vnode (#6513)
Browse files Browse the repository at this point in the history
* set vnode to 2048

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* step one

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* constants

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* add docs

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* move to vnode dir

* fix compiling

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* fix compiling

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* extract dispatch data

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* fix unit test

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* add debug assertions

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* refactor dynamic filter

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
BugenZhao and mergify[bot] authored Nov 23, 2022
1 parent bfa8334 commit 75f4caf
Show file tree
Hide file tree
Showing 34 changed files with 304 additions and 211 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ use itertools::Itertools;
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::error::{internal_error, Result};
use risingwave_common::hash::{HashKey, HashKeyDispatcher};
use risingwave_common::hash::{
HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode, VnodeMapping,
};
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, Datum, ParallelUnitId, VirtualNode, VnodeMapping};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::scan_range::ScanRange;
use risingwave_common::util::worker_util::get_pu_to_worker_mapping;
Expand Down Expand Up @@ -102,7 +104,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {

list.iter().for_each(|(scan_range, vnode)| {
scan_ranges.push(scan_range.to_protobuf());
vnode_bitmap.set(*vnode as usize, true);
vnode_bitmap.set(vnode.to_index(), true);
});

let row_seq_scan_node = NodeBody::RowSeqScan(RowSeqScanNode {
Expand Down Expand Up @@ -188,7 +190,7 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
}

let vnode = self.get_virtual_node(&scan_range)?;
let parallel_unit_id = self.vnode_mapping[vnode as usize];
let parallel_unit_id = self.vnode_mapping[vnode.to_index()];

let list = self
.pu_to_scan_range_mapping
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/task/consistent_hash_shuffle_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn generate_hash_values(
hasher_builder,
)
.iter_mut()
.map(|hash_value| consistent_hash_info.vmap[hash_value.to_vnode() as usize] as usize)
.map(|hash_value| consistent_hash_info.vmap[hash_value.to_vnode().to_index()] as usize)
.collect::<Vec<_>>();
Ok(hash_values)
}
Expand Down
1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ serde_derive = "1"
serde_json = "1"
smallvec = "1"
spin = "0.9"
static_assertions = "1"
strum = "0.24"
strum_macros = "0.24"
sysinfo = "0.26"
Expand Down
11 changes: 6 additions & 5 deletions src/common/src/hash/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,17 @@ use crate::array::{
StructRef,
};
use crate::collection::estimate_size::EstimateSize;
use crate::hash::vnode::VirtualNode;
use crate::row::Row;
use crate::types::{
DataType, Decimal, IntervalUnit, NaiveDateTimeWrapper, NaiveDateWrapper, NaiveTimeWrapper,
OrderedF32, OrderedF64, ScalarRef, ToOwnedDatum, VirtualNode, VIRTUAL_NODE_COUNT,
OrderedF32, OrderedF64, ScalarRef, ToOwnedDatum,
};
use crate::util::hash_util::Crc32FastBuilder;
use crate::util::value_encoding::{deserialize_datum, serialize_datum};

/// A wrapper for u64 hash result.
#[derive(Default, Clone, Debug, PartialEq)]
#[derive(Default, Clone, Copy, Debug, PartialEq)]
pub struct HashCode(pub u64);

impl From<u64> for HashCode {
Expand All @@ -55,12 +56,12 @@ impl From<u64> for HashCode {
}

impl HashCode {
pub fn hash_code(&self) -> u64 {
pub fn hash_code(self) -> u64 {
self.0
}

pub fn to_vnode(&self) -> VirtualNode {
(self.0 % VIRTUAL_NODE_COUNT as u64) as VirtualNode
pub fn to_vnode(self) -> VirtualNode {
VirtualNode::from(self)
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/common/src/hash/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ mod key;
pub use key::*;
mod dispatcher;
pub use dispatcher::HashKeyDispatcher;
mod vnode;
pub use vnode::*;
98 changes: 98 additions & 0 deletions src/common/src/hash/vnode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use parse_display::Display;

use crate::hash::HashCode;

/// Parallel unit is the minimal scheduling unit.
// TODO: make it a newtype
pub type ParallelUnitId = u32;
pub type VnodeMapping = Vec<ParallelUnitId>;

/// `VirtualNode` (a.k.a. VNode) is a minimal partition that a set of keys belong to. It is used for
/// consistent hashing.
#[repr(transparent)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Display)]
pub struct VirtualNode(VirtualNodeInner);

type VirtualNodeInner = u8;
static_assertions::const_assert!(VirtualNodeInner::BITS >= VirtualNode::BITS as u32);

impl From<HashCode> for VirtualNode {
fn from(hash_code: HashCode) -> Self {
// Take the least significant bits of the hash code.
// TODO: should we use the most significant bits?
let inner = (hash_code.0 % Self::COUNT as u64) as VirtualNodeInner;
VirtualNode(inner)
}
}

impl VirtualNode {
/// The number of bits used to represent a virtual node.
pub const BITS: usize = 8;
/// The total count of virtual nodes.
pub const COUNT: usize = 1 << Self::BITS;
/// The size of a virtual node in bytes, in memory or serialized representation.
pub const SIZE: usize = std::mem::size_of::<Self>();
}

/// An iterator over all virtual nodes.
pub type AllVirtualNodeIter = impl Iterator<Item = VirtualNode>;

impl VirtualNode {
/// The maximum value of the virtual node.
pub const MAX: VirtualNode = VirtualNode::from_index(Self::COUNT - 1);
/// The minimum (zero) value of the virtual node.
pub const ZERO: VirtualNode = VirtualNode::from_index(0);

/// Creates a virtual node from the `usize` index.
pub const fn from_index(index: usize) -> Self {
debug_assert!(index < Self::COUNT);
Self(index as _)
}

/// Returns the `usize` the virtual node used for indexing.
pub const fn to_index(self) -> usize {
self.0 as _
}

/// Creates a virtual node from the given scalar representation. Used by `VNODE` expression.
pub const fn from_scalar(scalar: i16) -> Self {
debug_assert!((scalar as usize) < Self::COUNT);
Self(scalar as _)
}

/// Returns the scalar representation of the virtual node. Used by `VNODE` expression.
pub const fn to_scalar(self) -> i16 {
self.0 as _
}

/// Creates a virtual node from the given big-endian bytes representation.
pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self {
let inner = VirtualNodeInner::from_be_bytes(bytes);
debug_assert!((inner as usize) < Self::COUNT);
Self(inner)
}

/// Returns the big-endian bytes representation of the virtual node.
pub const fn to_be_bytes(self) -> [u8; Self::SIZE] {
self.0.to_be_bytes()
}

/// Iterates over all virtual nodes.
pub fn all() -> AllVirtualNodeIter {
(0..Self::COUNT).map(Self::from_index)
}
}
11 changes: 0 additions & 11 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,6 @@ use crate::array::{
StructValue,
};

/// Parallel unit is the minimal scheduling unit.
pub type ParallelUnitId = u32;
pub type VnodeMapping = Vec<ParallelUnitId>;

/// `VirtualNode` (a.k.a. VNode) is a minimal partition that a set of keys belong to. It is used for
/// consistent hashing.
pub type VirtualNode = u8;
pub const VIRTUAL_NODE_SIZE: usize = std::mem::size_of::<VirtualNode>();
pub const VNODE_BITS: usize = 8;
pub const VIRTUAL_NODE_COUNT: usize = 1 << VNODE_BITS;

pub type OrderedF32 = ordered_float::OrderedFloat<f32>;
pub type OrderedF64 = ordered_float::OrderedFloat<f64>;

Expand Down
3 changes: 2 additions & 1 deletion src/common/src/util/scan_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use risingwave_pb::batch_plan::scan_range::Bound as BoundProst;
use risingwave_pb::batch_plan::ScanRange as ScanRangeProst;

use super::value_encoding::serialize_datum_to_bytes;
use crate::hash::VirtualNode;
use crate::row::{Row2, RowExt};
use crate::types::{Datum, ScalarImpl, VirtualNode};
use crate::types::{Datum, ScalarImpl};
use crate::util::hash_util::Crc32FastBuilder;
use crate::util::value_encoding::serialize_datum;

Expand Down
2 changes: 1 addition & 1 deletion src/common/src/util/worker_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::HashMap;

use risingwave_pb::common::WorkerNode;

use crate::types::ParallelUnitId;
use crate::hash::ParallelUnitId;

pub fn get_pu_to_worker_mapping(nodes: &[WorkerNode]) -> HashMap<ParallelUnitId, WorkerNode> {
let mut pu_to_worker = HashMap::new();
Expand Down
13 changes: 8 additions & 5 deletions src/expr/src/expr/expr_vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,26 @@ impl Expression for VnodeExpression {
let mut builder = I16ArrayBuilder::new(input.capacity());
hash_values
.into_iter()
.for_each(|h| builder.append(Some(h.to_vnode() as i16)));
.for_each(|h| builder.append(Some(h.to_vnode().to_scalar())));
Ok(Arc::new(ArrayImpl::from(builder.finish())))
}

fn eval_row(&self, input: &Row) -> Result<Datum> {
let dist_key_row = input.by_indices(&self.dist_key_indices);
// FIXME: currently the implementation of the hash function in Row::hash_row differs from
// Array::hash_at, so their result might be different. #3457
let vnode = dist_key_row.hash_row(&Crc32FastBuilder {}).to_vnode() as i16;
let vnode = dist_key_row
.hash_row(&Crc32FastBuilder {})
.to_vnode()
.to_scalar();
Ok(Some(vnode.into()))
}
}

#[cfg(test)]
mod tests {
use risingwave_common::array::{DataChunk, DataChunkTestExt};
use risingwave_common::types::VIRTUAL_NODE_COUNT;
use risingwave_common::hash::VirtualNode;
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::DataType as ProstDataType;
use risingwave_pb::expr::expr_node::RexNode;
Expand Down Expand Up @@ -131,7 +134,7 @@ mod tests {
actual.iter().for_each(|vnode| {
let vnode = vnode.unwrap().into_int16();
assert!(vnode >= 0);
assert!((vnode as usize) < VIRTUAL_NODE_COUNT);
assert!((vnode as usize) < VirtualNode::COUNT);
});
}

Expand All @@ -157,7 +160,7 @@ mod tests {
let actual = vnode_expr.eval_row(&row).unwrap();
let vnode = actual.unwrap().into_int16();
assert!(vnode >= 0);
assert!((vnode as usize) < VIRTUAL_NODE_COUNT);
assert!((vnode as usize) < VirtualNode::COUNT);
}
}
}
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/property/distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::catalog::{FieldDisplay, Schema, TableId};
use risingwave_common::error::Result;
use risingwave_common::types::{ParallelUnitId, VnodeMapping};
use risingwave_common::hash::{ParallelUnitId, VnodeMapping};
use risingwave_pb::batch_plan::exchange_info::{
ConsistentHashInfo, Distribution as DistributionProst, DistributionMode, HashInfo,
};
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use rand::seq::SliceRandom;
use risingwave_batch::executor::ExecutorBuilder;
use risingwave_batch::task::TaskId as TaskIdBatch;
use risingwave_common::array::DataChunk;
use risingwave_common::types::VnodeMapping;
use risingwave_common::hash::VnodeMapping;
use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::select_all;
use risingwave_connector::source::SplitMetaData;
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use itertools::Itertools;
use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::TableDesc;
use risingwave_common::error::RwError;
use risingwave_common::types::{ParallelUnitId, VnodeMapping, VIRTUAL_NODE_COUNT};
use risingwave_common::hash::{ParallelUnitId, VirtualNode, VnodeMapping};
use risingwave_common::util::scan_range::ScanRange;
use risingwave_connector::source::{ConnectorProperties, SplitEnumeratorImpl, SplitImpl};
use risingwave_pb::batch_plan::plan_node::NodeBody;
Expand Down Expand Up @@ -492,7 +492,7 @@ impl BatchPlanFragmenter {
.take(1)
.update(|(_, info)| {
info.vnode_bitmap =
Bitmap::all_high_bits(VIRTUAL_NODE_COUNT).to_protobuf();
Bitmap::all_high_bits(VirtualNode::COUNT).to_protobuf();
})
.collect();
}
Expand Down Expand Up @@ -758,11 +758,11 @@ fn derive_partitions(
}
// scan a single partition
Some(vnode) => {
let parallel_unit_id = vnode_mapping[vnode as usize];
let parallel_unit_id = vnode_mapping[vnode.to_index()];
let (bitmap, scan_ranges) = partitions
.entry(parallel_unit_id)
.or_insert_with(|| (BitmapBuilder::zeroed(num_vnodes), vec![]));
bitmap.set(vnode as usize, true);
bitmap.set(vnode.to_index(), true);
scan_ranges.push(scan_range.to_protobuf());
}
}
Expand All @@ -786,7 +786,7 @@ fn derive_partitions(
mod tests {
use std::collections::{HashMap, HashSet};

use risingwave_common::types::ParallelUnitId;
use risingwave_common::hash::ParallelUnitId;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::ParallelUnit;

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::{Arc, RwLock};

use rand::seq::SliceRandom;
use risingwave_common::bail;
use risingwave_common::types::{ParallelUnitId, VnodeMapping};
use risingwave_common::hash::{ParallelUnitId, VnodeMapping};
use risingwave_common::util::worker_util::get_pu_to_worker_mapping;
use risingwave_pb::common::WorkerNode;

Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Context};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::types::ParallelUnitId;
use risingwave_common::hash::ParallelUnitId;
use risingwave_common::{bail, try_match_expand};
use risingwave_connector::source::SplitImpl;
use risingwave_pb::common::{Buffer, ParallelUnit, ParallelUnitMapping, WorkerNode};
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};

use itertools::Itertools;
use risingwave_common::types::ParallelUnitId;
use risingwave_common::hash::ParallelUnitId;
use risingwave_pb::common::worker_node::State;
use risingwave_pb::common::{HostAddress, ParallelUnit, WorkerNode, WorkerType};
use risingwave_pb::meta::heartbeat_request;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::types::ParallelUnitId;
use risingwave_common::hash::ParallelUnitId;
use risingwave_common::util::is_stream_source;
use risingwave_connector::source::SplitImpl;
use risingwave_pb::common::{Buffer, ParallelUnit, ParallelUnitMapping};
Expand Down
Loading

0 comments on commit 75f4caf

Please sign in to comment.