Skip to content

Conversation

kosiew
Copy link
Contributor

@kosiew kosiew commented Oct 10, 2025

Purpose of this PR

Closes: #17897

This PR serves as a proof of concept (PoC) exploring an adaptive approach to improve the scalability and memory efficiency of MinMaxBytesAccumulator::update_batch.

The goal is to validate the adaptive mode selection and state reuse strategy, gather feedback on the overall architecture, and confirm that this direction eliminates the quadratic scaling issue seen in high-cardinality workloads.

Smaller, production-ready PRs will follow after feedback on this PoC.


Background

Previously, MinMaxBytesAccumulator::update_batch allocated a locations buffer sized to total_num_groups for every batch. As total_num_groups grew with each new group, memory usage and update time increased quadratically, severely degrading performance for MIN/MAX over large Utf8 datasets.

This PoC introduces a more adaptive, mode-aware design that addresses that scaling issue.


Summary of the Approach

The accumulator dynamically selects between dense, dense-inline, and sparse update modes based on runtime workload characteristics:

  • Group density and reuse
  • Access pattern stability
  • Monotonicity of group IDs

Goals

  • Maintain linear update cost per active group (vs. total historical groups)
  • Reuse dense scratch memory efficiently across batches
  • Switch to sparse tracking when groups become fragmented
  • Reduce unnecessary allocation and zeroing overhead

Key Implementation Highlights

🧩 Core Logic

  • Refactored update_batch_dense_impl and update_batch_sparse_impl with unified handling via a new WorkloadMode enum.
  • Added adaptive heuristics (record_batch_stats) to select the optimal mode automatically.
  • Reimplemented sparse path using hashbrown for faster lookups and lower overhead.
  • Introduced epoch-tracked mark reuse and lazy allocation to improve dense reuse efficiency.

🧠 Supporting Structures

  • Added helper structs in min_max_struct.rs encapsulating mode-specific state and transition logic.
  • Unified dense and sparse paths under a consistent update_batch contract.

📊 Benchmarks

  • Added new Criterion benchmark suite: datafusion/functions-aggregate/benches/min_max_bytes.rs, covering various workload patterns:
    • Dense reuse
    • Sparse and ultra-sparse
    • Monotonic group IDs
    • Mode transitions
    • Extreme duplication and quadratic growth
  • Benchmarks validate adaptive mode transitions under mixed workloads.

🧱 Cargo Additions

  • Added hashbrown dependency for efficient sparse tracking.
  • Registered new benchmark target in Cargo.toml.

Preliminary Results

Benchmarks confirm elimination of O(n²) scaling behavior
Stable performance across dense and sparse regimes

benchmark results


Scope of Change

No user-facing or SQL-level API changes.
All modifications are internal to the aggregate kernel for MIN/MAX over Utf8 columns.
The only observable effect is improved runtime performance and memory efficiency.


Next Steps

This PR is a proof of concept intended to:

  1. Demonstrate the feasibility of adaptive mode selection and reuse logic.
  2. Gather early feedback on design direction.
  3. Guide the breakdown into smaller, merge-ready PRs.

Summary

This PoC refactors MinMaxBytesAccumulator into an adaptive, mode-sensitive aggregator capable of scaling efficiently across dense, sparse, and transitional workloads.

The intent is to validate the design and heuristics before finalizing and submitting incremental PRs for review and merge.

kosiew added 11 commits October 10, 2025 12:45
…d add benchmarks

This patch introduces a new per-batch scratch tracking mechanism for the `MinMaxBytesState` accumulator to minimize redundant allocation overhead and improve efficiency for sparse group workloads. It also adds Criterion benchmarks to evaluate dense and sparse grouping performance.

* Added per-batch scratch structures to track only updated groups and their candidate values:

  * `scratch_group_ids: Vec<usize>` — tracks which groups were updated in the current batch.
  * `scratch_locations: Vec<usize>` — maps each group to its current batch input or existing value.
  * `scratch_epochs: Vec<u32>` — stores the current batch epoch per group to avoid unnecessary resets.
  * `current_epoch: u32` — increments each batch, resetting only when wrapping to zero.
* Introduced constants for clarity:

  ```rust
  const SCRATCH_EPOCH_UNUSED: u32 = 0;
  const SCRATCH_LOCATION_EXISTING: usize = usize::MAX;
  ```
* This approach ensures sparse updates no longer allocate for the full `total_num_groups`, improving scalability for high-cardinality group sets.

