Skip to content

Commit

Permalink
ARROW-15410: [C++][Datasets] Improve memory usage of datasets API whe…
Browse files Browse the repository at this point in the history
…n scanning parquet

This PR changes a few things.

 * The default file readahead is changed to 4.  This doesn't seem to affect performance on HDD/SSD and users should already be doing special tuning for S3.  Besides, in many cases, users are reading IPC/Parquet files that have many row groups and so we already have sufficient I/O parallelism.  This is important for bringing down the overall memory usage as can be seen in the formula below.
 * The default batch readahead is changed to 16.  Previously, when we were doing filtering and projection within the scanner, it made sense to read many batches ahead (generally want at least 2 * # of CPUs in that case).  Now that the exec plan is doing the computation the exec plan buffering is instead handled by kDefaultBackpressureLowBytes and kDefaultBackpressureHighBytes.
 * Moves around the parquet readahead a bit.  The previous version would read ahead N row groups.  Now we always read ahead exactly 1 row group but we read ahead N batches (this may mean that we read ahead more than 1 row group if the batch size is much larger than the row group size).
 * Backpressure now utilizes the pause/resume producing signals in the execution plan.  I've adding a `counter` argument to the calls to help deal with the challenges that arise when we try and sequence backpressure signals.  Partly this was to add support for monitoring backpressure (for tests).  Partly it is because I have since become more aware of the reasons for these signals.  They are needed to allow for backpressure from the aggregate & join nodes.
 * Sink backpressure can now be monitored.  This makes it easier to test and could be potentially useful to a user that wanted to know when they are consuming the plan too slowly.
 * Changes the default scanner batch size to 128Ki rows.  Now that we have more or less decoupled the scanning batch size from the row group size we can pass smaller batches through the scanner.  This makes it easier to get parallelism on small datasets..

Putting this altogether the scanner should now buffer in memory:

MAX(fragment_readahead * row_group_size_bytes * 2, fragment_readahead * batch_readahead * batch_size_bytes)

The exec plan sink node should buffer ~ kDefaultBackpressureHighBytes bytes.

The exec plan itself can have some number of tasks in flight but, assuming there are no pipeline breakers, this will be limited to the number of threads in the CPU thread pool and so it should be parallelism * batch_size_bytes.

Adding those together should give the total RAM usage of a plan being read via a sink node that doesn't have any pipeline breakers.

When the sink is a write node then there is a separate backpressure consideration based on # of rows (we can someday change this to be # of bytes but it would be a bit tricky at the moment because we need to balance this with the other write parameters like min_rows_per_group).

So, given the parquet dataset mentioned in the JIRA (21 files, 10 million rows each, 10 row groups each) and knowing that 1 row group is ~140MB when decompressed into Arrow format we should get the following default memory usage:

Scanner readahead = MAX(4 * 140MB * 2, 4 * 16 * 17.5MB) = MAX(1120MB, 1120MB) = 1120MB
Sink readahead ~ 1GiB
Total RAM usage should then be ~2GiB.

 - [x] Add tests to verify memory usage
 - [ ] ~~Update docs to mention that S3 users may want to increase the fragment readahead but this will come at the cost of more RAM usage.~~
 - [ ] ~~Update docs to give some of this "expected memory usage" information~~

Closes apache#12228 from westonpace/feature/ARROW-15410--improve-dataset-parquet-memory-usage

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
  • Loading branch information
westonpace committed Apr 22, 2022
1 parent 6c1a160 commit 78fb2ed
Show file tree
Hide file tree
Showing 28 changed files with 593 additions and 477 deletions.
8 changes: 6 additions & 2 deletions cpp/examples/arrow/compute_register_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,12 @@ class ExampleNode : public cp::ExecNode {
return arrow::Status::OK();
}

void ResumeProducing(ExecNode* output) override {}
void PauseProducing(ExecNode* output) override {}
void ResumeProducing(ExecNode* output, int32_t counter) override {
inputs_[0]->ResumeProducing(this, counter);
}
void PauseProducing(ExecNode* output, int32_t counter) override {
inputs_[0]->PauseProducing(this, counter);
}

void StopProducing(ExecNode* output) override { inputs_[0]->StopProducing(this); }
void StopProducing() override { inputs_[0]->StopProducing(); }
Expand Down
3 changes: 2 additions & 1 deletion cpp/examples/arrow/engine_substrait_consumption.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class IgnoringConsumer : public cp::SinkNodeConsumer {
public:
explicit IgnoringConsumer(size_t tag) : tag_{tag} {}

arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema) override {
arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
cp::BackpressureControl* backpressure_control) override {
return arrow::Status::OK();
}

Expand Down
3 changes: 2 additions & 1 deletion cpp/examples/arrow/execution_plan_documentation_examples.cc
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,8 @@ arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
: batches_seen(batches_seen), finish(std::move(finish)) {}

arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema) override {
arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
cp::BackpressureControl* backpressure_control) override {
return arrow::Status::OK();
}

