Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Managing memory usage during query execution #3

Closed
wants to merge 17 commits into from
Closed

Conversation

yjshen
Copy link
Owner

@yjshen yjshen commented Nov 9, 2021

Closes apache#587

This PR includes the implementation of ExternalSorter for ease of comprehension.

Check https://docs.google.com/document/d/1BT5HH-2sKq-Jxo51PNE6l9NNd_F-FyyYcyC3SKTnkIA/edit?usp=sharing for design.

@yjshen yjshen changed the title Managed memory usage during query execution Managing memory usage during query execution Nov 9, 2021
@alamb
Copy link

alamb commented Nov 10, 2021

Thanks @yjshen -- I hope to review this carefully tomorrow

Copy link

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @yjshen . I read some of this PR carefully but did not review the entire thing

I really like how you used the external sort as a motivating example for the memory manager. Given the variety of places that DataFusion runs, I think it is unlikely we'll get the details of a partition aware memory manager working well at first -- I think starting with some sort of simpler strategy (like evenly divide the memory across all operators) might make sense

In terms of where to go next, here is what I suggest about:

  1. We introduce the RuntimeEnv and the parts of the memory manager API needed to allocate and deallocate memory (e.g. something like MemoryConsumer::allocate() and MemoryConsumer::release())
  2. We add the appropriate calls (as we can) to all the operators to simply report what they are using
  3. As a follow on we can start to get fancier and calculate / estimate memory budgets for things like external sort

}

let input = self.input.execute(partition).await?;
external_sort(input, partition, self.expr.clone(), RUNTIME_ENV.clone()).await
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of getting the memory manager into the actual plans, one thing that might be worth looking at is putting it on the ExecutionContextState object (which the physical planner has access to). You could potentially add the RuntimeEnv reference to that struct and then pass it into ExternaSortExec::new()

Alternatively, as I think we envision all operators getting memory from the RuntimeEnv adding it as a new parameter to ExecutionPlan::execute() might be the clearest thing to do (and then it would be clear all execution plan nodes would have this structure)

https://github.com/yjshen/arrow-datafusion/blob/mm/datafusion/src/physical_plan/planner.rs#L299

use std::sync::Arc;

