Skip to content

Commit

Permalink
fix(query): remove topk optimization in parquet2 (#14297)
Browse files Browse the repository at this point in the history
* fix(query): remove topk optimization in parquet2

* fix(query): remove topk optimization in parquet2
  • Loading branch information
sundy-li authored Jan 10, 2024
1 parent d305e82 commit 0213c9b
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 99 deletions.
15 changes: 0 additions & 15 deletions src/query/storages/parquet/src/parquet2/parquet_table/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ impl Parquet2Table {
Projection::Columns(indices)
};

let top_k = push_down
.as_ref()
.map(|p| p.top_k(&self.table_info.schema()))
.unwrap_or_default();

// Currently, arrow2 doesn't support reading stats of a inner column of a nested type.
// Therefore, if there is inner fields in projection, we skip the row group pruning.
let skip_pruning = matches!(projection, Projection::InnerColumns(_));
Expand Down Expand Up @@ -91,15 +86,6 @@ impl Parquet2Table {
.map(|f| f.inverted_filter.as_expr(&BUILTIN_FUNCTIONS))
});

let top_k = top_k.map(|top_k| {
let offset = projected_column_nodes
.column_nodes
.iter()
.position(|node| node.leaf_indices[0] == top_k.leaf_id)
.unwrap();
(top_k, offset)
});

let func_ctx = ctx.get_function_context()?;

let row_group_pruner = if self.read_options.prune_row_groups() {
Expand Down Expand Up @@ -133,7 +119,6 @@ impl Parquet2Table {
columns_to_read,
column_nodes: projected_column_nodes,
skip_pruning,
top_k,
parquet_fast_read_bytes,
compression_ratio: self.compression_ratio,
max_memory_usage: settings.get_max_memory_usage()?,
Expand Down
35 changes: 1 addition & 34 deletions src/query/storages/parquet/src/parquet2/parquet_table/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use databend_common_expression::Expr;
use databend_common_expression::FunctionContext;
use databend_common_expression::RemoteExpr;
use databend_common_expression::TableSchemaRef;
use databend_common_expression::TopKSorter;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_pipeline_core::Pipeline;

Expand All @@ -41,8 +40,6 @@ pub struct Parquet2PrewhereInfo {
pub func_ctx: FunctionContext,
pub reader: Arc<Parquet2Reader>,
pub filter: Expr,
pub top_k: Option<(usize, TopKSorter)>,
// the usize is the index of the column in ParquetReader.schema
}

impl Parquet2Table {
Expand Down Expand Up @@ -78,31 +75,8 @@ impl Parquet2Table {
source_projection,
)?;

// build top k information
let top_k = plan
.push_downs
.as_ref()
.map(|p| p.top_k(&table_schema))
.unwrap_or_default();

// Build prewhere info.
let mut push_down_prewhere = PushDownInfo::prewhere_of_push_downs(plan.push_downs.as_ref());

let top_k = if let Some((prewhere, top_k)) = push_down_prewhere.as_mut().zip(top_k) {
// If there is a top k, we need to add the top k columns to the prewhere columns.
if let RemoteExpr::<String>::ColumnRef { id, .. } =
&plan.push_downs.as_ref().unwrap().order_by[0].0
{
let index = table_schema.index_of(id)?;
prewhere.remain_columns.remove_col(index);
prewhere.prewhere_columns.add_col(index);
Some((id.clone(), top_k))
} else {
None
}
} else {
None
};
let push_down_prewhere = PushDownInfo::prewhere_of_push_downs(plan.push_downs.as_ref());

// Build remain reader.
// If there is no prewhere filter, remain reader is the same as source reader (no prewhere phase, deserialize directly).
Expand All @@ -127,18 +101,11 @@ impl Parquet2Table {
)?;
src_fields.extend_from_slice(reader.output_schema.fields());
let filter = Self::build_filter(&p.filter, &reader.output_schema);
let top_k = top_k.map(|(name, top_k)| {
(
reader.output_schema.index_of(&name).unwrap(),
TopKSorter::new(top_k.limit, top_k.asc),
)
});
let func_ctx = ctx.get_function_context()?;
Ok::<_, ErrorCode>(Parquet2PrewhereInfo {
func_ctx,
reader,
filter,
top_k,
})
})
.transpose()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ impl Parquet2DeserializeTransform {
func_ctx,
reader,
filter,
top_k,
}) => {
let chunks = reader.read_from_merge_io(column_chunks)?;

Expand All @@ -241,59 +240,32 @@ impl Parquet2DeserializeTransform {
};

let mut prewhere_block = reader.deserialize(part, chunks, push_down)?;
// Step 1: Check TOP_K, if prewhere_columns contains not only TOP_K, we can check if TOP_K column can satisfy the heap.
if let Some((index, sorter)) = top_k {
let col = prewhere_block
.get_by_offset(*index)
.value
.as_column()
.unwrap();
if sorter.never_match_any(col) {
return Ok(None);
}
}

// Step 2: Read Prewhere columns and get the filter
// Step 1: Read Prewhere columns and get the filter
let evaluator = Evaluator::new(&prewhere_block, func_ctx, &BUILTIN_FUNCTIONS);
let filter = evaluator
.run(filter)
.map_err(|e| e.add_message("eval prewhere filter failed:"))?
.try_downcast::<BooleanType>()
.unwrap();

// Step 3: Apply the filter, if it's all filtered, we can skip the remain columns.
// Step 2: Apply the filter, if it's all filtered, we can skip the remain columns.
if FilterHelpers::is_all_unset(&filter) {
return Ok(None);
}

// Step 4: Apply the filter to topk and update the bitmap, this will filter more results
let filter = if let Some((index, sorter)) = top_k {
let top_k_column = prewhere_block
.get_by_offset(*index)
.value
.as_column()
.unwrap();
let mut bitmap =
FilterHelpers::filter_to_bitmap(filter, prewhere_block.num_rows());
sorter.push_column(top_k_column, &mut bitmap);
Value::Column(bitmap.into())
} else {
filter
};

if FilterHelpers::is_all_unset(&filter) {
return Ok(None);
}

// Step 5 Remove columns that are not needed for output. Use dummy column to replace them.
// Step 4 Remove columns that are not needed for output. Use dummy column to replace them.
let mut columns = prewhere_block.columns().to_vec();
for (col, f) in columns.iter_mut().zip(reader.output_schema().fields()) {
if !self.output_schema.has_field(f.name()) {
*col = BlockEntry::new(DataType::Null, Value::Scalar(Scalar::Null));
}
}

// Step 6: Read remain columns.
// Step 5: Read remain columns.
let chunks = self.remain_reader.read_from_merge_io(column_chunks)?;

let can_push_down = chunks
Expand Down
20 changes: 2 additions & 18 deletions src/query/storages/parquet/src/parquet2/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use databend_common_catalog::plan::PartInfo;
use databend_common_catalog::plan::PartStatistics;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::plan::PartitionsShuffleKind;
use databend_common_catalog::plan::TopK;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::Expr;
Expand Down Expand Up @@ -75,8 +74,6 @@ pub struct PartitionPruner {
pub column_nodes: ColumnNodes,
/// Whether to skip pruning.
pub skip_pruning: bool,
/// top k information from pushed down information. The usize is the offset of top k column in `schema`.
pub top_k: Option<(TopK, usize)>,
// TODO: use limit information for pruning
// /// Limit of this query. If there is order by and filter, it will not be used (assign to `usize::MAX`).
// pub limit: usize,
Expand Down Expand Up @@ -120,7 +117,7 @@ impl PartitionPruner {
.any(|c| c.metadata().statistics.is_none())
});

let row_group_stats = if no_stats {
let _row_group_stats = if no_stats {
None
} else if self.row_group_pruner.is_some() && !self.skip_pruning {
let (pruner, _) = self.row_group_pruner.as_ref().unwrap();
Expand All @@ -140,8 +137,6 @@ impl PartitionPruner {
} else {
None
}
} else if self.top_k.is_some() {
collect_row_group_stats(&self.column_nodes, &file_meta.row_groups).ok()
} else {
None
};
Expand Down Expand Up @@ -177,23 +172,13 @@ impl PartitionPruner {
let c = &rg.columns()[*index];
let (offset, length) = c.byte_range();

let min_max = self
.top_k
.as_ref()
.filter(|(tk, _)| tk.leaf_id == *index)
.zip(row_group_stats.as_ref())
.map(|((_, offset), stats)| {
let stat = stats[rg_idx].get(&(*offset as u32)).unwrap();
(stat.min().clone(), stat.max().clone())
});

column_metas.insert(*index, ColumnMeta {
offset,
length,
num_values: c.num_values(),
compression: c.compression(),
uncompressed_size: c.uncompressed_size() as u64,
min_max,
min_max: None,
has_dictionary: c.dictionary_page_offset().is_some(),
});
}
Expand Down Expand Up @@ -248,7 +233,6 @@ impl PartitionPruner {
let mut max_compression_ratio = self.compression_ratio;
let mut max_compressed_size = 0u64;

// If one row group does not have stats, we cannot use the stats for topk optimization.
for (file_id, file_meta) in file_metas.into_iter().enumerate() {
let path = &large_files[file_id].0;
if is_copy {
Expand Down

0 comments on commit 0213c9b

Please sign in to comment.