* Replaced the old `MinMaxLocation` enum with a compact integer-based scratch index tracking system.
* Reworked `update_batch()` logic to:

  * Avoid reallocations by reusing vectors between batches.
  * Update only touched groups using `scratch_group_ids`.
  * Perform in-place min/max comparisons without reinitializing full state.
* Prevents redundant group scanning and unnecessary Vec growth during sparse updates.

* Enhanced `size()` method to accurately include scratch storage:

  ```rust
  self.scratch_group_ids.capacity() * size_of::<usize>()
  self.scratch_locations.capacity() * size_of::<usize>()
  self.scratch_epochs.capacity() * size_of::<u32>()
  ```
* Provides a precise and bounded memory footprint estimation reflecting per-batch reuse.

* Added `benches/min_max_bytes.rs` with two new benchmarks:

  * `min_bytes_dense_groups` — evaluates dense group distributions.
  * `min_bytes_sparse_groups` — measures sparse grouping efficiency.
* Each benchmark tests `MinMaxBytesState` accumulator performance under different cardinality conditions.
* Integrated both into the Criterion suite via `criterion_group!` and `criterion_main!`.

* Added test `sparse_groups_do_not_allocate_per_total_group` verifying:

  * Sparse updates allocate only per-touched group.
  * Scratch state resets correctly between batches.
  * Epoch and group tracking remain consistent across calls.

* **Performance:** Substantially reduces allocation and update costs for sparse workloads.
* **Correctness:** Guarantees isolation between batch updates with epoch-based scratch tracking.
* **Maintainability:** Simplifies internal state handling by removing redundant enum logic.

* `datafusion/functions-aggregate/benches/min_max_bytes.rs`

* `datafusion/functions-aggregate/Cargo.toml`
* `Cargo.lock`
* `datafusion/functions-aggregate/src/min_max/min_max_bytes.rs`

* `sparse_groups_do_not_allocate_per_total_group`
…activation

- Refactored `MinMaxBytesState::update_batch` to restore efficient
  dense-path behavior:
  - Added `scratch_dense` table with epoch-based reuse instead of
    per-batch `HashMap`.
  - Introduced `scratch_epoch`, `scratch_dense_limit`, and
    `scratch_dense_enabled` to manage
    allocation and reuse between batches.
  - Implemented heuristic enabling of dense mode using
    `SCRATCH_DENSE_ENABLE_MULTIPLIER`
    to activate dense storage only when batches are sufficiently dense.
  - Added incremental dense growth with `SCRATCH_DENSE_GROWTH_STEP` to
    minimize per-batch zeroing cost.
  - Sparse batches continue to use `HashMap` without inflating dense
    allocation.

- Introduced `ScratchEntry` struct to track per-group epoch and location
  efficiently.
- Simplified logic to avoid clearing and reallocating dense storage on
  each batch.

- Added Criterion benchmark:
  - `min_bytes_dense_reused_batches`: measures performance for reused
    accumulators
    across multiple dense batches.

- Expanded test coverage:
  - Verified dense batches enable dense mode immediately
    (`dense_groups_use_dense_scratch`).
  - Verified sparse workloads remain unaffected
    (`sparse_groups_still_use_sparse_scratch`).
  - Verified dense-to-sparse transitions do not inflate allocations
    (`dense_then_sparse_batches_share_limit`).
  - Added dense reuse test to confirm epoch-based state reset without
    clearing allocations.

- Restored O(1) dense performance while retaining sparse/monotonic
  efficiency.
…tion

- Reworked `update_batch` in `MinMaxBytesState` to eliminate repeated sparse detours
  and mid-batch reprocessing for dense workloads.
- Added `evaluate_dense_candidate`, `enable_dense_for_batch`, and `expand_dense_limit`
  helpers to streamline dense path activation and resizing.
- Ensured dense path activates once per batch with direct expansion of
  `scratch_dense_limit` instead of repeated migration loops.
- Introduced new test-only counters:
  - `dense_enable_invocations` – number of dense activation events per test
  - `dense_sparse_detours` – count of sparse fallbacks while dense path active
- Updated unit tests to verify:
  - Dense workloads activate dense mode once and skip sparse map entirely
  - Dense-first batches set proper dense limits
  - Sparse workloads maintain correct allocation patterns
- Renamed benchmark `min_bytes_dense_groups` → `min_bytes_dense_first_batch`
  for clearer semantics
- Overall: restored dense path throughput parity with pre-regression baseline while
  retaining sparse-path improvements.
…e inline fast path

