Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: configurable vnode count with env var #18161

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ pub struct MetaConfig {
pub do_not_config_object_storage_lifecycle: bool,

/// Count of partition in split group. Meta will assign this value to every new group when it splits from default-group by automatically.
/// Each partition contains aligned data of `VirtualNode::COUNT / partition_vnode_count` consecutive virtual-nodes of one state table.
/// Each partition contains aligned data of `VirtualNode::count() / partition_vnode_count` consecutive virtual-nodes of one state table.
#[serde(default = "default::meta::partition_vnode_count")]
pub partition_vnode_count: u32,

Expand Down Expand Up @@ -348,7 +348,7 @@ pub struct MetaConfig {

/// Count of partitions of tables in default group and materialized view group.
/// The meta node will decide according to some strategy whether to cut the boundaries of the file according to the vnode alignment.
/// Each partition contains aligned data of `VirtualNode::COUNT / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table.
/// Each partition contains aligned data of `VirtualNode::count() / hybrid_partition_vnode_count` consecutive virtual-nodes of one state table.
/// Set it zero to disable this feature.
#[serde(default = "default::meta::hybrid_partition_vnode_count")]
pub hybrid_partition_vnode_count: u32,
Expand Down Expand Up @@ -427,10 +427,10 @@ impl<'de> Deserialize<'de> for DefaultParallelism {
)))
}
}
Parallelism::Int(i) => Ok(DefaultParallelism::Default(if i > VirtualNode::COUNT {
Parallelism::Int(i) => Ok(DefaultParallelism::Default(if i > VirtualNode::count() {
Err(serde::de::Error::custom(format!(
"default parallelism should be not great than {}",
VirtualNode::COUNT
VirtualNode::count()
)))?
} else {
NonZeroUsize::new(i).ok_or_else(|| {
Expand Down
32 changes: 17 additions & 15 deletions src/common/src/hash/consistent_hash/mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
pub fn new_uniform(items: impl ExactSizeIterator<Item = T::Item>) -> Self {
// If the number of items is greater than the total vnode count, no vnode will be mapped to
// some items and the mapping will be invalid.
assert!(items.len() <= VirtualNode::COUNT);
assert!(items.len() <= VirtualNode::count());

let mut original_indices = Vec::with_capacity(items.len());
let mut data = Vec::with_capacity(items.len());

let hash_shard_size = VirtualNode::COUNT / items.len();
let mut one_more_count = VirtualNode::COUNT % items.len();
let hash_shard_size = VirtualNode::count() / items.len();
let mut one_more_count = VirtualNode::count() % items.len();
let mut init_bound = 0;

for item in items {
Expand Down Expand Up @@ -144,7 +144,7 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
Self::new_uniform(std::iter::once(item))
}

/// The length of the vnode in this mapping, typically [`VirtualNode::COUNT`].
/// The length of the vnode in this mapping, typically [`VirtualNode::count`].
pub fn len(&self) -> usize {
self.original_indices
.last()
Expand Down Expand Up @@ -209,7 +209,7 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
for (vnode, item) in self.iter_with_vnode() {
vnode_bitmaps
.entry(item)
.or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::COUNT))
.or_insert_with(|| BitmapBuilder::zeroed(VirtualNode::count()))
.set(vnode.to_index(), true);
}

Expand All @@ -222,10 +222,10 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
/// Create a vnode mapping from the given mapping from items to bitmaps, where each bitmap
/// represents the vnodes mapped to the item.
pub fn from_bitmaps(bitmaps: &HashMap<T::Item, Bitmap>) -> Self {
let mut items = vec![None; VirtualNode::COUNT];
let mut items = vec![None; VirtualNode::count()];

for (&item, bitmap) in bitmaps {
assert_eq!(bitmap.len(), VirtualNode::COUNT);
assert_eq!(bitmap.len(), VirtualNode::count());
for idx in bitmap.iter_ones() {
if let Some(prev) = items[idx].replace(item) {
panic!("mapping at index `{idx}` is set to both `{prev:?}` and `{item:?}`");
Expand All @@ -241,17 +241,17 @@ impl<T: VnodeMappingItem> VnodeMapping<T> {
Self::from_expanded(&items)
}

/// Create a vnode mapping from the expanded slice of items with length [`VirtualNode::COUNT`].
/// Create a vnode mapping from the expanded slice of items with length [`VirtualNode::count`].
pub fn from_expanded(items: &[T::Item]) -> Self {
assert_eq!(items.len(), VirtualNode::COUNT);
assert_eq!(items.len(), VirtualNode::count());
let (original_indices, data) = compress_data(items);
Self {
original_indices,
data,
}
}

/// Convert this vnode mapping to a expanded vector of items with length [`VirtualNode::COUNT`].
/// Convert this vnode mapping to a expanded vector of items with length [`VirtualNode::count`].
pub fn to_expanded(&self) -> ExpandedMapping<T> {
self.iter().collect()
}
Expand Down Expand Up @@ -403,18 +403,20 @@ mod tests {
type TestMapping = VnodeMapping<Test>;
type Test2Mapping = VnodeMapping<Test2>;

const COUNTS: &[usize] = &[1, 3, 12, 42, VirtualNode::COUNT];
fn counts() -> &[usize] {
&[1, 3, 12, 42, VirtualNode::count()]
}

fn uniforms() -> impl Iterator<Item = TestMapping> {
COUNTS
counts()
.iter()
.map(|&count| TestMapping::new_uniform(0..count as u32))
}

fn randoms() -> impl Iterator<Item = TestMapping> {
COUNTS.iter().map(|&count| {
counts().iter().map(|&count| {
let raw = repeat_with(|| rand::thread_rng().gen_range(0..count as u32))
.take(VirtualNode::COUNT)
.take(VirtualNode::count())
.collect_vec();
TestMapping::from_expanded(&raw)
})
Expand All @@ -427,7 +429,7 @@ mod tests {
#[test]
fn test_uniform() {
for vnode_mapping in uniforms() {
assert_eq!(vnode_mapping.len(), VirtualNode::COUNT);
assert_eq!(vnode_mapping.len(), VirtualNode::count());
let item_count = vnode_mapping.iter_unique().count();

let mut check: HashMap<u32, Vec<_>> = HashMap::new();
Expand Down
89 changes: 65 additions & 24 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::num::NonZero;
use std::sync::LazyLock;

use itertools::Itertools;
use parse_display::Display;

Expand All @@ -31,44 +34,82 @@ pub struct VirtualNode(VirtualNodeInner);

/// The internal representation of a virtual node id.
type VirtualNodeInner = u16;
static_assertions::const_assert!(VirtualNodeInner::BITS >= VirtualNode::BITS as u32);
// static_assertions::const_assert!(VirtualNodeInner::BITS >= VirtualNode::BITS as u32);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// static_assertions::const_assert!(VirtualNodeInner::BITS >= VirtualNode::BITS as u32);


impl From<Crc32HashCode> for VirtualNode {
fn from(hash_code: Crc32HashCode) -> Self {
// Take the least significant bits of the hash code.
// TODO: should we use the most significant bits?
let inner = (hash_code.value() % Self::COUNT as u64) as VirtualNodeInner;
let inner = (hash_code.value() % Self::count() as u64) as VirtualNodeInner;
VirtualNode(inner)
}
}

impl VirtualNode {
/// The number of bits used to represent a virtual node.
///
/// Note: Not all bits of the inner representation are used. One should rely on this constant
/// to determine the count of virtual nodes.
pub const BITS: usize = 8;
/// The total count of virtual nodes.
pub const COUNT: usize = 1 << Self::BITS;
/// We may use `VirtualNode` as a datum in a stream, or store it as a column.
/// Hence this reifies it as a RW datatype.
pub const RW_TYPE: DataType = DataType::Int16;
/// The size of a virtual node in bytes, in memory or serialized representation.
pub const SIZE: usize = std::mem::size_of::<Self>();
/// The minimum (zero) value of the virtual node.
pub const ZERO: VirtualNode = unsafe { VirtualNode::from_index_unchecked(0) };
}

impl VirtualNode {
/// The default count of virtual nodes.
const DEFAULT_COUNT: usize = 1 << 8;
/// The maximum count of virtual nodes, limited by the size of the inner type [`VirtualNodeInner`].
const MAX_COUNT: usize = 1 << VirtualNodeInner::BITS;

/// The total count of virtual nodes.
///
/// It can be customized by the environment variable `RW_VNODE_COUNT`, or defaults to [`Self::DEFAULT_COUNT`].
pub fn count() -> usize {
// Cache the value to avoid repeated env lookups and parsing.
static COUNT: LazyLock<usize> = LazyLock::new(|| {
if let Ok(count) = std::env::var("RW_VNODE_COUNT") {
Copy link
Member

@fuyufjh fuyufjh Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the RW_VNODE_COUNT is different after restart?

Considering the vnode is persisted in storage, this will certainly cause some problems, I think. If so, I would question whether it's a good idea to pass it via env var, perhaps making it a MV argument would be better? Or at least it should be a system parameter, and should always use the first-time initialized value, like state_store_url

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.

I had some discussion about these approaches in the original issue. Basically this PR is a quick hack if we want to unblock some users to utilize more resources for their clusters. However, if we find this niche and not that urgent, I agree that we may not merge this PR but directly implement the polished ideas, like per-job configuration or global immutable system parameter.

let count = count
.parse::<NonZero<usize>>()
.expect("`RW_VNODE_COUNT` must be a positive integer")
.get();
assert!(
count <= VirtualNode::MAX_COUNT,
"`RW_VNODE_COUNT` should not exceed maximum value {}",
VirtualNode::MAX_COUNT
);
// TODO(var-vnode): shall we enforce it to be a power of 2?
count
} else {
VirtualNode::DEFAULT_COUNT
}
});

*COUNT
}

/// The last virtual node in the range. It's derived from [`Self::count`].
pub fn max() -> VirtualNode {
VirtualNode::from_index(Self::count() - 1)
}
}

/// An iterator over all virtual nodes.
pub type AllVirtualNodeIter = std::iter::Map<std::ops::Range<usize>, fn(usize) -> VirtualNode>;

impl VirtualNode {
/// The maximum value of the virtual node.
pub const MAX: VirtualNode = VirtualNode::from_index(Self::COUNT - 1);
/// We may use `VirtualNode` as a datum in a stream, or store it as a column.
/// Hence this reifies it as a RW datatype.
pub const RW_TYPE: DataType = DataType::Int16;
/// The minimum (zero) value of the virtual node.
pub const ZERO: VirtualNode = VirtualNode::from_index(0);

/// Creates a virtual node from the `usize` index.
pub const fn from_index(index: usize) -> Self {
debug_assert!(index < Self::COUNT);
pub fn from_index(index: usize) -> Self {
debug_assert!(index < Self::count());
Self(index as _)
}

/// Creates a virtual node from the `usize` index without bounds checking.
///
/// # Safety
///
/// The caller must ensure that the index is within the range of virtual nodes,
/// i.e., less than [`Self::count`].
pub const unsafe fn from_index_unchecked(index: usize) -> Self {
Self(index as _)
}

Expand All @@ -78,8 +119,8 @@ impl VirtualNode {
}

/// Creates a virtual node from the given scalar representation. Used by `VNODE` expression.
pub const fn from_scalar(scalar: i16) -> Self {
debug_assert!((scalar as usize) < Self::COUNT);
pub fn from_scalar(scalar: i16) -> Self {
debug_assert!((scalar as usize) < Self::count());
Self(scalar as _)
}

Expand All @@ -97,9 +138,9 @@ impl VirtualNode {
}

/// Creates a virtual node from the given big-endian bytes representation.
pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self {
pub fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self {
let inner = VirtualNodeInner::from_be_bytes(bytes);
debug_assert!((inner as usize) < Self::COUNT);
debug_assert!((inner as usize) < Self::count());
Self(inner)
}

Expand All @@ -110,7 +151,7 @@ impl VirtualNode {

/// Iterates over all virtual nodes.
pub fn all() -> AllVirtualNodeIter {
(0..Self::COUNT).map(Self::from_index)
(0..Self::count()).map(Self::from_index)
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/common/src/hash/table_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl TableDistribution {
pub fn singleton_vnode_bitmap_ref() -> &'static Arc<Bitmap> {
/// A bitmap that only the default vnode is set.
static SINGLETON_VNODES: LazyLock<Arc<Bitmap>> = LazyLock::new(|| {
let mut vnodes = BitmapBuilder::zeroed(VirtualNode::COUNT);
let mut vnodes = BitmapBuilder::zeroed(VirtualNode::count());
vnodes.set(SINGLETON_VNODE.to_index(), true);
vnodes.finish().into()
});
Expand All @@ -123,7 +123,7 @@ impl TableDistribution {
pub fn all_vnodes_ref() -> &'static Arc<Bitmap> {
/// A bitmap that all vnodes are set.
static ALL_VNODES: LazyLock<Arc<Bitmap>> =
LazyLock::new(|| Bitmap::ones(VirtualNode::COUNT).into());
LazyLock::new(|| Bitmap::ones(VirtualNode::count()).into());
&ALL_VNODES
}

Expand Down
15 changes: 9 additions & 6 deletions src/common/src/util/row_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,14 @@
use std::cmp::Ordering;
use std::time::SystemTime;

use static_assertions::const_assert;

use super::epoch::UNIX_RISINGWAVE_DATE_EPOCH;
use crate::hash::VirtualNode;

// TODO(var-vnode): should fit vnode count up to 16 bits
const TIMESTAMP_SHIFT_BITS: u8 = 22;
const VNODE_ID_SHIFT_BITS: u8 = 12;
const SEQUENCE_UPPER_BOUND: u16 = 1 << 12;
const VNODE_ID_UPPER_BOUND: u32 = 1 << 10;

const_assert!(VNODE_ID_UPPER_BOUND >= VirtualNode::COUNT as u32);
const SEQUENCE_UPPER_BOUND: u16 = 1 << VNODE_ID_SHIFT_BITS;
const VNODE_ID_UPPER_BOUND: u32 = 1 << (TIMESTAMP_SHIFT_BITS - VNODE_ID_SHIFT_BITS);

/// `RowIdGenerator` generates unique row ids using snowflake algorithm as following format:
///
Expand Down Expand Up @@ -62,6 +59,12 @@ pub fn extract_vnode_id_from_row_id(id: RowId) -> VirtualNode {
impl RowIdGenerator {
/// Create a new `RowIdGenerator` with given virtual nodes.
pub fn new(vnodes: impl IntoIterator<Item = VirtualNode>) -> Self {
assert!(
VirtualNode::count() <= VNODE_ID_UPPER_BOUND as usize,
"vnode count should not exceed {} due to limitation of row id format",
VNODE_ID_UPPER_BOUND
);

let base = *UNIX_RISINGWAVE_DATE_EPOCH;
Self {
base,
Expand Down
Loading
Loading