Skip to content

Commit

Permalink
ARROW-18008: Added a use_threads option to run_query (apache#19)
Browse files Browse the repository at this point in the history
* ARROW-18008: Added a use_threads option to run_query

* ARROW-18008: pylint
  • Loading branch information
westonpace authored Oct 26, 2022
1 parent 45791de commit f94072e
Show file tree
Hide file tree
Showing 17 changed files with 273 additions and 217 deletions.
97 changes: 90 additions & 7 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -604,12 +604,13 @@ Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches(
return DeclarationToBatchesAsync(std::move(declaration), exec_context).result();
}

Future<std::vector<ExecBatch>> DeclarationToExecBatchesAsync(Declaration declaration,
ExecContext* exec_context) {
Future<std::vector<ExecBatch>> DeclarationToExecBatchesAsync(
Declaration declaration, std::shared_ptr<Schema>* out_schema,
ExecContext* exec_context) {
AsyncGenerator<std::optional<ExecBatch>> sink_gen;
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan, ExecPlan::Make());
Declaration with_sink =
Declaration::Sequence({declaration, {"sink", SinkNodeOptions(&sink_gen)}});
Declaration with_sink = Declaration::Sequence(
{declaration, {"sink", SinkNodeOptions(&sink_gen, out_schema)}});
ARROW_RETURN_NOT_OK(with_sink.AddToPlan(exec_plan.get()));
ARROW_RETURN_NOT_OK(exec_plan->StartProducing(exec_context->executor()));
auto collected_fut = CollectAsyncGenerator(sink_gen);
Expand All @@ -622,9 +623,91 @@ Future<std::vector<ExecBatch>> DeclarationToExecBatchesAsync(Declaration declara
});
}

Result<std::vector<ExecBatch>> DeclarationToExecBatches(Declaration declaration,
ExecContext* exec_context) {
return DeclarationToExecBatchesAsync(std::move(declaration), exec_context).result();
Result<std::vector<ExecBatch>> DeclarationToExecBatches(
Declaration declaration, std::shared_ptr<Schema>* out_schema,
ExecContext* exec_context) {
return DeclarationToExecBatchesAsync(std::move(declaration), out_schema, exec_context)
.result();
}

namespace {
struct BatchConverter {
Future<std::shared_ptr<RecordBatch>> operator()() {
return exec_batch_gen().Then([this](const std::optional<ExecBatch>& batch)
-> Result<std::shared_ptr<RecordBatch>> {
if (batch) {
return batch->ToRecordBatch(schema);
} else {
return nullptr;
}
});
}

AsyncGenerator<std::optional<ExecBatch>> exec_batch_gen;
std::shared_ptr<Schema> schema;
};

Result<BatchConverter> DeclarationToRecordBatchGenerator(
Declaration declaration, ::arrow::internal::Executor* executor,
std::shared_ptr<ExecPlan>* out_plan) {
BatchConverter converter;
ARROW_ASSIGN_OR_RAISE(*out_plan, ExecPlan::Make());
Declaration with_sink = Declaration::Sequence(
{declaration,
{"sink", SinkNodeOptions(&converter.exec_batch_gen, &converter.schema)}});
ARROW_RETURN_NOT_OK(with_sink.AddToPlan(out_plan->get()));
ARROW_RETURN_NOT_OK((*out_plan)->StartProducing(executor));
return converter;
}
} // namespace

Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration declaration,
bool use_threads) {
std::shared_ptr<ExecPlan> plan;
std::shared_ptr<Schema> schema;
Iterator<std::shared_ptr<RecordBatch>> batch_itr =
::arrow::internal::IterateSynchronously<std::shared_ptr<RecordBatch>>(
[&](::arrow::internal::Executor* executor)
-> Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> {
ARROW_ASSIGN_OR_RAISE(
BatchConverter batch_converter,
DeclarationToRecordBatchGenerator(declaration, executor, &plan));
schema = batch_converter.schema;
return batch_converter;
},
use_threads);

struct PlanReader : RecordBatchReader {
PlanReader(std::shared_ptr<ExecPlan> plan, std::shared_ptr<Schema> schema,
Iterator<std::shared_ptr<RecordBatch>> iterator)
: plan_(std::move(plan)),
schema_(std::move(schema)),
iterator_(std::move(iterator)) {}
~PlanReader() { plan_->finished().Wait(); }

std::shared_ptr<Schema> schema() const override { return schema_; }

Status ReadNext(std::shared_ptr<RecordBatch>* record_batch) override {
return iterator_.Next().Value(record_batch);
}

Status Close() override {
// End plan and read from generator until finished
plan_->StopProducing();
std::shared_ptr<RecordBatch> batch;
do {
ARROW_RETURN_NOT_OK(ReadNext(&batch));
} while (batch != nullptr);
return Status::OK();
}

std::shared_ptr<ExecPlan> plan_;
std::shared_ptr<Schema> schema_;
Iterator<std::shared_ptr<RecordBatch>> iterator_;
};

return std::make_unique<PlanReader>(std::move(plan), std::move(schema),
std::move(batch_itr));
}