- Added new adaptive `WorkloadMode` enum to dynamically select between DenseInline,
  Simple, and SparseOptimized accumulation strategies.
- Implemented detailed batch statistics tracking via `BatchStats` to inform
  mode transitions and detect workload density patterns.
- Introduced `update_batch_dense_inline_impl` and committed fast-path variant
  for stable dense workloads, reducing allocation and per-batch overhead.
- Added `update_batch_simple_impl` with epoch-based slot reuse for medium-density workloads.
- Enhanced sparse update path to return per-batch statistics (`update_batch_sparse_impl`).
- Added heuristics (`should_use_dense_inline`, `should_use_simple`, `should_switch_to_sparse`)
  to control mode transitions based on group density and total groups.
- Implemented stability thresholds to commit DenseInline mode after repeated batches.
- Extended unit tests to verify:
  - DenseInline activation and commitment
  - Sparse mode selection for high group-id domains
  - Mode switching from Simple to Sparse under low density
- Added new benchmark `min_bytes_dense_duplicate_groups` to evaluate duplicate-heavy workloads.
- Updated size accounting to include new tracking structures and internal state fields.
- Removed outdated dense scratch activation tests; replaced with adaptive mode tests.
extensive benchmarks

- Introduced a new sequential dense fast path in `min_max_bytes.rs` to
  eliminate overhead for perfectly sequential dense workloads.
- Added heuristic detection for [0..N-1] group indices to automatically
  invoke the fast path.
- Implemented deferred dense mark allocation for single-batch workloads
  to minimize upfront memory cost.
- Added logic to reconsider DenseInline commitment if group domain
  expands beyond committed size.
- Improved batch processing structure with reusable scratch management
  and modularized sparse handling (`PreparedSparseBatch`,
`SparseBatchState`).
- Added full reset of internal state after `EmitTo::All` or when min_max
  drained completely.
- Extended unit tests to cover dense inline stability, mark readiness,
  domain growth, emit/reset behavior, and resizing correctness.
- Expanded benchmarks in `min_max_bytes.rs`:
  - Added micro, large, and multi-batch tests for adaptive mode
    heuristics.
  - Added stress tests for growing group domains and quadratic
    allocation detection.
  - Documented rationale and workload categories for all benchmark
    functions.
- Updated `min_max_struct.rs`:
  - Introduced reusable dense scratch mechanism for struct aggregates
    with epoch-based tracking.
  - Optimized multi-batch updates with reduced per-batch allocation.
  - Added precise size accounting including scratch structures.
  - Added new regression test for multi-batch sparse workloads
    (`test_min_max_sparse_multi_batch`).
…ath and expand benchmarks

- Corrected `update_batch_sequential_dense` to return accurate per-batch `unique_groups`
  and track `max_group_index` distinctly for domain estimation.
- Introduced `Visited` variant in `SequentialDenseLocation` to prevent re-counting
  of groups that compare equal to existing min/max values.
- Enhanced `record_batch_stats` to trace batch-level statistics (behind `trace` feature).
- Added new regression and density benchmarks:
  - `min_bytes_extreme_duplicates`
  - `min_bytes_sequential_stable_groups`
  - `min_bytes_sequential_dense_large_stable`
  - `min_bytes_medium_cardinality_stable`
  - `min_bytes_ultra_sparse`
  - `min_bytes_mode_transition`
- Added unit tests verifying correct per-batch counting and stability across duplicate batches.
- Improved heuristic documentation and tracing for adaptive mode transitions.
… add tracing feature

- Simplified `update_batch_sequential_dense` to eliminate per-batch scratch allocations and enable in-place updates.
- Fixed inaccurate `unique_groups` counting and improved dense fast-path behavior.
- Added new benchmarks to verify allocation reuse and stability.
- Adjusted tests for sequential dense behavior and memory accounting.
- Added optional `tracing` dependency and `trace` feature in Cargo.toml for developer instrumentation.
- Updated `size()` in `min_max_struct.rs` to use vector capacity instead of length for accurate accounting.
@github-actions github-actions bot added the functions Changes to functions implementation label Oct 10, 2025
@kosiew kosiew changed the title DRAFT - Adaptive MinMaxBytesAccumulator with Mode-Sensitive Dense/Sparse Processing and Comprehensive Criterion Benchmarks PoC - Adaptive MinMaxBytesAccumulator with Mode-Sensitive Dense/Sparse Processing and Comprehensive Criterion Benchmarks Oct 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

functions Changes to functions implementation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant