Skip to content

Commit

Permalink
Minor: Encapsulate LeftJoinData into a struct (rather than anonymou…
Browse files Browse the repository at this point in the history
…s enum) and add comments (#8153)

* Minor: Encapsulate LeftJoinData into a struct (rather than anonymous enum)

* clippy
  • Loading branch information
alamb committed Nov 14, 2023
1 parent 4535551 commit fcd17c8
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 20 deletions.
72 changes: 54 additions & 18 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,47 @@ use datafusion_physical_expr::EquivalenceProperties;
use ahash::RandomState;
use futures::{ready, Stream, StreamExt, TryStreamExt};

type JoinLeftData = (JoinHashMap, RecordBatch, MemoryReservation);
/// HashTable and input data for the left (build side) of a join
struct JoinLeftData {
/// The hash table with indices into `batch`
hash_map: JoinHashMap,
/// The input rows for the build side
batch: RecordBatch,
/// Memory reservation that tracks memory used by `hash_map` hash table
/// `batch`. Cleared on drop.
#[allow(dead_code)]
reservation: MemoryReservation,
}

impl JoinLeftData {
/// Create a new `JoinLeftData` from its parts
fn new(
hash_map: JoinHashMap,
batch: RecordBatch,
reservation: MemoryReservation,
) -> Self {
Self {
hash_map,
batch,
reservation,
}
}

/// Returns the number of rows in the build side
fn num_rows(&self) -> usize {
self.batch.num_rows()
}

/// return a reference to the hash map
fn hash_map(&self) -> &JoinHashMap {
&self.hash_map
}

/// returns a reference to the build side batch
fn batch(&self) -> &RecordBatch {
&self.batch
}
}

/// Join execution plan: Evaluates eqijoin predicates in parallel on multiple
/// partitions using a hash table and an optional filter list to apply post
Expand Down Expand Up @@ -692,8 +732,9 @@ async fn collect_left_input(
// Merge all batches into a single batch, so we
// can directly index into the arrays
let single_batch = concat_batches(&schema, &batches, num_rows)?;
let data = JoinLeftData::new(hashmap, single_batch, reservation);

Ok((hashmap, single_batch, reservation))
Ok(data)
}

/// Updates `hash` with new entries from [RecordBatch] evaluated against the expressions `on`,
Expand Down Expand Up @@ -770,7 +811,7 @@ struct HashJoinStream {
left_fut: OnceFut<JoinLeftData>,
/// Which left (probe) side rows have been matches while creating output.
/// For some OUTER joins, we need to know which rows have not been matched
/// to produce the correct.
/// to produce the correct output.
visited_left_side: Option<BooleanBufferBuilder>,
/// right (probe) input
right: SendableRecordBatchStream,
Expand Down Expand Up @@ -1042,13 +1083,13 @@ impl HashJoinStream {
{
// TODO: Replace `ceil` wrapper with stable `div_cell` after
// https://github.com/rust-lang/rust/issues/88581
let visited_bitmap_size = bit_util::ceil(left_data.1.num_rows(), 8);
let visited_bitmap_size = bit_util::ceil(left_data.num_rows(), 8);
self.reservation.try_grow(visited_bitmap_size)?;
self.join_metrics.build_mem_used.add(visited_bitmap_size);
}

let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
let num_rows = left_data.1.num_rows();
let num_rows = left_data.num_rows();
if need_produce_result_in_final(self.join_type) {
// Some join types need to track which row has be matched or unmatched:
// `left semi` join: need to use the bitmap to produce the matched row in the left side
Expand All @@ -1075,8 +1116,8 @@ impl HashJoinStream {

// get the matched two indices for the on condition
let left_right_indices = build_equal_condition_join_indices(
&left_data.0,
&left_data.1,
left_data.hash_map(),
left_data.batch(),
&batch,
&self.on_left,
&self.on_right,
Expand Down Expand Up @@ -1108,7 +1149,7 @@ impl HashJoinStream {

let result = build_batch_from_indices(
&self.schema,
&left_data.1,
left_data.batch(),
&batch,
&left_side,
&right_side,
Expand Down Expand Up @@ -1140,7 +1181,7 @@ impl HashJoinStream {
// use the left and right indices to produce the batch result
let result = build_batch_from_indices(
&self.schema,
&left_data.1,
left_data.batch(),
&empty_right_batch,
&left_side,
&right_side,
Expand Down Expand Up @@ -2519,16 +2560,11 @@ mod tests {
("c", &vec![30, 40]),
);

let left_data = (
JoinHashMap {
map: hashmap_left,
next,
},
left,
);
let join_hash_map = JoinHashMap::new(hashmap_left, next);

let (l, r) = build_equal_condition_join_indices(
&left_data.0,
&left_data.1,
&join_hash_map,
&left,
&right,
&[Column::new("a", 0)],
&[Column::new("a", 0)],
Expand Down
9 changes: 7 additions & 2 deletions datafusion/physical-plan/src/joins/hash_join_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,17 @@ use hashbrown::HashSet;
/// ```
pub struct JoinHashMap {
// Stores hash value to last row index
pub map: RawTable<(u64, u64)>,
map: RawTable<(u64, u64)>,
// Stores indices in chained list data structure
pub next: Vec<u64>,
next: Vec<u64>,
}

impl JoinHashMap {
#[cfg(test)]
pub(crate) fn new(map: RawTable<(u64, u64)>, next: Vec<u64>) -> Self {
Self { map, next }
}

pub(crate) fn with_capacity(capacity: usize) -> Self {
JoinHashMap {
map: RawTable::with_capacity(capacity),
Expand Down

0 comments on commit fcd17c8

Please sign in to comment.