Skip to content

Commit

Permalink
Add arrow stream node (facebookincubator#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo authored and zhejiangxiaomai committed Nov 8, 2022
1 parent c1ae763 commit 34844ab
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 1 deletion.
8 changes: 8 additions & 0 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,14 @@ void TableScanNode::addDetails(std::stringstream& stream) const {
stream << tableHandle_->toString();
}

const std::vector<PlanNodePtr>& ArrowStreamNode::sources() const {
return kEmptySources;
}

void ArrowStreamNode::addDetails(std::stringstream& stream) const {
// Nothing to add.
}

const std::vector<PlanNodePtr>& ExchangeNode::sources() const {
return kEmptySources;
}
Expand Down
47 changes: 47 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "velox/connectors/Connector.h"
#include "velox/core/Expressions.h"

#include "velox/vector/arrow/Bridge.h"

namespace facebook::velox::core {

typedef std::string PlanNodeId;
Expand Down Expand Up @@ -212,6 +214,51 @@ class ValuesNode : public PlanNode {
const bool parallelizable_;
};

class ArrowStreamNode : public PlanNode {
public:
ArrowStreamNode(
const PlanNodeId& id,
const RowTypePtr& outputType,
std::shared_ptr<ArrowArrayStream> arrowStream,
bool parallelizable = false)
: PlanNode(id),
outputType_(outputType),
arrowStream_(arrowStream),
parallelizable_(parallelizable) {
VELOX_CHECK(arrowStream != nullptr);
}

const RowTypePtr& outputType() const override {
return outputType_;
}

const std::vector<PlanNodePtr>& sources() const override;

bool requiresSplits() const override {
return true;
}

// For testing only.
bool isParallelizable() const {
return parallelizable_;
}

std::shared_ptr<ArrowArrayStream> arrowStream() const {
return arrowStream_;
}

std::string_view name() const override {
return "ArrowStream";
}

private:
void addDetails(std::stringstream& stream) const override;

const RowTypePtr outputType_;
std::shared_ptr<ArrowArrayStream> arrowStream_;
const bool parallelizable_;
};

class FilterNode : public PlanNode {
public:
FilterNode(const PlanNodeId& id, TypedExprPtr filter, PlanNodePtr source)
Expand Down
57 changes: 57 additions & 0 deletions velox/exec/ArrowStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/exec/ArrowStream.h"

namespace facebook::velox::exec {

ArrowStream::ArrowStream(
int32_t operatorId,
DriverCtx* driverCtx,
std::shared_ptr<const core::ArrowStreamNode> arrowStream)
: SourceOperator(
driverCtx,
arrowStream->outputType(),
operatorId,
arrowStream->id(),
"Arrow Stream") {
arrowStream_ = arrowStream->arrowStream();
}

RowVectorPtr ArrowStream::getOutput() {
struct ArrowArray arrowArray;
arrowStream_->get_next(&(*arrowStream_), &arrowArray);
if (arrowArray.release == NULL) {
// End of Stream.
closed_ = true;
return nullptr;
}
struct ArrowSchema arrowSchema;
arrowStream_->get_schema(&(*arrowStream_), &arrowSchema);
// Convert Arrow data into RowVector.
rowVector_ = std::dynamic_pointer_cast<RowVector>(
facebook::velox::importFromArrowAsViewer(arrowSchema, arrowArray));
return rowVector_;
}

void ArrowStream::close() {
closed_ = true;
}

bool ArrowStream::isFinished() {
return closed_;
}

} // namespace facebook::velox::exec
51 changes: 51 additions & 0 deletions velox/exec/ArrowStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/core/PlanNode.h"
#include "velox/exec/Operator.h"

#include "velox/vector/arrow/Abi.h"

namespace facebook::velox::exec {

class ArrowStream : public SourceOperator {
public:
ArrowStream(
int32_t operatorId,
DriverCtx* driverCtx,
std::shared_ptr<const core::ArrowStreamNode> arrowStream);

RowVectorPtr getOutput() override;

BlockingReason isBlocked(ContinueFuture* /* unused */) override {
return BlockingReason::kNotBlocked;
}

void noMoreInput() override {
Operator::noMoreInput();
close();
}

bool isFinished() override;

void close() override;

private:
bool closed_ = false;
RowVectorPtr rowVector_;
std::shared_ptr<ArrowArrayStream> arrowStream_;
};

} // namespace facebook::velox::exec
4 changes: 3 additions & 1 deletion velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ add_library(
Aggregate.cpp
AggregateFunctionRegistry.cpp
AggregationMasks.cpp
ArrowStream.cpp
ContainerRowSerde.cpp
CrossJoinBuild.cpp
CrossJoinProbe.cpp
Expand Down Expand Up @@ -70,7 +71,8 @@ target_link_libraries(
velox_time
velox_codegen
velox_common_base
velox_test_util)
velox_test_util
velox_arrow_bridge)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
Expand Down
13 changes: 13 additions & 0 deletions velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#include "velox/exec/LocalPlanner.h"
#include "velox/core/PlanFragment.h"
#include "velox/exec/ArrowStream.h"
#include "velox/exec/AssignUniqueId.h"
#include "velox/exec/CallbackSink.h"
#include "velox/exec/CrossJoinBuild.h"
Expand Down Expand Up @@ -182,6 +183,13 @@ uint32_t maxDrivers(const DriverFactory& driverFactory) {
if (!values->isParallelizable()) {
return 1;
}
} else if (
auto arrowStream =
std::dynamic_pointer_cast<const core::ArrowStreamNode>(node)) {
// ArrowStream node must run single-threaded, unless in test context.
if (!arrowStream->isParallelizable()) {
return 1;
}
} else if (
auto limit = std::dynamic_pointer_cast<const core::LimitNode>(node)) {
// final limit must run single-threaded
Expand Down Expand Up @@ -304,6 +312,11 @@ std::shared_ptr<Driver> DriverFactory::createDriver(
auto valuesNode =
std::dynamic_pointer_cast<const core::ValuesNode>(planNode)) {
operators.push_back(std::make_unique<Values>(id, ctx.get(), valuesNode));
} else if (
auto arrowStreamNode =
std::dynamic_pointer_cast<const core::ArrowStreamNode>(planNode)) {
operators.push_back(
std::make_unique<ArrowStream>(id, ctx.get(), arrowStreamNode));
} else if (
auto tableScanNode =
std::dynamic_pointer_cast<const core::TableScanNode>(planNode)) {
Expand Down
5 changes: 5 additions & 0 deletions velox/vector/arrow/Abi.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
extern "C" {
#endif

// Add a definition check here to avoid duplication with the definition
// included from velox/external/duckdb/duckdb.hpp.
#ifndef ARROW_FLAG_DICTIONARY_ORDERED
#define ARROW_FLAG_DICTIONARY_ORDERED 1
#define ARROW_FLAG_NULLABLE 2
#define ARROW_FLAG_MAP_KEYS_SORTED 4
Expand Down Expand Up @@ -102,6 +105,8 @@ struct ArrowArrayStream {
void* private_data;
};

#endif

#ifdef __cplusplus
}
#endif

0 comments on commit 34844ab

Please sign in to comment.