diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index 9e3004e9cd224..4c5afd47893d3 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -82,3 +82,7 @@ tempfile = "3" [[bench]] name = "bench_encoding" harness = false + +[[bench]] +name = "bitmap" +harness = false diff --git a/src/common/benches/bitmap.rs b/src/common/benches/bitmap.rs new file mode 100644 index 0000000000000..02f2810557ea3 --- /dev/null +++ b/src/common/benches/bitmap.rs @@ -0,0 +1,32 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use criterion::{criterion_group, criterion_main, Criterion}; +use risingwave_common::buffer::Bitmap; + +fn bench_bitmap(c: &mut Criterion) { + const CHUNK_SIZE: usize = 1024; + let x = Bitmap::zeros(CHUNK_SIZE); + let y = Bitmap::ones(CHUNK_SIZE); + let i = 0x123; + c.bench_function("zeros", |b| b.iter(|| Bitmap::zeros(CHUNK_SIZE))); + c.bench_function("ones", |b| b.iter(|| Bitmap::ones(CHUNK_SIZE))); + c.bench_function("get", |b| b.iter(|| x.is_set(i))); + c.bench_function("and", |b| b.iter(|| &x & &y)); + c.bench_function("or", |b| b.iter(|| &x | &y)); + c.bench_function("not", |b| b.iter(|| !&x)); +} + +criterion_group!(benches, bench_bitmap); +criterion_main!(benches); diff --git a/src/common/src/array/bool_array.rs b/src/common/src/array/bool_array.rs index 537bc527cf605..b6684fb0e146e 100644 --- a/src/common/src/array/bool_array.rs +++ b/src/common/src/array/bool_array.rs @@ -56,7 +56,7 @@ impl FromIterator for BoolArray { fn from_iter>(iter: I) -> Self { let data: Bitmap = iter.into_iter().collect(); BoolArray { - bitmap: Bitmap::all_high_bits(data.len()), + bitmap: Bitmap::ones(data.len()), data, } } diff --git a/src/common/src/array/data_chunk.rs b/src/common/src/array/data_chunk.rs index 8eca5478e17a3..b9405e4053f43 100644 --- a/src/common/src/array/data_chunk.rs +++ b/src/common/src/array/data_chunk.rs @@ -109,7 +109,7 @@ impl DataChunk { /// `cardinality` returns the number of visible tuples pub fn cardinality(&self) -> usize { match &self.vis2 { - Vis::Bitmap(b) => b.num_high_bits(), + Vis::Bitmap(b) => b.count_ones(), Vis::Compact(len) => *len, } } diff --git a/src/common/src/array/primitive_array.rs b/src/common/src/array/primitive_array.rs index 1035d6c8b2ba1..761d9f49b87e6 100644 --- a/src/common/src/array/primitive_array.rs +++ b/src/common/src/array/primitive_array.rs @@ -153,7 +153,7 @@ impl FromIterator for PrimitiveArray { fn from_iter>(iter: I) -> Self { let data: Vec = iter.into_iter().collect(); PrimitiveArray { - bitmap: Bitmap::all_high_bits(data.len()), + bitmap: Bitmap::ones(data.len()), data, } } diff --git a/src/common/src/array/vis.rs b/src/common/src/array/vis.rs index 709def2a7133a..041164f1e51c5 100644 --- a/src/common/src/array/vis.rs +++ b/src/common/src/array/vis.rs @@ -57,8 +57,8 @@ impl Vis { self.as_ref().iter() } - pub fn ones(&self) -> impl Iterator + '_ { - self.as_ref().ones() + pub fn iter_ones(&self) -> impl Iterator + '_ { + self.as_ref().iter_ones() } #[inline(always)] @@ -145,9 +145,9 @@ impl<'a> VisRef<'a> { } #[auto_enum(Iterator)] - pub fn ones(self) -> impl Iterator + 'a { + pub fn iter_ones(self) -> impl Iterator + 'a { match self { - VisRef::Bitmap(b) => b.ones(), + VisRef::Bitmap(b) => b.iter_ones(), VisRef::Compact(c) => 0..c, } } diff --git a/src/common/src/buffer/bitmap.rs b/src/common/src/buffer/bitmap.rs index b3573a25a955e..c401e1894e190 100644 --- a/src/common/src/buffer/bitmap.rs +++ b/src/common/src/buffer/bitmap.rs @@ -33,111 +33,122 @@ //! This is called a "validity bitmap" in the Arrow documentation. //! This file is adapted from [arrow-rs](https://github.com/apache/arrow-rs) -use std::iter; +// allow `zip` for performance reasons +#![allow(clippy::disallowed_methods)] + +use std::iter::{self, TrustedLen}; use std::ops::{BitAnd, BitOr, Not, RangeInclusive}; -use bytes::Bytes; -use itertools::Itertools; use risingwave_pb::common::buffer::CompressionType; use risingwave_pb::common::Buffer as ProstBuffer; -use crate::util::bit_util; - #[derive(Default, Debug)] pub struct BitmapBuilder { len: usize, - data: Vec, - num_high_bits: usize, + data: Vec, + count_ones: usize, } +const BITS: usize = usize::BITS as usize; + impl BitmapBuilder { + /// Creates a new empty bitmap with at least the specified capacity. pub fn with_capacity(capacity: usize) -> BitmapBuilder { BitmapBuilder { len: 0, - data: Vec::with_capacity((capacity + 7) / 8), - num_high_bits: 0, + data: Vec::with_capacity(Bitmap::vec_len(capacity)), + count_ones: 0, } } + /// Creates a new bitmap with all bits set to 0. pub fn zeroed(len: usize) -> BitmapBuilder { BitmapBuilder { len, - data: vec![0; (len + 7) / 8], - num_high_bits: 0, + data: vec![0; Bitmap::vec_len(len)], + count_ones: 0, } } + /// Writes a new value into a single bit. pub fn set(&mut self, n: usize, val: bool) { assert!(n < self.len); - let byte = &mut self.data[n / 8]; - let mask = 1 << (n % 8); - match (*byte & mask > 0, val) { + let byte = &mut self.data[n / BITS]; + let mask = 1 << (n % BITS); + match (*byte & mask != 0, val) { (true, false) => { *byte &= !mask; - self.num_high_bits -= 1; + self.count_ones -= 1; } (false, true) => { *byte |= mask; - self.num_high_bits += 1; + self.count_ones += 1; } _ => {} } } + /// Tests a single bit. pub fn is_set(&self, n: usize) -> bool { assert!(n < self.len); - let byte = &self.data[n / 8]; - let mask = 1 << (n % 8); + let byte = &self.data[n / BITS]; + let mask = 1 << (n % BITS); *byte & mask != 0 } + /// Appends a single bit to the back. pub fn append(&mut self, bit_set: bool) -> &mut Self { - if self.len % 8 == 0 { + if self.len % BITS == 0 { self.data.push(0); } - self.data[self.len / 8] |= (bit_set as u8) << (self.len % 8); - self.num_high_bits += bit_set as usize; + self.data[self.len / BITS] |= (bit_set as usize) << (self.len % BITS); + self.count_ones += bit_set as usize; self.len += 1; self } + /// Appends `n` bits to the back. pub fn append_n(&mut self, mut n: usize, bit_set: bool) -> &mut Self { - while n != 0 && self.len % 8 != 0 { + while n != 0 && self.len % BITS != 0 { self.append(bit_set); n -= 1; } self.len += n; - self.data - .resize((self.len + 7) / 8, if bit_set { 0xFF } else { 0x00 }); - if bit_set && self.len % 8 != 0 { + self.data.resize( + Bitmap::vec_len(self.len), + if bit_set { usize::MAX } else { 0 }, + ); + if bit_set && self.len % BITS != 0 { // remove tailing 1s - *self.data.last_mut().unwrap() &= (1 << (self.len % 8)) - 1; + *self.data.last_mut().unwrap() &= (1 << (self.len % BITS)) - 1; } if bit_set { - self.num_high_bits += n; + self.count_ones += n; } self } + /// Removes the last bit. pub fn pop(&mut self) -> Option<()> { if self.len == 0 { return None; } self.len -= 1; - self.data.truncate((self.len + 7) / 8); - if self.len % 8 != 0 { - *self.data.last_mut().unwrap() &= (1 << (self.len % 8)) - 1; + self.data.truncate(Bitmap::vec_len(self.len)); + if self.len % BITS != 0 { + *self.data.last_mut().unwrap() &= (1 << (self.len % BITS)) - 1; } Some(()) } + /// Appends a bitmap to the back. pub fn append_bitmap(&mut self, other: &Bitmap) -> &mut Self { - if self.len % 8 == 0 { + if self.len % BITS == 0 { // self is aligned, so just append the bytes self.len += other.len(); self.data.extend_from_slice(&other.bits); - self.num_high_bits += other.num_high_bits; + self.count_ones += other.count_ones; } else { for bit in other.iter() { self.append(bit); @@ -150,7 +161,7 @@ impl BitmapBuilder { Bitmap { num_bits: self.len(), bits: self.data.into(), - num_high_bits: self.num_high_bits, + count_ones: self.count_ones, } } @@ -160,69 +171,70 @@ impl BitmapBuilder { } /// An immutable bitmap. Use [`BitmapBuilder`] to build it. -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq)] pub struct Bitmap { - bits: Bytes, - // The useful bits in the bitmap. The total number of bits will usually // be larger than the useful bits due to byte-padding. num_bits: usize, // The number of high bits in the bitmap. - num_high_bits: usize, + count_ones: usize, + + bits: Box<[usize]>, } impl std::fmt::Debug for Bitmap { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "[")?; - let mut is_first = true; for data in self.iter() { - if is_first { - write!(f, "{}", data)?; - } else { - write!(f, ", {}", data)?; - } - is_first = false; + write!(f, "{}", if data { '1' } else { '0' })?; } - write!(f, "]") + Ok(()) } } impl Bitmap { - pub fn all_high_bits(num_bits: usize) -> Self { - let len = Self::num_bytes(num_bits); + /// Creates a new bitmap with all bits set to 0. + pub fn zeros(len: usize) -> Self { + BitmapBuilder::zeroed(len).finish() + } + + /// Creates a new bitmap with all bits set to 1. + pub fn ones(num_bits: usize) -> Self { + let len = Self::vec_len(num_bits); + let mut bits = vec![usize::MAX; len]; + if num_bits % BITS != 0 { + bits[len - 1] &= (1 << (num_bits % BITS)) - 1; + } Self { - bits: vec![0xff; len].into(), + bits: bits.into(), num_bits, - num_high_bits: num_bits, + count_ones: num_bits, } } - fn from_bytes_with_num_bits(buf: Bytes, num_bits: usize) -> Self { - debug_assert!(num_bits <= buf.len() << 3); - - let rem = num_bits % 8; - - let num_high_bits = if rem == 0 { - buf.iter().map(|&x| x.count_ones()).sum::() as usize - } else { - let (last, prefix) = buf.split_last().unwrap(); - prefix.iter().map(|&x| x.count_ones()).sum::() as usize - + (last & ((1u8 << rem) - 1)).count_ones() as usize - }; - - debug_assert!(num_high_bits <= num_bits); - + /// Creates a new bitmap from vector. + fn from_vec_with_len(buf: Vec, num_bits: usize) -> Self { + debug_assert_eq!(buf.len(), Self::vec_len(num_bits)); + let count_ones = buf.iter().map(|&x| x.count_ones()).sum::() as usize; + debug_assert!(count_ones <= num_bits); Self { num_bits, - bits: buf, - num_high_bits, + bits: buf.into(), + count_ones, } } - pub fn from_bytes(buf: Bytes) -> Self { - let num_bits = buf.len() << 3; - Self::from_bytes_with_num_bits(buf, num_bits) + /// Creates a new bitmap from bytes. + pub fn from_bytes(buf: &[u8]) -> Self { + let num_bits = buf.len() * 8; + let mut bits = Vec::with_capacity(Self::vec_len(num_bits)); + let slice = unsafe { + bits.set_len(bits.capacity()); + std::slice::from_raw_parts_mut(bits.as_ptr() as *mut u8, bits.len() * (BITS / 8)) + }; + slice[..buf.len()].copy_from_slice(buf); + slice[buf.len()..].fill(0); + Self::from_vec_with_len(bits, num_bits) } /// Return the next set bit index on or after `bit_idx`. @@ -230,12 +242,14 @@ impl Bitmap { (bit_idx..self.len()).find(|&idx| unsafe { self.is_set_unchecked(idx) }) } - pub fn num_high_bits(&self) -> usize { - self.num_high_bits + /// Counts the number of bits set to 1. + pub fn count_ones(&self) -> usize { + self.count_ones } - fn num_bytes(num_bits: usize) -> usize { - num_bits / 8 + usize::from(num_bits % 8 > 0) + /// Returns the length of vector to store `num_bits` bits. + fn vec_len(num_bits: usize) -> usize { + (num_bits + BITS - 1) / BITS } /// Returns the number of valid bits in the bitmap, @@ -247,26 +261,30 @@ impl Bitmap { /// Returns true if the `Bitmap` has a length of 0. pub fn is_empty(&self) -> bool { - self.bits.is_empty() + self.num_bits == 0 } + /// Returns true if the bit at `idx` is set, without doing bounds checking. + /// /// # Safety /// - /// Makes clippy happy. + /// Index must be in range. pub unsafe fn is_set_unchecked(&self, idx: usize) -> bool { - bit_util::get_bit_raw(self.bits.as_ptr(), idx) + self.bits.get_unchecked(idx / BITS) & (1 << (idx % BITS)) != 0 } + /// Returns true if the bit at `idx` is set. pub fn is_set(&self, idx: usize) -> bool { assert!(idx < self.len()); unsafe { self.is_set_unchecked(idx) } } - /// Check if the bitmap is all set to 1. - pub fn is_all_set(&self) -> bool { - self.num_high_bits == self.len() + /// Tests if every bit is set to 1. + pub fn all(&self) -> bool { + self.count_ones == self.len() } + /// Produces an iterator over each bit. pub fn iter(&self) -> BitmapIter<'_> { BitmapIter { bits: &self.bits, @@ -275,36 +293,21 @@ impl Bitmap { } } - /// Returns an iterator which starts from `offset`. - /// - /// # Panics - /// Panics if `offset > len`. - pub fn iter_from(&self, offset: usize) -> BitmapIter<'_> { - assert!(offset < self.len()); - BitmapIter { - bits: &self.bits, - idx: offset, - num_bits: self.num_bits, - } - } - /// Performs bitwise saturate subtract on two equal-length bitmaps. /// /// For example, lhs = [01110] and rhs = [00111], then /// `bit_saturate_subtract(lhs, rhs)` results in [01000] - pub fn bit_saturate_subtract(lhs: &Bitmap, rhs: &Bitmap) -> Bitmap { - assert_eq!(lhs.num_bits, rhs.num_bits); - let bits = lhs - .bits - .iter() - .zip_eq(rhs.bits.iter()) + pub fn bit_saturate_subtract(&self, rhs: &Bitmap) -> Bitmap { + assert_eq!(self.num_bits, rhs.num_bits); + let bits = (self.bits.iter()) + .zip(rhs.bits.iter()) .map(|(&a, &b)| (!(a & b)) & a) .collect(); - Bitmap::from_bytes_with_num_bits(bits, lhs.num_bits) + Bitmap::from_vec_with_len(bits, self.num_bits) } - /// Returns an iterator which yields the positions of high bits. - pub fn ones(&self) -> impl Iterator + '_ { + /// Enumerates the index of each bit set to 1. + pub fn iter_ones(&self) -> impl Iterator + '_ { self.iter() .enumerate() .filter(|(_, bit)| *bit) @@ -337,7 +340,7 @@ impl Bitmap { fn assert_valid(&self) { assert_eq!( self.iter().map(|x| x as usize).sum::(), - self.num_high_bits + self.count_ones ) } } @@ -347,13 +350,11 @@ impl<'a, 'b> BitAnd<&'b Bitmap> for &'a Bitmap { fn bitand(self, rhs: &'b Bitmap) -> Bitmap { assert_eq!(self.num_bits, rhs.num_bits); - let bits = self - .bits - .iter() - .zip_eq(rhs.bits.iter()) + let bits = (self.bits.iter()) + .zip(rhs.bits.iter()) .map(|(&a, &b)| a & b) .collect(); - Bitmap::from_bytes_with_num_bits(bits, self.num_bits) + Bitmap::from_vec_with_len(bits, self.num_bits) } } @@ -386,13 +387,11 @@ impl<'a, 'b> BitOr<&'b Bitmap> for &'a Bitmap { fn bitor(self, rhs: &'b Bitmap) -> Bitmap { assert_eq!(self.num_bits, rhs.num_bits); - let bits = self - .bits - .iter() - .zip_eq(rhs.bits.iter()) + let bits = (self.bits.iter()) + .zip(rhs.bits.iter()) .map(|(&a, &b)| a | b) .collect(); - Bitmap::from_bytes_with_num_bits(bits, self.num_bits) + Bitmap::from_vec_with_len(bits, self.num_bits) } } @@ -424,8 +423,15 @@ impl<'a> Not for &'a Bitmap { type Output = Bitmap; fn not(self) -> Self::Output { - let bits = self.bits.iter().map(|b| !b).collect(); - Bitmap::from_bytes_with_num_bits(bits, self.num_bits) + let mut bits: Vec = self.bits.iter().map(|b| !b).collect(); + if self.num_bits % BITS != 0 { + bits[self.num_bits / BITS] &= (1 << (self.num_bits % BITS)) - 1; + } + Bitmap { + num_bits: self.num_bits, + count_ones: self.num_bits - self.count_ones, + bits: bits.into(), + } } } @@ -461,12 +467,11 @@ impl FromIterator> for Bitmap { impl Bitmap { pub fn to_protobuf(&self) -> ProstBuffer { - let last_byte_num_bits = ((self.num_bits % 8) as u8).to_be_bytes(); - let body = last_byte_num_bits - .into_iter() - .chain(self.bits.iter().copied()) - .collect(); - + let mut body = Vec::with_capacity((self.num_bits + 7) % 8 + 1); + body.push((self.num_bits % 8) as u8); + body.extend_from_slice(unsafe { + std::slice::from_raw_parts(self.bits.as_ptr() as *const u8, (self.num_bits + 7) / 8) + }); ProstBuffer { body, compression: CompressionType::None as i32, @@ -476,31 +481,18 @@ impl Bitmap { impl From<&ProstBuffer> for Bitmap { fn from(buf: &ProstBuffer) -> Self { - let last_byte_num_bits = u8::from_be_bytes(buf.body[..1].try_into().unwrap()); - let bits = Bytes::copy_from_slice(&buf.body[1..]); // TODO: avoid this allocation - let num_bits = (bits.len() << 3) - ((8 - last_byte_num_bits) % 8) as usize; + let last_byte_num_bits = buf.body[0]; + let num_bits = ((buf.body.len() - 1) * 8) - ((8 - last_byte_num_bits) % 8) as usize; - Self::from_bytes_with_num_bits(bits, num_bits) - } -} - -impl PartialEq for Bitmap { - fn eq(&self, other: &Self) -> bool { - // buffer equality considers capacity, but here we want to only compare - // actual data contents - if self.num_bits != other.num_bits { - return false; - } - // assume unset bits are always 0, and num_bits is always consistent with bits length. - // Note: If you new a Buffer without init, the PartialEq may have UB due to uninit mem cuz - // we are comparing bytes by bytes instead of bits by bits. - let length = (self.num_bits + 7) / 8; - self.bits[..length] == other.bits[..length] + let mut bitmap = Self::from_bytes(&buf.body[1..]); + bitmap.num_bits = num_bits; + bitmap } } +/// Bitmap iterator. pub struct BitmapIter<'a> { - bits: &'a Bytes, + bits: &'a [usize], idx: usize, num_bits: usize, } @@ -512,16 +504,26 @@ impl<'a> iter::Iterator for BitmapIter<'a> { if self.idx >= self.num_bits { return None; } - let b = unsafe { bit_util::get_bit_raw(self.bits.as_ptr(), self.idx) }; + let b = unsafe { self.bits.get_unchecked(self.idx / BITS) } & (1 << (self.idx % BITS)) != 0; self.idx += 1; Some(b) } + + fn size_hint(&self) -> (usize, Option) { + let remaining = self.num_bits - self.idx; + (remaining, Some(remaining)) + } + + fn nth(&mut self, n: usize) -> Option { + self.idx += n; + self.next() + } } +unsafe impl TrustedLen for BitmapIter<'_> {} + #[cfg(test)] mod tests { - use itertools::Itertools; - use super::*; #[test] @@ -542,10 +544,10 @@ mod tests { }; let byte1 = 0b0101_0110_u8; let byte2 = 0b1010_1101_u8; - let expected = Bitmap::from_bytes(Bytes::copy_from_slice(&[byte1, byte2])); + let expected = Bitmap::from_bytes(&[byte1, byte2]); assert_eq!(bitmap1, expected); assert_eq!( - bitmap1.num_high_bits(), + bitmap1.count_ones(), (byte1.count_ones() + byte2.count_ones()) as usize ); @@ -561,16 +563,16 @@ mod tests { builder.finish() }; let byte1 = 0b0101_0110_u8; - let expected = Bitmap::from_bytes(Bytes::copy_from_slice(&[byte1])); + let expected = Bitmap::from_bytes(&[byte1]); assert_eq!(bitmap2, expected); } #[test] fn test_bitmap_all_high() { let num_bits = 3; - let bitmap = Bitmap::all_high_bits(num_bits); + let bitmap = Bitmap::ones(num_bits); assert_eq!(bitmap.len(), num_bits); - assert!(bitmap.is_all_set()); + assert!(bitmap.all()); for i in 0..num_bits { assert!(bitmap.is_set(i)); } @@ -631,27 +633,24 @@ mod tests { #[test] fn test_bitwise_or() { - let bitmap1 = Bitmap::from_bytes(Bytes::from_static(&[0b01101010])); - let bitmap2 = Bitmap::from_bytes(Bytes::from_static(&[0b01001110])); - assert_eq!( - Bitmap::from_bytes(Bytes::from_static(&[0b01101110])), - (bitmap1 | bitmap2) - ); + let bitmap1 = Bitmap::from_bytes(&[0b01101010]); + let bitmap2 = Bitmap::from_bytes(&[0b01001110]); + assert_eq!(Bitmap::from_bytes(&[0b01101110]), (bitmap1 | bitmap2)); } #[test] fn test_bitwise_saturate_subtract() { - let bitmap1 = Bitmap::from_bytes(Bytes::from_static(&[0b01101010])); - let bitmap2 = Bitmap::from_bytes(Bytes::from_static(&[0b01001110])); + let bitmap1 = Bitmap::from_bytes(&[0b01101010]); + let bitmap2 = Bitmap::from_bytes(&[0b01001110]); assert_eq!( - Bitmap::from_bytes(Bytes::from_static(&[0b00100000])), + Bitmap::from_bytes(&[0b00100000]), Bitmap::bit_saturate_subtract(&bitmap1, &bitmap2) ); } #[test] fn test_bitmap_is_set() { - let bitmap = Bitmap::from_bytes(Bytes::from_static(&[0b01001010])); + let bitmap = Bitmap::from_bytes(&[0b01001010]); assert!(!bitmap.is_set(0)); assert!(bitmap.is_set(1)); assert!(!bitmap.is_set(2)); @@ -665,7 +664,7 @@ mod tests { #[test] fn test_bitmap_iter() { { - let bitmap = Bitmap::from_bytes(Bytes::from_static(&[0b01001010])); + let bitmap = Bitmap::from_bytes(&[0b01001010]); let mut booleans = vec![]; for b in bitmap.iter() { booleans.push(b as u8); @@ -700,7 +699,7 @@ mod tests { #[test] fn test_bitmap_from_protobuf() { - let bitmap_bytes = vec![3u8 /* len % 8 */, 0b0101_0010, 0b110]; + let bitmap_bytes = vec![3u8 /* len % BITS */, 0b0101_0010, 0b110]; let buf = ProstBuffer { body: bitmap_bytes, compression: CompressionType::None as _, @@ -709,70 +708,56 @@ mod tests { let actual_bytes: Vec = bitmap.iter().map(|b| b as u8).collect(); assert_eq!(actual_bytes, vec![0, 1, 0, 0, 1, 0, 1, 0, /* */ 0, 1, 1]); // in reverse order - assert_eq!(bitmap.num_high_bits(), 5); + assert_eq!(bitmap.count_ones(), 5); } #[test] fn test_bitmap_from_buffer() { let byte1 = 0b0110_1010_u8; let byte2 = 0b1011_0101_u8; - let bitmap = Bitmap::from_bytes(Bytes::from_static(&[0b0110_1010, 0b1011_0101])); + let bitmap = Bitmap::from_bytes(&[0b0110_1010, 0b1011_0101]); let expected = Bitmap::from_iter(vec![ false, true, false, true, false, true, true, false, true, false, true, false, true, true, false, true, ]); - let num_high_bits = (byte1.count_ones() + byte2.count_ones()) as usize; + let count_ones = (byte1.count_ones() + byte2.count_ones()) as usize; assert_eq!(expected, bitmap); - assert_eq!(bitmap.num_high_bits(), num_high_bits); - assert_eq!(expected.num_high_bits(), num_high_bits); + assert_eq!(bitmap.count_ones(), count_ones); + assert_eq!(expected.count_ones(), count_ones); } #[test] fn test_bitmap_eq() { - let b1: Bitmap = (vec![false; 3]).into_iter().collect(); - let b2: Bitmap = (vec![false; 5]).into_iter().collect(); + let b1: Bitmap = Bitmap::zeros(3); + let b2: Bitmap = Bitmap::zeros(5); assert_ne!(b1, b2); - let b1: Bitmap = [true, false] - .iter() - .cycle() - .cloned() - .take(10000) - .collect_vec() - .into_iter() - .collect(); - let b2: Bitmap = [true, false] - .iter() - .cycle() - .cloned() - .take(10000) - .collect_vec() - .into_iter() - .collect(); + let b1: Bitmap = [true, false].iter().cycle().cloned().take(10000).collect(); + let b2: Bitmap = [true, false].iter().cycle().cloned().take(10000).collect(); assert_eq!(b1, b2); } #[test] fn test_bitmap_set() { let mut b = BitmapBuilder::zeroed(10); - assert_eq!(b.num_high_bits, 0); + assert_eq!(b.count_ones, 0); b.set(0, true); b.set(7, true); b.set(8, true); b.set(9, true); - assert_eq!(b.num_high_bits, 4); + assert_eq!(b.count_ones, 4); b.set(7, false); b.set(8, false); - assert_eq!(b.num_high_bits, 2); + assert_eq!(b.count_ones, 2); b.append(true); assert_eq!(b.len, 11); - assert_eq!(b.num_high_bits, 3); + assert_eq!(b.count_ones, 3); let b = b.finish(); - assert_eq!(b.bits.to_vec(), &[0b0000_0001, 0b0000_0110]); + assert_eq!(&b.bits[..], &[0b0110_0000_0001]); } #[test] diff --git a/src/common/src/util/bit_util.rs b/src/common/src/util/bit_util.rs deleted file mode 100644 index 56488ea9f2e71..0000000000000 --- a/src/common/src/util/bit_util.rs +++ /dev/null @@ -1,338 +0,0 @@ -// Copyright 2022 Singularity Data -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Utils for working with bits -//! Adapted from [arrow-rs]([arrow-rs](https://github.com/apache/arrow-rs/blob/master/parquet/src/util/bit_util.rs) (commit `75432ed`).) - -#[cfg(feature = "simd")] -use packed_simd::u8x64; - -const BIT_MASK: [u8; 8] = [1, 2, 4, 8, 16, 32, 64, 128]; -const UNSET_BIT_MASK: [u8; 8] = [ - 255 - 1, - 255 - 2, - 255 - 4, - 255 - 8, - 255 - 16, - 255 - 32, - 255 - 64, - 255 - 128, -]; - -/// Returns the nearest number that is `>=` than `num` and is a multiple of 64 -#[inline] -pub fn round_upto_multiple_of_64(num: usize) -> usize { - round_upto_power_of_2(num, 64) -} - -/// Returns the nearest multiple of `factor` that is `>=` than `num`. Here `factor` must -/// be a power of 2. -pub fn round_upto_power_of_2(num: usize, factor: usize) -> usize { - debug_assert!(factor > 0 && (factor & (factor - 1)) == 0); - (num + (factor - 1)) & !(factor - 1) -} - -/// Returns whether bit at position `i` in `data` is set or not -#[inline] -pub fn get_bit(data: &[u8], i: usize) -> bool { - (data[i >> 3] & BIT_MASK[i & 7]) != 0 -} - -/// Returns whether bit at position `i` in `data` is set or not. -/// -/// # Safety -/// -/// Note this doesn't do any bound checking, for performance reason. The caller is -/// responsible to guarantee that `i` is within bounds. -#[inline] -pub unsafe fn get_bit_raw(data: *const u8, i: usize) -> bool { - (*data.add(i >> 3) & BIT_MASK[i & 7]) != 0 -} - -/// Sets bit at position `i` for `data` to 1 -#[inline] -pub fn set_bit(data: &mut [u8], i: usize) { - data[i >> 3] |= BIT_MASK[i & 7]; -} - -/// Sets bit at position `i` for `data` -/// -/// # Safety -/// -/// Note this doesn't do any bound checking, for performance reason. The caller is -/// responsible to guarantee that `i` is within bounds. -#[inline] -pub unsafe fn set_bit_raw(data: *mut u8, i: usize) { - *data.add(i >> 3) |= BIT_MASK[i & 7]; -} - -/// Sets bit at position `i` for `data` to 0 -#[inline] -pub fn unset_bit(data: &mut [u8], i: usize) { - data[i >> 3] &= UNSET_BIT_MASK[i & 7]; -} - -/// Sets bit at position `i` for `data` to 0 -/// -/// # Safety -/// -/// Note this doesn't do any bound checking, for performance reason. The caller is -/// responsible to guarantee that `i` is within bounds. -#[inline] -pub unsafe fn unset_bit_raw(data: *mut u8, i: usize) { - *data.add(i >> 3) &= UNSET_BIT_MASK[i & 7]; -} - -/// Returns the ceil of `value`/`divisor` -#[inline] -pub fn ceil(value: usize, divisor: usize) -> usize { - let (quot, rem) = (value / divisor, value % divisor); - if rem > 0 && divisor > 0 { - quot + 1 - } else { - quot - } -} - -/// Performs SIMD bitwise binary operations. -/// -/// # Safety -/// -/// Note that each slice should be 64 bytes and it is the callers responsibility to ensure -/// that this is the case. If passed slices larger than 64 bytes the operation will only -/// be performed on the first 64 bytes. Slices less than 64 bytes will panic. -#[cfg(feature = "simd")] -pub unsafe fn bitwise_bin_op_simd(left: &[u8], right: &[u8], result: &mut [u8], op: F) -where - F: Fn(u8x64, u8x64) -> u8x64, -{ - let left_simd = u8x64::from_slice_unaligned_unchecked(left); - let right_simd = u8x64::from_slice_unaligned_unchecked(right); - let simd_result = op(left_simd, right_simd); - simd_result.write_to_slice_unaligned_unchecked(result); -} - -#[cfg(all(test, feature = "test_utils"))] -mod tests { - use std::collections::HashSet; - - use rand::Rng; - - use super::*; - use crate::util::test_util::seedable_rng; - - #[test] - fn test_round_upto_multiple_of_64() { - assert_eq!(0, round_upto_multiple_of_64(0)); - assert_eq!(64, round_upto_multiple_of_64(1)); - assert_eq!(64, round_upto_multiple_of_64(63)); - assert_eq!(64, round_upto_multiple_of_64(64)); - assert_eq!(128, round_upto_multiple_of_64(65)); - assert_eq!(192, round_upto_multiple_of_64(129)); - } - - #[test] - fn test_get_bit() { - // 00001101 - assert!(get_bit(&[0b00001101], 0)); - assert!(!get_bit(&[0b00001101], 1)); - assert!(get_bit(&[0b00001101], 2)); - assert!(get_bit(&[0b00001101], 3)); - - // 01001001 01010010 - assert!(get_bit(&[0b01001001, 0b01010010], 0)); - assert!(!get_bit(&[0b01001001, 0b01010010], 1)); - assert!(!get_bit(&[0b01001001, 0b01010010], 2)); - assert!(get_bit(&[0b01001001, 0b01010010], 3)); - assert!(!get_bit(&[0b01001001, 0b01010010], 4)); - assert!(!get_bit(&[0b01001001, 0b01010010], 5)); - assert!(get_bit(&[0b01001001, 0b01010010], 6)); - assert!(!get_bit(&[0b01001001, 0b01010010], 7)); - assert!(!get_bit(&[0b01001001, 0b01010010], 8)); - assert!(get_bit(&[0b01001001, 0b01010010], 9)); - assert!(!get_bit(&[0b01001001, 0b01010010], 10)); - assert!(!get_bit(&[0b01001001, 0b01010010], 11)); - assert!(get_bit(&[0b01001001, 0b01010010], 12)); - assert!(!get_bit(&[0b01001001, 0b01010010], 13)); - assert!(get_bit(&[0b01001001, 0b01010010], 14)); - assert!(!get_bit(&[0b01001001, 0b01010010], 15)); - } - - #[test] - fn test_get_bit_raw() { - const NUM_BYTE: usize = 10; - let mut buf = vec![0; NUM_BYTE]; - let mut expected = vec![]; - let mut rng = seedable_rng(); - for i in 0..8 * NUM_BYTE { - let b = rng.gen_bool(0.5); - expected.push(b); - if b { - set_bit(&mut buf[..], i) - } - } - - let raw_ptr = buf.as_ptr(); - for (i, b) in expected.iter().enumerate() { - unsafe { - assert_eq!(*b, get_bit_raw(raw_ptr, i)); - } - } - } - - #[test] - fn test_set_bit() { - let mut b = [0b00000010]; - set_bit(&mut b, 0); - assert_eq!([0b00000011], b); - set_bit(&mut b, 1); - assert_eq!([0b00000011], b); - set_bit(&mut b, 7); - assert_eq!([0b10000011], b); - } - - #[test] - fn test_unset_bit() { - let mut b = [0b11111101]; - unset_bit(&mut b, 0); - assert_eq!([0b11111100], b); - unset_bit(&mut b, 1); - assert_eq!([0b11111100], b); - unset_bit(&mut b, 7); - assert_eq!([0b01111100], b); - } - - #[test] - fn test_set_bit_raw() { - const NUM_BYTE: usize = 10; - let mut buf = vec![0; NUM_BYTE]; - let mut expected = vec![]; - let mut rng = seedable_rng(); - for i in 0..8 * NUM_BYTE { - let b = rng.gen_bool(0.5); - expected.push(b); - if b { - unsafe { - set_bit_raw(buf.as_mut_ptr(), i); - } - } - } - - let raw_ptr = buf.as_ptr(); - for (i, b) in expected.iter().enumerate() { - unsafe { - assert_eq!(*b, get_bit_raw(raw_ptr, i)); - } - } - } - - #[test] - fn test_unset_bit_raw() { - const NUM_BYTE: usize = 10; - let mut buf = vec![255; NUM_BYTE]; - let mut expected = vec![]; - let mut rng = seedable_rng(); - for i in 0..8 * NUM_BYTE { - let b = rng.gen_bool(0.5); - expected.push(b); - if !b { - unsafe { - unset_bit_raw(buf.as_mut_ptr(), i); - } - } - } - - let raw_ptr = buf.as_ptr(); - for (i, b) in expected.iter().enumerate() { - unsafe { - assert_eq!(*b, get_bit_raw(raw_ptr, i)); - } - } - } - - #[test] - fn test_get_set_bit_roundtrip() { - const NUM_BYTES: usize = 10; - const NUM_SETS: usize = 10; - - let mut buffer: [u8; NUM_BYTES * 8] = [0; NUM_BYTES * 8]; - let mut v = HashSet::new(); - let mut rng = seedable_rng(); - for _ in 0..NUM_SETS { - let offset = rng.gen_range(0..8 * NUM_BYTES); - v.insert(offset); - set_bit(&mut buffer[..], offset); - } - for i in 0..NUM_BYTES * 8 { - assert_eq!(v.contains(&i), get_bit(&buffer[..], i)); - } - } - - #[test] - #[cfg(all(any(target_arch = "x86", target_arch = "x86_64")))] - fn test_ceil() { - assert_eq!(ceil(0, 1), 0); - assert_eq!(ceil(1, 1), 1); - assert_eq!(ceil(1, 2), 1); - assert_eq!(ceil(1, 8), 1); - assert_eq!(ceil(7, 8), 1); - assert_eq!(ceil(8, 8), 1); - assert_eq!(ceil(9, 8), 2); - assert_eq!(ceil(9, 9), 1); - assert_eq!(ceil(10000000000, 10), 1000000000); - assert_eq!(ceil(10, 10000000000), 1); - assert_eq!(ceil(10000000000, 1000000000), 10); - } - - #[test] - #[cfg(feature = "simd")] - fn test_bitwise_and_simd() { - let buf1 = [0b00110011u8; 64]; - let buf2 = [0b11110000u8; 64]; - let mut buf3 = [0b00000000; 64]; - unsafe { bitwise_bin_op_simd(&buf1, &buf2, &mut buf3, |a, b| a & b) }; - for i in buf3.iter() { - assert_eq!(&0b00110000u8, i); - } - } - - #[test] - #[cfg(feature = "simd")] - fn test_bitwise_or_simd() { - let buf1 = [0b00110011u8; 64]; - let buf2 = [0b11110000u8; 64]; - let mut buf3 = [0b00000000; 64]; - unsafe { bitwise_bin_op_simd(&buf1, &buf2, &mut buf3, |a, b| a | b) }; - for i in buf3.iter() { - assert_eq!(&0b11110011u8, i); - } - } -} diff --git a/src/common/src/util/chunk_coalesce.rs b/src/common/src/util/chunk_coalesce.rs index 1bd358ed67ce3..e06e6a6ab4d70 100644 --- a/src/common/src/util/chunk_coalesce.rs +++ b/src/common/src/util/chunk_coalesce.rs @@ -80,7 +80,7 @@ impl DataChunkBuilder { let mut new_return_offset = input_chunk.offset; match input_chunk.data_chunk.visibility() { Some(vis) => { - for vis in vis.iter_from(input_chunk.offset) { + for vis in vis.iter().skip(input_chunk.offset) { new_return_offset += 1; if !vis { continue; diff --git a/src/common/src/util/mod.rs b/src/common/src/util/mod.rs index d69b3507e8dc1..f64a4e9942e7d 100644 --- a/src/common/src/util/mod.rs +++ b/src/common/src/util/mod.rs @@ -20,7 +20,6 @@ use crate::error::ErrorCode::InternalError; use crate::error::{Result, RwError}; pub mod addr; -pub mod bit_util; pub mod chunk_coalesce; pub mod compress; pub mod encoding_for_comparison; diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index 65f80f435b35f..46024f05124ef 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -17,7 +17,6 @@ use std::sync::Arc; -use bytes::Bytes; use futures::stream::StreamExt; use futures_async_stream::try_stream; use itertools::Itertools; @@ -131,7 +130,7 @@ async fn test_table_materialize() -> StreamResult<()> { let all_column_ids = vec![ColumnId::from(0), ColumnId::from(1)]; let all_schema = get_schema(&all_column_ids); let (barrier_tx, barrier_rx) = unbounded_channel(); - let vnodes = Bitmap::from_bytes(Bytes::from_static(&[0b11111111])); + let vnodes = Bitmap::from_bytes(&[0b11111111]); let state_table = SourceStateTableHandler::from_table_catalog( &default_source_internal_table(0x2333), MemoryStateStore::new(), diff --git a/src/expr/src/expr/expr_case.rs b/src/expr/src/expr/expr_case.rs index 00c60bf81701f..ad46d2eb5d3b1 100644 --- a/src/expr/src/expr/expr_case.rs +++ b/src/expr/src/expr/expr_case.rs @@ -74,7 +74,7 @@ impl Expression for CaseExpression { input.set_vis(calc_then_vis.clone()); let then_res = then.eval_checked(&input)?; calc_then_vis - .ones() + .iter_ones() .for_each(|pos| selection[pos] = Some(when_idx)); input.set_vis(&input_vis & (!&calc_then_vis)); result_array.push(then_res); @@ -83,7 +83,7 @@ impl Expression for CaseExpression { let else_res = else_expr.eval_checked(&input)?; input .vis() - .ones() + .iter_ones() .for_each(|pos| selection[pos] = Some(when_len)); result_array.push(else_res); } diff --git a/src/expr/src/expr/expr_coalesce.rs b/src/expr/src/expr/expr_coalesce.rs index 629ee822d490e..4e3ff919b2005 100644 --- a/src/expr/src/expr/expr_coalesce.rs +++ b/src/expr/src/expr/expr_coalesce.rs @@ -50,7 +50,7 @@ impl Expression for CoalesceExpression { orig_vis .as_ref() .bitand(res_bitmap_ref) - .ones() + .iter_ones() .for_each(|pos| { selection[pos] = Some(child_idx); }); diff --git a/src/expr/src/expr/expr_is_null.rs b/src/expr/src/expr/expr_is_null.rs index 3cb84cffd9c1d..227bb4d3fdc0e 100644 --- a/src/expr/src/expr/expr_is_null.rs +++ b/src/expr/src/expr/expr_is_null.rs @@ -59,10 +59,7 @@ impl Expression for IsNullExpression { fn eval(&self, input: &DataChunk) -> Result { let child_arr = self.child.eval_checked(input)?; - let arr = BoolArray::new( - Bitmap::all_high_bits(input.capacity()), - !child_arr.null_bitmap(), - ); + let arr = BoolArray::new(Bitmap::ones(input.capacity()), !child_arr.null_bitmap()); Ok(Arc::new(ArrayImpl::Bool(arr))) } @@ -85,7 +82,7 @@ impl Expression for IsNotNullExpression { Ok(child_arr) => child_arr.into_null_bitmap(), Err(child_arr) => child_arr.null_bitmap().clone(), }; - let arr = BoolArray::new(Bitmap::all_high_bits(input.capacity()), null_bitmap); + let arr = BoolArray::new(Bitmap::ones(input.capacity()), null_bitmap); Ok(Arc::new(ArrayImpl::Bool(arr))) } diff --git a/src/expr/src/vector_op/agg/filter.rs b/src/expr/src/vector_op/agg/filter.rs index 03aa320a78302..ec7d722e85140 100644 --- a/src/expr/src/vector_op/agg/filter.rs +++ b/src/expr/src/vector_op/agg/filter.rs @@ -79,7 +79,7 @@ impl Aggregator for Filter { }) .try_collect::()? }; - if bitmap.is_all_set() { + if bitmap.all() { // if the bitmap is all set, meaning all rows satisfy the filter, // call `update_multi` for potential optimization self.inner.update_multi(input, start_row_id, end_row_id) diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 01a68082cabec..722da50211e0e 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -512,7 +512,7 @@ impl BatchPlanFragmenter { .take(1) .update(|(_, info)| { info.vnode_bitmap = - Bitmap::all_high_bits(VirtualNode::COUNT).to_protobuf(); + Bitmap::ones(VirtualNode::COUNT).to_protobuf(); }) .collect(); } diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 9716f86c45885..56f85bdfb24d1 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -167,10 +167,7 @@ pub(crate) fn rebalance_actor_vnode( let order_by_bitmap_desc = |(_, bitmap_a): &(ActorId, Bitmap), (_, bitmap_b): &(ActorId, Bitmap)| -> Ordering { - bitmap_a - .num_high_bits() - .cmp(&bitmap_b.num_high_bits()) - .reverse() + bitmap_a.count_ones().cmp(&bitmap_b.count_ones()).reverse() }; let builder_from_bitmap = |bitmap: &Bitmap| -> BitmapBuilder { @@ -184,8 +181,8 @@ pub(crate) fn rebalance_actor_vnode( let prev_remain = removed .iter() .map(|(_, bitmap)| { - assert!(bitmap.num_high_bits() >= prev_expected); - bitmap.num_high_bits() - prev_expected + assert!(bitmap.count_ones() >= prev_expected); + bitmap.count_ones() - prev_expected }) .sum::(); @@ -194,7 +191,7 @@ pub(crate) fn rebalance_actor_vnode( let removed_balances = removed.into_iter().map(|(actor_id, bitmap)| Balance { actor_id, - balance: bitmap.num_high_bits() as i32, + balance: bitmap.count_ones() as i32, builder: builder_from_bitmap(&bitmap), }); @@ -202,7 +199,7 @@ pub(crate) fn rebalance_actor_vnode( .into_iter() .map(|(actor_id, bitmap)| Balance { actor_id, - balance: bitmap.num_high_bits() as i32 - expected as i32, + balance: bitmap.count_ones() as i32 - expected as i32, builder: builder_from_bitmap(&bitmap), }) .collect_vec(); diff --git a/src/meta/src/stream/scheduler.rs b/src/meta/src/stream/scheduler.rs index 6fa2718200e5d..2fc96031fb4f2 100644 --- a/src/meta/src/stream/scheduler.rs +++ b/src/meta/src/stream/scheduler.rs @@ -400,7 +400,7 @@ mod test { assert_ne!(fragment.vnode_mapping, None,); let mut vnode_sum = 0; for actor in fragment.actors { - vnode_sum += Bitmap::from(actor.get_vnode_bitmap()?).num_high_bits(); + vnode_sum += Bitmap::from(actor.get_vnode_bitmap()?).count_ones(); } assert_eq!(vnode_sum, VirtualNode::COUNT); } diff --git a/src/meta/src/stream/test_scale.rs b/src/meta/src/stream/test_scale.rs index 69226e6455c28..550a595c54a3c 100644 --- a/src/meta/src/stream/test_scale.rs +++ b/src/meta/src/stream/test_scale.rs @@ -98,7 +98,7 @@ mod tests { assert!(*b, "vnode {} should be set", idx); } - let vnodes = bitmaps.values().map(|bitmap| bitmap.num_high_bits()); + let vnodes = bitmaps.values().map(|bitmap| bitmap.count_ones()); let (min, max) = vnodes.minmax().into_option().unwrap(); assert!((max - min) <= 1, "min {} max {}", min, max); @@ -249,7 +249,7 @@ mod tests { check_bitmaps(&result); let (_, bitmap) = result.iter().exactly_one().unwrap(); - assert!(bitmap.is_all_set()); + assert!(bitmap.all()); } } diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index e80e9b83cf820..050ad64e4cfdd 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -321,7 +321,7 @@ impl StorageTable { // If `vnode_hint` is set, we can only access this single vnode. Some(vnode) => Either::Left(std::iter::once(vnode)), // Otherwise, we need to access all vnodes of this table. - None => Either::Right(self.vnodes.ones().map(VirtualNode::from_index)), + None => Either::Right(self.vnodes.iter_ones().map(VirtualNode::from_index)), }; Either::Right( vnodes.map(|vnode| prefixed_range(encoded_key_range.clone(), &vnode.to_be_bytes())), diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index 372cdceb973de..19e9cc0414075 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -58,7 +58,7 @@ impl Distribution { pub fn all_vnodes(dist_key_indices: Vec) -> Self { /// A bitmap that all vnodes are set. static ALL_VNODES: LazyLock> = - LazyLock::new(|| Bitmap::all_high_bits(VirtualNode::COUNT).into()); + LazyLock::new(|| Bitmap::ones(VirtualNode::COUNT).into()); Self { dist_key_indices, vnodes: ALL_VNODES.clone(), diff --git a/src/stream/src/cache/mod.rs b/src/stream/src/cache/mod.rs index 111cdec77c555..5cff3695ebef7 100644 --- a/src/stream/src/cache/mod.rs +++ b/src/stream/src/cache/mod.rs @@ -129,16 +129,14 @@ pub(super) fn cache_may_stale( #[cfg(test)] mod tests { - use bytes::Bytes; - use super::*; #[expect(clippy::bool_assert_comparison)] #[test] fn test_cache_may_stale() { - let p123 = Bitmap::from_bytes(Bytes::from_static(&[0b_0000_0111_u8])); - let p1234 = Bitmap::from_bytes(Bytes::from_static(&[0b_0000_1111_u8])); - let p1245 = Bitmap::from_bytes(Bytes::from_static(&[0b_0001_1011_u8])); + let p123 = Bitmap::from_bytes(&[0b_0000_0111]); + let p1234 = Bitmap::from_bytes(&[0b_0000_1111]); + let p1245 = Bitmap::from_bytes(&[0b_0001_1011]); assert_eq!(cache_may_stale(&p123, &p123), false); // unchanged assert_eq!(cache_may_stale(&p1234, &p123), false); // scale-out diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index e78d061a54824..7332fe533be70 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -733,7 +733,7 @@ impl StateTable { } else { vec![] }; - for vnode in self.vnodes.ones() { + for vnode in self.vnodes.iter_ones() { let mut range_begin = vnode.to_be_bytes().to_vec(); let mut range_end = range_begin.clone(); range_begin.extend(&range_begin_suffix); diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index 3e604c9fec48b..bf162b03b3ac2 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -322,7 +322,7 @@ impl DynamicFilterExecutor { let (columns, _) = data_chunk.into_parts(); - if new_visibility.num_high_bits() > 0 { + if new_visibility.count_ones() > 0 { let new_chunk = StreamChunk::new(new_ops, columns, Some(new_visibility)); yield Message::Chunk(new_chunk) } diff --git a/src/stream/src/executor/filter.rs b/src/stream/src/executor/filter.rs index 0c6418ad8f123..b6a60d4333236 100644 --- a/src/stream/src/executor/filter.rs +++ b/src/stream/src/executor/filter.rs @@ -146,7 +146,7 @@ impl SimpleFilterExecutor { let new_visibility = new_visibility.finish(); - Ok(if new_visibility.num_high_bits() > 0 { + Ok(if new_visibility.count_ones() > 0 { let new_chunk = StreamChunk::new(new_ops, columns, Some(new_visibility)); Some(new_chunk) } else { diff --git a/src/stream/src/executor/managed_state/dynamic_filter.rs b/src/stream/src/executor/managed_state/dynamic_filter.rs index 3395a63c5f139..f5194c2d84fc4 100644 --- a/src/stream/src/executor/managed_state/dynamic_filter.rs +++ b/src/stream/src/executor/managed_state/dynamic_filter.rs @@ -153,7 +153,7 @@ impl RangeCache { for pk_range in missing_ranges { let init_maps = self .vnodes - .ones() + .iter_ones() .map(|vnode| { self.cache .get_mut(&VirtualNode::from_index(vnode)) @@ -163,7 +163,7 @@ impl RangeCache { .collect_vec(); let futures = self.vnodes - .ones() + .iter_ones() .zip_eq(init_maps.into_iter()) .map(|(vnode, init_map)| { self.fetch_vnode_range(VirtualNode::from_index(vnode), &pk_range, init_map) @@ -231,7 +231,7 @@ impl RangeCache { ); let newly_owned_vnodes = Bitmap::bit_saturate_subtract(&new_vnodes, &old_vnodes); - let futures = newly_owned_vnodes.ones().map(|vnode| { + let futures = newly_owned_vnodes.iter_ones().map(|vnode| { self.fetch_vnode_range( VirtualNode::from_index(vnode), ¤t_range, @@ -676,7 +676,7 @@ mod tests { }) .collect::>(); let range = (Unbounded, Unbounded); - let vnodes = Bitmap::all_high_bits(VirtualNode::COUNT).into(); // set all the bits + let vnodes = Bitmap::ones(VirtualNode::COUNT).into(); // set all the bits let mut iter = UnorderedRangeCacheIter::new(&cache, range, vnodes); for i in VirtualNode::all() { assert_eq!( diff --git a/src/stream/src/executor/row_id_gen.rs b/src/stream/src/executor/row_id_gen.rs index cdb0f8843a16e..04d5eb1a1ec2e 100644 --- a/src/stream/src/executor/row_id_gen.rs +++ b/src/stream/src/executor/row_id_gen.rs @@ -146,7 +146,7 @@ mod tests { ]); let pk_indices = vec![0]; let row_id_index = 0; - let row_id_generator = Bitmap::all_high_bits(VirtualNode::COUNT); + let row_id_generator = Bitmap::ones(VirtualNode::COUNT); let (mut tx, upstream) = MockSource::channel(schema.clone(), pk_indices.clone()); let row_id_gen_executor = Box::new(RowIdGenExecutor::new( Box::new(upstream), diff --git a/src/stream/src/executor/sort.rs b/src/stream/src/executor/sort.rs index 13007ec540283..0a3a7d4aabf0b 100644 --- a/src/stream/src/executor/sort.rs +++ b/src/stream/src/executor/sort.rs @@ -259,7 +259,7 @@ impl SortExecutor { curr_vnode_bitmap.to_owned() }; let mut values_per_vnode = Vec::new(); - for owned_vnode in newly_owned_vnodes.ones() { + for owned_vnode in newly_owned_vnodes.iter_ones() { let value_iter = self .state_table .iter_with_pk_range( diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index d394199c10d4f..c151416aa6138 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -521,7 +521,6 @@ mod tests { use std::sync::Arc; use std::time::Duration; - use bytes::Bytes; use futures::StreamExt; use maplit::{convert_args, hashmap}; use risingwave_common::array::stream_chunk::StreamChunkTestExt; @@ -593,7 +592,7 @@ mod tests { MemoryStateStore::new(), ) .await; - let vnodes = Bitmap::from_bytes(Bytes::from_static(&[0b11111111])); + let vnodes = Bitmap::from_bytes(&[0b11111111]); let executor = SourceExecutor::new( ActorContext::create(0x3f3f3f), @@ -698,7 +697,7 @@ mod tests { ) .await; - let vnodes = Bitmap::from_bytes(Bytes::from_static(&[0b11111111])); + let vnodes = Bitmap::from_bytes(&[0b11111111]); let executor = SourceExecutor::new( ActorContext::create(0x3f3f3f), source_builder, @@ -822,7 +821,7 @@ mod tests { let schema = get_schema(&column_ids, &source_desc); let pk_indices = vec![0_usize]; let (barrier_tx, barrier_rx) = unbounded_channel::(); - let vnodes = Bitmap::from_bytes(Bytes::from_static(&[0b11111111])); + let vnodes = Bitmap::from_bytes(&[0b11111111]); let source_state_handler = SourceStateTableHandler::from_table_catalog( &default_source_internal_table(0x2333), mem_state_store.clone(), diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index c8205d5cccc0f..9f24f61138f04 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -214,7 +214,7 @@ impl WatermarkFilterExecutor { last_checkpoint_watermark = current_watermark.clone(); // Persist the watermark when checkpoint arrives. let vnodes = table.get_vnodes(); - for vnode in vnodes.ones() { + for vnode in vnodes.iter_ones() { let pk = Some(ScalarImpl::Int16(vnode as _)); let row = [pk, Some(current_watermark.clone())]; // FIXME(yuhao): use upsert.