Skip to content

Commit

Permalink
WIP: refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Jul 19, 2022
1 parent f98df53 commit 561dc0f
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 65 deletions.
78 changes: 37 additions & 41 deletions query/src/storages/fuse/pruning/block_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ impl BlockPruner {
RangeFilter::try_create(ctx.clone(), &exprs.filters[0], schema.clone())?;
Box::new(move |v: &StatisticsOfColumns| range_filter.eval(v))
}
_ => Box::new(|_: &StatisticsOfColumns| Ok(true)),
_ => {
// TODO make this a shortcut!
Box::new(|_: &StatisticsOfColumns| Ok(true))
}
};

let segment_locs = self.table_snapshot.segments.clone();
Expand Down Expand Up @@ -104,14 +107,16 @@ impl BlockPruner {
.map(|(idx, (loc, ver))| (NonCopy(idx), (loc, NonCopy(ver))));

let dal = ctx.get_storage_operator()?;
// let bloom_index_schema = BloomFilterIndexer::to_bloom_schema(schema.as_ref());
//
// // map from filed name to index
// let bloom_fields = bloom_index_schema
// .fields()
// .iter()
// .map(|filed| (field.name, filed.clone()))
// .collect::<HashMap<_, _>>();

let filter_expr = if let Some(Extras { filters, .. }) = push_down {
let filter_expression = &filters[0];
Some((
column_names_of_expression(filter_expression),
filter_expression,
))
} else {
None
};

let stream = futures::stream::iter(segment_locs)
.map(|(idx, (seg_loc, u))| async {
Expand All @@ -131,39 +136,30 @@ impl BlockPruner {
.into_iter()
.map(|v| (idx, v));

if let Some(Extras { filters, .. }) = push_down {
if let Some((names, expression)) = &filter_expr {
let mut res = vec![];
if !filters.is_empty() {
// TODO init these once
let filter_expr = &filters[0];
let filter_block_project = filter_columns(filter_expr);

// load filters of columns
for (idx, meta) in blocks {
let bloom_idx_location =
TableMetaLocationGenerator::block_bloom_index_location(
&meta.location.0,
);
let block = load_filter_columns(
dal.clone(),
&filter_block_project,
&bloom_idx_location,
)
.await?;
let ctx = ctx.clone();
let index = BloomFilterIndexer::from_bloom_block(
schema.clone(),
block,
ctx,
)?;
if BloomFilterExprEvalResult::False != index.eval(filter_expr)? {
res.push((idx, meta))
}
for (idx, meta) in blocks {
let bloom_idx_location =
TableMetaLocationGenerator::block_bloom_index_location(
&meta.location.0,
);
let filter_block = load_bloom_filter_by_columns(
dal.clone(),
names,
&bloom_idx_location,
)
.await?;
let ctx = ctx.clone();
let index = BloomFilterIndexer::from_bloom_block(
schema.clone(),
filter_block,
ctx,
)?;
if BloomFilterExprEvalResult::False != index.eval(expression)? {
res.push((idx, meta))
}
Ok::<_, ErrorCode>(res)
} else {
Ok::<_, ErrorCode>(blocks.collect::<Vec<_>>())
}
Ok::<_, ErrorCode>(res)
} else {
Ok::<_, ErrorCode>(blocks.collect::<Vec<_>>())
}
Expand Down Expand Up @@ -206,15 +202,15 @@ impl BlockPruner {
}
}

fn filter_columns(filter_expr: &Expression) -> Vec<String> {
fn column_names_of_expression(filter_expr: &Expression) -> Vec<String> {
// TODO avoid this clone!!!
find_column_exprs(&[filter_expr.clone()])
.iter()
.map(|e| BloomFilterIndexer::to_bloom_column_name(&e.column_name()))
.collect::<Vec<_>>()
}

async fn load_filter_columns(
async fn load_bloom_filter_by_columns(
dal: Operator,
projection: &[String],
location: &str,
Expand Down
4 changes: 1 addition & 3 deletions tests/logictest/complete.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ def run(source_file, target_path="."):
print(f"Source file: {source_file}")
case_name = os.path.basename(source_file)
print(f"Case name: {case_name}")
out = open(f"{target_path}/{case_name}",
mode="w+",
encoding='UTF-8')
out = open(f"{target_path}/{case_name}", mode="w+", encoding='UTF-8')

statement = list()
f = open(source_file, encoding='UTF-8')
Expand Down
14 changes: 10 additions & 4 deletions tests/logictest/gen_suites.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ def mysql_fetch_results(sql):
ret = ret + row_string + "\n"
except Exception as err:
log.warning(
f"SQL: {sql} fetch no results, msg:{str(err)} ,check it manual.")
f"SQL: {sql} fetch no results, msg:{str(err)} ,check it manual."
)
return ret

target_dir = os.path.dirname(
Expand Down Expand Up @@ -178,7 +179,9 @@ def mysql_fetch_results(sql):
continue

if query_options == "":
log.warning(f"statement: {statement} type query could not get query_option change to ok statement")
log.warning(
f"statement: {statement} type query could not get query_option change to ok statement"
)
content_output = content_output + STATEMENT_OK.format(
statement=statement)
continue
Expand Down Expand Up @@ -222,7 +225,8 @@ def mysql_fetch_results(sql):
http_client.query_with_session(statement)
mysql_client.execute(statement)
except Exception as err:
log.warning(f"statement {statement} excute error,msg {str(err)}")
log.warning(
f"statement {statement} excute error,msg {str(err)}")
pass

content_output = content_output + STATEMENT_OK.format(
Expand Down Expand Up @@ -276,5 +280,7 @@ def main(pattern=".*"):


if __name__ == '__main__':
log.info(f"Start generate sqllogictest suites from path: {suite_path} to path: {logictest_path}")
log.info(
f"Start generate sqllogictest suites from path: {suite_path} to path: {logictest_path}"
)
fire.Fire(main)
14 changes: 10 additions & 4 deletions tests/logictest/http_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ def parseSQL(sql):
try:
return json.loads(response.content)
except Exception as err:
log.error(f"http error, SQL: {statement}\ncontent: {response.content}\nerror msg:{str(err)}")
log.error(
f"http error, SQL: {statement}\ncontent: {response.content}\nerror msg:{str(err)}"
)
raise

def set_database(self, database):
Expand Down Expand Up @@ -174,14 +176,18 @@ def query_with_session(self, statement):
self._host, self._port, response['next_uri']),
headers=self.make_headers())
response = json.loads(resp.content)
log.debug(f"Sql in progress, fetch next_uri content: {response}")
log.debug(
f"Sql in progress, fetch next_uri content: {response}")
response_list.append(response)
except Exception as err:
log.warning(f"Fetch next_uri response with error: {str(err)}")
log.warning(
f"Fetch next_uri response with error: {str(err)}")
continue
break
if response['next_uri'] is not None:
log.warning(f"after waited for 12 secs, query still not finished (next url not none)!")
log.warning(
f"after waited for 12 secs, query still not finished (next url not none)!"
)

if self._session is None:
if response is not None and "session_id" in response:
Expand Down
30 changes: 22 additions & 8 deletions tests/logictest/logictest.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ def __init__(self, matched):
raise Exception(f"Invalid query options: {qo}")
if len(s) == 1:
if is_empty_line(s[0]):
raise Exception(f"Invalid query options, query type should not be empty: {qo}")
raise Exception(
f"Invalid query options, query type should not be empty: {qo}"
)
self.query_type = s[0]
return
query_type, options = qo.split(" ", 1)
Expand Down Expand Up @@ -216,7 +218,8 @@ def get_statements(suite_path, suite_name):
result_count = len(s.label) + 1
for i in range(result_count):
results.append(get_result(lines))
yield ParsedStatement(line_idx + 1, s, suite_name, text, results, runs_on)
yield ParsedStatement(line_idx + 1, s, suite_name, text, results,
runs_on)


def format_value(vals, val_num):
Expand Down Expand Up @@ -278,24 +281,32 @@ def execute(self):
if callable(getattr(self, "batch_execute")):
# case batch
for (file_path, suite_name) in self.statement_files:
log.info(f"Run query with the same session every suite file, suite file path:{file_path}")
log.info(
f"Run query with the same session every suite file, suite file path:{file_path}"
)
statement_list = list()
for state in get_statements(file_path, suite_name):
statement_list.append(state)
self.batch_execute(statement_list)
else:
# case one by one
for (file_path, suite_name) in self.statement_files:
log.info(f"Run query without session every statements, suite file path:{file_path}")
log.info(
f"Run query without session every statements, suite file path:{file_path}"
)
for state in get_statements(file_path, suite_name):
self.execute_statement(state)

def execute_statement(self, statement):
if self.kind not in statement.runs_on:
log.debug(f"Skip execute statement with {self.kind} SuiteRunner, only runs on {statement.runs_on}")
log.debug(
f"Skip execute statement with {self.kind} SuiteRunner, only runs on {statement.runs_on}"
)
return
if self.show_query_on_execution:
log.debug(f"executing statement, type {statement.s_type.type}\n{statement.text}\n")
log.debug(
f"executing statement, type {statement.s_type.type}\n{statement.text}\n"
)
if statement.s_type.type == "query":
self.assert_execute_query(statement)
elif statement.s_type.type == "error":
Expand Down Expand Up @@ -360,10 +371,13 @@ def assert_execute_error(self, statement):
match = re.search(statement.s_type.expect_error, actual.msg)
assert_that(
match, is_not(none()),
f"statement {str(statement)}, expect error regex {statement.s_type.expect_error}, found {actual}")
f"statement {str(statement)}, expect error regex {statement.s_type.expect_error}, found {actual}"
)

def run_sql_suite(self):
log.info(f"run_sql_suite for {self.kind} on base {os.path.abspath(self.path)}")
log.info(
f"run_sql_suite for {self.kind} on base {os.path.abspath(self.path)}"
)
self.fetch_files()
self.execute()

Expand Down
20 changes: 15 additions & 5 deletions tests/logictest/mysql_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,29 @@ def execute_query(self, statement):

if query_type[i] == 'I':
if not isinstance(v, int):
log.debug(f"Expected int, got type {type(v)} in query {statement.text} row {ri} col {i} value {v}")
log.debug(
f"Expected int, got type {type(v)} in query {statement.text} row {ri} col {i} value {v}"
)
elif query_type[i] == 'F' or query_type[i] == 'R':
if not isinstance(v, float):
log.debug(f"Expected float, got type {type(v)} in query {statement.text} row {ri} col {i} value {v}")
log.debug(
f"Expected float, got type {type(v)} in query {statement.text} row {ri} col {i} value {v}"
)
elif query_type[i] == 'T':
if not (isinstance(v, str) or isinstance(v, datetime) or
isinstance(v, date)):
log.debug(f"Expected string, got type {type(v)} in query { statement.text} row {ri} col {i} value {v}")
log.debug(
f"Expected string, got type {type(v)} in query { statement.text} row {ri} col {i} value {v}"
)
elif query_type[i] == 'B':
# bool return int in mysql
if not isinstance(v, int):
log.debug(f"Expected Bool, got type {type(v)} in query {statement.text} row {ri} col {i} value {v}")
log.debug(
f"Expected Bool, got type {type(v)} in query {statement.text} row {ri} col {i} value {v}"
)
else:
log.debug(f"Unknown type {query_type[i]} in query {statement.text} row {ri} col {i} value {v}")
log.debug(
f"Unknown type {query_type[i]} in query {statement.text} row {ri} col {i} value {v}"
)
vals.append(str(v))
return vals

0 comments on commit 561dc0f

Please sign in to comment.