Skip to content

Commit

Permalink
Init part for operator framework (facebookincubator#2)
Browse files Browse the repository at this point in the history
* Init part for operator framework

* Enabled Operator test

* Code style fix
  • Loading branch information
Ferdinand Xu committed Sep 27, 2021
1 parent 8bb8b9a commit efcc413
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 0 deletions.
1 change: 1 addition & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ add_library(
velox_exec
Aggregate.cpp
AllocationPool.cpp
CiderJITedOperator.cpp
ContainerRowSerde.cpp
Driver.cpp
EnforceSingleRow.cpp
Expand Down
53 changes: 53 additions & 0 deletions velox/exec/CiderJITedOperator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "CiderJITedOperator.h"

namespace facebook::velox::exec {

CiderJITedOperator::CiderJITedOperator(
int32_t operatorId,
DriverCtx* driverCtx,
const std::shared_ptr<const core::JITedNode>& jitedNode)
: Operator(
driverCtx,
jitedNode->outputType(),
operatorId,
jitedNode->id(),
"CiderJITedOperator"){};

bool CiderJITedOperator::needsInput() const {
return (input_ == nullptr);
}

void CiderJITedOperator::addInput(RowVectorPtr input) {
input_ = input;
}

RowVectorPtr CiderJITedOperator::getOutput() {
auto output = input_;
input_.reset();
return output;
}

BlockingReason CiderJITedOperator::isBlocked(ContinueFuture* /*future*/) {
return BlockingReason::kNotBlocked;
}

void CiderJITedOperator::finish() {
Operator::finish();
}
} // namespace facebook::velox::exec
78 changes: 78 additions & 0 deletions velox/exec/CiderJITedOperator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "velox/exec/Operator.h"

namespace facebook::velox::exec {
/**
* Cider JITed operator which fuses all operators when possible. Underlying, it
* uses OmnisciDB as an execution engine.
*/
class CiderJITedOperator : public Operator {
public:
CiderJITedOperator(
int32_t operatorId,
DriverCtx* driverCtx,
const std::shared_ptr<const core::JITedNode>& jitedNode);

bool needsInput() const override;

void addInput(RowVectorPtr input) override;

RowVectorPtr getOutput() override;

BlockingReason isBlocked(ContinueFuture* /*future*/) override;

void finish() override;

private:
RowVectorPtr input_;

// public:
// CiderJITedOperator(
// int32_t operatorId,
// DriverCtx* driverCtx,
// const std::shared_ptr<const core::JITedNode>& jitedNode);
//
// bool needsInput() const override;
//
// void addInput(RowVectorPtr input) override;
//
// RowVectorPtr getOutput() override;
//
// BlockingReason isBlocked(ContinueFuture* /*future*/) override;
//
// void finish() override;

// private:
// bool contain_join_build_ = false;
// bool contain_join_probe_ = false;
// bool contain_blocking_ops_ = false;
// // Cider RelAlg node string for all blocking string。 It will be called at
// // addInput method. Intermediate data structure is employed and return when
// // getOutput method is called.
// const std::string blocking_cider_str_ = "";
//
// // Cider RelAlg node string for all non-blocking string
// // It will be called at getOutput method
// const std::string non_blocking_cider_str_= "";
// RowVectorPtr input_;
// // For join build, it will be broadcast to pair drivers like join op
// // TODO (Cheng) change to Arrow format or other format
// RowVectorPtr non_blocking_res_;
};

} // namespace facebook::velox::exec
47 changes: 47 additions & 0 deletions velox/exec/CiderQueryRunner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

namespace facebook::velox::exec {

//TODO (Qi): A placeholder to show how OmnisciDB API looks like
// Remove this once OmnisciDB dependency issues resolved
class CiderQueryRunner {
public:
static CiderQueryRunner* getInstance();

void createTable(){};
void processBlocks(){};

// void createTable(
// const std::string& name,
// const struct ArrowArray* tableData,
// const struct ArrowSchema* tableSchema);
//
// struct ArrowArray* processBlocks(
// const std::string& sql,
// const struct ArrowArray* inputData,
// const struct ArrowSchema* inputSchema,
// const struct ArrowSchema* outputSchema);

private:
void init(){};
void cleanup(){};
// std::shared_ptr<EmbeddedDatabase::DBEngine> dbe_;
static std::unique_ptr<CiderQueryRunner> instance_;
};

} // namespace facebook::velox::exec
13 changes: 13 additions & 0 deletions velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "velox/exec/TopN.h"
#include "velox/exec/Unnest.h"
#include "velox/exec/Values.h"
#include "velox/exec/CiderJITedOperator.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -91,6 +92,13 @@ OperatorSupplier makeConsumerSupplier(
return std::make_unique<HashBuild>(operatorId, ctx, join);
};
}

