Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial MemoryManager and DiskManager APIs for query execution + External Sort implementation #1526

Merged
merged 14 commits into from
Jan 13, 2022

Conversation

yjshen
Copy link
Member

@yjshen yjshen commented Jan 7, 2022

Which issue does this PR close?

Closes #587 .

Rationale for this change

When DataFusion processes a single partition, it will keep allocating memory until the OS or the container system kills it. To make it worse, concurrently executing partitions or even simultaneously running plans will compete for available memory until all memory is exhausted. It is more challenging to meet the memory requirements for all operators of each partition when it is running. None of the partitions or plans would run to finish.

Therefore, the ability to control the total memory usage of the process as a whole, and at the same time, allocate the available memory to each execution partition is extremely important. Under this guarantee: when the memory is sufficient, the operator can acquire as much of the memory to do the computation; when the memory is tight, the operator can be downgraded to use the disk to store some intermediate results (spilling to disk) and use a limited memory for execution.

What changes are included in this PR?

The proposed memory management architecture is the following:

  1. User designates max execution memory by setting RuntimeConfig.max_memory and RuntimeConfig.memory_fraction (float64 between 0..1). The actual max memory DataFusion could use pool_size = max_memory * memory_fraction.
  2. The entities that take up memory during its execution are called Memory Consumers. Operators or others are encouraged to register themselves to the memory manager and report its usage through mem_used().
  3. There are two kinds of consumers:
    • Controlling consumers that would acquire memory during its execution and release memory through spill if no more memory is available.
    • Tracking consumers that exist for reporting purposes to provide a more accurate memory usage estimation for memory consumers.
  4. Controlling and tracking consumers share the pool. Each controlling consumer could acquire a maximum of
    (pool_size - all_tracking_used) / active_num_controlling_consumers.
            Memory Space for the DataFusion Lib / Process of `pool_size`
   ┌──────────────────────────────────────────────z─────────────────────────────┐
   │                                              z                             │
   │                                              z                             │
   │               Controlling                    z          Tracking           │
   │            Memory Consumers                  z       Memory Consumers      │
   │                                              z                             │
   │                                              z                             │
   └──────────────────────────────────────────────z─────────────────────────────┘

Are there any user-facing changes?

Users could limit the max memory used for DataFusion through RuntimeConfig::max_memory and RuntimeConfig::memory_fraction

Note

In addition to the proposed memory manager as well as the runtime that plumbing the execute API, an ExternalSortExec is implemented to illustrate the API usage.

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Jan 7, 2022
@alamb
Copy link
Contributor

alamb commented Jan 7, 2022

Thanks @yjshen -- I'll try and give this a good look over the weekend

@houqp houqp added the enhancement New feature or request label Jan 8, 2022
Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very exciting stuff, thanks @yjshen :)

