Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: introduce a newtype for vnode #6513

Merged
merged 13 commits into from
Nov 23, 2022
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ use itertools::Itertools;
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::catalog::{ColumnDesc, Field, Schema};
use risingwave_common::error::{internal_error, Result};
use risingwave_common::hash::{HashKey, HashKeyDispatcher};
use risingwave_common::hash::{
HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode, VnodeMapping,
};
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, Datum, ParallelUnitId, VirtualNode, VnodeMapping};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::scan_range::ScanRange;
use risingwave_common::util::worker_util::get_pu_to_worker_mapping;
Expand Down Expand Up @@ -102,7 +104,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {

list.iter().for_each(|(scan_range, vnode)| {
scan_ranges.push(scan_range.to_protobuf());
vnode_bitmap.set(*vnode as usize, true);
vnode_bitmap.set(vnode.to_index(), true);
});

let row_seq_scan_node = NodeBody::RowSeqScan(RowSeqScanNode {
Expand Down Expand Up @@ -188,7 +190,7 @@ impl<C: BatchTaskContext> LookupExecutorBuilder for InnerSideExecutorBuilder<C>
}

let vnode = self.get_virtual_node(&scan_range)?;
let parallel_unit_id = self.vnode_mapping[vnode as usize];
let parallel_unit_id = self.vnode_mapping[vnode.to_index()];

let list = self
.pu_to_scan_range_mapping
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/task/consistent_hash_shuffle_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn generate_hash_values(
hasher_builder,
)
.iter_mut()
.map(|hash_value| consistent_hash_info.vmap[hash_value.to_vnode() as usize] as usize)
.map(|hash_value| consistent_hash_info.vmap[hash_value.to_vnode().to_index()] as usize)
.collect::<Vec<_>>();
Ok(hash_values)
}
Expand Down
1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ serde_derive = "1"
serde_json = "1"
smallvec = "1"
spin = "0.9"
static_assertions = "1"
strum = "0.24"
strum_macros = "0.24"
sysinfo = "0.26"
Expand Down
11 changes: 6 additions & 5 deletions src/common/src/hash/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,17 @@ use crate::array::{
StructRef,
};
use crate::collection::estimate_size::EstimateSize;
use crate::hash::vnode::VirtualNode;
use crate::row::Row;
use crate::types::{
DataType, Decimal, IntervalUnit, NaiveDateTimeWrapper, NaiveDateWrapper, NaiveTimeWrapper,
OrderedF32, OrderedF64, ScalarRef, ToOwnedDatum, VirtualNode, VIRTUAL_NODE_COUNT,
OrderedF32, OrderedF64, ScalarRef, ToOwnedDatum,
};
use crate::util::hash_util::Crc32FastBuilder;
use crate::util::value_encoding::{deserialize_datum, serialize_datum};

/// A wrapper for u64 hash result.
#[derive(Default, Clone, Debug, PartialEq)]
#[derive(Default, Clone, Copy, Debug, PartialEq)]
pub struct HashCode(pub u64);

impl From<u64> for HashCode {
Expand All @@ -55,12 +56,12 @@ impl From<u64> for HashCode {
}

impl HashCode {
pub fn hash_code(&self) -> u64 {
pub fn hash_code(self) -> u64 {
self.0
}

pub fn to_vnode(&self) -> VirtualNode {
(self.0 % VIRTUAL_NODE_COUNT as u64) as VirtualNode
pub fn to_vnode(self) -> VirtualNode {
VirtualNode::from(self)
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/common/src/hash/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ mod key;
pub use key::*;
mod dispatcher;
pub use dispatcher::HashKeyDispatcher;
mod vnode;
pub use vnode::*;
98 changes: 98 additions & 0 deletions src/common/src/hash/vnode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use parse_display::Display;

use crate::hash::HashCode;

/// Parallel unit is the minimal scheduling unit.
// TODO: make it a newtype
pub type ParallelUnitId = u32;
pub type VnodeMapping = Vec<ParallelUnitId>;

/// `VirtualNode` (a.k.a. VNode) is a minimal partition that a set of keys belong to. It is used for
/// consistent hashing.
#[repr(transparent)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Display)]
pub struct VirtualNode(VirtualNodeInner);

type VirtualNodeInner = u8;
static_assertions::const_assert!(VirtualNodeInner::BITS >= VirtualNode::BITS as u32);

impl From<HashCode> for VirtualNode {
fn from(hash_code: HashCode) -> Self {
// Take the least significant bits of the hash code.
// TODO: should we use the most significant bits?
let inner = (hash_code.0 % Self::COUNT as u64) as VirtualNodeInner;
VirtualNode(inner)
}
}

impl VirtualNode {
/// The number of bits used to represent a virtual node.
pub const BITS: usize = 8;
/// The total count of virtual nodes.
pub const COUNT: usize = 1 << Self::BITS;
/// The size of a virtual node in bytes, in memory or serialized representation.
pub const SIZE: usize = std::mem::size_of::<Self>();
}

/// An iterator over all virtual nodes.
pub type AllVirtualNodeIter = impl Iterator<Item = VirtualNode>;

impl VirtualNode {
/// The maximum value of the virtual node.
pub const MAX: VirtualNode = VirtualNode::from_index(Self::COUNT - 1);
/// The minimum (zero) value of the virtual node.
pub const ZERO: VirtualNode = VirtualNode::from_index(0);

/// Creates a virtual node from the `usize` index.
pub const fn from_index(index: usize) -> Self {
debug_assert!(index < Self::COUNT);
Self(index as _)
}

/// Returns the `usize` the virtual node used for indexing.
pub const fn to_index(self) -> usize {
self.0 as _
}

/// Creates a virtual node from the given scalar representation. Used by `VNODE` expression.
pub const fn from_scalar(scalar: i16) -> Self {
debug_assert!((scalar as usize) < Self::COUNT);
Self(scalar as _)
}

/// Returns the scalar representation of the virtual node. Used by `VNODE` expression.
pub const fn to_scalar(self) -> i16 {
self.0 as _
}

/// Creates a virtual node from the given big-endian bytes representation.
pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self {
let inner = VirtualNodeInner::from_be_bytes(bytes);
debug_assert!((inner as usize) < Self::COUNT);
Self(inner)
}

/// Returns the big-endian bytes representation of the virtual node.
pub const fn to_be_bytes(self) -> [u8; Self::SIZE] {
self.0.to_be_bytes()
}

/// Iterates over all virtual nodes.
pub fn all() -> AllVirtualNodeIter {
(0..Self::COUNT).map(Self::from_index)
}
}
11 changes: 0 additions & 11 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,6 @@ use crate::array::{
StructValue,
};

/// Parallel unit is the minimal scheduling unit.
pub type ParallelUnitId = u32;
pub type VnodeMapping = Vec<ParallelUnitId>;

/// `VirtualNode` (a.k.a. VNode) is a minimal partition that a set of keys belong to. It is used for
/// consistent hashing.
pub type VirtualNode = u8;
pub const VIRTUAL_NODE_SIZE: usize = std::mem::size_of::<VirtualNode>();
pub const VNODE_BITS: usize = 8;
pub const VIRTUAL_NODE_COUNT: usize = 1 << VNODE_BITS;

pub type OrderedF32 = ordered_float::OrderedFloat<f32>;
pub type OrderedF64 = ordered_float::OrderedFloat<f64>;

Expand Down
3 changes: 2 additions & 1 deletion src/common/src/util/scan_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use risingwave_pb::batch_plan::scan_range::Bound as BoundProst;
use risingwave_pb::batch_plan::ScanRange as ScanRangeProst;

use super::value_encoding::serialize_datum_to_bytes;
use crate::hash::VirtualNode;
use crate::row::{Row2, RowExt};
use crate::types::{Datum, ScalarImpl, VirtualNode};
use crate::types::{Datum, ScalarImpl};
use crate::util::hash_util::Crc32FastBuilder;
use crate::util::value_encoding::serialize_datum;

Expand Down
2 changes: 1 addition & 1 deletion src/common/src/util/worker_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::HashMap;

use risingwave_pb::common::WorkerNode;

use crate::types::ParallelUnitId;
use crate::hash::ParallelUnitId;

pub fn get_pu_to_worker_mapping(nodes: &[WorkerNode]) -> HashMap<ParallelUnitId, WorkerNode> {
let mut pu_to_worker = HashMap::new();
Expand Down
13 changes: 8 additions & 5 deletions src/expr/src/expr/expr_vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,26 @@ impl Expression for VnodeExpression {
let mut builder = I16ArrayBuilder::new(input.capacity());
hash_values
.into_iter()
.for_each(|h| builder.append(Some(h.to_vnode() as i16)));
.for_each(|h| builder.append(Some(h.to_vnode().to_scalar())));
Ok(Arc::new(ArrayImpl::from(builder.finish())))
}

fn eval_row(&self, input: &Row) -> Result<Datum> {
let dist_key_row = input.by_indices(&self.dist_key_indices);
// FIXME: currently the implementation of the hash function in Row::hash_row differs from
// Array::hash_at, so their result might be different. #3457
let vnode = dist_key_row.hash_row(&Crc32FastBuilder {}).to_vnode() as i16;
let vnode = dist_key_row
.hash_row(&Crc32FastBuilder {})
.to_vnode()
.to_scalar();
Ok(Some(vnode.into()))
}
}

#[cfg(test)]
mod tests {
use risingwave_common::array::{DataChunk, DataChunkTestExt};
use risingwave_common::types::VIRTUAL_NODE_COUNT;
use risingwave_common::hash::VirtualNode;
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::DataType as ProstDataType;
use risingwave_pb::expr::expr_node::RexNode;
Expand Down Expand Up @@ -131,7 +134,7 @@ mod tests {
actual.iter().for_each(|vnode| {
let vnode = vnode.unwrap().into_int16();
assert!(vnode >= 0);
assert!((vnode as usize) < VIRTUAL_NODE_COUNT);
assert!((vnode as usize) < VirtualNode::COUNT);
});
}

Expand All @@ -157,7 +160,7 @@ mod tests {
let actual = vnode_expr.eval_row(&row).unwrap();
let vnode = actual.unwrap().into_int16();
assert!(vnode >= 0);
assert!((vnode as usize) < VIRTUAL_NODE_COUNT);
assert!((vnode as usize) < VirtualNode::COUNT);
}
}
}
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/property/distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::catalog::{FieldDisplay, Schema, TableId};
use risingwave_common::error::Result;
use risingwave_common::types::{ParallelUnitId, VnodeMapping};
use risingwave_common::hash::{ParallelUnitId, VnodeMapping};
use risingwave_pb::batch_plan::exchange_info::{
ConsistentHashInfo, Distribution as DistributionProst, DistributionMode, HashInfo,
};
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use rand::seq::SliceRandom;
use risingwave_batch::executor::ExecutorBuilder;
use risingwave_batch::task::TaskId as TaskIdBatch;
use risingwave_common::array::DataChunk;
use risingwave_common::types::VnodeMapping;
use risingwave_common::hash::VnodeMapping;
use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::select_all;
use risingwave_connector::source::SplitMetaData;
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use itertools::Itertools;
use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::TableDesc;
use risingwave_common::error::RwError;
use risingwave_common::types::{ParallelUnitId, VnodeMapping, VIRTUAL_NODE_COUNT};
use risingwave_common::hash::{ParallelUnitId, VirtualNode, VnodeMapping};
use risingwave_common::util::scan_range::ScanRange;
use risingwave_connector::source::{ConnectorProperties, SplitEnumeratorImpl, SplitImpl};
use risingwave_pb::batch_plan::plan_node::NodeBody;
Expand Down Expand Up @@ -492,7 +492,7 @@ impl BatchPlanFragmenter {
.take(1)
.update(|(_, info)| {
info.vnode_bitmap =
Bitmap::all_high_bits(VIRTUAL_NODE_COUNT).to_protobuf();
Bitmap::all_high_bits(VirtualNode::COUNT).to_protobuf();
})
.collect();
}
Expand Down Expand Up @@ -758,11 +758,11 @@ fn derive_partitions(
}
// scan a single partition
Some(vnode) => {
let parallel_unit_id = vnode_mapping[vnode as usize];
let parallel_unit_id = vnode_mapping[vnode.to_index()];
let (bitmap, scan_ranges) = partitions
.entry(parallel_unit_id)
.or_insert_with(|| (BitmapBuilder::zeroed(num_vnodes), vec![]));
bitmap.set(vnode as usize, true);
bitmap.set(vnode.to_index(), true);
scan_ranges.push(scan_range.to_protobuf());
}
}
Expand All @@ -786,7 +786,7 @@ fn derive_partitions(
mod tests {
use std::collections::{HashMap, HashSet};

use risingwave_common::types::ParallelUnitId;
use risingwave_common::hash::ParallelUnitId;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::ParallelUnit;

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::{Arc, RwLock};

use rand::seq::SliceRandom;
use risingwave_common::bail;
use risingwave_common::types::{ParallelUnitId, VnodeMapping};
use risingwave_common::hash::{ParallelUnitId, VnodeMapping};
use risingwave_common::util::worker_util::get_pu_to_worker_mapping;
use risingwave_pb::common::WorkerNode;

Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Context};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::types::ParallelUnitId;
use risingwave_common::hash::ParallelUnitId;
use risingwave_common::{bail, try_match_expand};
use risingwave_connector::source::SplitImpl;
use risingwave_pb::common::{Buffer, ParallelUnit, ParallelUnitMapping, WorkerNode};
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};

use itertools::Itertools;
use risingwave_common::types::ParallelUnitId;
use risingwave_common::hash::ParallelUnitId;
use risingwave_pb::common::worker_node::State;
use risingwave_pb::common::{HostAddress, ParallelUnit, WorkerNode, WorkerType};
use risingwave_pb::meta::heartbeat_request;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/model/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::types::ParallelUnitId;
use risingwave_common::hash::ParallelUnitId;
use risingwave_common::util::is_stream_source;
use risingwave_connector::source::SplitImpl;
use risingwave_pb::common::{Buffer, ParallelUnit, ParallelUnitMapping};
Expand Down
Loading