// FIXME (Cheng) do we need a separated node for consumer side
if (auto jit = std::dynamic_pointer_cast<const core::JITedNode>(planNode)) {
return [jit](int32_t operatorId, DriverCtx* ctx) {
return std::make_unique<CiderJITedOperator>(operatorId, ctx, jit);
};
}
return nullptr;
}

Expand Down Expand Up @@ -304,6 +312,11 @@ std::shared_ptr<Driver> DriverFactory::createDriver(
planNode)) {
operators.push_back(
std::make_unique<EnforceSingleRow>(id, ctx.get(), enforceSingleRow));
} else if (
auto jited =
std::dynamic_pointer_cast<const core::JITedNode>(planNode)) {
operators.push_back(
std::make_unique<CiderJITedOperator>(id, ctx.get(), jited));
} else {
auto extended = Operator::fromPlanNode(ctx.get(), id, planNode);
VELOX_CHECK(extended, "Unsupported plan node: {}", planNode->toString());
Expand Down
1 change: 1 addition & 0 deletions velox/exec/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ add_executable(
TableWriteTest.cpp
TopNTest.cpp
LimitTest.cpp
CiderJITedOperatorTest.cpp
OrderByTest.cpp
MergeTest.cpp
HashJoinTest.cpp
Expand Down
45 changes: 45 additions & 0 deletions velox/exec/tests/CiderJITedOperatorTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/exec/tests/OperatorTestBase.h"
#include "velox/exec/tests/PlanBuilder.h"

using namespace facebook::velox;
using namespace facebook::velox::exec::test;

class CiderJITedOperatorTest: public OperatorTestBase {};

// Simple filter/project case
TEST_F(CiderJITedOperatorTest, basic) {
vector_size_t batchSize = 100;
std::vector<RowVectorPtr> vectors;
for (int32_t i = 0; i < 3; ++i) {
auto c0 = makeFlatVector<int64_t>(
batchSize,
[&](vector_size_t row) { return batchSize * i + row; },
nullEvery(5));
auto c1 = makeFlatVector<int32_t>(
batchSize, [&](vector_size_t row) { return row; }, nullEvery(7));
auto c2 = makeFlatVector<double>(
batchSize, [](vector_size_t row) { return row * 0.1; }, nullEvery(11));
vectors.push_back(makeRowVector({c0, c1, c2}));
}
createDuckDbTable(vectors);

// TODO (Cheng) add low level API for cider plan str builder
const std::string planStr = "";
auto plan = PlanBuilder().values(vectors).jitedNode(planStr).planNode();
assertQuery(plan, "SELECT * FROM tmp");
}
6 changes: 6 additions & 0 deletions velox/exec/tests/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,12 @@ PlanBuilder& PlanBuilder::hashJoin(
return *this;
}

PlanBuilder& PlanBuilder::jitedNode(const std::string& jitPlanStr) {
planNode_ = std::make_shared<core::JITedNode>(
nextPlanNodeId(), jitPlanStr, planNode_);
return *this;
}

PlanBuilder& PlanBuilder::unnest(
const std::vector<std::string>& replicateColumns,
const std::vector<std::string>& unnestColumns,
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/tests/PlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ class PlanBuilder {
const std::vector<ChannelIndex>& output,
core::JoinType joinType = core::JoinType::kInner);

PlanBuilder& jitedNode(const std::string& relAlgStr);

PlanBuilder& unnest(
const std::vector<std::string>& replicateColumns,
const std::vector<std::string>& unnestColumns,
Expand Down

0 comments on commit efcc413

Please sign in to comment.