From abbb0dd211fe0242446aaa1f20db6b469f812d0b Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Thu, 27 Jun 2024 16:15:21 +0300 Subject: [PATCH] CDC Initial Scan progress --- ydb/core/protos/flat_scheme_op.proto | 6 ++ .../schemeshard_path_describer.cpp | 6 ++ .../ut_cdc_stream/ut_cdc_stream.cpp | 72 +++++++++++++++++++ .../tx/schemeshard/ut_helpers/ls_checks.cpp | 8 +++ .../tx/schemeshard/ut_helpers/ls_checks.h | 1 + ydb/core/ydb_convert/table_description.cpp | 6 ++ ydb/public/api/protos/ydb_table.proto | 7 ++ .../ydb_cli/commands/ydb_service_scheme.cpp | 10 ++- ydb/public/sdk/cpp/client/ydb_table/table.cpp | 32 +++++++++ ydb/public/sdk/cpp/client/ydb_table/table.h | 18 ++++- 10 files changed, 163 insertions(+), 3 deletions(-) diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 864ec8e74d0d..f2f436dd0a6c 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -884,6 +884,11 @@ enum ECdcStreamFormat { ECdcStreamFormatDebeziumJson = 4; } +message TCdcStreamScanProgress { + optional uint32 ShardsTotal = 1; + optional uint32 ShardsCompleted = 2; +} + message TCdcStreamDescription { optional string Name = 1; optional ECdcStreamMode Mode = 2; @@ -897,6 +902,7 @@ message TCdcStreamDescription { optional string AwsRegion = 9; // Set to '0' to disable resolved timestamps optional uint64 ResolvedTimestampsIntervalMs = 10; + optional TCdcStreamScanProgress ScanProgress = 11; } message TCreateCdcStream { diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index af1413ee1000..d196d518b213 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -1272,6 +1272,12 @@ void TSchemeShard::DescribeCdcStream(const TPathId& pathId, const TString& name, desc.SetState(info->State); desc.SetSchemaVersion(info->AlterVersion); + if (info->State == TCdcStreamInfo::EState::ECdcStreamStateScan && info->ScanShards) { + auto& scanProgress = *desc.MutableScanProgress(); + scanProgress.SetShardsTotal(info->ScanShards.size()); + scanProgress.SetShardsCompleted(info->DoneShards.size()); + } + Y_ABORT_UNLESS(PathsById.contains(pathId)); auto path = PathsById.at(pathId); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp index f2cf5f8d5fb0..a4e9f6a1b2c2 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp @@ -1286,6 +1286,78 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) { InitialScan(false); } + Y_UNIT_TEST(InitialScanProgress) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions() + .EnableProtoSourceIdInfo(true) + .EnableChangefeedInitialScan(true)); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + THolder blockedScanRequest; + auto blockScanRequest = runtime.AddObserver( + [&](TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev) { + blockedScanRequest.Reset(ev.Release()); + } + ); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + State: ECdcStreamStateScan + } + )"); + env.TestWaitNotification(runtime, txId); + + { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&blockedScanRequest](IEventHandle&) { + return bool(blockedScanRequest); + }); + runtime.DispatchEvents(opts); + } + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::StreamInitialScanProgress(1, 0), + }); + blockScanRequest.Remove(); + + THolder blockedAlterStream; + auto blockAlterStream = runtime.AddObserver( + [&](TEvSchemeShard::TEvModifySchemeTransaction::TPtr& ev) { + if (ev->Get()->Record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpAlterCdcStream) { + blockedAlterStream.Reset(ev.Release()); + } + } + ); + + runtime.Send(blockedScanRequest.Release(), 0, true); + { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&blockedAlterStream](IEventHandle&) { + return bool(blockedAlterStream); + }); + runtime.DispatchEvents(opts); + } + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::StreamInitialScanProgress(1, 1), + }); + blockAlterStream.Remove(); + + runtime.Send(blockedAlterStream.Release(), 0, true); + } + Y_UNIT_TEST(AlterStream) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions() diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp index 439a0f0a3c39..8a415d0cad6c 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp @@ -908,6 +908,14 @@ TCheckFunc StreamAwsRegion(const TString& value) { }; } +TCheckFunc StreamInitialScanProgress(ui32 total, ui32 completed) { + return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { + const auto& scanProgress = record.GetPathDescription().GetCdcStreamDescription().GetScanProgress(); + UNIT_ASSERT_VALUES_EQUAL(total, scanProgress.GetShardsTotal()); + UNIT_ASSERT_VALUES_EQUAL(completed, scanProgress.GetShardsCompleted()); + }; +} + TCheckFunc RetentionPeriod(const TDuration& value) { return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { UNIT_ASSERT_VALUES_EQUAL(value.Seconds(), record.GetPathDescription().GetPersQueueGroup() diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h index cc79bcd6706b..3cb8f39c5754 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h @@ -153,6 +153,7 @@ namespace NLs { TCheckFunc StreamVirtualTimestamps(bool value); TCheckFunc StreamResolvedTimestamps(const TDuration& value); TCheckFunc StreamAwsRegion(const TString& value); + TCheckFunc StreamInitialScanProgress(ui32 total, ui32 completed); TCheckFunc RetentionPeriod(const TDuration& value); TCheckFunc HasBackupInFly(ui64 txId); diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index ed18ba73f2f8..5964d2d7267a 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -1002,6 +1002,12 @@ void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out, break; } + if (stream.HasScanProgress()) { + auto& scanProgress = *changefeed->mutable_initial_scan_progress(); + scanProgress.set_parts_total(stream.GetScanProgress().GetShardsTotal()); + scanProgress.set_parts_completed(stream.GetScanProgress().GetShardsCompleted()); + } + FillAttributesImpl(*changefeed, stream); } } diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index a84139216d32..a98b82a6dab6 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -207,6 +207,11 @@ message ChangefeedDescription { STATE_INITIAL_SCAN = 3; } + message InitialScanProgress { + uint32 parts_total = 1; + uint32 parts_completed = 2; + } + // Name of the feed string name = 1; // Mode specifies the information that will be written to the feed @@ -223,6 +228,8 @@ message ChangefeedDescription { string aws_region = 7; // Interval of emitting of resolved timestamps. If unspecified, resolved timestamps are not emitted. google.protobuf.Duration resolved_timestamps_interval = 8; + // Progress of initial scan. If unspecified, initial scan is not running (completed or never launched). + InitialScanProgress initial_scan_progress = 9; } message StoragePool { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp index 14d9092fc152..e40fc3af1ea3 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp @@ -517,12 +517,18 @@ namespace { TPrettyTableConfig().WithoutRowDelimiters()); for (const auto& changefeed : changefeeds) { - table.AddRow() + auto& row = table.AddRow() .Column(0, changefeed.GetName()) .Column(1, changefeed.GetMode()) .Column(2, changefeed.GetFormat()) - .Column(3, changefeed.GetState()) .Column(4, changefeed.GetVirtualTimestamps() ? "on" : "off"); + if (const auto& scanProgress = changefeed.GetInitialScanProgress()) { + const float percentage = 100 * scanProgress->GetProgress(); + row.Column(3, TStringBuilder() << changefeed.GetState() + << " (" << FloatToString(percentage, PREC_POINT_DIGITS, 2) << "%)"); + } else { + row.Column(3, changefeed.GetState()); + } } Cout << Endl << "Changefeeds:" << Endl << table; diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 523ad219d50f..35d0cb3b2984 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -2397,6 +2397,23 @@ TChangefeedDescription::TChangefeedDescription(const Ydb::Table::ChangefeedDescr : TChangefeedDescription(FromProto(proto)) {} +TChangefeedDescription::TInitialScanProgress::TInitialScanProgress(ui32 total, ui32 completed) + : PartsTotal(total) + , PartsCompleted(completed) +{} + +ui32 TChangefeedDescription::TInitialScanProgress::GetPartsTotal() const { + return PartsTotal; +} + +ui32 TChangefeedDescription::TInitialScanProgress::GetPartsCompleted() const { + return PartsCompleted; +} + +float TChangefeedDescription::TInitialScanProgress::GetProgress() const { + return float(PartsCompleted) / float(PartsTotal); +} + TChangefeedDescription& TChangefeedDescription::WithVirtualTimestamps() { VirtualTimestamps_ = true; return *this; @@ -2473,6 +2490,10 @@ const TString& TChangefeedDescription::GetAwsRegion() const { return AwsRegion_; } +const std::optional& TChangefeedDescription::GetInitialScanProgress() const { + return InitialScanProgress_; +} + template TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) { EChangefeedMode mode; @@ -2540,6 +2561,13 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) { ret.State_ = EChangefeedState::Unknown; break; } + + if (proto.has_initial_scan_progress()) { + ret.InitialScanProgress_ = std::make_optional( + proto.initial_scan_progress().parts_total(), + proto.initial_scan_progress().parts_completed() + ); + } } for (const auto& [key, value] : proto.attributes()) { @@ -2627,6 +2655,10 @@ void TChangefeedDescription::Out(IOutputStream& o) const { o << ", aws_region: " << AwsRegion_; } + if (InitialScanProgress_) { + o << ", initial_scan_progress: " << 100 * InitialScanProgress_->GetProgress() << "%"; + } + o << " }"; } diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index d76c0ff2974b..ef0c6f822e21 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -266,10 +266,24 @@ class TBuildIndexOperation : public TOperation { //////////////////////////////////////////////////////////////////////////////// -//! Represents index description +//! Represents changefeed description class TChangefeedDescription { friend class NYdb::TProtoAccessor; +public: + class TInitialScanProgress { + public: + explicit TInitialScanProgress(ui32 total, ui32 completed); + + ui32 GetPartsTotal() const; + ui32 GetPartsCompleted() const; + float GetProgress() const; + + private: + ui32 PartsTotal; + ui32 PartsCompleted; + }; + public: TChangefeedDescription(const TString& name, EChangefeedMode mode, EChangefeedFormat format); @@ -297,6 +311,7 @@ class TChangefeedDescription { bool GetInitialScan() const; const THashMap& GetAttributes() const; const TString& GetAwsRegion() const; + const std::optional& GetInitialScanProgress() const; void SerializeTo(Ydb::Table::Changefeed& proto) const; TString ToString() const; @@ -320,6 +335,7 @@ class TChangefeedDescription { bool InitialScan_ = false; THashMap Attributes_; TString AwsRegion_; + std::optional InitialScanProgress_; }; bool operator==(const TChangefeedDescription& lhs, const TChangefeedDescription& rhs);