Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: shrink min max index #7958

Merged
merged 5 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions src/query/service/tests/it/storages/fuse/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ use std::collections::HashMap;
use common_base::base::tokio;
use common_datablocks::DataBlock;
use common_datavalues::prelude::*;
use common_functions::aggregates::eval_aggr;
use common_fuse_meta::meta::ClusterStatistics;
use common_fuse_meta::meta::ColumnStatistics;
use common_legacy_expression::add;
use common_legacy_expression::col;
use common_legacy_expression::lit;
use common_pipeline_transforms::processors::ExpressionExecutor;
use common_storages_fuse::statistics::Trim;
use common_storages_fuse::statistics::STATS_REPLACEMENT_CHAR;
use common_storages_fuse::statistics::STATS_STRING_PREFIX_LEN;
use databend_query::storages::fuse::io::BlockCompactor;
use databend_query::storages::fuse::io::BlockWriter;
use databend_query::storages::fuse::io::TableMetaLocationGenerator;
Expand All @@ -32,6 +36,7 @@ use databend_query::storages::fuse::statistics::BlockStatistics;
use databend_query::storages::fuse::statistics::ClusterStatsGenerator;
use databend_query::storages::fuse::statistics::StatisticsAccumulator;
use opendal::Operator;
use rand::Rng;

use crate::storages::fuse::table_test_fixture::TestFixture;

Expand Down Expand Up @@ -235,3 +240,169 @@ async fn test_ft_cluster_stats_with_stats() -> common_exception::Result<()> {

Ok(())
}

#[test]
fn test_ft_stats_block_stats_string_columns_trimming() -> common_exception::Result<()> {
let suite = || -> common_exception::Result<()> {
// prepare random strings
// 100 string, length ranges from 0 to 100 (chars)
let mut rand_strings: Vec<String> = vec![];
for _ in 0..100 {
let mut rnd = rand::thread_rng();
let rand_string: String = rand::thread_rng()
.sample_iter::<char, _>(rand::distributions::Standard)
.take(rnd.gen_range(0..1000))
.collect();

rand_strings.push(rand_string);
}

let min_expr = rand_strings.iter().min().unwrap();
let max_expr = rand_strings.iter().max().unwrap();

let data_value_min = DataValue::String(min_expr.to_owned().into_bytes());
let data_value_max = DataValue::String(max_expr.to_owned().into_bytes());

let trimmed_min = data_value_min.clone().trim_min();
let trimmed_max = data_value_max.clone().trim_max();

let meaningless_to_collect_max = is_degenerated_case(max_expr.as_str());

if meaningless_to_collect_max {
assert!(trimmed_max.is_none());
} else {
assert!(trimmed_max.is_some());
let trimmed = trimmed_max.unwrap().as_string()?;
assert!(char_len(&trimmed) <= STATS_STRING_PREFIX_LEN);
assert!(DataValue::String(trimmed) >= data_value_max)
}

{
assert!(trimmed_min.is_some());
let trimmed = trimmed_min.unwrap().as_string()?;
assert!(char_len(&trimmed) <= STATS_STRING_PREFIX_LEN);
assert!(DataValue::String(trimmed) <= data_value_min);
}
Ok(())
};

// let runs = 0..1000; // use this at home
let runs = 0..100;
for _ in runs {
suite()?
}
Ok(())
}