namespace internal {
Expand Down
10 changes: 8 additions & 2 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -503,11 +503,13 @@ ARROW_EXPORT Future<std::shared_ptr<Table>> DeclarationToTableAsync(
///
/// \see DeclarationToTable for details
ARROW_EXPORT Result<std::vector<ExecBatch>> DeclarationToExecBatches(
Declaration declaration, ExecContext* exec_context = default_exec_context());
Declaration declaration, std::shared_ptr<Schema>* out_schema,
ExecContext* exec_context = default_exec_context());

/// \brief Asynchronous version of \see DeclarationToExecBatches
ARROW_EXPORT Future<std::vector<ExecBatch>> DeclarationToExecBatchesAsync(
Declaration declaration, ExecContext* exec_context = default_exec_context());
Declaration declaration, std::shared_ptr<Schema>* out_schema,
ExecContext* exec_context = default_exec_context());

/// \brief Utility method to run a declaration and collect the results into a vector
///
Expand All @@ -519,6 +521,10 @@ ARROW_EXPORT Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatc
ARROW_EXPORT Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
Declaration declaration, ExecContext* exec_context = default_exec_context());

/// \brief Utility method to run a declaration return results as a RecordBatchReader
Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration declaration,
bool use_threads);

/// \brief Wrap an ExecBatch generator in a RecordBatchReader.
///
/// The RecordBatchReader does not impose any ordering on emitted batches.
Expand Down
18 changes: 14 additions & 4 deletions cpp/src/arrow/compute/exec/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,11 @@ struct ARROW_EXPORT BackpressureOptions {
class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions {
public:
explicit SinkNodeOptions(std::function<Future<std::optional<ExecBatch>>()>* generator,
std::shared_ptr<Schema>* schema = NULLPTR,
BackpressureOptions backpressure = {},
BackpressureMonitor** backpressure_monitor = NULLPTR)
: generator(generator),
schema(schema),
backpressure(std::move(backpressure)),
backpressure_monitor(backpressure_monitor) {}

Expand All @@ -175,6 +177,11 @@ class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions {
/// data from the plan. If this function is not called frequently enough then the sink
/// node will start to accumulate data and may apply backpressure.
std::function<Future<std::optional<ExecBatch>>()>* generator;

/// \brief A pointer to a schema
///
/// This will be set when the plan is created
std::shared_ptr<Schema>* schema;
/// \brief Options to control when to apply backpressure
///
/// This is optional, the default is to never apply backpressure. If the plan is not
Expand Down Expand Up @@ -246,8 +253,9 @@ class ARROW_EXPORT OrderBySinkNodeOptions : public SinkNodeOptions {
public:
explicit OrderBySinkNodeOptions(
SortOptions sort_options,
std::function<Future<std::optional<ExecBatch>>()>* generator)
: SinkNodeOptions(generator), sort_options(std::move(sort_options)) {}
std::function<Future<std::optional<ExecBatch>>()>* generator,
std::shared_ptr<Schema>* schema = NULLPTR)
: SinkNodeOptions(generator, schema), sort_options(std::move(sort_options)) {}

SortOptions sort_options;
};
Expand Down Expand Up @@ -436,8 +444,10 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions {
public:
explicit SelectKSinkNodeOptions(
SelectKOptions select_k_options,
std::function<Future<std::optional<ExecBatch>>()>* generator)
: SinkNodeOptions(generator), select_k_options(std::move(select_k_options)) {}
std::function<Future<std::optional<ExecBatch>>()>* generator,
std::shared_ptr<Schema>* schema = NULLPTR)
: SinkNodeOptions(generator, schema),
select_k_options(std::move(select_k_options)) {}

/// SelectK options
SelectKOptions select_k_options;
Expand Down
15 changes: 8 additions & 7 deletions cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,14 @@ TEST(ExecPlanExecution, SinkNodeBackpressure) {
BackpressureMonitor* backpressure_monitor;
BackpressureOptions backpressure_options(resume_if_below_bytes, pause_if_above_bytes);
std::shared_ptr<Schema> schema_ = schema({field("data", uint32())});
ARROW_EXPECT_OK(compute::Declaration::Sequence(
{
{"source", SourceNodeOptions(schema_, batch_producer)},
{"sink", SinkNodeOptions{&sink_gen, backpressure_options,
&backpressure_monitor}},
})
.AddToPlan(plan.get()));
ARROW_EXPECT_OK(
compute::Declaration::Sequence(
{
{"source", SourceNodeOptions(schema_, batch_producer)},
{"sink", SinkNodeOptions{&sink_gen, /*schema=*/nullptr,
backpressure_options, &backpressure_monitor}},
})
.AddToPlan(plan.get()));
ASSERT_TRUE(backpressure_monitor);
ARROW_EXPECT_OK(plan->StartProducing(GetCpuThreadPool()));

Expand Down
18 changes: 12 additions & 6 deletions cpp/src/arrow/compute/exec/sink_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class SinkNode : public ExecNode {
public:
SinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
AsyncGenerator<std::optional<ExecBatch>>* generator,
BackpressureOptions backpressure,
std::shared_ptr<Schema>* schema, BackpressureOptions backpressure,
BackpressureMonitor** backpressure_monitor_out)
: ExecNode(plan, std::move(inputs), {"collected"}, {},
/*num_outputs=*/0),
Expand All @@ -101,6 +101,9 @@ class SinkNode : public ExecNode {
if (backpressure_monitor_out) {
*backpressure_monitor_out = &backpressure_queue_;
}
if (schema) {
*schema = inputs_[0]->output_schema();
}
auto node_destroyed_capture = node_destroyed_;
*generator = [this, node_destroyed_capture]() -> Future<std::optional<ExecBatch>> {
if (*node_destroyed_capture) {
Expand All @@ -125,7 +128,7 @@ class SinkNode : public ExecNode {
const auto& sink_options = checked_cast<const SinkNodeOptions&>(options);
RETURN_NOT_OK(ValidateOptions(sink_options));
return plan->EmplaceNode<SinkNode>(plan, std::move(inputs), sink_options.generator,
sink_options.backpressure,
sink_options.schema, sink_options.backpressure,
sink_options.backpressure_monitor);
}

Expand Down Expand Up @@ -404,8 +407,9 @@ static Result<ExecNode*> MakeTableConsumingSinkNode(
struct OrderBySinkNode final : public SinkNode {
OrderBySinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::unique_ptr<OrderByImpl> impl,
AsyncGenerator<std::optional<ExecBatch>>* generator)
: SinkNode(plan, std::move(inputs), generator, /*backpressure=*/{},
AsyncGenerator<std::optional<ExecBatch>>* generator,
std::shared_ptr<Schema>* schema)
: SinkNode(plan, std::move(inputs), generator, schema, /*backpressure=*/{},
/*backpressure_monitor_out=*/nullptr),
impl_(std::move(impl)) {}

Expand All @@ -426,7 +430,8 @@ struct OrderBySinkNode final : public SinkNode {
OrderByImpl::MakeSort(plan->exec_context(), inputs[0]->output_schema(),
sink_options.sort_options));
return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs), std::move(impl),
sink_options.generator);
sink_options.generator,
sink_options.schema);
}

static Status ValidateCommonOrderOptions(const SinkNodeOptions& options) {
Expand Down Expand Up @@ -458,7 +463,8 @@ struct OrderBySinkNode final : public SinkNode {
OrderByImpl::MakeSelectK(plan->exec_context(), inputs[0]->output_schema(),
sink_options.select_k_options));
return plan->EmplaceNode<OrderBySinkNode>(plan, std::move(inputs), std::move(impl),
sink_options.generator);
sink_options.generator,
sink_options.schema);
}

static Status ValidateSelectKOptions(const SelectKSinkNodeOptions& options) {
Expand Down
34 changes: 6 additions & 28 deletions cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,37 +130,15 @@ Result<Datum> GroupByUsingExecPlan(const BatchesWithSchema& input,
keys[i] = FieldRef(key_names[i]);
}

ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make());
AsyncGenerator<std::optional<ExecBatch>> sink_gen;
RETURN_NOT_OK(
Declaration::Sequence(
{
{"source",
SourceNodeOptions{input.schema, input.gen(use_threads, /*slow=*/false)}},
{"aggregate", AggregateNodeOptions{std::move(aggregates), std::move(keys)}},
{"sink", SinkNodeOptions{&sink_gen}},
})
.AddToPlan(plan.get()));

RETURN_NOT_OK(plan->Validate());
RETURN_NOT_OK(plan->StartProducing(ctx->executor()));

auto collected_fut = CollectAsyncGenerator(sink_gen);

auto start_and_collect =
AllFinished({plan->finished(), Future<>(collected_fut)})
.Then([collected_fut]() -> Result<std::vector<ExecBatch>> {
ARROW_ASSIGN_OR_RAISE(auto collected, collected_fut.result());
return ::arrow::internal::MapVector(
[](std::optional<ExecBatch> batch) { return std::move(*batch); },
std::move(collected));
});

std::shared_ptr<Schema> output_schema;
Declaration decl = Declaration::Sequence(
{{"source",
SourceNodeOptions{input.schema, input.gen(use_threads, /*slow=*/false)}},
{"aggregate", AggregateNodeOptions{std::move(aggregates), std::move(keys)}}});
ARROW_ASSIGN_OR_RAISE(std::vector<ExecBatch> output_batches,
start_and_collect.MoveResult());
DeclarationToExecBatches(decl, &output_schema, ctx));

ArrayVector out_arrays(aggregates.size() + key_names.size());
const auto& output_schema = plan->sources()[0]->outputs()[0]->output_schema();
for (size_t i = 0; i < out_arrays.size(); ++i) {
std::vector<std::shared_ptr<Array>> arrays(output_batches.size());
for (size_t j = 0; j < output_batches.size(); ++j) {
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,8 @@ Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
{"filter", compute::FilterNodeOptions{scan_options_->filter}},
{"augmented_project",
compute::ProjectNodeOptions{std::move(exprs), std::move(names)}},
{"sink", compute::SinkNodeOptions{&sink_gen, scan_options_->backpressure}},
{"sink", compute::SinkNodeOptions{&sink_gen, /*schema=*/nullptr,
scan_options_->backpressure}},
})
.AddToPlan(plan.get()));

Expand Down
Loading

0 comments on commit f94072e

Please sign in to comment.