diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index 6212276dd0ec..7b059e0d0731 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -80,10 +80,12 @@ impl PrimitiveVector { } } - pub fn from_vec(array: Vec) -> Self { - Self { - array: PrimitiveArray::from_iter_values(array), - } + pub fn from_vec(vector: Vec) -> Self { + let mutable_buffer = arrow::buffer::MutableBuffer::from(vector); + let mut primitive_builder = + PrimitiveBuilder::::new_from_buffer(mutable_buffer, None); + let array = primitive_builder.finish(); + Self { array } } pub fn from_iter_values>(iter: I) -> Self { diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 88fa058c6b59..f1b693a8b83b 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -26,10 +26,12 @@ use common_time::Timestamp; use datatypes::arrow; use datatypes::arrow::array::ArrayRef; use datatypes::data_type::{ConcreteDataType, DataType}; -use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef}; +use datatypes::prelude::{MutableVector, Vector, VectorRef}; +use datatypes::types::TimestampType; use datatypes::value::{Value, ValueRef}; use datatypes::vectors::{ - Helper, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, + Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, + TimestampSecondVector, UInt64Vector, UInt8Vector, }; use snafu::{ensure, ResultExt}; use store_api::metadata::RegionMetadataRef; @@ -691,22 +693,23 @@ impl Series { /// `ValueBuilder` holds all the vector builders for field columns. struct ValueBuilder { - timestamp: Box, - sequence: UInt64VectorBuilder, - op_type: UInt8VectorBuilder, + timestamp: Vec, + timestamp_type: ConcreteDataType, + sequence: Vec, + op_type: Vec, fields: Vec>>, field_types: Vec, } impl ValueBuilder { fn new(region_metadata: &RegionMetadataRef, capacity: usize) -> Self { - let timestamp = region_metadata + let timestamp_type = region_metadata .time_index_column() .column_schema .data_type - .create_mutable_vector(capacity); - let sequence = UInt64VectorBuilder::with_capacity(capacity); - let op_type = UInt8VectorBuilder::with_capacity(capacity); + .clone(); + let sequence = Vec::with_capacity(capacity); + let op_type = Vec::with_capacity(capacity); let field_types = region_metadata .field_columns() @@ -715,7 +718,8 @@ impl ValueBuilder { let fields = (0..field_types.len()).map(|_| None).collect(); Self { - timestamp, + timestamp: Vec::with_capacity(capacity), + timestamp_type, sequence, op_type, fields, @@ -727,9 +731,10 @@ impl ValueBuilder { /// We don't need primary keys since they've already be encoded. fn push(&mut self, ts: ValueRef, sequence: u64, op_type: u8, fields: Vec) { debug_assert_eq!(fields.len(), self.fields.len()); - self.timestamp.push_value_ref(ts); - self.sequence.push_value_ref(ValueRef::UInt64(sequence)); - self.op_type.push_value_ref(ValueRef::UInt8(op_type)); + self.timestamp + .push(ts.as_timestamp().unwrap().unwrap().value()); + self.sequence.push(sequence); + self.op_type.push(op_type); let num_rows = self.timestamp.len(); for (idx, field_value) in fields.into_iter().enumerate() { if !field_value.is_null() || self.fields[idx].is_some() { @@ -844,9 +849,23 @@ impl From for Values { } }) .collect::>(); - let sequence = Arc::new(value.sequence.finish()); - let op_type = Arc::new(value.op_type.finish()); - let timestamp = value.timestamp.to_vector(); + let sequence = Arc::new(UInt64Vector::from_vec(value.sequence)); + let op_type = Arc::new(UInt8Vector::from_vec(value.op_type)); + let timestamp: VectorRef = match value.timestamp_type { + ConcreteDataType::Timestamp(TimestampType::Second(_)) => { + Arc::new(TimestampSecondVector::from_vec(value.timestamp)) + } + ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => { + Arc::new(TimestampMillisecondVector::from_vec(value.timestamp)) + } + ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => { + Arc::new(TimestampMicrosecondVector::from_vec(value.timestamp)) + } + ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => { + Arc::new(TimestampNanosecondVector::from_vec(value.timestamp)) + } + _ => unreachable!(), + }; if cfg!(debug_assertions) { debug_assert_eq!(timestamp.len(), sequence.len());