#[test]
fn test_ft_stats_block_stats_string_columns_trimming_using_eval() -> common_exception::Result<()> {
let data_type = StringType::new_impl();
let data_filed = DataField::new("a", data_type.clone());
let schema = DataSchemaRefExt::create(vec![data_filed]);

// verifies (randomly) the following assumptions:
//
// https://github.com/datafuselabs/databend/issues/7829
// > ...
// > in a way that preserves the property of min/max statistics:
// > the trimmed max should be larger than the non-trimmed one, and the trimmed min
// > should be lesser than the non-trimmed one.

let suite = || -> common_exception::Result<()> {
// prepare random strings
// 100 string, length ranges from 0 to 100 (chars)
let mut rand_strings: Vec<String> = vec![];
for _ in 0..100 {
let mut rnd = rand::thread_rng();
let rand_string: String = rand::thread_rng()
.sample_iter::<char, _>(rand::distributions::Standard)
.take(rnd.gen_range(0..1000))
.collect();

rand_strings.push(rand_string);
}

// build test data block, which has only on column, of String type
let data_col = Series::from_data(
rand_strings
.iter()
.map(|s| s.as_str())
.collect::<Vec<&str>>(),
);
let block = DataBlock::create(schema.clone(), vec![data_col.clone()]);

// calculate UNTRIMMED min max values of the test column
// by using eval_aggr (to be consistent with the column_statistic mod)
let data_field = DataField::new("", data_type.clone());
let rows = data_col.len();
let column_field = ColumnWithField::new(data_col, data_field);
let min_col = eval_aggr("min", vec![], &[column_field.clone()], rows)?;
let max_col = eval_aggr("max", vec![], &[column_field], rows)?;

let min_expr = min_col.get(0);
let max_expr = max_col.get(0);

// generate the statistics of column
let stats_of_columns = gen_columns_statistics(&block).unwrap();

// check if the max value (untrimmed) is in degenerated condition:
// - the length of string value is larger or equal than STRING_PREFIX_LEN
// - AND the string has a prefix of length STRING_PREFIX_LEN, for all the char C in prefix,
// C > REPLACEMENT_CHAR; which means we can not replace any of them.
let string_max_expr = String::from_utf8(max_expr.as_string()?).unwrap();
let meaningless_to_collect_max = is_degenerated_case(string_max_expr.as_str());

if meaningless_to_collect_max {
// no stats will be collected
assert!(stats_of_columns.get(&0).is_none())
} else {
// Finally:
// check that, trimmed "col_stats.max" always large than or equal to the untrimmed "max_expr"
let col_stats = stats_of_columns.get(&0).unwrap();
assert!(
col_stats.max >= max_expr,
"left [{}]\nright [{}]",
col_stats.max,
max_expr
);
// check that, trimmed "col_stats.min" always less than or equal to the untrimmed "mn_expr"
assert!(
col_stats.min <= min_expr,
"left [{}]\nright [{}]",
col_stats.min,
min_expr
);
}
Ok(())
};

// let runs = 0..1000; // use this at home

let runs = 0..100;
for _ in runs {
suite()?
}
Ok(())
}

fn is_degenerated_case(value: &str) -> bool {
// check if the value (untrimmed) is in degenerated condition:
// - the length of string value is larger or equal than STRING_PREFIX_LEN
// - AND the string has a prefix of length STRING_PREFIX_LEN, for all the char C in prefix,
// C > REPLACEMENT_CHAR; which means we can not replace any of them.
let larger_than_prefix_len = value.chars().count() > STATS_STRING_PREFIX_LEN;
let prefixed_with_irreplaceable_chars = value
.chars()
.take(STATS_STRING_PREFIX_LEN)
.all(|c| c >= STATS_REPLACEMENT_CHAR);

larger_than_prefix_len && prefixed_with_irreplaceable_chars
}

fn char_len(value: &[u8]) -> usize {
String::from_utf8(value.to_vec())
.unwrap()
.as_str()
.chars()
.count()
}
14 changes: 7 additions & 7 deletions src/query/service/tests/it/storages/index/range_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@ async fn test_range_filter() -> Result<()> {
in_memory_size: 0,
});

struct Test {
name: &'static str,
expr: LegacyExpression,
expect: bool,
error: &'static str,
}