lazy_static! {
/// Employ lazy static temporarily for RuntimeEnv, to avoid plumbing it through
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a smart idea for prototyping 👍

/// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`(SPMS).
/// Always put in mem batch based stream to idx 0 in SPMS so that we could spill
/// the stream when `spill()` is called on us.
async fn sort(&self) -> Result<SendableRecordBatchStream> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully follow the logic here (it may be because I didn't follow all the async stream stuff correctly)

Normally I have seen external sort implemented like:

  1. Sort gets maxumum allowable memory budget
  2. Sort reads input until it is over its budget at which point it sorts the batch and writes to an external file
  3. The in memory buffer is cleared and input reading commences until input is done or the budget is exhausted again
  4. Once the input is done, the last buffer is sorted and then a N way merge is done on the files from disk

I can't quite map that algorithm to this code 🤔

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The external sort here is a little bit different here, it's acquiring memory budget at each record batch basis, instead of getting a huge budget of fixed size ahead of real usage. By acquiring memory for record batch at a time, we can get finer control of how much memory is really used in the system. Therefore, the external sort implementation here is:

  1. Get one record batch from input, acquire a memory budget of its size from the memory manager, sort the batch, and insert the batch to a vector that contains all in-memory, buffered batches.
  2. when the external sorter is required to spill to disk, (triggered by other consumers or just itself when no memory available) it first looks up if itself contains in-memory buffered batches, if so, it does a heap sort to merge all in-memory, self-sorted batches to a partial order, and writes to file, frees up all memory the vector contains.
  3. when all input batches are seen, do a merge sort of in-memory batches with all spills, resulting in a final, total order.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense @yjshen -- I think there is a tradeoff here:

If memory is allocated "on demand" for each record batch as you describe in this ticket, one operator can effectively consume all the memory, causing others to spill even when they might not need to. However, the on demand strategy does work well to ensure that all memory allocated is used

For example, if you have a plan like

Sort
  GroupByHash

If you allocate memory on demand, it is possible that the group by hash consumes most/all of the memory available (and still requires spilling) so by the time the sort is run, it can't get much/all memory and so it spills unecessairly

However, it is true that if memory is allocated up front, there is the very real risk of allocating more memory to sort or group by hash than can be used (resulting in waste)

in conclusion, I think the algorithm you describe sounds like a good one

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the Sort -> GroupBy -> ParquetScan case, and for most operator cases, the memory usage style is clear: a growing phase and a shrinking phase.

Give group by as an example; it would use up all available memory and require spills during hash building. i.e., the growing phase. When all inputs are inserted, it enters into a shrinking mode, while it may still hold some memory for output, but also spillable to first write its aggregation result to a file and then read file by its successor. Memory usage can only reduce since then.

However, bad guys that output while buffering while inputting, e.g., a window operator, do not behave well like grow-then-shrink. And if we have several windows chained in a single stage, the spilling behavior would likely be strange, often spills for a single operator, and hard to proceed for the entire stage. (For Spark's memory management, as well for my current implementation).

Two available solutions for this case are:

  1. allocation strategist priority aware. i.e., while insufficient memory is witnessed, we only trigger consumers of a lower priority to spill and throttle processing speed by starving the childing operators.
  2. executor aware scheduler, as brought out several times in the community, throttle records/batch processing speed for different operators by scheduling them with adjustable speed.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Give group by as an example; it would use up all available memory and require spills during hash building. i.e., the growing phase. When all inputs are inserted, it enters into a shrinking mode, while it may still hold some memory for output, but also spillable to first write its aggregation result to a file and then read file by its successor. Memory usage can only reduce since then.

The classic group by spill algorithm I am familiar with spills groups to disk, sorted by group key when the hash table memory is exhausted.

Then, data is output by merging the sorted runs from disk.

upon thinking about this, it does seem reasonable that the merge phase can end up using much less memory 👍

However, bad guys that output while buffering while inputting, e.g., a window operator, do not behave well like grow-then-shrink.

That is a good point

@@ -61,6 +61,9 @@ pub enum DataFusionError {
/// Error returned during execution of the query.
/// Examples include files not found, errors in parsing certain types.
Execution(String),
/// This error is thrown when a consumer cannot acquire memory from the Memory Manager
/// we can just cancel the execution of the partition.
OutOfMemory(String),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
OutOfMemory(String),
ResourcesExhausted(String),

I think we may eventually have other resources that could be exhausted (like temp disk / object store space as well as file handles)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

/// Spill at least `size` bytes to disk and update related counters
async fn spill(&self, size: usize, trigger: &MemoryConsumerId) -> Result<usize> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could combine spill() with the actual allocation of a temp file (rather than the deallocation)

So like an interface such as fn reserve_spill(&self, size: usize) -> Result<filename>)

Which would effectively reserve a spill file up to size in length

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try this

}

/// Try allocate `required` bytes as needed
async fn allocate(&self, required: usize) -> Result<()> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered some sort of RAII mechanism here (to help ensure allocations match up with deallocations)?

Something like

Suggested change
async fn allocate(&self, required: usize) -> Result<()> {
async fn allocate(&self, required: usize) -> Result<ResourceReservation> {

and

struct ResourceReservation {...}


impl Drop for ResourceReservation {
  fn drop(&mut self) -> { 
    // release the reservation
  }
}

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't thought about it before, great idea👍

self.id().partition_id
}

/// Try allocate `required` bytes as needed
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in the design doc, the distinction between "allocate required bytes that I need to operate" and "give me a budget of extra memory I can use to work within" is an important one

External sort can still produce an answer with a small amount of memory, but will potentially go faster with a larger amount.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As explained in the previous sort implementation methods above, the memory consumers should try to allocate another bunch of memory in order to continue in-memory computation, or if no more memory is available, this allocate will trigger the partition manager (or just memory-manager if we combine the two) to find out suitable memory consumer(s) and trigger the spills.

So the suggestion here is to adjust the doc or of different design suggested?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think after thinking about this some more (see comment above), I don't have a specific suggestion. Sorry for confusion

@yjshen
Copy link
Owner Author

yjshen commented Nov 29, 2021

Hi @alamb , sorry for this really late reply. I was prototyping sort-merge join, sort-based aggregate, spillable repartition after this initial primitive. And also digging more about the actual reason for Spark's problematic memory control that always results in OOM in our production workload (since the prototyping here is mainly based on that of Spark's). The key find outs are:

Many of the memory consumption during Spark's execution is unmanaged, and this includes what you mentioned

"operator required memory" (certain operators like filter will need some amount of memory to operate at all)

  1. The memory consumers are not comprehensive. Many of the memory slipped away from memory trackings, such as the sort-merge join buffer for one key (which would consume much of memory when a huge record occurs or skewed join key occurs)
  2. Memory acquisition strategy is suboptimal; for example, while buffering records in memory, Spark would grow pointer array twice as big when the current one is exhausted, which will result in a huge memory allocation from JVM of a huge pointer array allocation, which is hard to find a continuous memory for it, puts a lot of full GC needs since it would enter "Tenured Space" directly, and requires more memory as really needs (holds the origin and newly allocated array both in memory)

Therefore, My current thinkings are:

  1. Add memory usage API for each physical operator for reporting purpose
  2. When the memory manager is required to allocate memory, it tries to find out the actual memory available by subtracting the used size of operators from the memory pool, then allocate the remainings
  3. Implementing more memory consumers as we proceed, optimize memory consumption of existing operators, such as using MutableRecordBatch for hash aggregate instead of a vector of ScalarValue.
  4. Extra efforts on huge records

And the result memory space usage will be:

                  Memory Space for the DataFusion Lib / Process

┌───────────────────────────────────────────────z─────────────────────────────┐
│                                               z                             │
│                                               z                             │
│            Remaining, Available for           z                             │
│                Memory Consumers               z       Physical Operators    │
│                                               z                             │
│                                               z                             │
└───────────────────────────────────────────────z─────────────────────────────┘

@alamb
Copy link

alamb commented Nov 29, 2021

The memory consumers are not comprehensive. Many of the memory slipped away from memory trackings, such as the sort-merge join buffer for one key (which would consume much of memory when a huge record occurs or skewed join key occurs)

Another strategy I have seen for this problem (basically internal buffers not accounted for properly) is a "slop" estimate for each operator. Something like "only allocate 80% of available resources (and assume that 20% will be used by the internal buffers, batches, etc" .

Allocating / reporting per operator as you describe above is also a reasonable strategy -- the issue is that the reporting will likely always diverge from the actual implementation. Perhaps both strategies are needed for a real system

Memory acquisition strategy is suboptimal;

This is something I think Rust and arrow-rs can help with

@yjshen
Copy link
Owner Author

yjshen commented Nov 29, 2021

While talking about

a "slop" estimate for each operator. Something like "only allocate 80% of available resources (and assume that 20% will be used by the internal buffers, batches, etc" .

I witnessed extreme cases where we set up only 40%~50% available resources for memory consumers but still suffered OOM. The chances are not rare, especially for chained window operators inside a single stage. 😂

I agree a "slop" parameter is still needed in our cases, will add one.

yjshen pushed a commit that referenced this pull request Dec 20, 2021
* # This is a combination of 3 commits.
# This is the 1st commit message:

Add Display for Expr::BinaryExpr

# This is the commit message #2:

Update logical_plan/operators tests

# This is the commit message #3:

rebase and debug display for non binary expr

* Add Display for Expr::BinaryExpr

Update logical_plan/operators tests

rebase and debug display for non binary expr

Add Display for Expr::BinaryExpr

Update logical_plan/operators tests

Updating tests

Update aggregate display

Updating tests without aggregate

More tests

Working on agg/scalar functions

Fix binary_expr in create_name function and attendant tests

More tests

More tests

Doc tests

Rebase and update new tests

* Submodule update

* Restore submodule references from master

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
@yjshen yjshen closed this Jan 20, 2022
yjshen pushed a commit that referenced this pull request Mar 16, 2023
* Initial commit

* initial commit

* failing test

* table scan projection

* closer

* test passes, with some hacks

* use DataFrame (#2)

* update README

* update dependency

* code cleanup (#3)

* Add support for Filter operator and BinaryOp expressions (#4)

* GitHub action (#5)

* Split code into producer and consumer modules (#6)

* Support more functions and scalar types (#7)

* Use substrait 0.1 and datafusion 8.0 (#8)

* use substrait 0.1

* use datafusion 8.0

* update datafusion to 10.0 and substrait to 0.2 (#11)

* Add basic join support (#12)

* Added fetch support (#23)

Added fetch to consumer

Added limit to producer

Added unit tests for limit

Added roundtrip_fill_none() for testing when None input can be converted to 0

Update src/consumer.rs

Co-authored-by: Andy Grove <andygrove73@gmail.com>

Co-authored-by: Andy Grove <andygrove73@gmail.com>

* Upgrade to DataFusion 13.0.0 (#25)

* Add sort consumer and producer (#24)

Add consumer

Add producer and test

Modified error string

* Add serializer/deserializer (#26)

* Add plan and function extension support (#27)

* Add plan and function extension support

* Removed unwraps

* Implement GROUP BY (#28)

* Add consumer, producer and tests for aggregate relation

Change function extension registration from absolute to relative anchor
(reference)

Remove operator to/from reference

* Fixed function registration bug

* Add test

* Addressed PR comments

* Changed field reference from mask to direct reference (#29)

* Changed field reference from masked reference to direct reference

* Handle unsupported case (struct with child)

* Handle SubqueryAlias (#30)

Fixed aggregate function register bug

* Add support for SELECT DISTINCT (#31)

Add test case

* Implement BETWEEN (#32)

* Add case (#33)

* Implement CASE WHEN

* Add more case to test

* Addressed comments

* feat: support explicit catalog/schema names in ReadRel (#34)

* feat: support explicit catalog/schema names in ReadRel

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: use re-exported expr crate

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* move files to subfolder

* RAT

* remove rust.yaml

* revert .gitignore changes

* tomlfmt

* tomlfmt

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Daniël Heres <danielheres@gmail.com>
Co-authored-by: JanKaul <jankaul@mailbox.org>
Co-authored-by: nseekhao <37189615+nseekhao@users.noreply.github.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants