Skip to content

Commit

Permalink
skip checkpointing if there is no new data processed (#649)
Browse files Browse the repository at this point in the history
Co-authored-by: Ken Chen <zlchen.ken@gmail.com>
Co-authored-by: yoko <haohang.shi@timeplus.io>
  • Loading branch information
3 people authored Apr 10, 2024
1 parent 996c5f3 commit a5e87b4
Show file tree
Hide file tree
Showing 21 changed files with 194 additions and 165 deletions.
1 change: 0 additions & 1 deletion .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ ConstructorInitializerIndentWidth: 4
ContinuationIndentWidth: 4
DerivePointerAlignment: false
DisableFormat: false
IndentWidth: 4
IndentWrappedFunctionNames: false
MacroBlockBegin: ''
MacroBlockEnd: ''
Expand Down
151 changes: 81 additions & 70 deletions src/Checkpoint/CheckpointCoordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,8 @@ void CheckpointCoordinator::removeCheckpoint(const String & qid)

void CheckpointCoordinator::triggerCheckpoint(const String & qid, UInt64 checkpoint_interval)
{
Int64 next_epoch = 0;

String node_desc;
std::optional<std::weak_ptr<PipelineExecutor>> executor;
std::weak_ptr<PipelineExecutor> executor;
CheckpointContextPtr ckpt_ctx;
{
std::scoped_lock lock(mutex);

Expand All @@ -262,22 +260,23 @@ void CheckpointCoordinator::triggerCheckpoint(const String & qid, UInt64 checkpo

if (iter->second->current_epoch != 0)
{
next_epoch = iter->second->current_epoch;
node_desc = iter->second->outstandingAckNodeDescriptions();
ckpt_ctx = std::make_shared<CheckpointContext>(iter->second->current_epoch, qid, this);
}
else
{
/// Notify the source processor to start checkpoint in a new epoch
next_epoch = iter->second->last_epoch + 1;
executor = iter->second->executor;
ckpt_ctx = std::make_shared<CheckpointContext>(iter->second->last_epoch + 1, qid, this);
iter->second->current_epoch = ckpt_ctx->epoch; /// new epoch in process
}
}

if (doTriggerCheckpoint(executor, qid, next_epoch, node_desc))
if (doTriggerCheckpoint(executor, std::move(ckpt_ctx)))
timer_service.runAfter(
checkpoint_interval, [query_id = qid, interval = checkpoint_interval, this]() { triggerCheckpoint(query_id, interval); });
else
timer_service.runAfter(15, [query_id = qid, interval = checkpoint_interval, this]() { triggerCheckpoint(query_id, interval); });
/// Retry after interval - min(15s, checkpoint_interval)
timer_service.runAfter(std::min<double>(15, checkpoint_interval), [query_id = qid, interval = checkpoint_interval, this]() { triggerCheckpoint(query_id, interval); });
}

void CheckpointCoordinator::preCheckpoint(DB::CheckpointContextPtr ckpt_ctx)
Expand Down Expand Up @@ -309,9 +308,7 @@ void CheckpointCoordinator::checkpointed(VersionType /*version*/, UInt32 node_id
/// Unregistered
return;

assert(ckpt_ctx->epoch == iter->second->current_epoch || iter->second->current_epoch == 0);

iter->second->current_epoch = ckpt_ctx->epoch;
assert(ckpt_ctx->epoch == iter->second->current_epoch);

if (iter->second->ack(node_id))
{
Expand Down Expand Up @@ -387,58 +384,60 @@ void CheckpointCoordinator::removeExpiredCheckpoints(bool delete_marked)
timer_service.runAfter(last_access_check_interval, [this]() { removeExpiredCheckpoints(false); });
}

bool CheckpointCoordinator::doTriggerCheckpoint(
const std::optional<std::weak_ptr<PipelineExecutor>> & executor, std::string_view qid, Int64 next_epoch, std::string_view node_desc)
bool CheckpointCoordinator::doTriggerCheckpoint(const std::weak_ptr<PipelineExecutor> & executor, CheckpointContextPtr ckpt_ctx)
{
if (executor.has_value())
{
/// Create directory before hand. Then all other processors don't need
/// check and create target epoch ckpt directory.
try
{
auto ckpt_ctx = std::make_shared<CheckpointContext>(next_epoch, qid, this);
preCheckpoint(ckpt_ctx);

{
auto exec = executor.value().lock();
if (!exec)
{
LOG_ERROR(
logger,
"Failed to trigger checkpointing state for query={} epoch={}, since it's already cancelled",
qid,
next_epoch);
return false;
}

exec->triggerCheckpoint(std::move(ckpt_ctx));
}
bool triggered = false;
SCOPE_EXIT({
if (!triggered)
resetCurrentCheckpointEpoch(ckpt_ctx->qid);
});

LOG_INFO(logger, "Triggered checkpointing state for query={} epoch={}", qid, next_epoch);
return true;
}
catch (const Exception & e)
{
LOG_ERROR(logger, "Failed to trigger checkpointing state for query={} epoch={} error={}", qid, next_epoch, e.message());
}
catch (const std::exception & ex)
/// Create directory before hand. Then all other processors don't need
/// check and create target epoch ckpt directory.
try
{
auto exec = executor.lock();
if (!exec)
{
LOG_ERROR(logger, "Failed to trigger checkpointing state for query={} epoch={} error={}", qid, next_epoch, ex.what());
LOG_ERROR(
logger,
"Failed to trigger checkpointing state for query={} epoch={}, since prev checkpoint is still in-progress or it was "
"already cancelled",
ckpt_ctx->qid,
ckpt_ctx->epoch);
return false;
}
catch (...)

if (!exec->hasProcessedNewDataSinceLastCheckpoint())
{
tryLogCurrentException(logger, fmt::format("Failed to trigger checkpointing state for query={} epoch={}", qid, next_epoch));
LOG_INFO(
logger,
"Skipped checkpointing state for query={} epoch={}, since there is no new data processed",
ckpt_ctx->qid,
ckpt_ctx->epoch);
return false;
}

preCheckpoint(ckpt_ctx);

exec->triggerCheckpoint(ckpt_ctx);

triggered = true;

LOG_INFO(logger, "Triggered checkpointing state for query={} epoch={}", ckpt_ctx->qid, ckpt_ctx->epoch);
return true;
}
else
catch (const Exception & e)
{
LOG_ERROR(logger, "Failed to trigger checkpointing state for query={} epoch={} error={}", ckpt_ctx->qid, ckpt_ctx->epoch, e.message());
}
catch (const std::exception & ex)
{
LOG_ERROR(logger, "Failed to trigger checkpointing state for query={} epoch={} error={}", ckpt_ctx->qid, ckpt_ctx->epoch, ex.what());
}
catch (...)
{
/// prev ckpt is still in progress, reschedule to check later
LOG_INFO(
logger,
"Prev checkpoint for query={} epoch={} is still in-progress, outstanding_ack_node_descriptions={}",
qid,
next_epoch,
node_desc);
tryLogCurrentException(logger, fmt::format("Failed to trigger checkpointing state for query={} epoch={}", ckpt_ctx->qid, ckpt_ctx->epoch));
}
return false;
}
Expand All @@ -448,41 +447,41 @@ void CheckpointCoordinator::triggerLastCheckpointAndFlush()
LOG_INFO(logger, "Trigger last checkpoint and flush begin");
Stopwatch stopwatch;

Strings qids;
std::vector<Int64> next_epochs;
Strings node_desces;
std::vector<std::optional<std::weak_ptr<PipelineExecutor>>> executors;
std::vector<std::weak_ptr<PipelineExecutor>> executors;
std::vector<CheckpointContextPtr> ckpt_ctxes;

{
std::scoped_lock lock(mutex);

executors.reserve(queries.size());
ckpt_ctxes.reserve(queries.size());
for (auto & [qid, query] : queries)
{
qids.emplace_back(qid);

if (query->current_epoch != 0)
{
next_epochs.emplace_back(query->current_epoch);
node_desces.emplace_back(query->outstandingAckNodeDescriptions());
executors.emplace_back();
ckpt_ctxes.emplace_back(std::make_shared<CheckpointContext>(query->current_epoch, qid, this));
}
else
{
/// Notify the source processor to start checkpoint in a new epoch
next_epochs.emplace_back(query->last_epoch + 1);
node_desces.emplace_back();
executors.emplace_back(query->executor);
ckpt_ctxes.emplace_back(std::make_shared<CheckpointContext>(query->last_epoch + 1, qid, this));
query->current_epoch = ckpt_ctxes.back()->epoch; /// new epoch in process
}
}
}

assert(executors.size() == ckpt_ctxes.size());

/// <query_id, triggered_epoch>
std::vector<std::pair<std::string_view, Int64>> triggered_queries;
triggered_queries.reserve(qids.size());
for (size_t i = 0; i < qids.size(); ++i)
triggered_queries.reserve(executors.size());
for (size_t i = 0; i < executors.size(); ++i)
{
/// FIXME: So far we've only enforced a simple flush strategy by triggering new checkpoint once (regardless of success)
if (doTriggerCheckpoint(executors[i], qids[i], next_epochs[i], node_desces[i]))
triggered_queries.emplace_back(qids[i], next_epochs[i]);
if (doTriggerCheckpoint(executors[i], ckpt_ctxes[i]))
triggered_queries.emplace_back(ckpt_ctxes[i]->qid, ckpt_ctxes[i]->epoch);
}

// Wait for last checkpoint flush completed
Expand Down Expand Up @@ -513,4 +512,16 @@ void CheckpointCoordinator::triggerLastCheckpointAndFlush()
stopwatch.stop();
LOG_INFO(logger, "Trigger last checkpoint and flush end (elapsed {} milliseconds)", stopwatch.elapsedMilliseconds());
}

void CheckpointCoordinator::resetCurrentCheckpointEpoch(const String & qid)
{
std::scoped_lock lock(mutex);
auto iter = queries.find(qid);
if (iter == queries.end())
/// Already canceled the query
return;

iter->second->current_epoch = 0;
}

}
8 changes: 3 additions & 5 deletions src/Checkpoint/CheckpointCoordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,9 @@ class CheckpointCoordinator final
void triggerCheckpoint(const String & qid, UInt64 checkpoint_interval);
void removeExpiredCheckpoints(bool delete_marked);

bool doTriggerCheckpoint(
const std::optional<std::weak_ptr<PipelineExecutor>> & executor,
std::string_view qid,
Int64 next_epoch,
std::string_view node_desc);
bool doTriggerCheckpoint(const std::weak_ptr<PipelineExecutor> & executor, CheckpointContextPtr ckpt_ctx);

void resetCurrentCheckpointEpoch(const String & qid);

private:
std::unique_ptr<CheckpointStorage> ckpt;
Expand Down
12 changes: 12 additions & 0 deletions src/Processors/Executors/ExecutingGraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Processors/PlaceholdProcessor.h>
#include <Processors/Streaming/ISource.h>

#include <Poco/JSON/Object.h>
#include <Poco/JSON/Array.h>
Expand Down Expand Up @@ -543,6 +544,17 @@ void ExecutingGraph::initCheckpointNodes()
assert(!checkpoint_trigger_nodes.empty() && !checkpoint_ack_nodes.empty());
}

bool ExecutingGraph::hasProcessedNewDataSinceLastCheckpoint() const noexcept
{
for (const auto * node : checkpoint_trigger_nodes)
{
const auto * streaming_source = dynamic_cast<const Streaming::ISource *>(node->processor);
if (streaming_source->hasProcessedNewDataSinceLastCheckpoint())
return true;
}
return false;
}

void ExecutingGraph::triggerCheckpoint(CheckpointContextPtr ckpt_ctx)
{
for (auto * node : checkpoint_trigger_nodes)
Expand Down
2 changes: 2 additions & 0 deletions src/Processors/Executors/ExecutingGraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ class ExecutingGraph
/// Init Checkpoint trigger nodes and ack nodes
void initCheckpointNodes();

bool hasProcessedNewDataSinceLastCheckpoint() const noexcept;

void triggerCheckpoint(CheckpointContextPtr ckpt_ctx);

template<typename Iterator>
Expand Down
5 changes: 5 additions & 0 deletions src/Processors/Executors/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,11 @@ void PipelineExecutor::serialize(CheckpointContextPtr ckpt_ctx) const
}
}

