Skip to content

Commit

Permalink
Fix stateless test
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed May 19, 2022
1 parent 7785caa commit a0f5686
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 31 deletions.
2 changes: 1 addition & 1 deletion query/src/procedures/systems/clustering_information.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Procedure for ClusteringInformationProcedure {

let tbl = FuseTable::try_from_table(tbl.as_ref())?;
let definition = if args.len() > 2 { &args[2] } else { "" };
let cluster_keys = get_cluster_keys(tbl, definition).await?;
let cluster_keys = get_cluster_keys(tbl, definition)?;

Ok(ClusteringInformation::new(ctx, tbl, cluster_keys)
.get_clustering_info()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use common_datablocks::DataBlock;
use common_datavalues::prelude::*;
use common_exception::ErrorCode;
use common_exception::Result;
use common_planners::Expression;
use serde_json::json;
Expand Down Expand Up @@ -90,7 +91,8 @@ impl<'a> ClusteringInformation<'a> {

fn get_min_max_stats(&self, block: &BlockMeta) -> Result<(Vec<DataValue>, Vec<DataValue>)> {
if self.table.cluster_keys() != self.cluster_keys || block.cluster_stats.is_none() {
todo!()
// Todo(zhyass): support manually specifying the cluster key.
return Err(ErrorCode::UnImplement("Unimplement error"));
}

let cluster_stats = block.cluster_stats.clone().unwrap();
Expand All @@ -108,6 +110,10 @@ impl<'a> ClusteringInformation<'a> {
});
}

// Gather all cluster statistics points to a sorted Map.
// Key: The cluster statistics points.
// Value: 0: The block indexes with key as min value;
// 1: The block indexes with key as max value;
let mut points_map: BTreeMap<Vec<DataValue>, (Vec<usize>, Vec<usize>)> = BTreeMap::new();
let mut total_constant_block_count = 0;
for (i, block) in blocks.iter().enumerate() {
Expand All @@ -127,7 +133,10 @@ impl<'a> ClusteringInformation<'a> {
.or_insert((vec![], vec![i]));
}

// calculate overlaps and depth.
let mut statis = Vec::new();
// key: the block index.
// value: (overlaps, depth).
let mut unfinished_parts: HashMap<usize, (usize, usize)> = HashMap::new();
for (start, end) in points_map.values() {
let point_depth = unfinished_parts.len() + start.len();
Expand Down Expand Up @@ -195,6 +204,10 @@ impl<'a> ClusteringInformation<'a> {
}
}

// The histogram contains buckets with widths:
// 1 to 16 with increments of 1.
// For buckets larger than 16, increments of twice the width of the previous bucket (e.g. 32, 64, 128, …).
// e.g. If val is 2, the bucket is 2. If val is 18, the bucket is 32.
fn get_buckets(val: usize) -> u32 {
let mut val = val as u32;
if val <= 16 || val & (val - 1) == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl Table for ClusteringInformationTable {
.await?;
let tbl = FuseTable::try_from_table(tbl.as_ref())?;

let cluster_keys = get_cluster_keys(tbl, &self.arg_cluster_keys).await?;
let cluster_keys = get_cluster_keys(tbl, &self.arg_cluster_keys)?;

let blocks = vec![
ClusteringInformation::new(ctx.clone(), tbl, cluster_keys)
Expand All @@ -153,7 +153,7 @@ impl Table for ClusteringInformationTable {
pipeline.add_pipe(NewPipe::SimplePipe {
inputs_port: vec![],
outputs_port: vec![output.clone()],
processors: vec![FuseHistorySource::create(
processors: vec![ClusteringInformationSource::create(
ctx,
output,
self.arg_database_name.to_owned(),
Expand All @@ -166,23 +166,23 @@ impl Table for ClusteringInformationTable {
}
}

struct FuseHistorySource {
struct ClusteringInformationSource {
finish: bool,
ctx: Arc<QueryContext>,
arg_database_name: String,
arg_table_name: String,
arg_cluster_keys: String,
}

impl FuseHistorySource {
impl ClusteringInformationSource {
pub fn create(
ctx: Arc<QueryContext>,
output: Arc<OutputPort>,
arg_database_name: String,
arg_table_name: String,
arg_cluster_keys: String,
) -> Result<ProcessorPtr> {
AsyncSourcer::create(ctx.clone(), output, FuseHistorySource {
AsyncSourcer::create(ctx.clone(), output, ClusteringInformationSource {
ctx,
finish: false,
arg_table_name,
Expand All @@ -192,7 +192,7 @@ impl FuseHistorySource {
}
}

impl AsyncSource for FuseHistorySource {
impl AsyncSource for ClusteringInformationSource {
const NAME: &'static str = "clustering_information";

type BlockFuture<'a> = impl Future<Output = Result<Option<DataBlock>>> where Self: 'a;
Expand All @@ -216,7 +216,7 @@ impl AsyncSource for FuseHistorySource {
.await?;

let tbl = FuseTable::try_from_table(tbl.as_ref())?;
let cluster_keys = get_cluster_keys(tbl, &self.arg_cluster_keys).await?;
let cluster_keys = get_cluster_keys(tbl, &self.arg_cluster_keys)?;

Ok(Some(
ClusteringInformation::new(self.ctx.clone(), tbl, cluster_keys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub fn parse_func_table_args(table_args: &TableArgs) -> Result<(String, String)>
}
}

pub async fn get_cluster_keys(table: &FuseTable, definition: &str) -> Result<Vec<Expression>> {
pub fn get_cluster_keys(table: &FuseTable, definition: &str) -> Result<Vec<Expression>> {
let cluster_keys = if !definition.is_empty() {
let schema = table.schema();
let exprs = PlanParser::parse_exprs(definition)?;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
show value of table being cloned
1 1
2 1
0 3
1 3
4 4
(b, a) 3 1 0.6667 1.6667 {"00001":1,"00002":2}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- Create table t09_0014
create table t09_0014(a int, b int) cluster by(b,a);

insert into t09_0014 values(0,3),(1,1);
insert into t09_0014 values(1,3),(2,1);
insert into t09_0014 values(4,4);

select * from t09_0014 order by b, a;

--Bug in cluster mode: https://github.com/datafuselabs/databend/issues/5473
--select * from clustering_information('default','t09_0014');

-- Drop table.
drop table t09_0014;

0 comments on commit a0f5686

Please sign in to comment.