Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interpreter: Serialize executors #4742

Merged
merged 24 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/TestUtils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ add_headers_and_sources(test_util .)
list(REMOVE_ITEM test_util_sources "bench_dbms_main.cpp" "gtests_dbms_main.cpp")

add_library(test_util_gtest_main ${test_util_headers} ${test_util_sources} gtests_dbms_main.cpp)
target_link_libraries(test_util_gtest_main dbms gtest_main)
target_link_libraries(test_util_gtest_main dbms gtest_main clickhouse_aggregate_functions)

add_library(test_util_bench_main ${test_util_headers} ${test_util_sources} bench_dbms_main.cpp)
target_link_libraries(test_util_bench_main dbms gtest_main benchmark)
Expand Down
75 changes: 22 additions & 53 deletions dbms/src/TestUtils/InterpreterTestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,72 +12,41 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FmtUtils.h>
#include <TestUtils/InterpreterTestUtils.h>
#include <TestUtils/executorSerializer.h>

namespace DB::tests
{
namespace
DAGContext & MockExecutorTest::getDAGContext()
{
String toTreeString(const tipb::Executor & root_executor, size_t level = 0);
assert(dag_context_ptr != nullptr);
return *dag_context_ptr;
}

void MockExecutorTest::initializeContext()
{
dag_context_ptr = std::make_unique<DAGContext>(1024);
context = MockDAGRequestContext(TiFlashTestEnv::getContext());
}

// serialize tipb::DAGRequest, print the executor name in a Tree format.
String toTreeString(std::shared_ptr<tipb::DAGRequest> dag_request)
void MockExecutorTest::SetUpTestCase()
{
assert((dag_request->executors_size() > 0) != dag_request->has_root_executor());
if (dag_request->has_root_executor())
try
{
return toTreeString(dag_request->root_executor());
DB::registerFunctions();
DB::registerAggregateFunctions();
}
else
catch (DB::Exception &)
{
FmtBuffer buffer;
String prefix;
traverseExecutors(dag_request.get(), [&buffer, &prefix](const tipb::Executor & executor) {
assert(executor.has_executor_id());
buffer.fmtAppend("{}{}\n", prefix, executor.executor_id());
prefix.append(" ");
return true;
});
return buffer.toString();
// Maybe another test has already registered, ignore exception here.
}
}

String toTreeString(const tipb::Executor & root_executor, size_t level)
{
FmtBuffer buffer;

auto append_str = [&buffer, &level](const tipb::Executor & executor) {
assert(executor.has_executor_id());

buffer.append(String(level, ' '));
buffer.append(executor.executor_id()).append("\n");
};

traverseExecutorTree(root_executor, [&](const tipb::Executor & executor) {
if (executor.has_join())
{
append_str(executor);
++level;
for (const auto & child : executor.join().children())
buffer.append(toTreeString(child, level));
return false;
}
else
{
append_str(executor);
++level;
return true;
}
});

return buffer.toString();
}
} // namespace

void dagRequestEqual(String & expected_string, const std::shared_ptr<tipb::DAGRequest> & actual)
void MockExecutorTest::dagRequestEqual(String & expected_string, const std::shared_ptr<tipb::DAGRequest> & actual)
{
String actual_string = toTreeString(actual);
ASSERT_EQ(Poco::trimInPlace(expected_string), Poco::trimInPlace(actual_string));
FmtBuffer buf;
auto serializer = ExecutorSerializer(context.context, buf);
ASSERT_EQ(Poco::trimInPlace(expected_string), Poco::trim(serializer.serialize(actual.get())));
}

} // namespace DB::tests
29 changes: 7 additions & 22 deletions dbms/src/TestUtils/InterpreterTestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
#pragma once

#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Common/FmtUtils.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Functions/registerFunctions.h>
#include <TestUtils/FunctionTestUtils.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <TestUtils/TiFlashTestEnv.h>
#include <TestUtils/executorSerializer.h>
#include <TestUtils/mockExecutor.h>
namespace DB::tests
{
void dagRequestEqual(String & expected_string, const std::shared_ptr<tipb::DAGRequest> & actual);
class MockExecutorTest : public ::testing::Test
{
protected:
Expand All @@ -37,29 +38,13 @@ class MockExecutorTest : public ::testing::Test
MockExecutorTest()
: context(TiFlashTestEnv::getContext())
{}
static void SetUpTestCase()
{
try
{
DB::registerFunctions();
}
catch (DB::Exception &)
{
// Maybe another test has already registered, ignore exception here.
}
}
static void SetUpTestCase();

virtual void initializeContext()
{
dag_context_ptr = std::make_unique<DAGContext>(1024);
context = MockDAGRequestContext(TiFlashTestEnv::getContext());
}
virtual void initializeContext();

DAGContext & getDAGContext()
{
assert(dag_context_ptr != nullptr);
return *dag_context_ptr;
}
DAGContext & getDAGContext();

void dagRequestEqual(String & expected_string, const std::shared_ptr<tipb::DAGRequest> & actual);

protected:
MockDAGRequestContext context;
Expand Down
Loading