Skip to content

Commit

Permalink
perf: optimize writing non-null primitive value (GreptimeTeam#5460)
Browse files Browse the repository at this point in the history
* avoid using arrow builder

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* optimize from_vec

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
waynexia authored Jan 30, 2025
1 parent 5b6279f commit f378d21
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 20 deletions.
10 changes: 6 additions & 4 deletions src/datatypes/src/vectors/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,12 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
}
}

pub fn from_vec(array: Vec<T::Native>) -> Self {
Self {
array: PrimitiveArray::from_iter_values(array),
}
pub fn from_vec(vector: Vec<T::Native>) -> Self {
let mutable_buffer = arrow::buffer::MutableBuffer::from(vector);
let mut primitive_builder =
PrimitiveBuilder::<T::ArrowPrimitive>::new_from_buffer(mutable_buffer, None);
let array = primitive_builder.finish();
Self { array }
}

pub fn from_iter_values<I: IntoIterator<Item = T::Native>>(iter: I) -> Self {
Expand Down
51 changes: 35 additions & 16 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ use common_time::Timestamp;
use datatypes::arrow;
use datatypes::arrow::array::ArrayRef;
use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef};
use datatypes::prelude::{MutableVector, Vector, VectorRef};
use datatypes::types::TimestampType;
use datatypes::value::{Value, ValueRef};
use datatypes::vectors::{
Helper, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder,
Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
TimestampSecondVector, UInt64Vector, UInt8Vector,
};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
Expand Down Expand Up @@ -691,22 +693,23 @@ impl Series {

/// `ValueBuilder` holds all the vector builders for field columns.
struct ValueBuilder {
timestamp: Box<dyn MutableVector>,
sequence: UInt64VectorBuilder,
op_type: UInt8VectorBuilder,
timestamp: Vec<i64>,
timestamp_type: ConcreteDataType,
sequence: Vec<u64>,
op_type: Vec<u8>,
fields: Vec<Option<Box<dyn MutableVector>>>,
field_types: Vec<ConcreteDataType>,
}

impl ValueBuilder {
fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self {
let timestamp = region_metadata
let timestamp_type = region_metadata
.time_index_column()
.column_schema
.data_type
.create_mutable_vector(capacity);
let sequence = UInt64VectorBuilder::with_capacity(capacity);
let op_type = UInt8VectorBuilder::with_capacity(capacity);
.clone();
let sequence = Vec::with_capacity(capacity);
let op_type = Vec::with_capacity(capacity);

let field_types = region_metadata
.field_columns()
Expand All @@ -715,7 +718,8 @@ impl ValueBuilder {
let fields = (0..field_types.len()).map(|_| None).collect();

Self {
timestamp,
timestamp: Vec::with_capacity(capacity),
timestamp_type,
sequence,
op_type,
fields,
Expand All @@ -727,9 +731,10 @@ impl ValueBuilder {
/// We don't need primary keys since they've already be encoded.
fn push(&mut self, ts: ValueRef, sequence: u64, op_type: u8, fields: Vec<ValueRef>) {
debug_assert_eq!(fields.len(), self.fields.len());
self.timestamp.push_value_ref(ts);
self.sequence.push_value_ref(ValueRef::UInt64(sequence));
self.op_type.push_value_ref(ValueRef::UInt8(op_type));
self.timestamp
.push(ts.as_timestamp().unwrap().unwrap().value());
self.sequence.push(sequence);
self.op_type.push(op_type);
let num_rows = self.timestamp.len();
for (idx, field_value) in fields.into_iter().enumerate() {
if !field_value.is_null() || self.fields[idx].is_some() {
Expand Down Expand Up @@ -844,9 +849,23 @@ impl From<ValueBuilder> for Values {
}
})
.collect::<Vec<_>>();
let sequence = Arc::new(value.sequence.finish());
let op_type = Arc::new(value.op_type.finish());
let timestamp = value.timestamp.to_vector();
let sequence = Arc::new(UInt64Vector::from_vec(value.sequence));
let op_type = Arc::new(UInt8Vector::from_vec(value.op_type));
let timestamp: VectorRef = match value.timestamp_type {
ConcreteDataType::Timestamp(TimestampType::Second(_)) => {
Arc::new(TimestampSecondVector::from_vec(value.timestamp))
}
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
Arc::new(TimestampMillisecondVector::from_vec(value.timestamp))
}
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp))
}
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
Arc::new(TimestampNanosecondVector::from_vec(value.timestamp))
}
_ => unreachable!(),
};

if cfg!(debug_assertions) {
debug_assert_eq!(timestamp.len(), sequence.len());
Expand Down

0 comments on commit f378d21

Please sign in to comment.