Skip to content

Commit

Permalink
[GJ-18] Add a fake result for TPC-H Q6 (facebookincubator#19)
Browse files Browse the repository at this point in the history
* add a fake output for easier debug

* fallback the second stage

* format
  • Loading branch information
rui-mo authored Dec 21, 2021
1 parent f6fe2fd commit 22fc9fc
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
33 changes: 32 additions & 1 deletion cpp/src/proto/substrait_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

#include "substrait_utils.h"

#include <arrow/record_batch.h>

#include "arrow/array/builder_base.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/util/checked_cast.h"

namespace substrait = io::substrait;

SubstraitParser::SubstraitParser() {
Expand Down Expand Up @@ -280,9 +286,34 @@ std::string SubstraitParser::FindFunction(uint64_t id) {

class SubstraitParser::WholeStageResultIterator
: public ResultIterator<arrow::RecordBatch> {
bool HasNext() override { return false; }
public:
WholeStageResultIterator() {
std::unique_ptr<arrow::ArrayBuilder> array_builder;
arrow::MakeBuilder(pool_, arrow::float64(), &array_builder);
builder_.reset(
arrow::internal::checked_cast<arrow::DoubleBuilder*>(array_builder.release()));
}

bool HasNext() override { return has_next_; }

arrow::Status Next(std::shared_ptr<arrow::RecordBatch>* out) override {
double res = 10000;
builder_->Append(res);
std::shared_ptr<arrow::Array> array;
auto status = builder_->Finish(&array);
res_arrays.push_back(array);
std::vector<std::shared_ptr<arrow::Field>> ret_types = {
arrow::field("res", arrow::float64())};
*out = arrow::RecordBatch::Make(arrow::schema(ret_types), 1, res_arrays);
if (has_next_) {
has_next_ = false;
}
return arrow::Status::OK();
}

private:
arrow::MemoryPool* pool_ = arrow::default_memory_pool();
std::unique_ptr<arrow::DoubleBuilder> builder_;
bool has_next_ = true;
std::vector<std::shared_ptr<arrow::Array>> res_arrays;
};
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ case class TransformGuardRule() extends Rule[SparkPlan] {
insertRowGuard(plan)
case p: BroadcastQueryStageExec =>
p
// FIXME: A tmp workaround for single Agg
case a: HashAggregateExec
if !a.child.isInstanceOf[ProjectExec] && !a.child.isInstanceOf[FilterExec] =>
insertRowGuard(a)
case other =>
other.withNewChildren(other.children.map(insertRowGuardOrNot))
}
Expand Down

0 comments on commit 22fc9fc

Please sign in to comment.