Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable proton revision for checkpoint #495

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions base/base/SerdeTag.h

This file was deleted.

36 changes: 36 additions & 0 deletions src/Common/serde.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include <IO/VarInt.h>
#include <base/types.h>
#include <Common/VersionRevision.h>

namespace DB
{
/// REQUIRES: The object must support versioned serialization/deserialization
template <typename S, typename WB, typename... Args>
concept VersionedSerializable
= requires(const S & s, WB & wb, VersionType version, Args &&... args) { s.serialize(wb, version, std::forward<Args>(args)...); };

template <typename S, typename RB, typename... Args>
concept VersionedDeserializable
= requires(S & s, RB & rb, VersionType version, Args &&... args) { s.deserialize(rb, version, std::forward<Args>(args)...); };

template <typename WB, typename... Args, VersionedSerializable<WB, Args...> S>
void ALWAYS_INLINE serialize(const S & s, WB & wb, VersionType version, Args &&... args)
{
s.serialize(wb, version, std::forward<Args>(args)...);
}

template <typename RB, typename... Args, VersionedDeserializable<RB, Args...> S>
void ALWAYS_INLINE deserialize(S & s, RB & rb, VersionType version, Args &&... args)
{
s.deserialize(rb, version, std::forward<Args>(args)...);
}

/// macro tag to indicate the data members or struct or class will
/// be serialized / deserialized via network or file system IO.
/// Hence, data structure versioning / backward / forward compatibility
/// are concerns
#define SERDE
#define NO_SERDE
}
18 changes: 12 additions & 6 deletions src/Interpreters/Streaming/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ namespace ErrorCodes
extern const int RECOVER_CHECKPOINT_FAILED;
extern const int AGGREGATE_FUNCTION_NOT_APPLICABLE;
extern const int NOT_IMPLEMENTED;
extern const int SERVER_REVISION_IS_TOO_OLD;
}

namespace Streaming
Expand Down Expand Up @@ -3336,13 +3337,15 @@ void Aggregator::recover(AggregatedDataVariants & data_variants, ReadBuffer & rb
{
/// Serialization layout
/// [version] + [states layout]
VersionType version = 0;
readIntBinary(version, rb);
VersionType recovered_version = 0;
readIntBinary(recovered_version, rb);
assert(recovered_version <= getVersion());

/// FIXME: Legacy layout needs to be cleaned after no use
if (version <= 1)
if (recovered_version <= 1)
return doRecoverLegacy(data_variants, rb);

/// Recover STATE V2
return doRecover(data_variants, rb);
}

Expand Down Expand Up @@ -3735,10 +3738,13 @@ bool Aggregator::shouldClearStates(ConvertAction action, bool final_) const
}
}

VersionType Aggregator::getVersionFromRevision(UInt64 revision [[maybe_unused]]) const
VersionType Aggregator::getVersionFromRevision(UInt64 revision) const
{
/// FIXME: Enable @p revision ? always 1 for now
return static_cast<VersionType>(2);
if (revision >= STATE_V2_MIN_REVISION)
return static_cast<VersionType>(2);
else
throw Exception(
ErrorCodes::SERVER_REVISION_IS_TOO_OLD, "State of AggregatedDataVariants is not yet implemented in revision {}", revision);
}

VersionType Aggregator::getVersion() const
Expand Down
5 changes: 5 additions & 0 deletions src/Interpreters/Streaming/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,11 @@ class Aggregator final
VersionType getVersion() const;

public:
/// Existed versions:
/// STATE VERSION 1 - Legacy version
/// STATE VERSION 2 - REVISION 1 (Enable revision)
static constexpr UInt64 STATE_V2_MIN_REVISION = 1;