bool PipelineExecutor::hasProcessedNewDataSinceLastCheckpoint() const noexcept
{
return graph->hasProcessedNewDataSinceLastCheckpoint();
}

void PipelineExecutor::triggerCheckpoint(CheckpointContextPtr ckpt_ctx)
{
graph->triggerCheckpoint(std::move(ckpt_ctx));
Expand Down
2 changes: 2 additions & 0 deletions src/Processors/Executors/PipelineExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class PipelineExecutor final : public std::enable_shared_from_this<PipelineExecu

String getStats() const;

bool hasProcessedNewDataSinceLastCheckpoint() const noexcept;

/// Trigger checkpointing the states of operators in the graph
void triggerCheckpoint(CheckpointContextPtr ckpt_ctx);

Expand Down
12 changes: 6 additions & 6 deletions src/Processors/Streaming/ISource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ void ISource::checkpoint(CheckpointContextPtr ckpt_ctx_)
void ISource::recover(CheckpointContextPtr ckpt_ctx_)
{
doRecover(std::move(ckpt_ctx_));
last_checkpointed_sn = lastProcessedSN();
setLastCheckpointSN(lastProcessedSN());

/// Reset consume offset started from the next of last checkpointed sn (if not manually reset before recovery)
if (!reseted_start_sn.has_value() && last_checkpointed_sn >= 0)
doResetStartSN(last_checkpointed_sn + 1);
/// Reset consume offset started from the next of last checkpoint sn (if not manually reset before recovery)
if (!reset_start_sn.has_value() && lastCheckpointSN() >= 0)
doResetStartSN(lastCheckpointSN() + 1);
}

void ISource::resetStartSN(Int64 sn)
{
reseted_start_sn = sn;
reset_start_sn = sn;
doResetStartSN(sn);
}

Expand All @@ -32,7 +32,7 @@ std::optional<Chunk> ISource::tryGenerate()
if (auto current_ckpt_ctx = ckpt_request.poll(); current_ckpt_ctx)
{
auto chunk = doCheckpoint(std::move(current_ckpt_ctx));
last_checkpointed_sn = lastProcessedSN();
setLastCheckpointSN(lastProcessedSN());
return std::move(chunk);
}

Expand Down
17 changes: 12 additions & 5 deletions src/Processors/Streaming/ISource.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,24 @@ class ISource : public DB::ISource

virtual String description() const { return ""; }

/// \brief Get the last progressed sequence number of the source, it shouldn't be called during pipeline execution (thread-unsafe)
virtual Int64 lastProcessedSN() const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented for lastProcessedSN() of {}", getName()); }
/// \brief Get/Set the last progressed sequence number of the source
Int64 lastProcessedSN() const noexcept { return last_processed_sn.load(std::memory_order_relaxed); }
void setLastProcessedSN(Int64 sn) noexcept { last_processed_sn.store(sn, std::memory_order_relaxed); }

/// \brief Reset the start sequence number of the source, it must be called before the pipeline execution (thread-unsafe)
void resetStartSN(Int64 sn);

Int64 lastCheckpointedSN() const noexcept { return last_checkpointed_sn; }
/// \brief Get the last checkpoint sequence number of the source
Int64 lastCheckpointSN() const noexcept { return last_ckpt_sn.load(std::memory_order_relaxed); }

bool hasProcessedNewDataSinceLastCheckpoint() const noexcept { return lastProcessedSN() > lastCheckpointSN(); }

void checkpoint(CheckpointContextPtr ckpt_ctx_) override final;
void recover(CheckpointContextPtr ckpt_ctx_) override final;

private:
void setLastCheckpointSN(Int64 ckpt_sn) noexcept { last_ckpt_sn.store(ckpt_sn, std::memory_order_relaxed); }

std::optional<Chunk> tryGenerate() override final;

/// \brief Checkpointing the source state (include lastProcessedSN())
Expand All @@ -58,8 +64,9 @@ class ISource : public DB::ISource
private:
/// For checkpoint
CheckpointRequest ckpt_request;
NO_SERDE std::optional<Int64> reseted_start_sn;
NO_SERDE Int64 last_checkpointed_sn = -1;
NO_SERDE std::optional<Int64> reset_start_sn;
NO_SERDE std::atomic<Int64> last_ckpt_sn = -1;
SERDE std::atomic<Int64> last_processed_sn = -1;
};

using StreamingSourcePtr = std::shared_ptr<ISource>;
Expand Down
4 changes: 4 additions & 0 deletions src/Processors/Transforms/Streaming/AggregatingTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,10 @@ void AggregatingTransform::recover(CheckpointContextPtr ckpt_ctx)
{
UInt64 last_rows = 0;
DB::readIntBinary<UInt64>(last_rows, rb);
/// In case when for we had global aggregated some data, but done checkpoint request before finializing
if (last_rows > 0) [[unlikely]]
LOG_WARNING(log, "Last checkpoint state don't be finalized, rows_since_last_finalization={}", last_rows);

*rows_since_last_finalization = last_rows;
}

Expand Down
Loading

0 comments on commit a5e87b4

Please sign in to comment.