Skip to content

Commit

Permalink
CDC Initial Scan progress
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Jun 27, 2024
1 parent e3b5d7c commit 03803ce
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 3 deletions.
6 changes: 6 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,11 @@ enum ECdcStreamFormat {
ECdcStreamFormatDebeziumJson = 4;
}

message TCdcStreamScanProgress {
optional uint32 ShardsTotal = 1;
optional uint32 ShardsCompleted = 2;
}

message TCdcStreamDescription {
optional string Name = 1;
optional ECdcStreamMode Mode = 2;
Expand All @@ -897,6 +902,7 @@ message TCdcStreamDescription {
optional string AwsRegion = 9;
// Set to '0' to disable resolved timestamps
optional uint64 ResolvedTimestampsIntervalMs = 10;
optional TCdcStreamScanProgress ScanProgress = 11;
}

message TCreateCdcStream {
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,12 @@ void TSchemeShard::DescribeCdcStream(const TPathId& pathId, const TString& name,
desc.SetState(info->State);
desc.SetSchemaVersion(info->AlterVersion);

if (info->State == TCdcStreamInfo::EState::ECdcStreamStateScan && info->ScanShards) {
auto& scanProgress = *desc.MutableScanProgress();
scanProgress.SetShardsTotal(info->ScanShards.size());
scanProgress.SetShardsCompleted(info->DoneShards.size());
}

Y_ABORT_UNLESS(PathsById.contains(pathId));
auto path = PathsById.at(pathId);

Expand Down
72 changes: 72 additions & 0 deletions ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,78 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) {
InitialScan(false);
}

Y_UNIT_TEST(InitialScanProgress) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions()
.EnableProtoSourceIdInfo(true)
.EnableChangefeedInitialScan(true));
ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Uint64" }
Columns { Name: "value" Type: "Uint64" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

THolder<IEventHandle> blockedScanRequest;
auto blockScanRequest = runtime.AddObserver<TEvDataShard::TEvCdcStreamScanRequest>(
[&](TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev) {
blockedScanRequest.Reset(ev.Release());
}
);

TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
TableName: "Table"
StreamDescription {
Name: "Stream"
Mode: ECdcStreamModeKeysOnly
Format: ECdcStreamFormatProto
State: ECdcStreamStateScan
}
)");
env.TestWaitNotification(runtime, txId);

{
TDispatchOptions opts;
opts.FinalEvents.emplace_back([&blockedScanRequest](IEventHandle&) {
return bool(blockedScanRequest);
});
runtime.DispatchEvents(opts);
}

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
NLs::StreamInitialScanProgress(1, 0),
});
blockScanRequest.Remove();

THolder<IEventHandle> blockedAlterStream;
auto blockAlterStream = runtime.AddObserver<TEvSchemeShard::TEvModifySchemeTransaction>(
[&](TEvSchemeShard::TEvModifySchemeTransaction::TPtr& ev) {
if (ev->Get()->Record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpAlterCdcStream) {
blockedAlterStream.Reset(ev.Release());
}
}
);

runtime.Send(blockedScanRequest.Release(), 0, true);
{
TDispatchOptions opts;
opts.FinalEvents.emplace_back([&blockedAlterStream](IEventHandle&) {
return bool(blockedAlterStream);
});
runtime.DispatchEvents(opts);
}

TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
NLs::StreamInitialScanProgress(1, 1),
});
blockAlterStream.Remove();

runtime.Send(blockedAlterStream.Release(), 0, true);
}

Y_UNIT_TEST(AlterStream) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions()
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,14 @@ TCheckFunc StreamAwsRegion(const TString& value) {
};
}

TCheckFunc StreamInitialScanProgress(ui32 total, ui32 completed) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
const auto& scanProgress = record.GetPathDescription().GetCdcStreamDescription().GetScanProgress();
UNIT_ASSERT_VALUES_EQUAL(total, scanProgress.GetShardsTotal());
UNIT_ASSERT_VALUES_EQUAL(completed, scanProgress.GetShardsCompleted());
};
}

TCheckFunc RetentionPeriod(const TDuration& value) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT_VALUES_EQUAL(value.Seconds(), record.GetPathDescription().GetPersQueueGroup()
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ namespace NLs {
TCheckFunc StreamVirtualTimestamps(bool value);
TCheckFunc StreamResolvedTimestamps(const TDuration& value);
TCheckFunc StreamAwsRegion(const TString& value);
TCheckFunc StreamInitialScanProgress(ui32 total, ui32 completed);
TCheckFunc RetentionPeriod(const TDuration& value);

TCheckFunc HasBackupInFly(ui64 txId);
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/ydb_convert/table_description.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,12 @@ void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
break;
}

if (stream.HasScanProgress()) {
auto& scanProgress = *changefeed->mutable_initial_scan_progress();
scanProgress.set_parts_total(stream.GetScanProgress().GetShardsTotal());
scanProgress.set_parts_completed(stream.GetScanProgress().GetShardsCompleted());
}

FillAttributesImpl(*changefeed, stream);
}
}
Expand Down
7 changes: 7 additions & 0 deletions ydb/public/api/protos/ydb_table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ message ChangefeedDescription {
STATE_INITIAL_SCAN = 3;
}