void checkpoint(const AggregatedDataVariants & data_variants, WriteBuffer & wb);
void recover(AggregatedDataVariants & data_variants, ReadBuffer & rb);

Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/Streaming/HashJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include <Interpreters/AggregationCommon.h>
#include <Interpreters/TableJoin.h>
#include <base/SerdeTag.h>
#include <Common/serde.h>
#include <Common/ColumnUtils.h>
#include <Common/HashMapSizes.h>
#include <Common/HashMapsTemplate.h>
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/Streaming/RefCountDataBlockList.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <Interpreters/Streaming/CachedBlockMetrics.h>
#include <Interpreters/Streaming/RefCountDataBlock.h>
#include <Interpreters/Streaming/joinSerder_fwd.h>
#include <base/SerdeTag.h>
#include <Common/serde.h>
#include <base/defines.h>

#include <list>
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/Streaming/joinData.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include <Core/Block.h>
#include <Core/BlockRangeSplitter.h>
#include <Core/LightChunk.h>
#include <base/SerdeTag.h>
#include <Common/serde.h>
#include <Common/Arena.h>
#include <Common/HashMapSizes.h>

Expand Down
1 change: 1 addition & 0 deletions src/Processors/IProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ class IProcessor
virtual void recover(CheckpointContextPtr) { }

VersionType getVersion() const;
void setVersion(VersionType version_) { version = version_; }

protected:
bool is_streaming = false;
Expand Down
6 changes: 3 additions & 3 deletions src/Processors/Streaming/LimitTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ void LimitTransform::checkpoint(CheckpointContextPtr ckpt_ctx)
if (has_previous_row_chunk)
{
writeBoolText(true, wb);
SimpleNativeWriter<Chunk> writer(wb, getOutputPort().getHeader(), ProtonRevision::getVersionRevision());
SimpleNativeWriter<Chunk> writer(wb, getOutputPort().getHeader(), getVersion());
writer.write(previous_row_chunk);
}

Expand All @@ -460,12 +460,12 @@ void LimitTransform::checkpoint(CheckpointContextPtr ckpt_ctx)