let tests: Vec<Test> = vec![
Test {
name: "a < 1 and b > 3",
Expand Down Expand Up @@ -423,3 +416,10 @@ fn test_bound_for_like_pattern() -> Result<()> {

Ok(())
}

struct Test {
name: &'static str,
expr: LegacyExpression,
expect: bool,
error: &'static str,
}
4 changes: 2 additions & 2 deletions src/query/storages/fuse-meta/src/meta/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub type ClusterKey = (u32, String);

pub type StatisticsOfColumns = HashMap<u32, ColumnStatistics>;

#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Eq, PartialEq)]
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct ColumnStatistics {
pub min: DataValue,
pub max: DataValue,
Expand Down Expand Up @@ -58,7 +58,7 @@ fn default_level() -> i32 {
0
}

#[derive(Serialize, Deserialize, Clone, Debug, Default)]
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, Default)]
pub struct Statistics {
pub row_count: u64,
pub block_count: u64,
Expand Down
114 changes: 112 additions & 2 deletions src/query/storages/fuse/src/statistics/column_statistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,21 @@ pub fn gen_columns_statistics(data_block: &DataBlock) -> Result<StatisticsOfColu
let maxs = eval_aggr("max", vec![], &[column_field], rows)?;

if mins.len() > 0 {
min = mins.get(0);
min = if let Some(v) = mins.get(0).trim_min() {
v
} else {
continue;
}
}

if maxs.len() > 0 {
max = maxs.get(0);
max = if let Some(v) = maxs.get(0).trim_max() {
v
} else {
continue;
}
}

let (is_all_null, bitmap) = col.validity();
let unset_bits = match (is_all_null, bitmap) {
(true, _) => rows,
Expand Down Expand Up @@ -104,3 +114,103 @@ pub mod traverse {
Ok(())
}
}

// Impls of this trait should preserves the property of min/max statistics:
//
// the trimmed max should be larger than the non-trimmed one (if possible).
// and the trimmed min should be lesser than the non-trimmed one (if possible).
pub trait Trim: Sized {
fn trim_min(self) -> Option<Self>;
fn trim_max(self) -> Option<Self>;
}

pub const STATS_REPLACEMENT_CHAR: char = '\u{FFFD}';
pub const STATS_STRING_PREFIX_LEN: usize = 16;

impl Trim for DataValue {
fn trim_min(self) -> Option<Self> {
match self {
DataValue::String(bytes) => match String::from_utf8(bytes) {
Ok(mut v) => {
if v.len() <= STATS_STRING_PREFIX_LEN {
Some(DataValue::String(v.into_bytes()))
} else {
// find the character boundary to prevent String::truncate from panic
let vs = v.as_str();
let slice = match vs.char_indices().nth(STATS_STRING_PREFIX_LEN) {
None => vs,
Some((idx, _)) => &vs[..idx],
};

// do truncate
Some(DataValue::String({
v.truncate(slice.len());
v.into_bytes()
}))
}
}
Err(_) => {
// if failed to convert the bytes into (utf-8)string, just ignore it.
None
}
},
v => Some(v),
}
}

fn trim_max(self) -> Option<Self> {
match self {
DataValue::String(bytes) => match String::from_utf8(bytes) {
Ok(v) => {
if v.len() <= STATS_STRING_PREFIX_LEN {
// if number of bytes is lesser, just return
Some(DataValue::String(v.into_bytes()))
} else {
// no need to trim, less than STRING_RREFIX_LEN chars
let number_of_chars = v.as_str().chars().count();
if number_of_chars <= STATS_STRING_PREFIX_LEN {
return Some(DataValue::String(v.into_bytes()));
}

// slice the input (at the boundary of chars), takes at most STRING_PREFIX_LEN chars
let vs = v.as_str();
let sliced = match vs.char_indices().nth(STATS_STRING_PREFIX_LEN) {
None => vs,
Some((idx, _)) => &vs[..idx],
};

// find the position to replace the char with REPLACEMENT_CHAR
// in reversed order, break at the first one we met
let mut idx = None;
for (i, c) in sliced.char_indices().rev() {
if c < STATS_REPLACEMENT_CHAR {
idx = Some(i);
break;
}
}

// grab the replacement_point
let replacement_point = idx?;

// rebuild the string (since the len of result string is rather small)
let mut r = String::with_capacity(STATS_STRING_PREFIX_LEN);
for (i, c) in sliced.char_indices() {
if i < replacement_point {
r.push(c)
} else {
r.push(STATS_REPLACEMENT_CHAR);
}
}

Some(DataValue::String(r.into_bytes()))
}
}
Err(_) => {
// if failed to convert the bytes into (utf-8)string, just ignore it.
None
}
},
v => Some(v),
}
}
}
3 changes: 3 additions & 0 deletions src/query/storages/fuse/src/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,8 @@ pub use block_statistics::BlockStatistics;
pub use cluster_statistics::ClusterStatsGenerator;
pub use column_statistic::gen_columns_statistics;
pub use column_statistic::traverse;
pub use column_statistic::Trim;
pub use column_statistic::STATS_REPLACEMENT_CHAR;
pub use column_statistic::STATS_STRING_PREFIX_LEN;
pub use reducers::merge_statistics;
pub use reducers::reduce_block_statistics;
15 changes: 13 additions & 2 deletions src/query/storages/index/src/filters/xor8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,21 @@ impl Filter for Xor8Filter {

impl SupportedType for Xor8Filter {
fn is_supported_type(data_type: &DataTypeImpl) -> bool {
// Bloom index only enabled for String and Boolean types for now
// Bloom index only enabled for String and Integral types for now
let inner_type = remove_nullable(data_type);
let data_type_id = inner_type.data_type_id();
matches!(data_type_id, TypeID::String)
matches!(
data_type_id,
TypeID::String
| TypeID::UInt8
| TypeID::UInt16
| TypeID::UInt32
| TypeID::UInt64
| TypeID::Int8
| TypeID::Int16
| TypeID::Int32
| TypeID::Int64
)
}
}

Expand Down
Loading