datafusion/src/execution/memory_manager.rs Outdated Show resolved Hide resolved
datafusion/src/execution/memory_manager.rs Outdated Show resolved Hide resolved
/// Initialize
pub(crate) fn initialize(self: &Arc<Self>) {
let manager = self.clone();
let handle = task::spawn(async move {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am i correct that this background refresh process is needed because tracking consumer memory updates are managed internally instead of through MemoryManager?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm creating a background task that runs periodically to update tracking consumers' total memory usage, to avoid controlling consumers to ask for available memory frequently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't put much thought into this yet, but I am curious what are your thoughts on having tracking consumers also report memory usage update directly to the memory manager? Basically similar to what we have with the controlling consumers, but without the capability to force them to spill.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason is to reduce interaction with the memory manager during one's execution, to reduce complexity as well as eliminate synchronization needs.

  • For tracking consumers that were converted from controlling consumers. for example, the hashtable size / partial sort in-mem size is known when tracking consumer is created or transformed to, then no more need for them to acquire memory or interact with memory manager.
  • For other tracking consumers with internal computational buffers. One can report its usage by simply updating its internal state mem_used, no extra function calls or interaction with the memory manager during execution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea of periodically polling tracking consumers is reasonable.

I am a little worried about a task that polls based on some clock interval, however -- it is likely that the frequency will be too fast or two slow.

What about updating tracking consumers every call to try_grow? or query to the memory manager for total memory used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no more maintained tracker_total and no more background maintaining tasks. Memory manager decides total tracker memory each time its can_grow is called now.

datafusion/src/execution/memory_manager.rs Outdated Show resolved Hide resolved
datafusion/src/execution/memory_manager.rs Outdated Show resolved Hide resolved
@liukun4515
Copy link
Contributor

@yjshen thanks, this is milestone pr for memory controller in datafusion.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much @yjshen

I think the memory manager interface in this PR (MemoryManager / MemoryConsumer) is a nice good foundation going forward.

Prior to merging this PR I would like to see:

  1. The ref count cycle between the memory manager and execution plans, I think this PR could me merged into DataFusion as is and we could iterate from there
  2. Some tests for MemoryManager and ExternalSorter (as suggested in the PR comments here)

I also think it is worth removing / reconsidering the background loop for tracked memory consumers as well, though since there isn't used yet I don't think it is critical to remove prior to merging this PR

But again, really nice and thank you for this contribution

ballista/rust/executor/src/collect.rs Show resolved Hide resolved
datafusion/src/error.rs Show resolved Hide resolved
datafusion/src/execution/memory_manager.rs Outdated Show resolved Hide resolved
datafusion/src/execution/runtime_env.rs Show resolved Hide resolved
datafusion/src/execution/runtime_env.rs Show resolved Hide resolved
}

/// Register a new memory consumer for memory usage tracking
pub(crate) fn register_consumer(self: &Arc<Self>, consumer: Arc<dyn MemoryConsumer>) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see any code that registered any Tracking consumers yet.

In terms of plumbing, what do you think about:

  1. making all ExecutionPlans MemoryConsumers and providing default implementations (that reported 0 usage)
  2. Registering all ExecutionPlans somehow as MemoryConsumers as part of physical plan creation?

That way all implementations of ExecutionPlan could report their usage without having to explicitly register themselves with the memory manager. Also the manager could report on how many operators were not providing any statistics, etc

datafusion/src/execution/memory_manager.rs Outdated Show resolved Hide resolved
datafusion/src/execution/memory_manager.rs Outdated Show resolved Hide resolved
datafusion/src/execution/memory_manager.rs Outdated Show resolved Hide resolved
datafusion/src/execution/memory_manager.rs Outdated Show resolved Hide resolved
@yjshen
Copy link
Member Author

yjshen commented Jan 10, 2022

@houqp @alamb Thanks for your detailed and insightful review!

Resolved:

  • The maintained total trackers' memory and the background threads that update it are removed. Instead, total tracker memory is collected each time the memory manager runs its can_grow method.
  • Renamed controlling consumer to requesting consumer, requester in short.
  • Use Weak in MemoryManager now.
  • Use TempDir instead of manually retries of creating scratch dirs. Use rand instead of uuid crate.

To discuss:

I didn't see any code that registered any Tracking consumers yet.

There is one in SortMergingStream while merging multiple partial order results from spill files and the last piece of batches that are still in memory. The last piece is created as in-memory batches backed StreamWrapper, and just reporting its usage as in-memory batches total size.

In terms of plumbing, what do you think about:

  1. making all ExecutionPlans MemoryConsumers and providing default implementations (that reported 0 usage)
  2. Registering all ExecutionPlans somehow as MemoryConsumers as part of physical plan creation?

That way all implementations of ExecutionPlan could report their usage without having to explicitly register themselves with the memory manager. Also the manager could report on how many operators were not providing any statistics, etc

I think there is a gap between ExecPlan and MemoryConsumer. Since an execute method would be called multiple times with different partition, it's always the SendableRecordBatchStream such as SortPreservingMergeStream, CrossJoinStream that takes up memory. Should I make it like:

/// Trait for types that stream [arrow::record_batch::RecordBatch]
pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> + MemoryConsumer {
    /// Returns the schema of this `RecordBatchStream`.
    ///
    /// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this
    /// stream should have the same schema as returned from this method.
    fn schema(&self) -> SchemaRef;
}

/// Trait for a stream of record batches.
pub type SendableRecordBatchStream = Pin<Arc<dyn RecordBatchStream + Send + Sync>>;

Should I make SendableRecordBatchStream pin arc instead of pin box and register each stream arc to runtime at each execute() last line? Also register consumers through:

pub fn register_consumer(&self, memory_consumer: &Arc<dyn MemoryConsumer>) {

may sometimes be awkward:

runtime.register_consumer(&(streams.clone() as Arc<dyn MemoryConsumer>));

Any thoughts?

@yjshen yjshen requested review from houqp and alamb January 10, 2022 09:59
@alamb
Copy link
Contributor

alamb commented Jan 11, 2022

I think there is a gap between ExecPlan and MemoryConsumer. Since an execute method would be called multiple times with different partition, it's always the SendableRecordBatchStream such as SortPreservingMergeStream, CrossJoinStream that takes up memory. Should I make it like:

This is a good point (that the memory management is done on a per-partition basis rather than a per ExecutionPlan basis. I need to think 🤔 about it some more.

I would recommend we don't change SendableRecordBatchStream which is complicated enough as is.

I will make time today to review this PR again thoroughly -- thank you @yjshen I think we are close

@tustvold
Copy link
Contributor

tustvold commented Jan 11, 2022

Should I make SendableRecordBatchStream pin arc instead of pin box and register each stream arc to runtime at each execute() last line?

Not fully caught up, but how would you consume from such a thing? You need a mutable reference to poll a stream? Streams, like iterators, are not meant to be shared.

As an aside the Sync constraint on SendableRecordBatchStream is potentially extraneous for this reason, you can't do much with a shared stream anyway, so requiring share-ability between threads imposes unnecessary implementation constraints

@yjshen
Copy link
Member Author

yjshen commented Jan 11, 2022

@tustvold Thanks for bringing it up. I find the stream a single place to have all runtime entities be auto-registered to the memory manager at once. Maybe a wrapper over the stream could achieve the goal?

@tustvold
Copy link
Contributor

tustvold commented Jan 11, 2022

Maybe a wrapper over the stream could achieve the goal

My instinct would be to suggest having the shared ref internal to the stream implementation, instead of a wrapper. Otherwise I suspect you will run into borrow checker, pinning, and async pain. This would also avoid needing to make breaking changes to SendableRecordBatchStream?

Another thing to potentially think about is that many of the operators aren't actually streams, rather they spawn a tokio task and then return an mpsc queue. There will need to be some accounting of both data buffered in the queue, and data in the operators "task". My gut feeling is this is going to require adding some sort of RAII tracking field to RecordBatch or possibly Buffer but I'm not really sure...

@alamb alamb added the api change Changes the API exposed to users of the crate label Jan 11, 2022
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed the memory manager changes -- I think it is good enough to start with and we can iterate from there.

I didn't get a chance to fully review the changes to sort_preserving_merge -- will keep at it tomorrow.

cc @tustvold

datafusion/src/execution/memory_manager.rs Outdated Show resolved Hide resolved
Comment on lines +39 to +42
/// Merge buffered, self-sorted record batches to get an order.
///
/// Internally, it uses MinHeap to reduce extra memory consumption
/// by not concatenating all batches into one and sorting it as done by `SortExec`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to study the connection between SortExec, SortPreservingMergeStream and InMemSortStream some more to fully get understand this. I can't help by think that InMemSortStream is doing the same thing as SortPreservingMergeStream -- and I wonder if we can reuse that same code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main difference between InMemSortStream and SortPreservingMergeStream lies in assuming different numbers of "entity" (batches for IMSS and streams for SPMS) merged.

Since InMemSort means to merge much more partially ordered "entities", the sorter should reduce the num of comparison for each item pop-up. Hence a MinHeap was introduced.

On the other hand, InMemSort is more specialized to have each "entity" only one record batch, therefore simplified logic compared to consider stream continuation in SortPreservingMergeStream.

Currently, the common parts SortKeyCursor and RowIndex for both sorts are extracted to sorts/mod.rs reduce duplication.

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't have a good solution to the SendableRecordBatchStream problem off the top of my head, will need to think more about it. Other than that, I think the change looks good as a first iteration 👍

datafusion/src/execution/memory_manager.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you again @yjshen for this great contribution.

I have some concerns about parts of this PR (listed below), but I still think we should merge this PR as is and handle the concerns as follow ons.

Thus my plan is to send a note to the dev mailing list and slack channel asking for comments (on the API specifically) and if we don't hear any major concerns I suggest we merge this PR tomorrow.

My rationale for the merge with concerns doing is:

  1. This PR gets the necessary foundations in for limiting resources at runtime: specifically the RuntimeEnv, and MemoryConsumer, MemoryManager, and DiskManager APIs.
  2. It is backwards compatible (e.g. external sort is not connected to anything, so there should be no performance regressions)

I think it would be good to file a follow on PR marking the MemoryManager, DiskManager, and MemoryConsumer APIs as experimental and I will prepare such a PR.

My Concerns (aka major follow on work):

  1. External sorting is not connected to anything -- aka it is code that isn't used (yet)
  2. InMemSortStream and SortPreservingMergeStream are doing very much the same thing -- consolidating the code I think will be important as we move to optimize them
  3. As most of the rest of the system isn't connected to the memory system, the APIs may not be fully adequate (but we can iterate on that)

This PR also unlocks some cool follow on projects (like supporting external group by / group by hash / spill to disk) 🚗

@alamb
Copy link
Contributor

alamb commented Jan 12, 2022

FWIW I also plan to run the TPCH benchmarks on this PR and will post the results (I don't expect any changes)

@alamb alamb changed the title A simplified memory manager for query execution Initial MemoryManager and DiskManager APIs for query execution + External Sort implementation Jan 12, 2022
Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @yjshen, great work! I went through it briefly. BTW, some of the details are well handled, such as pool size.

datafusion/src/execution/runtime_env.rs Outdated Show resolved Hide resolved
let path = tmp_dir.path().to_str().unwrap().to_string();
std::mem::forget(tmp_dir);

Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to define constants for default values of batch_size and memory_fraction than bare numbers

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is already the default of config and meant to be overwritten if needed. And a single place for these defaults?

BTW, I think it is worth reconsidering and restructuring the multiple configs and their usages. ExecutionConfig, PhysicalPlanConfig and RuntimeConfig.

At least we should not pass target_batch_size during query planning since we already have runtimeEnv plumbing through the execute() API now, will create follow-up PR once we've merged this one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to me. We can deal with it uniformly in the following PR

@liukun4515
Copy link
Contributor

FWIW I also plan to run the TPCH benchmarks on this PR and will post the results (I don't expect any changes)

Will you post the result in this review?

@alamb
Copy link
Contributor

alamb commented Jan 13, 2022

Will you post the result in this review?

Yes, I will do so.

@alamb
Copy link
Contributor

alamb commented Jan 13, 2022

Ok, I am going to fire up my benchmark machine, get some numbers, and assuming they look good merge this PR

@alamb
Copy link
Contributor

alamb commented Jan 13, 2022

Here are the results of my comparison: the simple_mm branch appears to be about 10% faster for reasons I don't understand

Setup:

  1. 10G TPCH data (Scale Factor 10)
  2. 16 core / 64G mem machine in google cloud ("Cascade Lake" architecture)
  3. Ran q1 which is a basic select / predicate / orderby (query below)

Benchmark command:

cd benchmarks
cargo run --release  --bin tpch -- benchmark datafusion --partitions 16 -m  --iterations 10 --path /data/tpch_data_10G/ --format tbl --query 1

master

Compared master at 14176ff (arrow-datafusion) (merge base of simple_mm)

Query 1 iteration 0 took 550.8 ms
Query 1 iteration 1 took 542.1 ms
Query 1 iteration 2 took 533.0 ms
Query 1 iteration 3 took 539.4 ms
Query 1 iteration 4 took 543.0 ms
Query 1 iteration 5 took 538.5 ms
Query 1 iteration 6 took 537.9 ms
Query 1 iteration 7 took 536.6 ms
Query 1 iteration 8 took 537.5 ms
Query 1 iteration 9 took 539.8 ms
Query 1 avg time: 539.86 ms

yjshen/simple_mm

yjshen/simple_mm at 04dca98

Query 1 iteration 0 took 500.2 ms
Query 1 iteration 1 took 492.2 ms
Query 1 iteration 2 took 489.1 ms
Query 1 iteration 3 took 488.4 ms
Query 1 iteration 4 took 489.2 ms
Query 1 iteration 5 took 485.6 ms
Query 1 iteration 6 took 488.2 ms
Query 1 iteration 7 took 489.5 ms
Query 1 iteration 8 took 491.4 ms
Query 1 iteration 9 took 489.7 ms
Query 1 avg time: 490.36 ms

Query

Query 1

select
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
        l_shipdate <= date '1998-09-02'
group by
    l_returnflag,
    l_linestatus
order by
    l_returnflag,
    l_linestatus;

@alamb
Copy link
Contributor

alamb commented Jan 13, 2022

I think while not perfect this PR is a step in the right direction towards being able to handle queries that need to spill. 🚀 thank you @yjshen

Shall I file follow on tickets for the next step?

I am particularly interested in ensuring we consolidate the Sort code (so there is only a single sort operator that does in memory sorting if it has enough memory budget but then spills to disk if needed). I would enjoy helping make this happen (perhaps by writing some tests?)

@alamb alamb merged commit d7e465a into apache:master Jan 13, 2022
@liukun4515
Copy link
Contributor

I talked with @yjshen yesterday, maybe the external sort has been implemented as a draft in his branch. @alamb
You can fill an issue to track the follow-up tasks, it's clear for all contributors.

@yjshen
Copy link
Member Author

yjshen commented Jan 14, 2022

Thank you all again for helping me with the initial document proposal as well as insightful reviews in this PR ❤️

Shall I file follow on tickets for the next step?

Yes, that would be great! please open issues that you have in mind. I have a bunch of ideas as follow-ups as well. I think we already have the foundation for many exciting features to come. How do you like to open an umbrella issue as well as sub-task issues? I could file sub-tasks issues under it as well.

I am particularly interested in ensuring we consolidate the Sort code. (so there is only a single sort operator that does in memory sorting if it has enough memory budget but then spills to disk if needed). I would enjoy helping make this happen (perhaps by writing some tests?)

Thanks, I can do the initial consolidation, please join at any time or just take it over. depends on your time schedule.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Epic] Optionally Limit memory used by DataFusion plan
6 participants