diff --git a/query/src/storages/fuse/pruning/block_pruner.rs b/query/src/storages/fuse/pruning/block_pruner.rs index 6216ee868f0a3..2b964f6793eaf 100644 --- a/query/src/storages/fuse/pruning/block_pruner.rs +++ b/query/src/storages/fuse/pruning/block_pruner.rs @@ -70,7 +70,10 @@ impl BlockPruner { RangeFilter::try_create(ctx.clone(), &exprs.filters[0], schema.clone())?; Box::new(move |v: &StatisticsOfColumns| range_filter.eval(v)) } - _ => Box::new(|_: &StatisticsOfColumns| Ok(true)), + _ => { + // TODO make this a shortcut! + Box::new(|_: &StatisticsOfColumns| Ok(true)) + } }; let segment_locs = self.table_snapshot.segments.clone(); @@ -104,14 +107,16 @@ impl BlockPruner { .map(|(idx, (loc, ver))| (NonCopy(idx), (loc, NonCopy(ver)))); let dal = ctx.get_storage_operator()?; - // let bloom_index_schema = BloomFilterIndexer::to_bloom_schema(schema.as_ref()); - // - // // map from filed name to index - // let bloom_fields = bloom_index_schema - // .fields() - // .iter() - // .map(|filed| (field.name, filed.clone())) - // .collect::>(); + + let filter_expr = if let Some(Extras { filters, .. }) = push_down { + let filter_expression = &filters[0]; + Some(( + column_names_of_expression(filter_expression), + filter_expression, + )) + } else { + None + }; let stream = futures::stream::iter(segment_locs) .map(|(idx, (seg_loc, u))| async { @@ -131,39 +136,30 @@ impl BlockPruner { .into_iter() .map(|v| (idx, v)); - if let Some(Extras { filters, .. }) = push_down { + if let Some((names, expression)) = &filter_expr { let mut res = vec![]; - if !filters.is_empty() { - // TODO init these once - let filter_expr = &filters[0]; - let filter_block_project = filter_columns(filter_expr); - - // load filters of columns - for (idx, meta) in blocks { - let bloom_idx_location = - TableMetaLocationGenerator::block_bloom_index_location( - &meta.location.0, - ); - let block = load_filter_columns( - dal.clone(), - &filter_block_project, - &bloom_idx_location, - ) - .await?; - let ctx = ctx.clone(); - let index = BloomFilterIndexer::from_bloom_block( - schema.clone(), - block, - ctx, - )?; - if BloomFilterExprEvalResult::False != index.eval(filter_expr)? { - res.push((idx, meta)) - } + for (idx, meta) in blocks { + let bloom_idx_location = + TableMetaLocationGenerator::block_bloom_index_location( + &meta.location.0, + ); + let filter_block = load_bloom_filter_by_columns( + dal.clone(), + names, + &bloom_idx_location, + ) + .await?; + let ctx = ctx.clone(); + let index = BloomFilterIndexer::from_bloom_block( + schema.clone(), + filter_block, + ctx, + )?; + if BloomFilterExprEvalResult::False != index.eval(expression)? { + res.push((idx, meta)) } - Ok::<_, ErrorCode>(res) - } else { - Ok::<_, ErrorCode>(blocks.collect::>()) } + Ok::<_, ErrorCode>(res) } else { Ok::<_, ErrorCode>(blocks.collect::>()) } @@ -206,7 +202,7 @@ impl BlockPruner { } } -fn filter_columns(filter_expr: &Expression) -> Vec { +fn column_names_of_expression(filter_expr: &Expression) -> Vec { // TODO avoid this clone!!! find_column_exprs(&[filter_expr.clone()]) .iter() @@ -214,7 +210,7 @@ fn filter_columns(filter_expr: &Expression) -> Vec { .collect::>() } -async fn load_filter_columns( +async fn load_bloom_filter_by_columns( dal: Operator, projection: &[String], location: &str, diff --git a/tests/logictest/complete.py b/tests/logictest/complete.py index 5f92eb34758df..4021595716016 100644 --- a/tests/logictest/complete.py +++ b/tests/logictest/complete.py @@ -20,9 +20,7 @@ def run(source_file, target_path="."): print(f"Source file: {source_file}") case_name = os.path.basename(source_file) print(f"Case name: {case_name}") - out = open(f"{target_path}/{case_name}", - mode="w+", - encoding='UTF-8') + out = open(f"{target_path}/{case_name}", mode="w+", encoding='UTF-8') statement = list() f = open(source_file, encoding='UTF-8') diff --git a/tests/logictest/gen_suites.py b/tests/logictest/gen_suites.py index bb83232c8fdf4..6807908bb2ee9 100644 --- a/tests/logictest/gen_suites.py +++ b/tests/logictest/gen_suites.py @@ -126,7 +126,8 @@ def mysql_fetch_results(sql): ret = ret + row_string + "\n" except Exception as err: log.warning( - f"SQL: {sql} fetch no results, msg:{str(err)} ,check it manual.") + f"SQL: {sql} fetch no results, msg:{str(err)} ,check it manual." + ) return ret target_dir = os.path.dirname( @@ -178,7 +179,9 @@ def mysql_fetch_results(sql): continue if query_options == "": - log.warning(f"statement: {statement} type query could not get query_option change to ok statement") + log.warning( + f"statement: {statement} type query could not get query_option change to ok statement" + ) content_output = content_output + STATEMENT_OK.format( statement=statement) continue @@ -222,7 +225,8 @@ def mysql_fetch_results(sql): http_client.query_with_session(statement) mysql_client.execute(statement) except Exception as err: - log.warning(f"statement {statement} excute error,msg {str(err)}") + log.warning( + f"statement {statement} excute error,msg {str(err)}") pass content_output = content_output + STATEMENT_OK.format( @@ -276,5 +280,7 @@ def main(pattern=".*"): if __name__ == '__main__': - log.info(f"Start generate sqllogictest suites from path: {suite_path} to path: {logictest_path}") + log.info( + f"Start generate sqllogictest suites from path: {suite_path} to path: {logictest_path}" + ) fire.Fire(main) diff --git a/tests/logictest/http_connector.py b/tests/logictest/http_connector.py index 0f317e6c130e4..ef45230bde96c 100644 --- a/tests/logictest/http_connector.py +++ b/tests/logictest/http_connector.py @@ -139,7 +139,9 @@ def parseSQL(sql): try: return json.loads(response.content) except Exception as err: - log.error(f"http error, SQL: {statement}\ncontent: {response.content}\nerror msg:{str(err)}") + log.error( + f"http error, SQL: {statement}\ncontent: {response.content}\nerror msg:{str(err)}" + ) raise def set_database(self, database): @@ -174,14 +176,18 @@ def query_with_session(self, statement): self._host, self._port, response['next_uri']), headers=self.make_headers()) response = json.loads(resp.content) - log.debug(f"Sql in progress, fetch next_uri content: {response}") + log.debug( + f"Sql in progress, fetch next_uri content: {response}") response_list.append(response) except Exception as err: - log.warning(f"Fetch next_uri response with error: {str(err)}") + log.warning( + f"Fetch next_uri response with error: {str(err)}") continue break if response['next_uri'] is not None: - log.warning(f"after waited for 12 secs, query still not finished (next url not none)!") + log.warning( + f"after waited for 12 secs, query still not finished (next url not none)!" + ) if self._session is None: if response is not None and "session_id" in response: diff --git a/tests/logictest/logictest.py b/tests/logictest/logictest.py index 7b78fd9aa51ec..83e13e8423b15 100644 --- a/tests/logictest/logictest.py +++ b/tests/logictest/logictest.py @@ -126,7 +126,9 @@ def __init__(self, matched): raise Exception(f"Invalid query options: {qo}") if len(s) == 1: if is_empty_line(s[0]): - raise Exception(f"Invalid query options, query type should not be empty: {qo}") + raise Exception( + f"Invalid query options, query type should not be empty: {qo}" + ) self.query_type = s[0] return query_type, options = qo.split(" ", 1) @@ -216,7 +218,8 @@ def get_statements(suite_path, suite_name): result_count = len(s.label) + 1 for i in range(result_count): results.append(get_result(lines)) - yield ParsedStatement(line_idx + 1, s, suite_name, text, results, runs_on) + yield ParsedStatement(line_idx + 1, s, suite_name, text, results, + runs_on) def format_value(vals, val_num): @@ -278,7 +281,9 @@ def execute(self): if callable(getattr(self, "batch_execute")): # case batch for (file_path, suite_name) in self.statement_files: - log.info(f"Run query with the same session every suite file, suite file path:{file_path}") + log.info( + f"Run query with the same session every suite file, suite file path:{file_path}" + ) statement_list = list() for state in get_statements(file_path, suite_name): statement_list.append(state) @@ -286,16 +291,22 @@ def execute(self): else: # case one by one for (file_path, suite_name) in self.statement_files: - log.info(f"Run query without session every statements, suite file path:{file_path}") + log.info( + f"Run query without session every statements, suite file path:{file_path}" + ) for state in get_statements(file_path, suite_name): self.execute_statement(state) def execute_statement(self, statement): if self.kind not in statement.runs_on: - log.debug(f"Skip execute statement with {self.kind} SuiteRunner, only runs on {statement.runs_on}") + log.debug( + f"Skip execute statement with {self.kind} SuiteRunner, only runs on {statement.runs_on}" + ) return if self.show_query_on_execution: - log.debug(f"executing statement, type {statement.s_type.type}\n{statement.text}\n") + log.debug( + f"executing statement, type {statement.s_type.type}\n{statement.text}\n" + ) if statement.s_type.type == "query": self.assert_execute_query(statement) elif statement.s_type.type == "error": @@ -360,10 +371,13 @@ def assert_execute_error(self, statement): match = re.search(statement.s_type.expect_error, actual.msg) assert_that( match, is_not(none()), - f"statement {str(statement)}, expect error regex {statement.s_type.expect_error}, found {actual}") + f"statement {str(statement)}, expect error regex {statement.s_type.expect_error}, found {actual}" + ) def run_sql_suite(self): - log.info(f"run_sql_suite for {self.kind} on base {os.path.abspath(self.path)}") + log.info( + f"run_sql_suite for {self.kind} on base {os.path.abspath(self.path)}" + ) self.fetch_files() self.execute() diff --git a/tests/logictest/mysql_runner.py b/tests/logictest/mysql_runner.py index 4d5f00e150d5c..9f9c5c6a2e140 100644 --- a/tests/logictest/mysql_runner.py +++ b/tests/logictest/mysql_runner.py @@ -69,19 +69,29 @@ def execute_query(self, statement): if query_type[i] == 'I': if not isinstance(v, int): - log.debug(f"Expected int, got type {type(v)} in query {statement.text} row {ri} col {i} value {v}") + log.debug( + f"Expected int, got type {type(v)} in query {statement.text} row {ri} col {i} value {v}" + ) elif query_type[i] == 'F' or query_type[i] == 'R': if not isinstance(v, float): - log.debug(f"Expected float, got type {type(v)} in query {statement.text} row {ri} col {i} value {v}") + log.debug( + f"Expected float, got type {type(v)} in query {statement.text} row {ri} col {i} value {v}" + ) elif query_type[i] == 'T': if not (isinstance(v, str) or isinstance(v, datetime) or isinstance(v, date)): - log.debug(f"Expected string, got type {type(v)} in query { statement.text} row {ri} col {i} value {v}") + log.debug( + f"Expected string, got type {type(v)} in query { statement.text} row {ri} col {i} value {v}" + ) elif query_type[i] == 'B': # bool return int in mysql if not isinstance(v, int): - log.debug(f"Expected Bool, got type {type(v)} in query {statement.text} row {ri} col {i} value {v}") + log.debug( + f"Expected Bool, got type {type(v)} in query {statement.text} row {ri} col {i} value {v}" + ) else: - log.debug(f"Unknown type {query_type[i]} in query {statement.text} row {ri} col {i} value {v}") + log.debug( + f"Unknown type {query_type[i]} in query {statement.text} row {ri} col {i} value {v}" + ) vals.append(str(v)) return vals