void LimitTransform::recover(CheckpointContextPtr ckpt_ctx)
{
ckpt_ctx->coordinator->recover(getLogicID(), ckpt_ctx, [this](VersionType /*version*/, ReadBuffer & rb) {
ckpt_ctx->coordinator->recover(getLogicID(), ckpt_ctx, [this](VersionType version_, ReadBuffer & rb) {
bool has_previous_row_chunk;
readBoolText(has_previous_row_chunk, rb);
if (has_previous_row_chunk)
{
SimpleNativeReader<Chunk> reader(rb, getOutputPort().getHeader(), ProtonRevision::getVersionRevision());
SimpleNativeReader<Chunk> reader(rb, getOutputPort().getHeader(), version_);
previous_row_chunk = reader.read();
}

Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Streaming/LimitTransform.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <Processors/IProcessor.h>
#include <Processors/RowsBeforeLimitCounter.h>
#include <Core/SortDescription.h>
#include <base/SerdeTag.h>
#include <Common/serde.h>
namespace DB
{
namespace Streaming
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Streaming/OffsetTransform.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <Processors/IProcessor.h>
#include <Processors/RowsBeforeLimitCounter.h>
#include <Core/SortDescription.h>
#include <base/SerdeTag.h>
#include <Common/serde.h>

namespace DB
{
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Transforms/DistinctTransform.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <Interpreters/SetVariants.h>

/// proton: starts.
#include <base/SerdeTag.h>
#include <Common/serde.h>
/// proton: ends.

namespace DB
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Transforms/ExpressionTransform.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <Processors/ISimpleTransform.h>

/// proton: starts.
#include <base/SerdeTag.h>
#include <Common/serde.h>
/// proton: ends.

namespace DB
Expand Down
14 changes: 7 additions & 7 deletions src/Processors/Transforms/Streaming/AggregatingTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,12 @@ void AggregatingTransform::emitVersion(Chunk & chunk)
auto col = params->version_type->createColumn();
col->reserve(rows);
for (size_t i = 0; i < rows; i++)
col->insert(many_data->version++);
col->insert(many_data->emited_version++);
chunk.addColumn(std::move(col));
}
else
{
Int64 version = many_data->version++;
Int64 version = many_data->emited_version++;
chunk.addColumn(params->version_type->createColumnConst(rows, version)->convertToFullColumnIfConst());
}
}
Expand Down Expand Up @@ -432,7 +432,7 @@ void AggregatingTransform::checkpoint(CheckpointContextPtr ckpt_ctx)

DB::writeIntBinary(many_data->finalized_watermark.load(std::memory_order_relaxed), wb);
DB::writeIntBinary(many_data->finalized_window_end.load(std::memory_order_relaxed), wb);
DB::writeIntBinary(many_data->version.load(std::memory_order_relaxed), wb);
DB::writeIntBinary(many_data->emited_version.load(std::memory_order_relaxed), wb);

assert(num_variants == many_data->rows_since_last_finalizations.size());
for (const auto & last_row : many_data->rows_since_last_finalizations)
Expand All @@ -441,7 +441,7 @@ void AggregatingTransform::checkpoint(CheckpointContextPtr ckpt_ctx)
bool has_field = many_data->hasField();
DB::writeBoolText(has_field, wb);
if (has_field)
many_data->any_field.serializer(many_data->any_field.field, wb);
many_data->any_field.serializer(many_data->any_field.field, wb, getVersion());
}

/// Serializing no shared data
Expand All @@ -458,7 +458,7 @@ void AggregatingTransform::checkpoint(CheckpointContextPtr ckpt_ctx)

void AggregatingTransform::recover(CheckpointContextPtr ckpt_ctx)
{
ckpt_ctx->coordinator->recover(getLogicID(), ckpt_ctx, [this](VersionType /*version*/, ReadBuffer & rb) {
ckpt_ctx->coordinator->recover(getLogicID(), ckpt_ctx, [this](VersionType version_, ReadBuffer & rb) {
bool is_last_checkpointing_transform;
DB::readBoolText(is_last_checkpointing_transform, rb);

Expand All @@ -484,7 +484,7 @@ void AggregatingTransform::recover(CheckpointContextPtr ckpt_ctx)

Int64 last_version = 0;
DB::readIntBinary(last_version, rb);
many_data->version = last_version;
many_data->emited_version = last_version;

assert(num_variants == many_data->rows_since_last_finalizations.size());
for (auto & rows_since_last_finalization : many_data->rows_since_last_finalizations)
Expand All @@ -497,7 +497,7 @@ void AggregatingTransform::recover(CheckpointContextPtr ckpt_ctx)
bool has_field;
DB::readBoolText(has_field, rb);
if (has_field)
many_data->any_field.deserializer(many_data->any_field.field, rb);
many_data->any_field.deserializer(many_data->any_field.field, rb, version_);
}

/// Serializing local or stable data during checkpointing
Expand Down
8 changes: 4 additions & 4 deletions src/Processors/Transforms/Streaming/AggregatingTransform.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <DataTypes/DataTypeFactory.h>
#include <Processors/IProcessor.h>
#include <Common/Stopwatch.h>
#include <base/SerdeTag.h>
#include <Common/serde.h>

#include <any>

Expand Down Expand Up @@ -73,7 +73,7 @@ SERDE struct ManyAggregatedData
SERDE std::atomic<Int64> finalized_watermark = INVALID_WATERMARK;
SERDE std::atomic<Int64> finalized_window_end = INVALID_WATERMARK;

SERDE std::atomic<Int64> version = 0;
SERDE std::atomic<Int64> emited_version = 0;

SERDE std::vector<std::unique_ptr<std::atomic<UInt64>>> rows_since_last_finalizations;

Expand All @@ -84,8 +84,8 @@ SERDE struct ManyAggregatedData
SERDE struct AnyField
{
SERDE std::any field;
std::function<void(const std::any &, WriteBuffer &)> serializer;
std::function<void(std::any &, ReadBuffer &)> deserializer;
std::function<void(const std::any &, WriteBuffer &, VersionType)> serializer;
std::function<void(std::any &, ReadBuffer &, VersionType)> deserializer;
} any_field;

explicit ManyAggregatedData(size_t num_threads) : variants(num_threads), watermarks(num_threads, INVALID_WATERMARK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,17 +165,17 @@ void AggregatingTransformWithSubstream::emitVersion(Chunk & chunk, const Substre
size_t rows = chunk.rows();
if (params->params.group_by == Aggregator::Params::GroupBy::USER_DEFINED)
{
/// For UDA with own emit strategy, possibly a block can trigger multiple emits for a substream, each emit cause version+1
/// For UDA with own emit strategy, possibly a block can trigger multiple emits for a substream, each emit cause emited_version+1
/// each emit only has one result, therefore we can count emit times by row number
auto col = params->version_type->createColumn();
col->reserve(rows);
for (size_t i = 0; i < rows; i++)
col->insert(substream_ctx->version++);
col->insert(substream_ctx->emited_version++);
chunk.addColumn(std::move(col));
}
else
{
Int64 version = substream_ctx->version++;
Int64 version = substream_ctx->emited_version++;
chunk.addColumn(params->version_type->createColumnConst(rows, version)->convertToFullColumnIfConst());
}
}
Expand Down Expand Up @@ -250,60 +250,60 @@ void AggregatingTransformWithSubstream::checkpoint(CheckpointContextPtr ckpt_ctx
for (const auto & [id, substream_ctx] : substream_contexts)
{
assert(id == substream_ctx->id);
substream_ctx->serialize(wb);
substream_ctx->serialize(wb, getVersion());
}
});
}

void AggregatingTransformWithSubstream::recover(CheckpointContextPtr ckpt_ctx)
{
ckpt_ctx->coordinator->recover(getLogicID(), ckpt_ctx, [this](VersionType /*version*/, ReadBuffer & rb) {
ckpt_ctx->coordinator->recover(getLogicID(), ckpt_ctx, [this](VersionType version_, ReadBuffer & rb) {
size_t num_substreams;
readIntBinary(num_substreams, rb);
substream_contexts.reserve(num_substreams);
for (size_t i = 0; i < num_substreams; ++i)
{
auto substream_ctx = std::make_shared<SubstreamContext>(this);
substream_ctx->deserialize(rb);
substream_ctx->deserialize(rb, version_);
substream_contexts.emplace(substream_ctx->id, std::move(substream_ctx));
}
});
}

void SubstreamContext::serialize(WriteBuffer & wb) const
void SubstreamContext::serialize(WriteBuffer & wb, VersionType version) const
{
DB::Streaming::serialize(id, wb);

aggregating_transform->params->aggregator.checkpoint(variants, wb);

DB::writeIntBinary(finalized_watermark, wb);

DB::writeIntBinary(version, wb);
DB::writeIntBinary(emited_version, wb);

DB::writeIntBinary(rows_since_last_finalization, wb);

bool has_field = hasField();
DB::writeBoolText(has_field, wb);
if (has_field)
any_field.serializer(any_field.field, wb);
any_field.serializer(any_field.field, wb, version);
}

void SubstreamContext::deserialize(ReadBuffer & rb)
void SubstreamContext::deserialize(ReadBuffer & rb, VersionType version)
{
DB::Streaming::deserialize(id, rb);

aggregating_transform->params->aggregator.recover(variants, rb);

DB::readIntBinary(finalized_watermark, rb);

DB::readIntBinary(version, rb);
DB::readIntBinary(emited_version, rb);

DB::readIntBinary(rows_since_last_finalization, rb);

bool has_field;
DB::readBoolText(has_field, rb);
if (has_field)
any_field.deserializer(any_field.field, rb);
any_field.deserializer(any_field.field, rb, version);
}

}
Expand Down
Loading
Loading