Skip to content

Commit

Permalink
refactor(stream,agg): extract ensure_keys_in_cache function to make…
Browse files Browse the repository at this point in the history
… code clean (#8098)

As title.

Approved-By: soundOfDestiny
  • Loading branch information
stdrc authored Feb 21, 2023
1 parent 9b2cb59 commit 0759ad6
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 36 deletions.
6 changes: 3 additions & 3 deletions src/stream/src/executor/global_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,6 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {
vars: &mut ExecutionVars<S>,
chunk: StreamChunk,
) -> StreamExecutorResult<()> {
// Mark state as changed.
vars.state_changed = true;

// Decompose the input chunk.
let capacity = chunk.capacity();
let (ops, columns, visibility) = chunk.into_inner();
Expand Down Expand Up @@ -186,6 +183,9 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {
)
.await?;

// Mark state as changed.
vars.state_changed = true;

Ok(())
}

Expand Down
81 changes: 48 additions & 33 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ struct ExecutionVars<K: HashKey, S: StateStore> {
}

struct ExecutionStats {
/// How many times have we hit the cache of join executor for the lookup of each key.
/// How many times have we hit the cache of hash agg executor for the lookup of each key.
lookup_miss_count: u64,
total_lookup_count: u64,

/// How many times have we hit the cache of join executor for all the lookups generated by one
/// StreamChunk.
/// How many times have we hit the cache of hash agg executor for all the lookups generated by
/// one StreamChunk.
chunk_lookup_miss_count: u64,
chunk_total_lookup_count: u64,
}
Expand Down Expand Up @@ -219,28 +219,24 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
.collect()
}

async fn apply_chunk(
async fn ensure_keys_in_cache(
this: &mut ExecutorInner<K, S>,
vars: &mut ExecutionVars<K, S>,
chunk: StreamChunk,
cache: &mut AggGroupCache<K, S>,
keys: impl IntoIterator<Item = &K>,
stats: &mut ExecutionStats,
) -> StreamExecutorResult<()> {
// Find groups in this chunk and generate visibility for each group key.
let keys = K::build(&this.group_key_indices, chunk.data_chunk())?;
let group_visibilities = Self::get_group_visibilities(keys, chunk.visibility());

let group_key_types = &this.info.schema.data_types()[..this.group_key_indices.len()];

let futs = group_visibilities
.iter()
.filter_map(|(key, _)| {
vars.stats.total_lookup_count += 1;
if vars.agg_group_cache.contains(key) {
let futs = keys
.into_iter()
.filter_map(|key| {
stats.total_lookup_count += 1;
if cache.contains(key) {
None
} else {
vars.stats.lookup_miss_count += 1;
stats.lookup_miss_count += 1;
Some(async {
// Create `AggGroup` for the current group if not exists. This will fetch
// previous agg result from the result table.
// Create `AggGroup` for the current group if not exists. This will
// fetch previous agg result from the result table.
let agg_group = Box::new(
AggGroup::create(
Some(key.deserialize(group_key_types)?),
Expand All @@ -259,17 +255,36 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
})
.collect_vec(); // collect is necessary to avoid lifetime issue of `agg_group_cache`

// If not all the required states/keys are in the cache, this is a chunk-level cache miss.
stats.chunk_total_lookup_count += 1;
if !futs.is_empty() {
vars.stats.chunk_lookup_miss_count += 1;
}
vars.stats.chunk_total_lookup_count += 1;
let mut buffered = stream::iter(futs).buffer_unordered(10).fuse();
while let Some(result) = buffered.next().await {
let (key, agg_group) = result?;
vars.agg_group_cache.put(key, agg_group);
// If not all the required states/keys are in the cache, it's a chunk-level cache miss.
stats.chunk_lookup_miss_count += 1;
let mut buffered = stream::iter(futs).buffer_unordered(10).fuse();
while let Some(result) = buffered.next().await {
let (key, agg_group) = result?;
cache.put(key, agg_group);
}
}
drop(buffered); // drop to avoid accidental use
Ok(())
}

async fn apply_chunk(
this: &mut ExecutorInner<K, S>,
vars: &mut ExecutionVars<K, S>,
chunk: StreamChunk,
) -> StreamExecutorResult<()> {
// Find groups in this chunk and generate visibility for each group key.
let keys = K::build(&this.group_key_indices, chunk.data_chunk())?;
let group_visibilities = Self::get_group_visibilities(keys, chunk.visibility());

// Create `AggGroup` for each group if not exists.
Self::ensure_keys_in_cache(
this,
&mut vars.agg_group_cache,
group_visibilities.iter().map(|(k, _)| k),
&mut vars.stats,
)
.await?;

// Decompose the input chunk.
let capacity = chunk.capacity();
Expand Down Expand Up @@ -311,14 +326,12 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
});

// Apply chunk to each of the state (per agg_call), for each group.
for (key, visibility) in &group_visibilities {
// Mark the group as changed.
vars.group_change_set.insert(key.clone());
let agg_group = vars.agg_group_cache.get_mut(key).unwrap().as_mut();
for (key, visibility) in group_visibilities {
let agg_group = vars.agg_group_cache.get_mut(&key).unwrap().as_mut();
let visibilities = call_visibilities
.iter()
.map(Option::as_ref)
.map(|call_vis| call_vis.map_or_else(|| visibility.clone(), |v| v & visibility))
.map(|call_vis| call_vis.map_or_else(|| visibility.clone(), |v| v & &visibility))
.map(Some)
.collect();
agg_group
Expand All @@ -330,6 +343,8 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
&mut this.distinct_dedup_tables,
)
.await?;
// Mark the group as changed.
vars.group_change_set.insert(key);
}

Ok(())
Expand Down

0 comments on commit 0759ad6

Please sign in to comment.