From a5e87b4efa5de9ecebfa3a9d6a1f4d506546632d Mon Sep 17 00:00:00 2001 From: Lisen <38773813+yl-lisen@users.noreply.github.com> Date: Wed, 10 Apr 2024 17:56:01 +0800 Subject: [PATCH] skip checkpointing if there is no new data processed (#649) Co-authored-by: Ken Chen Co-authored-by: yoko --- .clang-format | 1 - src/Checkpoint/CheckpointCoordinator.cpp | 151 ++++++++++-------- src/Checkpoint/CheckpointCoordinator.h | 8 +- src/Processors/Executors/ExecutingGraph.cpp | 12 ++ src/Processors/Executors/ExecutingGraph.h | 2 + src/Processors/Executors/PipelineExecutor.cpp | 5 + src/Processors/Executors/PipelineExecutor.h | 2 + src/Processors/Streaming/ISource.cpp | 12 +- src/Processors/Streaming/ISource.h | 17 +- .../Streaming/AggregatingTransform.cpp | 4 + .../AggregatingTransformWithSubstream.cpp | 9 ++ .../ExternalStream/Kafka/KafkaSource.cpp | 65 ++++---- .../ExternalStream/Kafka/KafkaSource.h | 18 --- .../Streaming/StorageMaterializedView.cpp | 6 +- .../Streaming/StreamingStoreSource.cpp | 10 +- .../Streaming/StreamingStoreSourceBase.cpp | 16 +- .../Streaming/StreamingStoreSourceBase.h | 6 +- .../0013_changelog_stream13.yaml | 2 + .../0018_query_state5_substream.json | 8 +- .../0018_query_state6_view.json | 4 +- .../0030_two_level_global_aggr.yaml | 1 + 21 files changed, 194 insertions(+), 165 deletions(-) diff --git a/.clang-format b/.clang-format index 12acbce80c9..aed3c7c13f8 100644 --- a/.clang-format +++ b/.clang-format @@ -74,7 +74,6 @@ ConstructorInitializerIndentWidth: 4 ContinuationIndentWidth: 4 DerivePointerAlignment: false DisableFormat: false -IndentWidth: 4 IndentWrappedFunctionNames: false MacroBlockBegin: '' MacroBlockEnd: '' diff --git a/src/Checkpoint/CheckpointCoordinator.cpp b/src/Checkpoint/CheckpointCoordinator.cpp index 5cf0f766487..1969393ff48 100644 --- a/src/Checkpoint/CheckpointCoordinator.cpp +++ b/src/Checkpoint/CheckpointCoordinator.cpp @@ -248,10 +248,8 @@ void CheckpointCoordinator::removeCheckpoint(const String & qid) void CheckpointCoordinator::triggerCheckpoint(const String & qid, UInt64 checkpoint_interval) { - Int64 next_epoch = 0; - - String node_desc; - std::optional> executor; + std::weak_ptr executor; + CheckpointContextPtr ckpt_ctx; { std::scoped_lock lock(mutex); @@ -262,22 +260,23 @@ void CheckpointCoordinator::triggerCheckpoint(const String & qid, UInt64 checkpo if (iter->second->current_epoch != 0) { - next_epoch = iter->second->current_epoch; - node_desc = iter->second->outstandingAckNodeDescriptions(); + ckpt_ctx = std::make_shared(iter->second->current_epoch, qid, this); } else { /// Notify the source processor to start checkpoint in a new epoch - next_epoch = iter->second->last_epoch + 1; executor = iter->second->executor; + ckpt_ctx = std::make_shared(iter->second->last_epoch + 1, qid, this); + iter->second->current_epoch = ckpt_ctx->epoch; /// new epoch in process } } - if (doTriggerCheckpoint(executor, qid, next_epoch, node_desc)) + if (doTriggerCheckpoint(executor, std::move(ckpt_ctx))) timer_service.runAfter( checkpoint_interval, [query_id = qid, interval = checkpoint_interval, this]() { triggerCheckpoint(query_id, interval); }); else - timer_service.runAfter(15, [query_id = qid, interval = checkpoint_interval, this]() { triggerCheckpoint(query_id, interval); }); + /// Retry after interval - min(15s, checkpoint_interval) + timer_service.runAfter(std::min(15, checkpoint_interval), [query_id = qid, interval = checkpoint_interval, this]() { triggerCheckpoint(query_id, interval); }); } void CheckpointCoordinator::preCheckpoint(DB::CheckpointContextPtr ckpt_ctx) @@ -309,9 +308,7 @@ void CheckpointCoordinator::checkpointed(VersionType /*version*/, UInt32 node_id /// Unregistered return; - assert(ckpt_ctx->epoch == iter->second->current_epoch || iter->second->current_epoch == 0); - - iter->second->current_epoch = ckpt_ctx->epoch; + assert(ckpt_ctx->epoch == iter->second->current_epoch); if (iter->second->ack(node_id)) { @@ -387,58 +384,60 @@ void CheckpointCoordinator::removeExpiredCheckpoints(bool delete_marked) timer_service.runAfter(last_access_check_interval, [this]() { removeExpiredCheckpoints(false); }); } -bool CheckpointCoordinator::doTriggerCheckpoint( - const std::optional> & executor, std::string_view qid, Int64 next_epoch, std::string_view node_desc) +bool CheckpointCoordinator::doTriggerCheckpoint(const std::weak_ptr & executor, CheckpointContextPtr ckpt_ctx) { - if (executor.has_value()) - { - /// Create directory before hand. Then all other processors don't need - /// check and create target epoch ckpt directory. - try - { - auto ckpt_ctx = std::make_shared(next_epoch, qid, this); - preCheckpoint(ckpt_ctx); - - { - auto exec = executor.value().lock(); - if (!exec) - { - LOG_ERROR( - logger, - "Failed to trigger checkpointing state for query={} epoch={}, since it's already cancelled", - qid, - next_epoch); - return false; - } - - exec->triggerCheckpoint(std::move(ckpt_ctx)); - } + bool triggered = false; + SCOPE_EXIT({ + if (!triggered) + resetCurrentCheckpointEpoch(ckpt_ctx->qid); + }); - LOG_INFO(logger, "Triggered checkpointing state for query={} epoch={}", qid, next_epoch); - return true; - } - catch (const Exception & e) - { - LOG_ERROR(logger, "Failed to trigger checkpointing state for query={} epoch={} error={}", qid, next_epoch, e.message()); - } - catch (const std::exception & ex) + /// Create directory before hand. Then all other processors don't need + /// check and create target epoch ckpt directory. + try + { + auto exec = executor.lock(); + if (!exec) { - LOG_ERROR(logger, "Failed to trigger checkpointing state for query={} epoch={} error={}", qid, next_epoch, ex.what()); + LOG_ERROR( + logger, + "Failed to trigger checkpointing state for query={} epoch={}, since prev checkpoint is still in-progress or it was " + "already cancelled", + ckpt_ctx->qid, + ckpt_ctx->epoch); + return false; } - catch (...) + + if (!exec->hasProcessedNewDataSinceLastCheckpoint()) { - tryLogCurrentException(logger, fmt::format("Failed to trigger checkpointing state for query={} epoch={}", qid, next_epoch)); + LOG_INFO( + logger, + "Skipped checkpointing state for query={} epoch={}, since there is no new data processed", + ckpt_ctx->qid, + ckpt_ctx->epoch); + return false; } + + preCheckpoint(ckpt_ctx); + + exec->triggerCheckpoint(ckpt_ctx); + + triggered = true; + + LOG_INFO(logger, "Triggered checkpointing state for query={} epoch={}", ckpt_ctx->qid, ckpt_ctx->epoch); + return true; } - else + catch (const Exception & e) + { + LOG_ERROR(logger, "Failed to trigger checkpointing state for query={} epoch={} error={}", ckpt_ctx->qid, ckpt_ctx->epoch, e.message()); + } + catch (const std::exception & ex) + { + LOG_ERROR(logger, "Failed to trigger checkpointing state for query={} epoch={} error={}", ckpt_ctx->qid, ckpt_ctx->epoch, ex.what()); + } + catch (...) { - /// prev ckpt is still in progress, reschedule to check later - LOG_INFO( - logger, - "Prev checkpoint for query={} epoch={} is still in-progress, outstanding_ack_node_descriptions={}", - qid, - next_epoch, - node_desc); + tryLogCurrentException(logger, fmt::format("Failed to trigger checkpointing state for query={} epoch={}", ckpt_ctx->qid, ckpt_ctx->epoch)); } return false; } @@ -448,41 +447,41 @@ void CheckpointCoordinator::triggerLastCheckpointAndFlush() LOG_INFO(logger, "Trigger last checkpoint and flush begin"); Stopwatch stopwatch; - Strings qids; - std::vector next_epochs; - Strings node_desces; - std::vector>> executors; + std::vector> executors; + std::vector ckpt_ctxes; { std::scoped_lock lock(mutex); + + executors.reserve(queries.size()); + ckpt_ctxes.reserve(queries.size()); for (auto & [qid, query] : queries) { - qids.emplace_back(qid); - if (query->current_epoch != 0) { - next_epochs.emplace_back(query->current_epoch); - node_desces.emplace_back(query->outstandingAckNodeDescriptions()); executors.emplace_back(); + ckpt_ctxes.emplace_back(std::make_shared(query->current_epoch, qid, this)); } else { /// Notify the source processor to start checkpoint in a new epoch - next_epochs.emplace_back(query->last_epoch + 1); - node_desces.emplace_back(); executors.emplace_back(query->executor); + ckpt_ctxes.emplace_back(std::make_shared(query->last_epoch + 1, qid, this)); + query->current_epoch = ckpt_ctxes.back()->epoch; /// new epoch in process } } } + assert(executors.size() == ckpt_ctxes.size()); + /// std::vector> triggered_queries; - triggered_queries.reserve(qids.size()); - for (size_t i = 0; i < qids.size(); ++i) + triggered_queries.reserve(executors.size()); + for (size_t i = 0; i < executors.size(); ++i) { /// FIXME: So far we've only enforced a simple flush strategy by triggering new checkpoint once (regardless of success) - if (doTriggerCheckpoint(executors[i], qids[i], next_epochs[i], node_desces[i])) - triggered_queries.emplace_back(qids[i], next_epochs[i]); + if (doTriggerCheckpoint(executors[i], ckpt_ctxes[i])) + triggered_queries.emplace_back(ckpt_ctxes[i]->qid, ckpt_ctxes[i]->epoch); } // Wait for last checkpoint flush completed @@ -513,4 +512,16 @@ void CheckpointCoordinator::triggerLastCheckpointAndFlush() stopwatch.stop(); LOG_INFO(logger, "Trigger last checkpoint and flush end (elapsed {} milliseconds)", stopwatch.elapsedMilliseconds()); } + +void CheckpointCoordinator::resetCurrentCheckpointEpoch(const String & qid) +{ + std::scoped_lock lock(mutex); + auto iter = queries.find(qid); + if (iter == queries.end()) + /// Already canceled the query + return; + + iter->second->current_epoch = 0; +} + } diff --git a/src/Checkpoint/CheckpointCoordinator.h b/src/Checkpoint/CheckpointCoordinator.h index 2bdc4d56cd4..4058748b355 100644 --- a/src/Checkpoint/CheckpointCoordinator.h +++ b/src/Checkpoint/CheckpointCoordinator.h @@ -73,11 +73,9 @@ class CheckpointCoordinator final void triggerCheckpoint(const String & qid, UInt64 checkpoint_interval); void removeExpiredCheckpoints(bool delete_marked); - bool doTriggerCheckpoint( - const std::optional> & executor, - std::string_view qid, - Int64 next_epoch, - std::string_view node_desc); + bool doTriggerCheckpoint(const std::weak_ptr & executor, CheckpointContextPtr ckpt_ctx); + + void resetCurrentCheckpointEpoch(const String & qid); private: std::unique_ptr ckpt; diff --git a/src/Processors/Executors/ExecutingGraph.cpp b/src/Processors/Executors/ExecutingGraph.cpp index f2e73283079..a9a2a4ef45c 100644 --- a/src/Processors/Executors/ExecutingGraph.cpp +++ b/src/Processors/Executors/ExecutingGraph.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -543,6 +544,17 @@ void ExecutingGraph::initCheckpointNodes() assert(!checkpoint_trigger_nodes.empty() && !checkpoint_ack_nodes.empty()); } +bool ExecutingGraph::hasProcessedNewDataSinceLastCheckpoint() const noexcept +{ + for (const auto * node : checkpoint_trigger_nodes) + { + const auto * streaming_source = dynamic_cast(node->processor); + if (streaming_source->hasProcessedNewDataSinceLastCheckpoint()) + return true; + } + return false; +} + void ExecutingGraph::triggerCheckpoint(CheckpointContextPtr ckpt_ctx) { for (auto * node : checkpoint_trigger_nodes) diff --git a/src/Processors/Executors/ExecutingGraph.h b/src/Processors/Executors/ExecutingGraph.h index 8a5eb26510e..f4ad0c1295e 100644 --- a/src/Processors/Executors/ExecutingGraph.h +++ b/src/Processors/Executors/ExecutingGraph.h @@ -173,6 +173,8 @@ class ExecutingGraph /// Init Checkpoint trigger nodes and ack nodes void initCheckpointNodes(); + bool hasProcessedNewDataSinceLastCheckpoint() const noexcept; + void triggerCheckpoint(CheckpointContextPtr ckpt_ctx); template diff --git a/src/Processors/Executors/PipelineExecutor.cpp b/src/Processors/Executors/PipelineExecutor.cpp index c0580c25a28..c29b0b5e119 100644 --- a/src/Processors/Executors/PipelineExecutor.cpp +++ b/src/Processors/Executors/PipelineExecutor.cpp @@ -497,6 +497,11 @@ void PipelineExecutor::serialize(CheckpointContextPtr ckpt_ctx) const } } +bool PipelineExecutor::hasProcessedNewDataSinceLastCheckpoint() const noexcept +{ + return graph->hasProcessedNewDataSinceLastCheckpoint(); +} + void PipelineExecutor::triggerCheckpoint(CheckpointContextPtr ckpt_ctx) { graph->triggerCheckpoint(std::move(ckpt_ctx)); diff --git a/src/Processors/Executors/PipelineExecutor.h b/src/Processors/Executors/PipelineExecutor.h index 9cc50cc1e7f..14661c3d150 100644 --- a/src/Processors/Executors/PipelineExecutor.h +++ b/src/Processors/Executors/PipelineExecutor.h @@ -68,6 +68,8 @@ class PipelineExecutor final : public std::enable_shared_from_this= 0) - doResetStartSN(last_checkpointed_sn + 1); + /// Reset consume offset started from the next of last checkpoint sn (if not manually reset before recovery) + if (!reset_start_sn.has_value() && lastCheckpointSN() >= 0) + doResetStartSN(lastCheckpointSN() + 1); } void ISource::resetStartSN(Int64 sn) { - reseted_start_sn = sn; + reset_start_sn = sn; doResetStartSN(sn); } @@ -32,7 +32,7 @@ std::optional ISource::tryGenerate() if (auto current_ckpt_ctx = ckpt_request.poll(); current_ckpt_ctx) { auto chunk = doCheckpoint(std::move(current_ckpt_ctx)); - last_checkpointed_sn = lastProcessedSN(); + setLastCheckpointSN(lastProcessedSN()); return std::move(chunk); } diff --git a/src/Processors/Streaming/ISource.h b/src/Processors/Streaming/ISource.h index 4359749aa90..e1dd3bbee3d 100644 --- a/src/Processors/Streaming/ISource.h +++ b/src/Processors/Streaming/ISource.h @@ -23,18 +23,24 @@ class ISource : public DB::ISource virtual String description() const { return ""; } - /// \brief Get the last progressed sequence number of the source, it shouldn't be called during pipeline execution (thread-unsafe) - virtual Int64 lastProcessedSN() const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented for lastProcessedSN() of {}", getName()); } + /// \brief Get/Set the last progressed sequence number of the source + Int64 lastProcessedSN() const noexcept { return last_processed_sn.load(std::memory_order_relaxed); } + void setLastProcessedSN(Int64 sn) noexcept { last_processed_sn.store(sn, std::memory_order_relaxed); } /// \brief Reset the start sequence number of the source, it must be called before the pipeline execution (thread-unsafe) void resetStartSN(Int64 sn); - Int64 lastCheckpointedSN() const noexcept { return last_checkpointed_sn; } + /// \brief Get the last checkpoint sequence number of the source + Int64 lastCheckpointSN() const noexcept { return last_ckpt_sn.load(std::memory_order_relaxed); } + + bool hasProcessedNewDataSinceLastCheckpoint() const noexcept { return lastProcessedSN() > lastCheckpointSN(); } void checkpoint(CheckpointContextPtr ckpt_ctx_) override final; void recover(CheckpointContextPtr ckpt_ctx_) override final; private: + void setLastCheckpointSN(Int64 ckpt_sn) noexcept { last_ckpt_sn.store(ckpt_sn, std::memory_order_relaxed); } + std::optional tryGenerate() override final; /// \brief Checkpointing the source state (include lastProcessedSN()) @@ -58,8 +64,9 @@ class ISource : public DB::ISource private: /// For checkpoint CheckpointRequest ckpt_request; - NO_SERDE std::optional reseted_start_sn; - NO_SERDE Int64 last_checkpointed_sn = -1; + NO_SERDE std::optional reset_start_sn; + NO_SERDE std::atomic last_ckpt_sn = -1; + SERDE std::atomic last_processed_sn = -1; }; using StreamingSourcePtr = std::shared_ptr; diff --git a/src/Processors/Transforms/Streaming/AggregatingTransform.cpp b/src/Processors/Transforms/Streaming/AggregatingTransform.cpp index 7f0404993a9..765a440813b 100644 --- a/src/Processors/Transforms/Streaming/AggregatingTransform.cpp +++ b/src/Processors/Transforms/Streaming/AggregatingTransform.cpp @@ -549,6 +549,10 @@ void AggregatingTransform::recover(CheckpointContextPtr ckpt_ctx) { UInt64 last_rows = 0; DB::readIntBinary(last_rows, rb); + /// In case when for we had global aggregated some data, but done checkpoint request before finializing + if (last_rows > 0) [[unlikely]] + LOG_WARNING(log, "Last checkpoint state don't be finalized, rows_since_last_finalization={}", last_rows); + *rows_since_last_finalization = last_rows; } diff --git a/src/Processors/Transforms/Streaming/AggregatingTransformWithSubstream.cpp b/src/Processors/Transforms/Streaming/AggregatingTransformWithSubstream.cpp index 53c1d906fc5..9328b8d42e3 100644 --- a/src/Processors/Transforms/Streaming/AggregatingTransformWithSubstream.cpp +++ b/src/Processors/Transforms/Streaming/AggregatingTransformWithSubstream.cpp @@ -303,6 +303,15 @@ void AggregatingTransformWithSubstream::recover(CheckpointContextPtr ckpt_ctx) { auto substream_ctx = std::make_shared(this); substream_ctx->deserialize(rb, version_); + + /// In case when for we had global aggregated some data, but done checkpoint request before finializing + if (substream_ctx->rows_since_last_finalization > 0) [[unlikely]] + LOG_WARNING( + log, + "Last checkpoint state don't be finalized in substream id={}, rows_since_last_finalization={}", + substream_ctx->id, + substream_ctx->rows_since_last_finalization); + substream_contexts.emplace(substream_ctx->id, std::move(substream_ctx)); } }); diff --git a/src/Storages/ExternalStream/Kafka/KafkaSource.cpp b/src/Storages/ExternalStream/Kafka/KafkaSource.cpp index 213f69cb9f4..549d8bffd5e 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSource.cpp +++ b/src/Storages/ExternalStream/Kafka/KafkaSource.cpp @@ -47,13 +47,12 @@ KafkaSource::KafkaSource( , topic(topic_) , shard(shard_) , offset(offset_) - , ckpt_data(kafka.topicName(), shard) , external_stream_counter(external_stream_counter_) , query_context(std::move(query_context_)) , logger(&Poco::Logger::get(fmt::format("{}.{}", kafka.getLoggerName(), consumer->name()))) { if (offset > 0) - ckpt_data.last_sn = offset - 1; + setLastProcessedSN(offset - 1); assert(external_stream_counter); @@ -109,7 +108,7 @@ Chunk KafkaSource::generate() /// result_blocks is not empty, fallthrough } - ckpt_data.last_sn = iter->second; + setLastProcessedSN(iter->second); return std::move((iter++)->first); } @@ -352,8 +351,13 @@ Chunk KafkaSource::doCheckpoint(CheckpointContextPtr ckpt_ctx_) auto result = header_chunk.clone(); result.setCheckpointContext(ckpt_ctx_); - ckpt_ctx_->coordinator->checkpoint(State::VERSION, getLogicID(), ckpt_ctx_, [&](WriteBuffer & wb) { ckpt_data.serialize(wb); }); - LOG_INFO(logger, "Saved checkpoint topic={} parition={} offset={}", ckpt_data.topic, ckpt_data.partition, ckpt_data.last_sn); + ckpt_ctx_->coordinator->checkpoint(getVersion(), getLogicID(), ckpt_ctx_, [&](WriteBuffer & wb) { + writeStringBinary(kafka.topicName(), wb); + writeIntBinary(shard, wb); + writeIntBinary(lastProcessedSN(), wb); + }); + + LOG_INFO(logger, "Saved checkpoint topic={} parition={} offset={}", kafka.topicName(), shard, lastProcessedSN()); /// FIXME, if commit failed ? /// Propagate checkpoint barriers @@ -362,10 +366,27 @@ Chunk KafkaSource::doCheckpoint(CheckpointContextPtr ckpt_ctx_) void KafkaSource::doRecover(CheckpointContextPtr ckpt_ctx_) { - ckpt_ctx_->coordinator->recover( - getLogicID(), ckpt_ctx_, [&](VersionType version, ReadBuffer & rb) { ckpt_data.deserialize(version, rb); }); - - LOG_INFO(logger, "Recovered last_sn={}", ckpt_data.last_sn); + ckpt_ctx_->coordinator->recover(getLogicID(), ckpt_ctx_, [&](VersionType, ReadBuffer & rb) { + String recovered_topic; + Int32 recovered_partition; + readStringBinary(recovered_topic, rb); + readIntBinary(recovered_partition, rb); + + if (recovered_topic != kafka.topicName() || recovered_partition != shard) + throw Exception( + ErrorCodes::RECOVER_CHECKPOINT_FAILED, + "Found mismatched kafka topic-partition. recovered={}-{}, current={}-{}", + recovered_topic, + recovered_partition, + kafka.topicName(), + shard); + + Int64 recovered_last_sn; + readIntBinary(recovered_last_sn, rb); + setLastProcessedSN(recovered_last_sn); + }); + + LOG_INFO(logger, "Recovered last_sn={}", lastProcessedSN()); } void KafkaSource::doResetStartSN(Int64 sn) @@ -377,30 +398,4 @@ void KafkaSource::doResetStartSN(Int64 sn) } } -void KafkaSource::State::serialize(WriteBuffer & wb) const -{ - writeStringBinary(topic, wb); - writeIntBinary(partition, wb); - writeIntBinary(last_sn, wb); -} - -void KafkaSource::State::deserialize(VersionType /*version*/, ReadBuffer & rb) -{ - String recovered_topic; - Int32 recovered_partition; - readStringBinary(recovered_topic, rb); - readIntBinary(recovered_partition, rb); - - if (recovered_topic != topic || recovered_partition != partition) - throw Exception( - ErrorCodes::RECOVER_CHECKPOINT_FAILED, - "Found mismatched kafka topic-partition. recovered={}-{}, current={}-{}", - recovered_topic, - recovered_partition, - topic, - partition); - - readIntBinary(last_sn, rb); -} - } diff --git a/src/Storages/ExternalStream/Kafka/KafkaSource.h b/src/Storages/ExternalStream/Kafka/KafkaSource.h index 3d30816e640..5cc34ad5a73 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSource.h +++ b/src/Storages/ExternalStream/Kafka/KafkaSource.h @@ -39,8 +39,6 @@ class KafkaSource final : public Streaming::ISource Chunk generate() override; - Int64 lastProcessedSN() const override { return ckpt_data.last_sn; } - private: void calculateColumnPositions(); void initFormatExecutor(); @@ -89,22 +87,6 @@ class KafkaSource final : public Streaming::ISource bool consume_started = false; - /// For checkpoint - struct State - { - void serialize(WriteBuffer & wb) const; - void deserialize(VersionType version, ReadBuffer & rb); - - static constexpr VersionType VERSION = 0; /// Current State Version - - /// For VERSION-0 - const String & topic; - Int32 partition; - Int64 last_sn = -1; - - State(const String & topic_, Int32 partition_) : topic(topic_), partition(partition_) { } - } ckpt_data; - ExternalStreamCounterPtr external_stream_counter; ContextPtr query_context; diff --git a/src/Storages/Streaming/StorageMaterializedView.cpp b/src/Storages/Streaming/StorageMaterializedView.cpp index a0900d07500..3d3820bcff7 100644 --- a/src/Storages/Streaming/StorageMaterializedView.cpp +++ b/src/Storages/Streaming/StorageMaterializedView.cpp @@ -99,7 +99,7 @@ std::vector getCheckpointedSNOfStreamingSources(const Streaming::Streamin std::vector sns; sns.reserve(streaming_sources.size()); for (const auto & streaming_source : streaming_sources) - sns.emplace_back(streaming_source->lastCheckpointedSN()); + sns.emplace_back(streaming_source->lastCheckpointSN()); return sns; } @@ -111,7 +111,7 @@ String dumpStreamingSourcesWithSNs(const Streaming::StreamingSourcePtrs & stream if (sns.empty()) { for (const auto & streaming_source : streaming_sources) - wb << streaming_source->getName() << "(" << streaming_source->description() << ",sn=" << streaming_source->lastCheckpointedSN() + wb << streaming_source->getName() << "(" << streaming_source->description() << ",sn=" << streaming_source->lastCheckpointSN() << ");"; } else @@ -122,7 +122,7 @@ String dumpStreamingSourcesWithSNs(const Streaming::StreamingSourcePtrs & stream /// Use last checkpointed sn instead of invalid sn for (auto && [streaming_source, sn] : std::views::zip(streaming_sources, sns)) wb << streaming_source->getName() << "(" << streaming_source->description() - << ",sn=" << (sn >= 0 ? sn : streaming_source->lastCheckpointedSN()) << ");"; + << ",sn=" << (sn >= 0 ? sn : streaming_source->lastCheckpointSN()) << ");"; } return wb.str(); diff --git a/src/Storages/Streaming/StreamingStoreSource.cpp b/src/Storages/Streaming/StreamingStoreSource.cpp index 67cf239f754..078b93e4504 100644 --- a/src/Storages/Streaming/StreamingStoreSource.cpp +++ b/src/Storages/Streaming/StreamingStoreSource.cpp @@ -19,8 +19,8 @@ StreamingStoreSource::StreamingStoreSource( Poco::Logger * log_) : StreamingStoreSourceBase(header, storage_snapshot_, std::move(context_), log_, ProcessorID::StreamingStoreSourceID) { - if (sn >= ProtonConsts::LogStartSN) - last_sn = sn - 1; + if (sn > 0) + setLastProcessedSN(sn - 1); const auto & settings = query_context->getSettingsRef(); if (settings.record_consume_batch_count.value != 0) @@ -36,7 +36,7 @@ StreamingStoreSource::StreamingStoreSource( auto consumer = kpool.getOrCreateStreaming(stream_shard_->logStoreClusterId()); assert(consumer); kafka_reader = std::make_unique( - std::move(stream_shard_), sn, columns_desc.physical_column_positions_to_read, std::move(consumer), log); + std::move(stream_shard_), sn, columns_desc.physical_column_positions_to_read, std::move(consumer), logger); } else { @@ -50,7 +50,7 @@ StreamingStoreSource::StreamingStoreSource( /*schema_provider*/ nullptr, /*schema_version*/ 0, columns_desc.physical_column_positions_to_read, - log); + logger); } } @@ -152,7 +152,7 @@ void StreamingStoreSource::doResetStartSN(Int64 sn) else kafka_reader->resetOffset(sn); - LOG_INFO(log, "Reset start sn={}", sn); + LOG_INFO(logger, "Reset start sn={}", sn); } } } diff --git a/src/Storages/Streaming/StreamingStoreSourceBase.cpp b/src/Storages/Streaming/StreamingStoreSourceBase.cpp index 9495b70aa4b..c9c47be60bf 100644 --- a/src/Storages/Streaming/StreamingStoreSourceBase.cpp +++ b/src/Storages/Streaming/StreamingStoreSourceBase.cpp @@ -16,12 +16,12 @@ extern const int RECOVER_CHECKPOINT_FAILED; } StreamingStoreSourceBase::StreamingStoreSourceBase( - const Block & header, const StorageSnapshotPtr & storage_snapshot_, ContextPtr query_context_, Poco::Logger * log_, ProcessorID pid_) + const Block & header, const StorageSnapshotPtr & storage_snapshot_, ContextPtr query_context_, Poco::Logger * logger_, ProcessorID pid_) : Streaming::ISource(header, true, pid_) , storage_snapshot( std::make_shared(*storage_snapshot_)) /// We like to make a copy of it since we will mutate the snapshot , query_context(std::move(query_context_)) - , log(log_) + , logger(logger_) , header_chunk(header.getColumns(), 0) , columns_desc(header.getNames(), storage_snapshot) { @@ -151,7 +151,7 @@ Chunk StreamingStoreSourceBase::generate() /// result_blocks is not empty, fallthrough } - last_sn = iter->second; + setLastProcessedSN(iter->second); return std::move((iter++)->first); } @@ -172,9 +172,11 @@ Chunk StreamingStoreSourceBase::doCheckpoint(CheckpointContextPtr current_ckpt_c writeIntBinary(processor_id, wb); writeStringBinary(stream_shard.first, wb); writeIntBinary(stream_shard.second, wb); - writeIntBinary(last_sn, wb); + writeIntBinary(lastProcessedSN(), wb); }); + LOG_INFO(logger, "Saved checkpoint sn={}", lastProcessedSN()); + /// FIXME, if commit failed ? /// Propagate checkpoint barriers return result; @@ -206,10 +208,12 @@ void StreamingStoreSourceBase::doRecover(CheckpointContextPtr ckpt_ctx_) current_stream_shard.first, current_stream_shard.second); - readIntBinary(last_sn, rb); + Int64 recovered_last_sn = 0; + readIntBinary(recovered_last_sn, rb); + setLastProcessedSN(recovered_last_sn); }); - LOG_INFO(log, "Recovered last_sn={}", last_sn); + LOG_INFO(logger, "Recovered last_sn={}", lastProcessedSN()); } } diff --git a/src/Storages/Streaming/StreamingStoreSourceBase.h b/src/Storages/Streaming/StreamingStoreSourceBase.h index f54b1968684..5c413cc1f4f 100644 --- a/src/Storages/Streaming/StreamingStoreSourceBase.h +++ b/src/Storages/Streaming/StreamingStoreSourceBase.h @@ -21,8 +21,6 @@ class StreamingStoreSourceBase : public Streaming::ISource Chunk generate() override; - Int64 lastProcessedSN() const override { return last_sn; } - private: virtual void readAndProcess() = 0; virtual std::pair getStreamShard() const = 0; @@ -35,7 +33,7 @@ class StreamingStoreSourceBase : public Streaming::ISource ContextPtr query_context; - Poco::Logger * log; + Poco::Logger * logger; Chunk header_chunk; @@ -47,7 +45,5 @@ class StreamingStoreSourceBase : public Streaming::ISource std::vector> result_chunks_with_sns; std::vector>::iterator iter; - - Int64 last_sn = -1; }; } diff --git a/tests/stream/test_stream_smoke/0013_changelog_stream13.yaml b/tests/stream/test_stream_smoke/0013_changelog_stream13.yaml index 024f40dada1..00e384f1870 100644 --- a/tests/stream/test_stream_smoke/0013_changelog_stream13.yaml +++ b/tests/stream/test_stream_smoke/0013_changelog_stream13.yaml @@ -24,6 +24,7 @@ tests: - single shard - checkpoint - emit changelog + - bug name: "global aggr on changelog_kv stream with single shard" description: sum_distinct and count_distinct for changelog_kv stream with single shard. steps: @@ -275,6 +276,7 @@ tests: - changelog_kv - single shard - checkpoint + - bug name: "global aggr on changelog_kv stream with single shard" description: sum_distinct and count_distinct for changelog_kv stream with single shard. steps: diff --git a/tests/stream/test_stream_smoke/0018_query_state5_substream.json b/tests/stream/test_stream_smoke/0018_query_state5_substream.json index 7f1de24665c..ca1495d25dd 100644 --- a/tests/stream/test_stream_smoke/0018_query_state5_substream.json +++ b/tests/stream/test_stream_smoke/0018_query_state5_substream.json @@ -85,7 +85,7 @@ }, { "id": 102, - "tags": ["query_state"], + "tags": ["query_state", "bug"], "name": "multiple_recover_global_aggr_in_substream", "description": "multiple recover global aggregation in substream from state checkpoint", "steps":[ @@ -93,9 +93,9 @@ "statements": [ {"client":"python", "query_type": "table", "query":"drop stream if exists test19_state_stream5"}, {"client":"python", "query_type": "table", "exist":"test19_state_stream5", "exist_wait":2, "wait":1, "query":"create stream test19_state_stream5 (id string, location string, value float, timestamp datetime64(3) default now64(3))"}, - {"client":"python", "query_type": "stream", "query_id":"19102", "wait":1, "terminate":"manual", "query":"subscribe to select id, count(*) over (partition by id), min(value) over (partition by id), max(value) over (partition by id) from test19_state_stream5 settings checkpoint_interval=1"}, + {"client":"python", "query_type": "stream", "query_id":"19102", "wait":1, "terminate":"manual", "query":"subscribe to select id, count(*) over (partition by id), min(value) over (partition by id), max(value) over (partition by id) from test19_state_stream5 settings checkpoint_interval=6"}, {"client":"python", "query_type": "table", "depends_on": "19102", "wait":1, "query": "insert into test19_state_stream5(id, location, value, timestamp) values ('dev1', 'ca', 57.3, '2020-02-02 20:00:00')"}, - {"client":"python", "query_type": "table", "kill":"19102", "kill_wait":3, "query": "insert into test19_state_stream5(id, location, value, timestamp) values ('dev2', 'ca', 58.3, '2020-02-02 20:00:02')"} + {"client":"python", "query_type": "table", "kill":"19102", "kill_wait":8, "query": "insert into test19_state_stream5(id, location, value, timestamp) values ('dev2', 'ca', 58.3, '2020-02-02 20:00:02')"} ] }, { @@ -103,7 +103,7 @@ {"client":"python", "query_type": "table", "wait":1, "query": "insert into test19_state_stream5(id, location, value, timestamp) values ('dev1', 'ca', 68, '2020-02-02 20:00:03')"}, {"client":"python", "query_type": "table", "query": "insert into test19_state_stream5(id, location, value, timestamp) values ('dev2', 'ca', 70, '2020-02-02 20:00:04')"}, {"client":"python", "query_type": "stream","query_id":"19102-1", "terminate": "manual", "query":"recover from '19102'"}, - {"client":"python", "query_type": "table", "wait":3, "query":"kill query where query_id='19102' sync"} + {"client":"python", "query_type": "table", "wait":8, "query":"kill query where query_id='19102' sync"} ] }, { diff --git a/tests/stream/test_stream_smoke/0018_query_state6_view.json b/tests/stream/test_stream_smoke/0018_query_state6_view.json index 145a773ec01..ffe3d11c6d8 100644 --- a/tests/stream/test_stream_smoke/0018_query_state6_view.json +++ b/tests/stream/test_stream_smoke/0018_query_state6_view.json @@ -500,8 +500,8 @@ {"client":"python", "query_type": "table", "wait":1, "query":"drop stream if exists test19_state_stream6"}, {"client":"python", "query_type": "table", "exist":"test19_state_stream6", "exist_wait":2, "wait":1, "query":"create stream test19_state_stream6 (id string, value_str string)"}, {"client":"python", "query_type": "table", "exist":"test19_state_view6", "exist_wait":2, "wait":1, "query":"create view if not exists test19_state_view6 as select emit_version(), group_array(id) as ids, group_array(latest_value_str) as latest_value_strs, map_cast(ids, latest_value_strs) as id_value_strs, id_value_strs['a'] as id_value_a, to_float(id_value_a) from (select id, latest(value_str) as latest_value_str from test19_state_stream6 group by id order by id)"}, - {"client":"python", "query_type": "stream", "query_id":"19163", "wait":1, "terminate":"manual", "query":"subscribe to select * from test19_state_view6 limit 2 settings checkpoint_interval=1"}, - {"client":"python", "query_type": "table", "depends_on":"19163", "kill":"19163", "kill_wait":3, "wait":1,"query":"insert into test19_state_stream6 (id, value_str) values ('a', '100.1') ('b', '200.1') ('c', '300.1')"} + {"client":"python", "query_type": "stream", "query_id":"19163", "wait":1, "terminate":"manual", "query":"subscribe to select * from test19_state_view6 limit 2 settings checkpoint_interval=6"}, + {"client":"python", "query_type": "table", "depends_on":"19163", "kill":"19163", "kill_wait":8, "wait":1,"query":"insert into test19_state_stream6 (id, value_str) values ('a', '100.1') ('b', '200.1') ('c', '300.1')"} ] }, { diff --git a/tests/stream/test_stream_smoke/0030_two_level_global_aggr.yaml b/tests/stream/test_stream_smoke/0030_two_level_global_aggr.yaml index d1c3335b8cc..bad29114c43 100644 --- a/tests/stream/test_stream_smoke/0030_two_level_global_aggr.yaml +++ b/tests/stream/test_stream_smoke/0030_two_level_global_aggr.yaml @@ -176,6 +176,7 @@ tests: - "emit changelog" - "changelog aggr" - "query_state" + - "bug" name: subscribe-and-recover-two-level-global-aggr description: subscribe to two level global aggregation query, then recover from checkpointed. steps: