Skip to content

Commit

Permalink
refactor emit changelog for aggregated results (backward incompatible) (
Browse files Browse the repository at this point in the history
  • Loading branch information
yl-lisen authored Feb 5, 2024
1 parent 2304993 commit 779a9fe
Show file tree
Hide file tree
Showing 14 changed files with 497 additions and 398 deletions.
10 changes: 5 additions & 5 deletions cmake/autogenerated_versions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

# NOTE: has nothing common with DBMS_TCP_PROTOCOL_VERSION,
# only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes.
SET(VERSION_REVISION 3)
SET(VERSION_REVISION 4)
SET(VERSION_MAJOR 1)
SET(VERSION_MINOR 4)
SET(VERSION_PATCH 2)
SET(VERSION_MINOR 5)
SET(VERSION_PATCH 0)
SET(VERSION_GITHASH ec89390888e1238c6b4f6887dead2097fba8522f)
SET(VERSION_DESCRIBE v1.4.2)
SET(VERSION_STRING 1.4.2)
SET(VERSION_DESCRIBE v1.5.0)
SET(VERSION_STRING 1.5.0)
# end of autochange
4 changes: 1 addition & 3 deletions src/Common/HashTable/TimeBucketHashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
UNREACHABLE();
}

ALWAYS_INLINE Int64 windowKey(const DB::SerializedKeyHolder & key_holder) { return windowKey(key_holder.key); }

protected:
typename Impl::iterator beginOfNextNonEmptyBucket(Int64 & bucket)
{
Expand Down Expand Up @@ -263,7 +261,7 @@ class TimeBucketHashTable : private boost::noncopyable, protected Hash /// empty
template <typename KeyHolder>
void ALWAYS_INLINE emplace(KeyHolder && key_holder, LookupResult & it, bool & inserted, size_t hash_value)
{
auto window = windowKey(key_holder);
auto window = windowKey(keyHolderGetKey(key_holder));
impls[window].emplace(key_holder, it, inserted, hash_value);
updated_buckets[window] = true; /// updated
}
Expand Down
7 changes: 6 additions & 1 deletion src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3237,6 +3237,10 @@ void InterpreterSelectQuery::executeStreamingAggregation(
"Streaming aggregatation group by overflow mode '{}' is not implemented",
magic_enum::enum_name(settings.group_by_overflow_mode.value));

auto tracking_updates_type = Streaming::TrackingUpdatesType::None;
if (data_stream_semantic_pair.isChangelogOutput())
tracking_updates_type = Streaming::TrackingUpdatesType::UpdatesWithRetract;

Streaming::Aggregator::Params params(
header_before_aggregation,
keys,
Expand All @@ -3261,7 +3265,8 @@ void InterpreterSelectQuery::executeStreamingAggregation(
streaming_group_by,
delta_col_pos,
window_keys_num,
query_info.streaming_window_params);
query_info.streaming_window_params,
tracking_updates_type);

auto merge_threads = max_streams;
auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads
Expand Down
551 changes: 285 additions & 266 deletions src/Interpreters/Streaming/Aggregator.cpp

Large diffs are not rendered by default.

28 changes: 16 additions & 12 deletions src/Interpreters/Streaming/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ enum class ConvertType : uint8_t
{
Normal = 0,
Updates = 1,
Retract = 2,
};

/// using TimeBucketAggregatedDataWithUInt16Key = TimeBucketHashMap<FixedImplicitZeroHashMap<UInt16, AggregateDataPtr>>;
Expand Down Expand Up @@ -128,6 +129,7 @@ SERDE struct AggregatedDataVariants : private boost::noncopyable
/// Pools for states of aggregate functions. Ownership will be later transferred to ColumnAggregateFunction.
Arenas aggregates_pools;
Arena * aggregates_pool{}; /// The pool that is currently used for allocation.
std::unique_ptr<Arena> retract_pool; /// Use separate pool to manage retract data, which will be cleared after each finalization

/** Specialization for the case when there are no keys, and for keys not fitted into max_rows_to_group_by.
*/
Expand Down Expand Up @@ -380,6 +382,8 @@ SERDE struct AggregatedDataVariants : private boost::noncopyable
aggregates_pool->enableRecycle(true);
}

void resetAndCreateRetractPool() { retract_pool = std::make_unique<Arena>(); }

/// Number of rows (different keys).
size_t size() const
{
Expand Down Expand Up @@ -779,7 +783,6 @@ class Aggregator final
size_t row_begin,
size_t row_end,
AggregatedDataVariants & result,
AggregatedDataVariants & retracted_result,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns /// Passed to not create them anew for each block
) const;
Expand Down Expand Up @@ -825,14 +828,16 @@ class Aggregator final
/// \return: merged updated data if exists, when there is no update data, return nullptr
AggregatedDataVariantsPtr mergeUpdateGroups(ManyAggregatedDataVariants & data_variants) const;

/// Only convert the retract states of update groups tracked
BlocksList convertRetractToBlocks(AggregatedDataVariants & data_variants) const;

/// \return: merged retract data if exists, when there is no retract data, return nullptr
AggregatedDataVariantsPtr mergeRetractGroups(ManyAggregatedDataVariants & data_variants) const;

/// For some streaming queries with `emit on update` or `emit changelog`, need tracking updates (with retract)
bool needTrackUpdates() const { return params.tracking_updates_type != TrackingUpdatesType::None; }
TrackingUpdatesType trackingUpdatesType() const { return params.tracking_updates_type; }

/// Used for merge changed groups and return the <retracted_state, aggregated_state> of changed groups
std::pair<AggregatedDataVariantsPtr, AggregatedDataVariantsPtr>
mergeRetractedGroups(ManyAggregatedDataVariants & aggregated_data, ManyAggregatedDataVariants & retracted_data) const;

std::vector<Int64> bucketsBefore(const AggregatedDataVariants & result, Int64 max_bucket) const;
void removeBucketsBefore(AggregatedDataVariants & result, Int64 max_bucket) const;

Expand Down Expand Up @@ -1068,18 +1073,17 @@ class Aggregator final
bool executeAndRetractImpl(
Method & method,
Arena * aggregates_pool,
Method & retracted_method,
Arena * retracted_pool,
Arena * retract_pool,
size_t row_begin,
size_t row_end,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions) const;

template <typename Method>
void mergeRetractedGroupsImpl(ManyAggregatedDataVariants & aggregated_data, ManyAggregatedDataVariants & retracted_data) const;

template <typename Method, bool is_two_level>
void mergeUpdateGroupsImpl(ManyAggregatedDataVariants & non_empty_data, Arena * arena) const;

template <typename Method>
void mergeRetractGroupsImpl(ManyAggregatedDataVariants & non_empty_data, Arena * arena) const;
/// proton: ends.

Block prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_variants, bool final, bool clear_states, ConvertType type = ConvertType::Normal) const;
Expand Down Expand Up @@ -1137,10 +1141,10 @@ class Aggregator final
public:
/// Existed versions:
/// STATE V1 - Legacy version (REVISION 1)
/// STATE V2 - REVISION 1 (Enable revision)
/// STATE V2 - REVISION 1 (Enable revision increment)
/// STATE V3 - REVISION 3 (Add updates tracking state)
static constexpr UInt64 STATE_V2_MIN_REVISION = 1;
// static constexpr UInt64 STATE_V3_MIN_REVISION = 3; /// will enable it later
static constexpr UInt64 STATE_V3_MIN_REVISION = 3;

VersionType getVersionFromRevision(UInt64 revision) const;
VersionType getVersion() const;
Expand Down
17 changes: 15 additions & 2 deletions src/Interpreters/Streaming/TrackingUpdatesData.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ namespace Streaming
SERDE struct TrackingUpdates
{
static ALWAYS_INLINE TrackingUpdates & data(AggregateDataPtr __restrict place) { return *reinterpret_cast<TrackingUpdates *>(place); }
static ALWAYS_INLINE const TrackingUpdates & data(ConstAggregateDataPtr __restrict place) { return *reinterpret_cast<const TrackingUpdates *>(place); }
static ALWAYS_INLINE const TrackingUpdates & data(ConstAggregateDataPtr __restrict place)
{
return *reinterpret_cast<const TrackingUpdates *>(place);
}

static ALWAYS_INLINE bool empty(ConstAggregateDataPtr __restrict place) { return data(place).updates == 0; }
static ALWAYS_INLINE bool updated(ConstAggregateDataPtr __restrict place) { return data(place).updated_since_last_finalization; }
Expand Down Expand Up @@ -91,14 +94,24 @@ SERDE struct TrackingUpdates
/// Used to track if the target to be tracked has zero sum changes
UInt64 updates = 0;

/// Used to track if the target group tracked has updates since last finalization
/// Used to track if the target group tracked has updates since last finalization
bool updated_since_last_finalization = true;
};

SERDE struct TrackingUpdatesWithRetract : TrackingUpdates
{
static ALWAYS_INLINE AggregateDataPtr & getRetract(AggregateDataPtr & place) { return reinterpret_cast<TrackingUpdatesWithRetract *>(place)->retract_data; }
static ALWAYS_INLINE bool hasRetract(ConstAggregateDataPtr __restrict place) { return reinterpret_cast<const TrackingUpdatesWithRetract *>(place)->retract_data; }

/// Used to track changes for the target to be tracked
AggregateDataPtr retract_data = nullptr;
};

enum class TrackingUpdatesType : uint8_t
{
None = 0,
Updates = 1,
UpdatesWithRetract = 2,
};

}
Expand Down
113 changes: 74 additions & 39 deletions src/Processors/Transforms/Streaming/AggregatingHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,30 @@ Chunk mergeBlocksToChunk(BlocksList && blocks)
return merged_chunk;
}

Chunk convertToChunkImpl(AggregatedDataVariants & data, const AggregatingTransformParams & params)
Chunk convertToChunkImpl(AggregatedDataVariants & data, const AggregatingTransformParams & params, ConvertType type)
{
if (data.empty())
return {};

BlocksList blocks = params.aggregator.convertToBlocks(data, params.final, params.params.max_threads);
BlocksList blocks;
switch (type)
{
case ConvertType::Updates:
{
blocks = params.aggregator.convertUpdatesToBlocks(data);
break;
}
case ConvertType::Retract:
{
blocks = params.aggregator.convertRetractToBlocks(data);
break;
}
case ConvertType::Normal:
{
blocks = params.aggregator.convertToBlocks(data, params.final, params.params.max_threads);
break;
}
}

/// FIXME: When global aggr states was converted two level hash table, the merged chunk may be too large
return mergeBlocksToChunk(std::move(blocks));
Expand All @@ -46,7 +64,7 @@ namespace AggregatingHelper
{
Chunk convertToChunk(AggregatedDataVariants & data, const AggregatingTransformParams & params)
{
return convertToChunkImpl(data, params);
return convertToChunkImpl(data, params, ConvertType::Normal);
}

Chunk mergeAndConvertToChunk(ManyAggregatedDataVariants & data, const AggregatingTransformParams & params)
Expand All @@ -68,42 +86,59 @@ Chunk mergeAndSpliceAndConvertBucketsToChunk(
return convertToChunk(params.aggregator.mergeAndSpliceAndConvertBucketsToBlock(data, params.final, buckets));
}

ChunkPair
convertToChangelogChunk(AggregatedDataVariants & data, RetractedDataVariants & retracted_data, const AggregatingTransformParams & params)
{
if (data.empty())
return {};

assert(!retracted_data.empty());

auto retracted_chunk = convertToChunk(retracted_data, params);
if (retracted_chunk)
{
auto retracted_delta_col = ColumnInt8::create(retracted_chunk.rows(), Int8(-1));
retracted_chunk.addColumn(std::move(retracted_delta_col));
retracted_chunk.setConsecutiveDataFlag();
}
retracted_data.reset(); /// Clean up retract data after finalized

auto chunk = convertToChunk(data, params);
if (chunk)
{
auto delta_col = ColumnInt8::create(chunk.rows(), Int8(1));
chunk.addColumn(std::move(delta_col));
}

return {std::move(retracted_chunk), std::move(chunk)};
}

ChunkPair mergeAndConvertToChangelogChunk(
ManyAggregatedDataVariants & data, ManyRetractedDataVariants & retracted_data, const AggregatingTransformParams & params)
{
auto [merged_data, merged_retracted_data] = params.aggregator.mergeRetractedGroups(data, retracted_data);
if (!merged_data)
return {};

assert(merged_retracted_data);
return convertToChangelogChunk(*merged_data, *merged_retracted_data, params);
ChunkPair convertToChangelogChunk(AggregatedDataVariants & data, const AggregatingTransformParams & params)
{
if (data.empty())
return {};

auto retracted_chunk = convertToChunkImpl(data, params, ConvertType::Retract);
if (retracted_chunk)
{
auto retracted_delta_col = ColumnInt8::create(retracted_chunk.rows(), Int8(-1));
retracted_chunk.addColumn(std::move(retracted_delta_col));
retracted_chunk.setConsecutiveDataFlag();
}

auto chunk = convertToChunkImpl(data, params, ConvertType::Updates);
if (chunk)
{
auto delta_col = ColumnInt8::create(chunk.rows(), Int8(1));
chunk.addColumn(std::move(delta_col));
}
return {std::move(retracted_chunk), std::move(chunk)};
}

ChunkPair mergeAndConvertToChangelogChunk(ManyAggregatedDataVariants & data, const AggregatingTransformParams & params)
{
if (data.size() == 1)
return convertToChangelogChunk(*data[0], params);

ChunkPair results;
auto & [retracted_chunk, chunk] = results;

auto merged_retracted_data = params.aggregator.mergeRetractGroups(data);
if (merged_retracted_data)
{
retracted_chunk = convertToChunk(*merged_retracted_data, params);
if (retracted_chunk)
{
auto retracted_delta_col = ColumnInt8::create(retracted_chunk.rows(), Int8(-1));
retracted_chunk.addColumn(std::move(retracted_delta_col));
retracted_chunk.setConsecutiveDataFlag();
}
}

auto merged_updated_data = params.aggregator.mergeUpdateGroups(data);
if (merged_updated_data)
{
chunk = convertToChunk(*merged_updated_data, params);
if (chunk)
{
auto delta_col = ColumnInt8::create(chunk.rows(), Int8(1));
chunk.addColumn(std::move(delta_col));
}
}
return results;
}
}
}
Expand Down
21 changes: 10 additions & 11 deletions src/Processors/Transforms/Streaming/AggregatingHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,16 @@ Chunk spliceAndConvertBucketsToChunk(
Chunk mergeAndSpliceAndConvertBucketsToChunk(
ManyAggregatedDataVariants & data, const AggregatingTransformParams & params, const std::vector<Int64> & buckets);

/// Only used for emit changelog
/// @brief Based on new/updated groups @p retracted_data , only convert the state of changed groups (retracted: last state, aggregated: current state)
/// \data: current aggregated state of all groups
/// \retracted_data: only have last state of changed groups (i.e. new/updated/deleted)
/// @returns <retracted_chunk, aggregated_chunk>
/// retracted_chunk: just contains retracted data of changed groups
/// aggregated_chunk: just contains aggregated data of changed groups
ChunkPair
convertToChangelogChunk(AggregatedDataVariants & data, RetractedDataVariants & retracted_data, const AggregatingTransformParams & params);
ChunkPair mergeAndConvertToChangelogChunk(
ManyAggregatedDataVariants & data, ManyRetractedDataVariants & retracted_data, const AggregatingTransformParams & params);
/// Changelog chunk converters are used for changelog emit. They can return a pair of chunks : one
/// for retraction and one for updates. And those 2 chunks are expected to be passed to downstream
/// consecutively otherwise the down stream aggregation result may not be correct or emit incorrect
/// intermediate results. To facilitate the downstream processing, we usually mark the `consecutive`
/// flag bit for these chunks.
/// \return {retract_chunk, update_chunk} pair, retract_chunk if not empty, contains retract data
/// because of the current updates; update_chunk if not empty, contains the result for the
/// latest update data
ChunkPair convertToChangelogChunk(AggregatedDataVariants & data, const AggregatingTransformParams & params);
ChunkPair mergeAndConvertToChangelogChunk(ManyAggregatedDataVariants & data, const AggregatingTransformParams & params);
}

}
Expand Down
Loading

0 comments on commit 779a9fe

Please sign in to comment.