message InitialScanProgress {
uint32 parts_total = 1;
uint32 parts_completed = 2;
}

// Name of the feed
string name = 1;
// Mode specifies the information that will be written to the feed
Expand All @@ -223,6 +228,8 @@ message ChangefeedDescription {
string aws_region = 7;
// Interval of emitting of resolved timestamps. If unspecified, resolved timestamps are not emitted.
google.protobuf.Duration resolved_timestamps_interval = 8;
// Progress of initial scan. If unspecified, initial scan is not running (completed or never launched).
InitialScanProgress initial_scan_progress = 9;
}

message StoragePool {
Expand Down
8 changes: 6 additions & 2 deletions ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,12 +517,16 @@ namespace {
TPrettyTableConfig().WithoutRowDelimiters());

for (const auto& changefeed : changefeeds) {
table.AddRow()
auto& row = table.AddRow()
.Column(0, changefeed.GetName())
.Column(1, changefeed.GetMode())
.Column(2, changefeed.GetFormat())
.Column(3, changefeed.GetState())
.Column(4, changefeed.GetVirtualTimestamps() ? "on" : "off");
if (const auto& scanProgress = changefeed.GetInitialScanProgress()) {
row.Column(3, TStringBuilder() << changefeed.GetState() << " (" << 100 * scanProgress->GetProgress() << "%)");
} else {
row.Column(3, changefeed.GetState());
}
}

Cout << Endl << "Changefeeds:" << Endl << table;
Expand Down
32 changes: 32 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_table/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2397,6 +2397,23 @@ TChangefeedDescription::TChangefeedDescription(const Ydb::Table::ChangefeedDescr
: TChangefeedDescription(FromProto(proto))
{}

TChangefeedDescription::TInitialScanProgress::TInitialScanProgress(ui32 total, ui32 completed)
: PartsTotal(total)
, PartsCompleted(completed)
{}

ui32 TChangefeedDescription::TInitialScanProgress::GetPartsTotal() const {
return PartsTotal;
}

ui32 TChangefeedDescription::TInitialScanProgress::GetPartsCompleted() const {
return PartsCompleted;
}

float TChangefeedDescription::TInitialScanProgress::GetProgress() const {
return float(PartsCompleted) / float(PartsTotal);
}

TChangefeedDescription& TChangefeedDescription::WithVirtualTimestamps() {
VirtualTimestamps_ = true;
return *this;
Expand Down Expand Up @@ -2473,6 +2490,10 @@ const TString& TChangefeedDescription::GetAwsRegion() const {
return AwsRegion_;
}

const std::optional<TChangefeedDescription::TInitialScanProgress>& TChangefeedDescription::GetInitialScanProgress() const {
return InitialScanProgress_;
}

template <typename TProto>
TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) {
EChangefeedMode mode;
Expand Down Expand Up @@ -2540,6 +2561,13 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) {
ret.State_ = EChangefeedState::Unknown;
break;
}

if (proto.has_initial_scan_progress()) {
ret.InitialScanProgress_ = std::make_optional<TInitialScanProgress>(
proto.initial_scan_progress().parts_total(),
proto.initial_scan_progress().parts_completed()
);
}
}

for (const auto& [key, value] : proto.attributes()) {
Expand Down Expand Up @@ -2627,6 +2655,10 @@ void TChangefeedDescription::Out(IOutputStream& o) const {
o << ", aws_region: " << AwsRegion_;
}

if (InitialScanProgress_) {
o << ", initial_scan_progress: " << 100 * InitialScanProgress_->GetProgress() << "%";
}

o << " }";
}

Expand Down
18 changes: 17 additions & 1 deletion ydb/public/sdk/cpp/client/ydb_table/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,24 @@ class TBuildIndexOperation : public TOperation {

////////////////////////////////////////////////////////////////////////////////

//! Represents index description
//! Represents changefeed description
class TChangefeedDescription {
friend class NYdb::TProtoAccessor;

public:
class TInitialScanProgress {
public:
explicit TInitialScanProgress(ui32 total, ui32 completed);

ui32 GetPartsTotal() const;
ui32 GetPartsCompleted() const;
float GetProgress() const;

private:
ui32 PartsTotal;
ui32 PartsCompleted;
};

public:
TChangefeedDescription(const TString& name, EChangefeedMode mode, EChangefeedFormat format);

Expand Down Expand Up @@ -297,6 +311,7 @@ class TChangefeedDescription {
bool GetInitialScan() const;
const THashMap<TString, TString>& GetAttributes() const;
const TString& GetAwsRegion() const;
const std::optional<TInitialScanProgress>& GetInitialScanProgress() const;

void SerializeTo(Ydb::Table::Changefeed& proto) const;
TString ToString() const;
Expand All @@ -320,6 +335,7 @@ class TChangefeedDescription {
bool InitialScan_ = false;
THashMap<TString, TString> Attributes_;
TString AwsRegion_;
std::optional<TInitialScanProgress> InitialScanProgress_;
};

bool operator==(const TChangefeedDescription& lhs, const TChangefeedDescription& rhs);
Expand Down

0 comments on commit 03803ce

Please sign in to comment.