Expand Down
18 changes: 14 additions & 4 deletions cpp/src/arrow/compute/exec/aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,13 @@ class ScalarAggregateNode : public ExecNode {
return Status::OK();
}

void PauseProducing(ExecNode* output) override { EVENT(span_, "PauseProducing"); }
void PauseProducing(ExecNode* output, int32_t counter) override {
inputs_[0]->PauseProducing(this, counter);
}

void ResumeProducing(ExecNode* output) override { EVENT(span_, "ResumeProducing"); }
void ResumeProducing(ExecNode* output, int32_t counter) override {
inputs_[0]->ResumeProducing(this, counter);
}

void StopProducing(ExecNode* output) override {
DCHECK_EQ(output, outputs_[0]);
Expand Down Expand Up @@ -598,9 +602,15 @@ class GroupByNode : public ExecNode {
return Status::OK();
}

void PauseProducing(ExecNode* output) override { EVENT(span_, "PauseProducing"); }
void PauseProducing(ExecNode* output, int32_t counter) override {
// TODO(ARROW-16260)
// Without spillover there is way to handle backpressure in this node
}

void ResumeProducing(ExecNode* output) override { EVENT(span_, "ResumeProducing"); }
void ResumeProducing(ExecNode* output, int32_t counter) override {
// TODO(ARROW-16260)
// Without spillover there is way to handle backpressure in this node
}

void StopProducing(ExecNode* output) override {
EVENT(span_, "StopProducing");
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,13 @@ Status MapNode::StartProducing() {
return Status::OK();
}

void MapNode::PauseProducing(ExecNode* output) { EVENT(span_, "PauseProducing"); }
void MapNode::PauseProducing(ExecNode* output, int32_t counter) {
inputs_[0]->PauseProducing(this, counter);
}

void MapNode::ResumeProducing(ExecNode* output) { EVENT(span_, "ResumeProducing"); }
void MapNode::ResumeProducing(ExecNode* output, int32_t counter) {
inputs_[0]->ResumeProducing(this, counter);
}

void MapNode::StopProducing(ExecNode* output) {
DCHECK_EQ(output, outputs_[0]);
Expand Down
34 changes: 28 additions & 6 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,24 @@ class ARROW_EXPORT ExecNode {
// - A method allows passing a ProductionHint asynchronously from an output node
// (replacing PauseProducing(), ResumeProducing(), StopProducing())

// Concurrent calls to PauseProducing and ResumeProducing can be hard to sequence
// as they may travel at different speeds through the plan.
//
// For example, consider a resume that comes quickly after a pause. If the source
// receives the resume before the pause the source may think the destination is full
// and halt production which would lead to deadlock.
//
// To resolve this a counter is sent for all calls to pause/resume. Only the call with
// the highest counter value is valid. So if a call to PauseProducing(5) comes after
// a call to ResumeProducing(6) then the source should continue producing.
//
// If a node has multiple outputs it should emit a new counter value to its inputs
// whenever any of its outputs changes which means the counters sent to inputs may be
// larger than the counters received on its outputs.
//
// A node with multiple outputs will also need to ensure it is applying backpressure if
// any of its outputs is asking to pause

/// \brief Start producing
///
/// This must only be called once. If this fails, then other lifecycle
Expand All @@ -204,22 +222,26 @@ class ARROW_EXPORT ExecNode {

/// \brief Pause producing temporarily
///
/// \param output Pointer to the output that is full
/// \param counter Counter used to sequence calls to pause/resume
///
/// This call is a hint that an output node is currently not willing
/// to receive data.
///
/// This may be called any number of times after StartProducing() succeeds.
/// However, the node is still free to produce data (which may be difficult
/// to prevent anyway if data is produced using multiple threads).
virtual void PauseProducing(ExecNode* output) = 0;
virtual void PauseProducing(ExecNode* output, int32_t counter) = 0;

/// \brief Resume producing after a temporary pause
///
/// \param output Pointer to the output that is now free
/// \param counter Counter used to sequence calls to pause/resume
///
/// This call is a hint that an output node is willing to receive data again.
///
/// This may be called any number of times after StartProducing() succeeds.
/// This may also be called concurrently with PauseProducing(), which suggests
/// the implementation may use an atomic counter.
virtual void ResumeProducing(ExecNode* output) = 0;
virtual void ResumeProducing(ExecNode* output, int32_t counter) = 0;

/// \brief Stop producing definitively to a single output
///
Expand Down Expand Up @@ -281,9 +303,9 @@ class MapNode : public ExecNode {

Status StartProducing() override;

void PauseProducing(ExecNode* output) override;
void PauseProducing(ExecNode* output, int32_t counter) override;

void ResumeProducing(ExecNode* output) override;
void ResumeProducing(ExecNode* output, int32_t counter) override;

void StopProducing(ExecNode* output) override;

Expand Down
8 changes: 6 additions & 2 deletions cpp/src/arrow/compute/exec/hash_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -592,9 +592,13 @@ class HashJoinNode : public ExecNode {
return Status::OK();
}

void PauseProducing(ExecNode* output) override { EVENT(span_, "PauseProducing"); }
void PauseProducing(ExecNode* output, int32_t counter) override {
// TODO(ARROW-16246)
}

void ResumeProducing(ExecNode* output) override { EVENT(span_, "ResumeProducing"); }
void ResumeProducing(ExecNode* output, int32_t counter) override {
// TODO(ARROW-16246)
}

void StopProducing(ExecNode* output) override {
DCHECK_EQ(output, outputs_[0]);
Expand Down
77 changes: 73 additions & 4 deletions cpp/src/arrow/compute/exec/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,85 @@ class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions {
std::vector<FieldRef> keys;
};

constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30; // 1GiB
constexpr int32_t kDefaultBackpressureLowBytes = 1 << 28; // 256MiB

class ARROW_EXPORT BackpressureMonitor {
public:
virtual ~BackpressureMonitor() = default;
virtual uint64_t bytes_in_use() const = 0;
virtual bool is_paused() const = 0;
};

/// \brief Options to control backpressure behavior
struct ARROW_EXPORT BackpressureOptions {
/// \brief Create default options that perform no backpressure
BackpressureOptions() : resume_if_below(0), pause_if_above(0) {}
/// \brief Create options that will perform backpressure
///
/// \param resume_if_below The producer should resume producing if the backpressure
/// queue has fewer than resume_if_below items.
/// \param pause_if_above The producer should pause producing if the backpressure
/// queue has more than pause_if_above items
BackpressureOptions(uint32_t resume_if_below, uint32_t pause_if_above)
: resume_if_below(resume_if_below), pause_if_above(pause_if_above) {}

static BackpressureOptions DefaultBackpressure() {
return BackpressureOptions(kDefaultBackpressureLowBytes,
kDefaultBackpressureHighBytes);
}

bool should_apply_backpressure() const { return pause_if_above > 0; }

uint64_t resume_if_below;
uint64_t pause_if_above;
};

/// \brief Add a sink node which forwards to an AsyncGenerator<ExecBatch>
///
/// Emitted batches will not be ordered.
class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions {
public:
explicit SinkNodeOptions(std::function<Future<util::optional<ExecBatch>>()>* generator,
util::BackpressureOptions backpressure = {})
: generator(generator), backpressure(std::move(backpressure)) {}
BackpressureOptions backpressure = {},
BackpressureMonitor** backpressure_monitor = NULLPTR)
: generator(generator),
backpressure(std::move(backpressure)),
backpressure_monitor(backpressure_monitor) {}

/// \brief A pointer to a generator of batches.
///
/// This will be set when the node is added to the plan and should be used to consume
/// data from the plan. If this function is not called frequently enough then the sink
/// node will start to accumulate data and may apply backpressure.
std::function<Future<util::optional<ExecBatch>>()>* generator;
util::BackpressureOptions backpressure;
/// \brief Options to control when to apply backpressure
///
/// This is optional, the default is to never apply backpressure. If the plan is not
/// consumed quickly enough the system may eventually run out of memory.
BackpressureOptions backpressure;
/// \brief A pointer to a backpressure monitor
///
/// This will be set when the node is added to the plan. This can be used to inspect
/// the amount of data currently queued in the sink node. This is an optional utility
/// and backpressure can be applied even if this is not used.
BackpressureMonitor** backpressure_monitor;
};

/// \brief Control used by a SinkNodeConsumer to pause & resume
///
/// Callers should ensure that they do not call Pause and Resume simultaneously and they
/// should sequence things so that a call to Pause() is always followed by an eventual
/// call to Resume()
class ARROW_EXPORT BackpressureControl {
public:
/// \brief Ask the input to pause
///
/// This is best effort, batches may continue to arrive
/// Must eventually be followed by a call to Resume() or deadlock will occur
virtual void Pause() = 0;
/// \brief Ask the input to resume
virtual void Resume() = 0;
};

class ARROW_EXPORT SinkNodeConsumer {
Expand All @@ -150,7 +218,8 @@ class ARROW_EXPORT SinkNodeConsumer {
/// This will be run once the schema is finalized as the plan is starting and
/// before any calls to Consume. A common use is to save off the schema so that
/// batches can be interpreted.
virtual Status Init(const std::shared_ptr<Schema>& schema) = 0;
virtual Status Init(const std::shared_ptr<Schema>& schema,
BackpressureControl* backpressure_control) = 0;
/// \brief Consume a batch of data
virtual Status Consume(ExecBatch batch) = 0;
/// \brief Signal to the consumer that the last batch has been delivered
Expand Down
Loading

0 comments on commit 78fb2ed

Please sign in to comment.