-
Notifications
You must be signed in to change notification settings - Fork 409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Test: support push down filter in ut #6573
Changes from all commits
00c5083
1c2fd70
0c2b479
4fb26ce
77cace2
6678b77
65cdfab
a37200d
fea1ac9
7827292
b0021a9
1db7ea0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,8 +11,13 @@ | |
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
#include <DataStreams/ExpressionBlockInputStream.h> | ||
#include <DataStreams/FilterBlockInputStream.h> | ||
#include <DataStreams/IBlockOutputStream.h> | ||
#include <Debug/MockStorage.h> | ||
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h> | ||
#include <Flash/Coprocessor/DAGQueryInfo.h> | ||
#include <Flash/Coprocessor/InterpreterUtils.h> | ||
#include <Flash/Coprocessor/TiDBTableScan.h> | ||
#include <Interpreters/Context.h> | ||
#include <Parsers/ASTIdentifier.h> | ||
|
@@ -119,10 +124,11 @@ void MockStorage::addTableDataForDeltaMerge(Context & context, const String & na | |
} | ||
} | ||
|
||
BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int64 id) | ||
BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int64 table_id, const PushDownFilter * push_down_filter) | ||
{ | ||
auto storage = storage_delta_merge_map[id]; | ||
auto column_infos = table_schema_for_delta_merge[id]; | ||
assert(tableExistsForDeltaMerge(table_id)); | ||
auto storage = storage_delta_merge_map[table_id]; | ||
auto column_infos = table_schema_for_delta_merge[table_id]; | ||
assert(storage); | ||
assert(!column_infos.empty()); | ||
Names column_names; | ||
|
@@ -134,10 +140,30 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int6 | |
SelectQueryInfo query_info; | ||
query_info.query = std::make_shared<ASTSelectQuery>(); | ||
query_info.mvcc_query_info = std::make_unique<MvccQueryInfo>(context.getSettingsRef().resolve_locks, std::numeric_limits<UInt64>::max(), scan_context); | ||
BlockInputStreams ins = storage->read(column_names, query_info, context, stage, 8192, 1); // TODO: Support config max_block_size and num_streams | ||
|
||
BlockInputStreamPtr in = ins[0]; | ||
return in; | ||
if (push_down_filter && push_down_filter->hasValue()) | ||
{ | ||
auto analyzer = std::make_unique<DAGExpressionAnalyzer>(names_and_types_map_for_delta_merge[table_id], context); | ||
query_info.dag_query = std::make_unique<DAGQueryInfo>( | ||
push_down_filter->conditions, | ||
analyzer->getPreparedSets(), | ||
analyzer->getCurrentInputColumns(), | ||
context.getTimezoneInfo()); | ||
auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(*push_down_filter, *analyzer); | ||
BlockInputStreams ins = storage->read(column_names, query_info, context, stage, 8192, 1); // TODO: Support config max_block_size and num_streams | ||
// TODO: set num_streams, then ins.size() != 1 | ||
BlockInputStreamPtr in = ins[0]; | ||
in = std::make_shared<FilterBlockInputStream>(in, before_where, filter_column_name, "test"); | ||
Comment on lines
+154
to
+155
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a assert for the size of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The size will be changed by |
||
in->setExtraInfo("push down filter"); | ||
in = std::make_shared<ExpressionBlockInputStream>(in, project_after_where, "test"); | ||
in->setExtraInfo("projection after push down filter"); | ||
return in; | ||
} | ||
else | ||
{ | ||
BlockInputStreams ins = storage->read(column_names, query_info, context, stage, 8192, 1); | ||
BlockInputStreamPtr in = ins[0]; | ||
return in; | ||
} | ||
} | ||
|
||
void MockStorage::addTableInfoForDeltaMerge(const String & name, const MockColumnInfoVec & columns) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ class FilterExecutorTestRunner : public DB::tests::ExecutorTest | |
{{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}, | ||
{toNullableVec<String>("s1", {"banana", {}, "banana"}), | ||
toNullableVec<String>("s2", {"apple", {}, "banana"})}); | ||
|
||
context.addExchangeReceiver("exchange1", | ||
{{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}, | ||
{toNullableVec<String>("s1", {"banana", {}, "banana"}), | ||
|
@@ -251,6 +252,90 @@ try | |
} | ||
CATCH | ||
|
||
TEST_F(FilterExecutorTestRunner, PushDownFilter) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe need more tests There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Ok, I will test more filters |
||
try | ||
{ | ||
context.mockStorage()->setUseDeltaMerge(true); | ||
context.addMockDeltaMerge({"test_db", "test_table1"}, | ||
{{"i1", TiDB::TP::TypeLongLong}, {"s2", TiDB::TP::TypeString}}, | ||
{toVec<Int64>("i1", {1, 2, 3}), | ||
toNullableVec<String>("s2", {"apple", {}, "banana"})}); | ||
|
||
// Do not support push down filter test for DAGQueryBlockInterpreter | ||
enablePlanner(true); | ||
|
||
auto request = context | ||
.scan("test_db", "test_table1") | ||
.filter(lt(col("i1"), lit(Field(static_cast<Int64>(2))))) | ||
.build(context); | ||
|
||
{ | ||
String expected = R"( | ||
Expression: <final projection> | ||
Expression: <projection after push down filter> | ||
Filter: <push down filter> | ||
DeltaMergeSegmentThread)"; | ||
executeInterpreterWithDeltaMerge(expected, request, 10); | ||
} | ||
executeAndAssertColumnsEqual( | ||
request, | ||
{toNullableVec<Int64>({1}), | ||
toNullableVec<String>({"apple"})}); | ||
|
||
|
||
request = context | ||
.scan("test_db", "test_table1") | ||
.filter(lt(col("i1"), lit(Field(static_cast<Int64>(3))))) | ||
.build(context); | ||
|
||
executeAndAssertColumnsEqual( | ||
request, | ||
{toNullableVec<Int64>({1, 2}), | ||
toNullableVec<String>({"apple", {}})}); | ||
|
||
for (size_t i = 4; i < 10; ++i) | ||
{ | ||
request = context | ||
.scan("test_db", "test_table1") | ||
.filter(lt(col("i1"), lit(Field(static_cast<Int64>(i))))) | ||
.build(context); | ||
|
||
executeAndAssertColumnsEqual( | ||
request, | ||
{toNullableVec<Int64>({1, 2, 3}), | ||
toNullableVec<String>({"apple", {}, "banana"})}); | ||
} | ||
|
||
for (size_t i = 0; i < 10; ++i) | ||
{ | ||
request = context | ||
.scan("test_db", "test_table1") | ||
.filter(gt(col("i1"), lit(Field(static_cast<Int64>(-i))))) | ||
.build(context); | ||
|
||
executeAndAssertColumnsEqual( | ||
request, | ||
{toNullableVec<Int64>({1, 2, 3}), | ||
toNullableVec<String>({"apple", {}, "banana"})}); | ||
} | ||
|
||
for (size_t i = 0; i < 10; ++i) | ||
{ | ||
request = context | ||
.scan("test_db", "test_table1") | ||
.filter(gt(col("i1"), lit(Field(static_cast<Int64>(-i))))) | ||
.project({col("i1")}) | ||
.build(context); | ||
|
||
executeAndAssertColumnsEqual( | ||
request, | ||
{toNullableVec<Int64>({1, 2, 3})}); | ||
} | ||
|
||
context.mockStorage()->setUseDeltaMerge(false); | ||
} | ||
CATCH | ||
|
||
/// TODO: more functions. | ||
|
||
} // namespace tests | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use
shared_ptr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
filter
will be used only once