Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce columnar segment info cache #16505

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/query/ee/tests/it/inverted_index/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ async fn apply_block_pruning(
FusePruner::create(&ctx, dal, schema, push_down, bloom_index_cols, None)?
.read_pruning(segment_locs)
.await
.map(|v| {
v.into_iter()
.map(|(block_meta_index, block_meta, _)| (block_meta_index, block_meta))
.collect()
})
}

#[tokio::test(flavor = "multi_thread")]
Expand Down
4 changes: 3 additions & 1 deletion src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ impl DataBlock {

#[inline]
pub fn new_from_columns(columns: Vec<Column>) -> Self {
assert!(!columns.is_empty());
if columns.is_empty() {
return DataBlock::empty();
}
let num_rows = columns[0].len();
debug_assert!(columns.iter().all(|c| c.len() == num_rows));

Expand Down
1 change: 1 addition & 0 deletions src/query/expression/src/converts/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ mod to;
pub const EXTENSION_KEY: &str = "Extension";

pub use to::table_schema_to_arrow_schema;
pub use to::table_type_to_arrow_type;
5 changes: 5 additions & 0 deletions src/query/expression/src/converts/arrow/to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::Column;
use crate::DataBlock;
use crate::DataField;
use crate::DataSchema;
use crate::TableDataType;
use crate::TableField;
use crate::TableSchema;

Expand Down Expand Up @@ -197,3 +198,7 @@ fn arrow_field_from_arrow2_field(field: Arrow2Field) -> ArrowField {

ArrowField::new(field.name, data_type, field.is_nullable).with_metadata(metadata)
}

pub fn table_type_to_arrow_type(ty: &TableDataType) -> ArrowDataType {
arrow_schema::DataType::from(crate::converts::arrow2::table_type_to_arrow_type(ty))
}
1 change: 1 addition & 0 deletions src/query/expression/src/converts/arrow2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ pub const ARROW_EXT_TYPE_GEOMETRY: &str = "Geometry";
pub const ARROW_EXT_TYPE_GEOGRAPHY: &str = "Geography";

pub use to::set_validities;
pub use to::table_type_to_arrow_type;
2 changes: 1 addition & 1 deletion src/query/expression/src/converts/arrow2/to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl From<&DataField> for ArrowField {

// Note: Arrow's data type is not nullable, so we need to explicitly
// add nullable information to Arrow's field afterwards.
fn table_type_to_arrow_type(ty: &TableDataType) -> ArrowDataType {
pub fn table_type_to_arrow_type(ty: &TableDataType) -> ArrowDataType {
match ty {
TableDataType::Null => ArrowDataType::Null,
TableDataType::EmptyArray => ArrowDataType::Extension(
Expand Down
213 changes: 133 additions & 80 deletions src/query/service/tests/it/storages/fuse/bloom_index_meta_size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use databend_common_expression::FromData;
use databend_common_expression::Scalar;
use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_expression::TableSchema;
use databend_common_expression::TableSchemaRefExt;
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
use databend_common_storages_fuse::statistics::gen_columns_statistics;
Expand All @@ -39,6 +40,7 @@ use databend_storages_common_cache::InMemoryLruCache;
use databend_storages_common_table_meta::meta::BlockMeta;
use databend_storages_common_table_meta::meta::ColumnMeta;
use databend_storages_common_table_meta::meta::ColumnStatistics;
use databend_storages_common_table_meta::meta::ColumnarSegmentInfo;
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
use databend_storages_common_table_meta::meta::Compression;
use databend_storages_common_table_meta::meta::Location;
Expand All @@ -48,6 +50,7 @@ use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::Versioned;
use opendal::Operator;
use parquet::format::FileMetaData;
use rand::Rng;
use sysinfo::get_current_pid;
use sysinfo::System;
use uuid::Uuid;
Expand Down Expand Up @@ -160,7 +163,46 @@ async fn test_segment_info_size() -> databend_common_exception::Result<()> {
let cache_number = 3000;
let num_block_per_seg = 1000;

let segment_info = build_test_segment_info(num_block_per_seg)?;
let (segment_info, _) = build_test_segment_info(num_block_per_seg)?;

let sys = System::new_all();
let pid = get_current_pid().unwrap();
let process = sys.process(pid).unwrap();
let base_memory_usage = process.memory();
let scenario = format!(
"{} SegmentInfo, {} block per seg ",
cache_number, num_block_per_seg
);

eprintln!(
"scenario {}, pid {}, base memory {}",
scenario, pid, base_memory_usage
);

let cache = InMemoryLruCache::with_items_capacity(String::from(""), cache_number);
for _ in 0..cache_number {
let uuid = Uuid::new_v4();
let block_metas = segment_info
.blocks
.iter()
.map(|b: &Arc<BlockMeta>| Arc::new(b.as_ref().clone()))
.collect::<Vec<_>>();
let statistics = segment_info.summary.clone();
let segment_info = SegmentInfo::new(block_metas, statistics);
cache.insert(format!("{}", uuid.simple()), segment_info);
}
show_memory_usage("SegmentInfoCache", base_memory_usage, cache_number);

Ok(())
}

#[tokio::test(flavor = "multi_thread")]
#[ignore]
async fn test_columnar_segment_info_size() -> databend_common_exception::Result<()> {
let cache_number = 3000;
let num_block_per_seg = 1000;

let (segment_info, table_schema) = build_test_segment_info(num_block_per_seg)?;

let sys = System::new_all();
let pid = get_current_pid().unwrap();
Expand All @@ -180,20 +222,16 @@ async fn test_segment_info_size() -> databend_common_exception::Result<()> {
{
for _ in 0..cache_number {
let uuid = Uuid::new_v4();
let block_metas = segment_info
.blocks
.iter()
.map(|b: &Arc<BlockMeta>| Arc::new(b.as_ref().clone()))
.collect::<Vec<_>>();
let statistics = segment_info.summary.clone();
let segment_info = SegmentInfo::new(block_metas, statistics);
cache.insert(
format!("{}", uuid.simple()),
CompactSegmentInfo::try_from(segment_info)?,
ColumnarSegmentInfo::try_from_segment_info_and_schema(
segment_info.clone(),
&table_schema,
)?,
);
}
}
show_memory_usage("SegmentInfoCache", base_memory_usage, cache_number);
show_memory_usage("ColumnarSegmentInfoCache", base_memory_usage, cache_number);

Ok(())
}
Expand All @@ -205,7 +243,7 @@ async fn test_segment_raw_bytes_size() -> databend_common_exception::Result<()>
let cache_number = 3000;
let num_block_per_seg = 1000;

let segment_info = build_test_segment_info(num_block_per_seg)?;
let (segment_info, _) = build_test_segment_info(num_block_per_seg)?;
let segment_info_bytes = CompactSegmentInfo::try_from(segment_info)?;

let sys = System::new_all();
Expand Down Expand Up @@ -245,7 +283,7 @@ async fn test_segment_raw_repr_bytes_size() -> databend_common_exception::Result
let cache_number = 3000;
let num_block_per_seg = 1000;

let segment_info = build_test_segment_info(num_block_per_seg)?;
let (segment_info, _) = build_test_segment_info(num_block_per_seg)?;
let segment_raw = CompactSegmentInfo::try_from(&segment_info)?;

let sys = System::new_all();
Expand Down Expand Up @@ -280,80 +318,95 @@ async fn test_segment_raw_repr_bytes_size() -> databend_common_exception::Result

fn build_test_segment_info(
num_blocks_per_seg: usize,
) -> databend_common_exception::Result<SegmentInfo> {
let col_meta = ColumnMeta::Parquet(SingleColumnMeta {
offset: 0,
len: 0,
num_values: 0,
});

let col_stat = ColumnStatistics::new(
Scalar::String(String::from_utf8(vec![b'a'; STATS_STRING_PREFIX_LEN])?),
Scalar::String(String::from_utf8(vec![b'a'; STATS_STRING_PREFIX_LEN])?),
0,
0,
None,
);

let number_col_stat = ColumnStatistics::new(
Scalar::Number(NumberScalar::Int32(0)),
Scalar::Number(NumberScalar::Int32(0)),
0,
0,
None,
);

// 20 string columns, 5 number columns
) -> databend_common_exception::Result<(SegmentInfo, TableSchema)> {
let mut rng = rand::thread_rng();
let num_string_columns = 20;
let num_number_columns = 5;
let col_metas = (0..num_string_columns + num_number_columns)
.map(|id| (id as ColumnId, col_meta.clone()))
.collect::<HashMap<_, _>>();

assert_eq!(num_number_columns + num_string_columns, col_metas.len());

let mut col_stats = (0..num_string_columns)
.map(|id| (id as ColumnId, col_stat.clone()))
.collect::<HashMap<_, _>>();
for idx in num_string_columns..num_string_columns + num_number_columns {
col_stats.insert(idx as ColumnId, number_col_stat.clone());
let location_gen = TableMetaLocationGenerator::with_prefix("/root/12345/67890".to_owned());
let mut block_metas = Vec::with_capacity(num_blocks_per_seg);
for _ in 0..num_blocks_per_seg {
let (block_location, block_uuid) = location_gen.gen_block_location();
let mut col_stats = HashMap::new();
let mut col_metas = HashMap::new();
for id in 0..num_string_columns + num_number_columns {
col_metas.insert(
id as ColumnId,
ColumnMeta::Parquet(SingleColumnMeta {
offset: rng.gen_range(0..150 * 1024 * 1024),
len: rng.gen_range(10 * 1024..10 * 1024 * 1024),
num_values: rng.gen_range(100_000..1_000_000),
}),
);
}
for id in 0..num_string_columns {
col_stats.insert(
id as ColumnId,
ColumnStatistics::new(
Scalar::String(
(0..STATS_STRING_PREFIX_LEN)
.map(|_| rng.gen_range(b'a'..=b'z') as char)
.collect(),
),
Scalar::String(
(0..STATS_STRING_PREFIX_LEN)
.map(|_| rng.gen_range(b'a'..=b'z') as char)
.collect(),
),
rng.gen_range(100_000..1_000_000),
rng.gen_range(100_000..1_000_000),
Some(rng.gen_range(10_000..100_000)),
),
);
}
for id in num_string_columns..num_string_columns + num_number_columns {
col_stats.insert(
id as ColumnId,
ColumnStatistics::new(
Scalar::Number(NumberScalar::Int32(rng.gen_range(-100_000..100_000))),
Scalar::Number(NumberScalar::Int32(rng.gen_range(-100_000..100_000))),
rng.gen_range(100_000..1_000_000),
rng.gen_range(100_000..1_000_000),
Some(rng.gen_range(10_000..100_000)),
),
);
}
assert_eq!(col_metas.len(), num_string_columns + num_number_columns);
assert_eq!(col_stats.len(), num_string_columns + num_number_columns);
let block_meta = BlockMeta {
row_count: rng.gen_range(100_000..1_000_000),
block_size: rng.gen_range(50 * 1024 * 1024..150 * 1024 * 1024),
file_size: rng.gen_range(10 * 1024 * 1024..50 * 1024 * 1024),
col_stats,
col_metas,
cluster_stats: None,
location: block_location,
bloom_filter_index_location: Some(location_gen.block_bloom_index_location(&block_uuid)),
bloom_filter_index_size: rng.gen_range(1024 * 1024..5 * 1024 * 1024),
inverted_index_size: None,
compression: Compression::Lz4,
create_on: Some(Utc::now()),
};
block_metas.push(Arc::new(block_meta));
}
assert_eq!(num_number_columns + num_string_columns, col_stats.len());

let location_gen = TableMetaLocationGenerator::with_prefix("/root/12345/67890".to_owned());
let mut fields = vec![];
for id in 0..num_string_columns {
fields.push(TableField::new(
&format!("col_{}", id),
TableDataType::String,
));
}
for id in num_string_columns..num_string_columns + num_number_columns {
fields.push(TableField::new(
&format!("col_{}", id),
TableDataType::Number(NumberDataType::Int32),
));
}
let table_schema = TableSchema::new(fields);

let (block_location, block_uuid) = location_gen.gen_block_location();
let block_meta = BlockMeta {
row_count: 0,
block_size: 0,
file_size: 0,
col_stats: col_stats.clone(),
col_metas,
cluster_stats: None,
location: block_location,
bloom_filter_index_location: Some(location_gen.block_bloom_index_location(&block_uuid)),
bloom_filter_index_size: 0,
inverted_index_size: None,
compression: Compression::Lz4,
create_on: Some(Utc::now()),
};

let block_metas = (0..num_blocks_per_seg)
.map(|_| Arc::new(block_meta.clone()))
.collect::<Vec<_>>();
let statistics = Statistics::default();

let statistics = Statistics {
row_count: 0,
block_count: 0,
perfect_block_count: 0,
uncompressed_byte_size: 0,
compressed_byte_size: 0,
index_size: 0,
col_stats: col_stats.clone(),
cluster_stats: None,
};

Ok(SegmentInfo::new(block_metas, statistics))
Ok((SegmentInfo::new(block_metas, statistics), table_schema))
}

#[allow(dead_code)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ fn test_to_partitions() -> Result<()> {
));

let blocks_metas = (0..num_of_block)
.map(|_| (None, block_meta.clone()))
.map(|_| (None, block_meta.clone(), None))
.collect::<Vec<_>>();

let column_nodes = (0..num_of_col).map(col_nodes_gen).collect::<Vec<_>>();
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/tests/it/storages/fuse/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async fn apply_block_pruning(
FusePruner::create(&ctx, op, schema, push_down, bloom_index_cols, None)?
.read_pruning(segment_locs)
.await
.map(|v| v.into_iter().map(|(_, v)| v).collect())
.map(|v| v.into_iter().map(|(_, v, _)| v).collect())
}

#[tokio::test(flavor = "multi_thread")]
Expand Down
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,12 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("enable_columnar_segment_info", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
desc: "Enables columnar segment info.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("enable_query_result_cache", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Enables caching query results to improve performance for identical queries.",
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,4 +764,8 @@ impl Settings {
pub fn set_short_sql_max_length(&self, val: u64) -> Result<()> {
self.try_set_u64("short_sql_max_length", val)
}

pub fn get_enable_columnar_segment_info(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_columnar_segment_info")? == 1)
}
}
Loading
Loading