From 78c574af88f169b53c302a47add06a742bd50893 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Fri, 28 Jun 2024 21:05:29 +0300 Subject: [PATCH 1/9] CDC Initial Scan progress (#6028) --- ydb/core/protos/flat_scheme_op.proto | 6 ++ .../schemeshard_path_describer.cpp | 6 ++ .../ut_cdc_stream/ut_cdc_stream.cpp | 72 +++++++++++++++++++ .../tx/schemeshard/ut_helpers/ls_checks.cpp | 8 +++ .../tx/schemeshard/ut_helpers/ls_checks.h | 1 + ydb/core/ydb_convert/table_description.cpp | 6 ++ ydb/public/api/protos/ydb_table.proto | 7 ++ .../ydb_cli/commands/ydb_service_scheme.cpp | 10 ++- ydb/public/sdk/cpp/client/ydb_table/table.cpp | 36 ++++++++++ ydb/public/sdk/cpp/client/ydb_table/table.h | 18 ++++- 10 files changed, 167 insertions(+), 3 deletions(-) diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index f821aef9958a..3641adeec5a8 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -807,6 +807,11 @@ enum ECdcStreamFormat { ECdcStreamFormatDebeziumJson = 4; } +message TCdcStreamScanProgress { + optional uint32 ShardsTotal = 1; + optional uint32 ShardsCompleted = 2; +} + message TCdcStreamDescription { optional string Name = 1; optional ECdcStreamMode Mode = 2; @@ -820,6 +825,7 @@ message TCdcStreamDescription { optional string AwsRegion = 9; // Set to '0' to disable resolved timestamps optional uint64 ResolvedTimestampsIntervalMs = 10; + optional TCdcStreamScanProgress ScanProgress = 11; } message TCreateCdcStream { diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index e24a1f59f8dd..cff2c84a617f 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -1230,6 +1230,12 @@ void TSchemeShard::DescribeCdcStream(const TPathId& pathId, const TString& name, desc.SetState(info->State); desc.SetSchemaVersion(info->AlterVersion); + if (info->ScanShards) { + auto& scanProgress = *desc.MutableScanProgress(); + scanProgress.SetShardsTotal(info->ScanShards.size()); + scanProgress.SetShardsCompleted(info->DoneShards.size()); + } + Y_ABORT_UNLESS(PathsById.contains(pathId)); auto path = PathsById.at(pathId); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp index 61891cabada1..57b202cc3cfc 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream/ut_cdc_stream.cpp @@ -1263,6 +1263,78 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithInitialScanTests) { InitialScan(false); } + Y_UNIT_TEST(InitialScanProgress) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions() + .EnableProtoSourceIdInfo(true) + .EnableChangefeedInitialScan(true)); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + THolder blockedScanRequest; + auto blockScanRequest = runtime.AddObserver( + [&](TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev) { + blockedScanRequest.Reset(ev.Release()); + } + ); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + State: ECdcStreamStateScan + } + )"); + env.TestWaitNotification(runtime, txId); + + { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&blockedScanRequest](IEventHandle&) { + return bool(blockedScanRequest); + }); + runtime.DispatchEvents(opts); + } + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::StreamInitialScanProgress(1, 0), + }); + blockScanRequest.Remove(); + + THolder blockedAlterStream; + auto blockAlterStream = runtime.AddObserver( + [&](TEvSchemeShard::TEvModifySchemeTransaction::TPtr& ev) { + if (ev->Get()->Record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpAlterCdcStream) { + blockedAlterStream.Reset(ev.Release()); + } + } + ); + + runtime.Send(blockedScanRequest.Release(), 0, true); + { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&blockedAlterStream](IEventHandle&) { + return bool(blockedAlterStream); + }); + runtime.DispatchEvents(opts); + } + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::StreamInitialScanProgress(1, 1), + }); + blockAlterStream.Remove(); + + runtime.Send(blockedAlterStream.Release(), 0, true); + } + Y_UNIT_TEST(AlterStream) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions() diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp index 524048ff8cf3..e3554770b4c2 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp @@ -850,6 +850,14 @@ TCheckFunc StreamAwsRegion(const TString& value) { }; } +TCheckFunc StreamInitialScanProgress(ui32 total, ui32 completed) { + return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { + const auto& scanProgress = record.GetPathDescription().GetCdcStreamDescription().GetScanProgress(); + UNIT_ASSERT_VALUES_EQUAL(total, scanProgress.GetShardsTotal()); + UNIT_ASSERT_VALUES_EQUAL(completed, scanProgress.GetShardsCompleted()); + }; +} + TCheckFunc RetentionPeriod(const TDuration& value) { return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { UNIT_ASSERT_VALUES_EQUAL(value.Seconds(), record.GetPathDescription().GetPersQueueGroup() diff --git a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h index 3f0cd290c1f6..37fe18611a36 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h +++ b/ydb/core/tx/schemeshard/ut_helpers/ls_checks.h @@ -141,6 +141,7 @@ namespace NLs { TCheckFunc StreamVirtualTimestamps(bool value); TCheckFunc StreamResolvedTimestamps(const TDuration& value); TCheckFunc StreamAwsRegion(const TString& value); + TCheckFunc StreamInitialScanProgress(ui32 total, ui32 completed); TCheckFunc RetentionPeriod(const TDuration& value); TCheckFunc HasBackupInFly(ui64 txId); diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index e9350f2733f6..c37814a7b0be 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -927,6 +927,12 @@ void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out, break; } + if (stream.HasScanProgress()) { + auto& scanProgress = *changefeed->mutable_initial_scan_progress(); + scanProgress.set_parts_total(stream.GetScanProgress().GetShardsTotal()); + scanProgress.set_parts_completed(stream.GetScanProgress().GetShardsCompleted()); + } + FillAttributesImpl(*changefeed, stream); } } diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index e878cb5bdf17..b62b906dcd0d 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -191,6 +191,11 @@ message ChangefeedDescription { STATE_INITIAL_SCAN = 3; } + message InitialScanProgress { + uint32 parts_total = 1; + uint32 parts_completed = 2; + } + // Name of the feed string name = 1; // Mode specifies the information that will be written to the feed @@ -207,6 +212,8 @@ message ChangefeedDescription { string aws_region = 7; // Interval of emitting of resolved timestamps. If unspecified, resolved timestamps are not emitted. google.protobuf.Duration resolved_timestamps_interval = 8; + // Progress of initial scan. If unspecified, initial scan was not launched. + InitialScanProgress initial_scan_progress = 9; } message StoragePool { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp index 460a78de53f2..e3d225feda37 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp @@ -517,12 +517,18 @@ namespace { TPrettyTableConfig().WithoutRowDelimiters()); for (const auto& changefeed : changefeeds) { - table.AddRow() + auto& row = table.AddRow() .Column(0, changefeed.GetName()) .Column(1, changefeed.GetMode()) .Column(2, changefeed.GetFormat()) - .Column(3, changefeed.GetState()) .Column(4, changefeed.GetVirtualTimestamps() ? "on" : "off"); + if (changefeed.GetState() == NTable::EChangefeedState::InitialScan && changefeed.GetInitialScanProgress()) { + const float percentage = changefeed.GetInitialScanProgress()->GetProgress(); + row.Column(3, TStringBuilder() << changefeed.GetState() + << " (" << FloatToString(percentage, PREC_POINT_DIGITS, 2) << "%)"); + } else { + row.Column(3, changefeed.GetState()); + } } Cout << Endl << "Changefeeds:" << Endl << table; diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 647cae8973d1..0c6543b0c013 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -2340,6 +2340,27 @@ TChangefeedDescription::TChangefeedDescription(const Ydb::Table::ChangefeedDescr : TChangefeedDescription(FromProto(proto)) {} +TChangefeedDescription::TInitialScanProgress::TInitialScanProgress(ui32 total, ui32 completed) + : PartsTotal(total) + , PartsCompleted(completed) +{} + +ui32 TChangefeedDescription::TInitialScanProgress::GetPartsTotal() const { + return PartsTotal; +} + +ui32 TChangefeedDescription::TInitialScanProgress::GetPartsCompleted() const { + return PartsCompleted; +} + +float TChangefeedDescription::TInitialScanProgress::GetProgress() const { + if (PartsTotal == 0) { + return 0; + } + + return 100 * float(PartsCompleted) / float(PartsTotal); +} + TChangefeedDescription& TChangefeedDescription::WithVirtualTimestamps() { VirtualTimestamps_ = true; return *this; @@ -2416,6 +2437,10 @@ const TString& TChangefeedDescription::GetAwsRegion() const { return AwsRegion_; } +const std::optional& TChangefeedDescription::GetInitialScanProgress() const { + return InitialScanProgress_; +} + template TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) { EChangefeedMode mode; @@ -2483,6 +2508,13 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) { ret.State_ = EChangefeedState::Unknown; break; } + + if (proto.has_initial_scan_progress()) { + ret.InitialScanProgress_ = std::make_optional( + proto.initial_scan_progress().parts_total(), + proto.initial_scan_progress().parts_completed() + ); + } } for (const auto& [key, value] : proto.attributes()) { @@ -2570,6 +2602,10 @@ void TChangefeedDescription::Out(IOutputStream& o) const { o << ", aws_region: " << AwsRegion_; } + if (InitialScanProgress_) { + o << ", initial_scan_progress: " << InitialScanProgress_->GetProgress() << "%"; + } + o << " }"; } diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index 21a721d6e503..369c025f1bba 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -213,10 +213,24 @@ class TBuildIndexOperation : public TOperation { //////////////////////////////////////////////////////////////////////////////// -//! Represents index description +//! Represents changefeed description class TChangefeedDescription { friend class NYdb::TProtoAccessor; +public: + class TInitialScanProgress { + public: + explicit TInitialScanProgress(ui32 total, ui32 completed); + + ui32 GetPartsTotal() const; + ui32 GetPartsCompleted() const; + float GetProgress() const; // percentage + + private: + ui32 PartsTotal; + ui32 PartsCompleted; + }; + public: TChangefeedDescription(const TString& name, EChangefeedMode mode, EChangefeedFormat format); @@ -244,6 +258,7 @@ class TChangefeedDescription { bool GetInitialScan() const; const THashMap& GetAttributes() const; const TString& GetAwsRegion() const; + const std::optional& GetInitialScanProgress() const; void SerializeTo(Ydb::Table::Changefeed& proto) const; TString ToString() const; @@ -267,6 +282,7 @@ class TChangefeedDescription { bool InitialScan_ = false; THashMap Attributes_; TString AwsRegion_; + std::optional InitialScanProgress_; }; bool operator==(const TChangefeedDescription& lhs, const TChangefeedDescription& rhs); From 8f88212d6fd947a5d18e461900e49918079c3386 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Mon, 1 Jul 2024 21:13:13 +0300 Subject: [PATCH 2/9] Replication stats (total & per item): lag, initial scan progress (#6092) --- ydb/core/base/services_assert.cpp | 2 +- ydb/core/grpc_services/rpc_replication.cpp | 13 +- ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 31 +++ ydb/core/protos/replication.proto | 6 + .../tx/replication/controller/controller.cpp | 6 + .../replication/controller/controller_impl.h | 2 + .../replication/controller/private_events.cpp | 13 ++ .../replication/controller/private_events.h | 17 ++ .../tx/replication/controller/replication.h | 1 + .../tx/replication/controller/target_base.cpp | 6 +- .../tx/replication/controller/target_base.h | 1 + .../controller/tx_describe_replication.cpp | 218 +++++++++++++++++- ydb/library/services/services.proto | 1 + .../api/protos/draft/ydb_replication.proto | 10 +- .../ydb_cli/commands/ydb_service_scheme.cpp | 51 +++- .../sdk/cpp/client/draft/ydb_replication.cpp | 33 ++- .../sdk/cpp/client/draft/ydb_replication.h | 27 ++- ydb/public/sdk/cpp/client/ydb_table/table.cpp | 11 + ydb/public/sdk/cpp/client/ydb_table/table.h | 3 + 19 files changed, 417 insertions(+), 35 deletions(-) diff --git a/ydb/core/base/services_assert.cpp b/ydb/core/base/services_assert.cpp index 540ba664a238..891d9045c631 100644 --- a/ydb/core/base/services_assert.cpp +++ b/ydb/core/base/services_assert.cpp @@ -2,4 +2,4 @@ #include static_assert(static_cast(NKikimrServices::EServiceKikimr_MIN) > static_cast(NActorsServices::EServiceCommon_MAX), "KIKIMR SERVICES IDs SHOULD BE GREATER THAN COMMON ONES"); -static_assert(NKikimrServices::TActivity::EType_ARRAYSIZE < 640, "ACTOR ACTIVITY TYPES MUST BE NOT VERY BIG TO BE ARRAY INDICES"); // If we would have many different actor activities, it is OK to increase this value. +static_assert(NKikimrServices::TActivity::EType_ARRAYSIZE < 768, "ACTOR ACTIVITY TYPES MUST BE NOT VERY BIG TO BE ARRAY INDICES"); // If we would have many different actor activities, it is OK to increase this value. diff --git a/ydb/core/grpc_services/rpc_replication.cpp b/ydb/core/grpc_services/rpc_replication.cpp index 72021a4762e6..83c28cc04845 100644 --- a/ydb/core/grpc_services/rpc_replication.cpp +++ b/ydb/core/grpc_services/rpc_replication.cpp @@ -102,6 +102,7 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor(); PathIdFromPathId(pathId, ev->Record.MutablePathId()); + ev->Record.SetIncludeStats(GetProtoRequest()->include_stats()); NTabletPipe::SendData(SelfId(), ControllerPipeClient, ev.release()); Become(&TDescribeReplicationRPC::StateDescribeReplication); @@ -167,6 +168,13 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActormutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration( + from.GetLagMilliSeconds()); + } + if (from.HasInitialScanProgress()) { + to.mutable_stats()->set_initial_scan_progress(from.GetInitialScanProgress()); + } } static void ConvertState(NKikimrReplication::TReplicationState& from, Ydb::Replication::DescribeReplicationResult& to) { @@ -174,9 +182,12 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActormutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration( + *to.mutable_running()->mutable_stats()->mutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration( from.GetStandBy().GetLagMilliSeconds()); } + if (from.GetStandBy().HasInitialScanProgress()) { + to.mutable_running()->mutable_stats()->set_initial_scan_progress(from.GetStandBy().GetInitialScanProgress()); + } break; case NKikimrReplication::TReplicationState::kError: *to.mutable_error()->mutable_issues() = std::move(*from.MutableError()->MutableIssues()); diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 60ad83907ff0..0d4e3e3dcd4b 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -5275,7 +5275,10 @@ Y_UNIT_TEST_SUITE(KqpScheme) { } Y_UNIT_TEST(CreateAsyncReplicationWithSecret) { + using namespace NReplication; + TKikimrRunner kikimr("root@builtin"); + auto repl = TReplicationClient(kikimr.GetDriver(), TCommonClientSettings().Database("/Root")); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -5319,6 +5322,34 @@ Y_UNIT_TEST_SUITE(KqpScheme) { Sleep(TDuration::Seconds(1)); } + + while (true) { + auto settings = TDescribeReplicationSettings().IncludeStats(true); + const auto result = repl.DescribeReplication("/Root/replication", settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + const auto& desc = result.GetReplicationDescription(); + UNIT_ASSERT_VALUES_EQUAL(desc.GetState(), TReplicationDescription::EState::Running); + + const auto& total = desc.GetRunningState().GetStats(); + if (!total.GetInitialScanProgress() || *total.GetInitialScanProgress() < 100) { + Sleep(TDuration::Seconds(1)); + continue; + } + + UNIT_ASSERT(total.GetInitialScanProgress()); + UNIT_ASSERT_DOUBLES_EQUAL(*total.GetInitialScanProgress(), 100.0, 0.01); + + const auto& items = desc.GetItems(); + UNIT_ASSERT_VALUES_EQUAL(items.size(), 1); + const auto& item = items.at(0).Stats; + + UNIT_ASSERT(item.GetInitialScanProgress()); + UNIT_ASSERT_DOUBLES_EQUAL(*item.GetInitialScanProgress(), *total.GetInitialScanProgress(), 0.01); + + // TODO: check lag too + break; + } } Y_UNIT_TEST(AlterAsyncReplication) { diff --git a/ydb/core/protos/replication.proto b/ydb/core/protos/replication.proto index 69309d074aa1..3e9c0e1d080a 100644 --- a/ydb/core/protos/replication.proto +++ b/ydb/core/protos/replication.proto @@ -33,10 +33,14 @@ message TReplicationConfig { message TTargetSpecific { message TTarget { + // in/out optional string SrcPath = 1; optional string DstPath = 2; optional string SrcStreamName = 3; + // out optional uint64 Id = 4; + optional uint32 LagMilliSeconds = 5; + optional float InitialScanProgress = 6; // pencentage } repeated TTarget Targets = 1; @@ -59,6 +63,7 @@ message TReplicationConfig { message TReplicationState { message TStandBy { optional uint32 LagMilliSeconds = 1; + optional float InitialScanProgress = 2; // pencentage } message TPaused { @@ -146,6 +151,7 @@ message TEvDropReplicationResult { message TEvDescribeReplication { optional NKikimrProto.TPathID PathId = 1; + optional bool IncludeStats = 2; } message TEvDescribeReplicationResult { diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp index e49503a3fe85..4f73d84d68b0 100644 --- a/ydb/core/tx/replication/controller/controller.cpp +++ b/ydb/core/tx/replication/controller/controller.cpp @@ -59,6 +59,7 @@ STFUNC(TController::StateWork) { HFunc(TEvPrivate::TEvUpdateTenantNodes, Handle); HFunc(TEvPrivate::TEvProcessQueues, Handle); HFunc(TEvPrivate::TEvRemoveWorker, Handle); + HFunc(TEvPrivate::TEvDescribeTargetsResult, Handle); HFunc(TEvDiscovery::TEvDiscoveryData, Handle); HFunc(TEvDiscovery::TEvError, Handle); HFunc(TEvService::TEvStatus, Handle); @@ -132,6 +133,11 @@ void TController::Handle(TEvController::TEvDescribeReplication::TPtr& ev, const RunTxDescribeReplication(ev, ctx); } +void TController::Handle(TEvPrivate::TEvDescribeTargetsResult::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + RunTxDescribeReplication(ev, ctx); +} + void TController::Handle(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx) { CLOG_T(ctx, "Handle " << ev->Get()->ToString()); RunTxDiscoveryTargetsResult(ev, ctx); diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h index 8663690a246d..927d2d5bf528 100644 --- a/ydb/core/tx/replication/controller/controller_impl.h +++ b/ydb/core/tx/replication/controller/controller_impl.h @@ -82,6 +82,7 @@ class TController void Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvProcessQueues::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvRemoveWorker::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvDescribeTargetsResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx); void Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx); void Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx); @@ -128,6 +129,7 @@ class TController void RunTxDropReplication(TEvController::TEvDropReplication::TPtr& ev, const TActorContext& ctx); void RunTxDropReplication(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx); void RunTxDescribeReplication(TEvController::TEvDescribeReplication::TPtr& ev, const TActorContext& ctx); + void RunTxDescribeReplication(TEvPrivate::TEvDescribeTargetsResult::TPtr& ev, const TActorContext& ctx); void RunTxDiscoveryTargetsResult(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx); void RunTxAssignStreamName(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActorContext& ctx); void RunTxCreateStreamResult(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/tx/replication/controller/private_events.cpp b/ydb/core/tx/replication/controller/private_events.cpp index 30ce9caabe43..f562331cc26d 100644 --- a/ydb/core/tx/replication/controller/private_events.cpp +++ b/ydb/core/tx/replication/controller/private_events.cpp @@ -163,6 +163,19 @@ TString TEvPrivate::TEvRemoveWorker::ToString() const { << " }"; } +TEvPrivate::TEvDescribeTargetsResult::TEvDescribeTargetsResult(const TActorId& sender, ui64 rid, TResult&& result) + : Sender(sender) + , ReplicationId(rid) + , Result(std::move(result)) +{ +} + +TString TEvPrivate::TEvDescribeTargetsResult::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " ReplicationId: " << ReplicationId + << " }"; +} + } Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::NController::TEvPrivate::TEvDiscoveryTargetsResult::TAddEntry, stream, value) { diff --git a/ydb/core/tx/replication/controller/private_events.h b/ydb/core/tx/replication/controller/private_events.h index 8eb3631dd7b5..7383d6f7ffc0 100644 --- a/ydb/core/tx/replication/controller/private_events.h +++ b/ydb/core/tx/replication/controller/private_events.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -8,6 +9,10 @@ #include #include +#include + +#include + namespace NKikimr::NReplication::NController { struct TEvPrivate { @@ -25,6 +30,7 @@ struct TEvPrivate { EvResolveSecretResult, EvAlterDstResult, EvRemoveWorker, + EvDescribeTargetsResult, EvEnd, }; @@ -191,6 +197,17 @@ struct TEvPrivate { TString ToString() const override; }; + struct TEvDescribeTargetsResult: public TEventLocal { + using TResult = THashMap>; + + const TActorId Sender; + const ui64 ReplicationId; + TResult Result; + + explicit TEvDescribeTargetsResult(const TActorId& sender, ui64 rid, TResult&& result); + TString ToString() const override; + }; + }; // TEvPrivate } diff --git a/ydb/core/tx/replication/controller/replication.h b/ydb/core/tx/replication/controller/replication.h index f3478c4b0a96..71c5d794bbac 100644 --- a/ydb/core/tx/replication/controller/replication.h +++ b/ydb/core/tx/replication/controller/replication.h @@ -79,6 +79,7 @@ class TReplication: public TSimpleRefCount { virtual void AddWorker(ui64 id) = 0; virtual void RemoveWorker(ui64 id) = 0; virtual void UpdateLag(ui64 workerId, TDuration lag) = 0; + virtual const TMaybe GetLag() const = 0; virtual void Progress(const TActorContext& ctx) = 0; virtual void Shutdown(const TActorContext& ctx) = 0; diff --git a/ydb/core/tx/replication/controller/target_base.cpp b/ydb/core/tx/replication/controller/target_base.cpp index 590cdd393884..93b2c59a5b45 100644 --- a/ydb/core/tx/replication/controller/target_base.cpp +++ b/ydb/core/tx/replication/controller/target_base.cpp @@ -117,10 +117,14 @@ void TTargetBase::UpdateLag(ui64 workerId, TDuration lag) { } if (TLagProvider::UpdateLag(it->second, workerId, lag)) { - Replication->UpdateLag(GetId(), GetLag().GetRef()); + Replication->UpdateLag(GetId(), TLagProvider::GetLag().GetRef()); } } +const TMaybe TTargetBase::GetLag() const { + return TLagProvider::GetLag(); +} + void TTargetBase::Progress(const TActorContext& ctx) { switch (DstState) { case EDstState::Creating: diff --git a/ydb/core/tx/replication/controller/target_base.h b/ydb/core/tx/replication/controller/target_base.h index d1b71710c2bc..4626433cd8d0 100644 --- a/ydb/core/tx/replication/controller/target_base.h +++ b/ydb/core/tx/replication/controller/target_base.h @@ -50,6 +50,7 @@ class TTargetBase void AddWorker(ui64 id) override; void RemoveWorker(ui64 id) override; void UpdateLag(ui64 workerId, TDuration lag) override; + const TMaybe GetLag() const override; void Progress(const TActorContext& ctx) override; void Shutdown(const TActorContext& ctx) override; diff --git a/ydb/core/tx/replication/controller/tx_describe_replication.cpp b/ydb/core/tx/replication/controller/tx_describe_replication.cpp index 65b6518c737b..f78f1516445f 100644 --- a/ydb/core/tx/replication/controller/tx_describe_replication.cpp +++ b/ydb/core/tx/replication/controller/tx_describe_replication.cpp @@ -1,17 +1,125 @@ #include "controller_impl.h" +#include "logging.h" +#include "private_events.h" -#include +#include +#include +#include +#include + +#include +#include namespace NKikimr::NReplication::NController { +class TTargetDescriber: public TActorBootstrapped { + void DescribeTarget(ui64 id) { + Y_ABORT_UNLESS(Targets.contains(id)); + Send(YdbProxy, new TEvYdbProxy::TEvDescribeTableRequest(Targets.at(id), {}), 0, id); + } + + void Handle(TEvYdbProxy::TEvDescribeTableResponse::TPtr& ev) { + LOG_T("Handle " << ev->Get()->ToString()); + + if (!Targets.contains(ev->Cookie)) { + LOG_W("Unknown describe response" + << ": cookie# " << ev->Cookie); + return; + } + + const auto id = ev->Cookie; + const auto& path = Targets.at(id); + + if (Result.contains(id)) { + LOG_W("Duplicate describe response" + << ": id# " << id + << ", path# " << path); + return; + } + + auto& result = ev->Get()->Result; + if (result.IsSuccess()) { + LOG_D("Describe succeeded" + << ": id# " << id + << ", path# " << path); + Result.emplace(id, std::move(result)); + } else { + LOG_E("Describe failed" + << ": id# " << id + << ", path# " << path + << ", status# " << result.GetStatus() + << ", issues# " << result.GetIssues().ToOneLineString()); + Result.emplace(id, std::nullopt); + } + + if (Result.size() == Targets.size()) { + Send(Parent, new TEvPrivate::TEvDescribeTargetsResult(Sender, ReplicationId, std::move(Result))); + PassAway(); + } + } + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::REPLICATION_CONTROLLER_TARGET_DESCRIBER; + } + + explicit TTargetDescriber(const TActorId& sender, const TActorId& parent, ui64 rid, const TActorId& proxy, THashMap&& targets) + : Sender(sender) + , Parent(parent) + , ReplicationId(rid) + , YdbProxy(proxy) + , Targets (std::move(targets)) + , LogPrefix("TargetDescriber", ReplicationId) + { + } + + void Bootstrap() { + for (const auto& [id, _] : Targets) { + DescribeTarget(id); + } + + Become(&TThis::StateWork); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvYdbProxy::TEvDescribeTableResponse, Handle); + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + const TActorId Sender; + const TActorId Parent; + const ui64 ReplicationId; + const TActorId YdbProxy; + const THashMap Targets; + const TActorLogPrefix LogPrefix; + + TEvPrivate::TEvDescribeTargetsResult::TResult Result; + +}; // TTargetDescriber + class TController::TTxDescribeReplication: public TTxBase { - TEvController::TEvDescribeReplication::TPtr Ev; + const TActorId Sender; + TEvController::TEvDescribeReplication::TPtr PubEv; + TEvPrivate::TEvDescribeTargetsResult::TPtr PrivEv; + TReplication::TPtr Replication; THolder Result; + THashMap TargetsToDescribe; public: explicit TTxDescribeReplication(TController* self, TEvController::TEvDescribeReplication::TPtr& ev) : TTxBase("TxDescribeReplication", self) - , Ev(ev) + , Sender(ev->Sender) + , PubEv(ev) + { + } + + explicit TTxDescribeReplication(TController* self, TEvPrivate::TEvDescribeTargetsResult::TPtr& ev) + : TTxBase("TxDescribeReplication", self) + , Sender(ev->Get()->Sender) + , PrivEv(ev) { } @@ -19,23 +127,74 @@ class TController::TTxDescribeReplication: public TTxBase { return TXTYPE_DESCRIBE_REPLICATION; } - bool Execute(TTransactionContext&, const TActorContext& ctx) override { - CLOG_D(ctx, "Execute: " << Ev->Get()->ToString()); + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + if (PubEv) { + return ExecutePub(txc, ctx); + } else if (PrivEv) { + return ExecutePriv(txc, ctx); + } else { + Y_ABORT("unreachable"); + } + } - const auto& record = Ev->Get()->Record; - Result = MakeHolder(); + bool ExecutePub(TTransactionContext&, const TActorContext& ctx) { + CLOG_D(ctx, "Execute: " << PubEv->Get()->ToString()); + const auto& record = PubEv->Get()->Record; const auto pathId = PathIdFromPathId(record.GetPathId()); - auto replication = Self->Find(pathId); - if (!replication) { + Replication = Self->Find(pathId); + if (!Replication) { + Result = MakeHolder(); + Result->Record.SetStatus(NKikimrReplication::TEvDescribeReplicationResult::NOT_FOUND); + return true; + } + + if (record.GetIncludeStats()) { + for (ui64 tid = 0; tid < Replication->GetNextTargetId(); ++tid) { + auto* target = Replication->FindTarget(tid); + if (!target) { + continue; + } + + TargetsToDescribe.emplace(tid, target->GetSrcPath()); + } + + if (TargetsToDescribe) { + return true; + } + } + + return DescribeReplication(Replication); + } + + bool ExecutePriv(TTransactionContext&, const TActorContext& ctx) { + CLOG_D(ctx, "Execute: " << PrivEv->Get()->ToString()); + + const auto rid = PrivEv->Get()->ReplicationId; + + Replication = Self->Find(rid); + if (!Replication) { + Result = MakeHolder(); Result->Record.SetStatus(NKikimrReplication::TEvDescribeReplicationResult::NOT_FOUND); return true; } + return DescribeReplication(Replication); + } + + bool DescribeReplication(TReplication::TPtr replication) { + Result = MakeHolder(); Result->Record.SetStatus(NKikimrReplication::TEvDescribeReplicationResult::SUCCESS); Result->Record.MutableConnectionParams()->CopyFrom(replication->GetConfig().GetSrcConnectionParams()); + using TInitialScanProgress = NYdb::NTable::TChangefeedDescription::TInitialScanProgress; + std::optional totalScanProgress; + + if (PrivEv) { + totalScanProgress = std::make_optional(); + } + for (ui64 tid = 0; tid < replication->GetNextTargetId(); ++tid) { auto* target = replication->FindTarget(tid); if (!target) { @@ -49,6 +208,34 @@ class TController::TTxDescribeReplication: public TTxBase { if (target->GetStreamName()) { item.SetSrcStreamName(target->GetStreamName()); } + if (const auto lag = target->GetLag()) { + item.SetLagMilliSeconds(lag->MilliSeconds()); + } + + if (PrivEv) { + const auto& result = PrivEv->Get()->Result; + + auto it = result.find(tid); + if (it == result.end() || !it->second) { + totalScanProgress.reset(); + continue; + } + + const auto& changefeeds = it->second->GetTableDescription().GetChangefeedDescriptions(); + auto* cfPtr = FindIfPtr(changefeeds, [target](const NYdb::NTable::TChangefeedDescription& cf) { + return cf.GetName() == target->GetStreamName(); + }); + + if (!cfPtr || !cfPtr->GetInitialScanProgress()) { + totalScanProgress.reset(); + continue; + } + + item.SetInitialScanProgress(cfPtr->GetInitialScanProgress()->GetProgress()); + if (totalScanProgress) { + *totalScanProgress += *cfPtr->GetInitialScanProgress(); + } + } } auto& state = *Result->Record.MutableState(); @@ -59,6 +246,9 @@ class TController::TTxDescribeReplication: public TTxBase { if (const auto lag = replication->GetLag()) { state.MutableStandBy()->SetLagMilliSeconds(lag->MilliSeconds()); } + if (totalScanProgress) { + state.MutableStandBy()->SetInitialScanProgress(totalScanProgress->GetProgress()); + } break; case TReplication::EState::Done: state.MutableDone(); @@ -78,7 +268,11 @@ class TController::TTxDescribeReplication: public TTxBase { CLOG_D(ctx, "Complete"); if (Result) { - ctx.Send(Ev->Sender, Result.Release(), 0, Ev->Cookie); + ctx.Send(Sender, Result.Release()); + } else if (TargetsToDescribe) { + Y_ABORT_UNLESS(Replication); + ctx.Register(new TTargetDescriber(Sender, ctx.SelfID, + Replication->GetId(), Replication->GetYdbProxy(), std::move(TargetsToDescribe))); } } @@ -88,4 +282,8 @@ void TController::RunTxDescribeReplication(TEvController::TEvDescribeReplication Execute(new TTxDescribeReplication(this, ev), ctx); } +void TController::RunTxDescribeReplication(TEvPrivate::TEvDescribeTargetsResult::TPtr& ev, const TActorContext& ctx) { + Execute(new TTxDescribeReplication(this, ev), ctx); +} + } diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 7a6e9811a145..3315d84d9319 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -1024,5 +1024,6 @@ message TActivity { REPLICATION_CONTROLLER_TABLE_WORKER_REGISTAR = 633; REPLICATION_CONTROLLER_SECRET_RESOLVER = 636; REPLICATION_CONTROLLER_DST_ALTERER = 637; + REPLICATION_CONTROLLER_TARGET_DESCRIBER = 642; }; }; diff --git a/ydb/public/api/protos/draft/ydb_replication.proto b/ydb/public/api/protos/draft/ydb_replication.proto index 08a78ca58621..c1421a05e2ab 100644 --- a/ydb/public/api/protos/draft/ydb_replication.proto +++ b/ydb/public/api/protos/draft/ydb_replication.proto @@ -15,6 +15,8 @@ message DescribeReplicationRequest { Ydb.Operations.OperationParams operation_params = 1; // Replication path. string path = 2 [(required) = true]; + // Include statistics. + bool include_stats = 3; } message DescribeReplicationResponse { @@ -42,15 +44,21 @@ message ConnectionParams { } message DescribeReplicationResult { + message Stats { + optional google.protobuf.Duration lag = 1; + optional float initial_scan_progress = 2; + } + message Item { string source_path = 1; string destination_path = 2; optional string source_changefeed_name = 3; uint64 id = 4; + Stats stats = 5; } message RunningState { - optional google.protobuf.Duration lag = 1; + Stats stats = 1; } message ErrorState { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp index e3d225feda37..90715d8e7ed1 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp @@ -136,7 +136,7 @@ void TCommandDescribe::Config(TConfig& config) { // Table options config.Opts->AddLongOption("partition-boundaries", "[Table] Show partition key boundaries").StoreTrue(&ShowKeyShardBoundaries) .AddLongName("shard-boundaries"); - config.Opts->AddLongOption("stats", "[Table|Topic] Show table/topic statistics").StoreTrue(&ShowStats); + config.Opts->AddLongOption("stats", "[Table|Topic|Replication] Show table/topic/replication statistics").StoreTrue(&ShowStats); config.Opts->AddLongOption("partition-stats", "[Table|Topic] Show partition statistics").StoreTrue(&ShowPartitionStats); AddDeprecatedJsonOption(config, "(Deprecated, will be removed soon. Use --format option instead) [Table] Output in json format"); @@ -402,14 +402,34 @@ int TCommandDescribe::DescribeCoordinationNode(const TDriver& driver) { return PrintDescription(this, OutputFormat, desc, &TCommandDescribe::PrintCoordinationNodeResponsePretty); } +template +static TString ValueOr(const std::optional& value, const U& orValue) { + if (value) { + return TStringBuilder() << *value; + } else { + return TStringBuilder() << orValue; + } +} + +template +static TString ProgressOr(const std::optional& value, const U& orValue) { + if (value) { + return TStringBuilder() << FloatToString(*value, PREC_POINT_DIGITS, 2) << "%"; + } else { + return TStringBuilder() << orValue; + } +} + int TCommandDescribe::PrintReplicationResponsePretty(const NYdb::NReplication::TDescribeReplicationResult& result) const { const auto& desc = result.GetReplicationDescription(); Cout << Endl << "State: " << desc.GetState(); switch (desc.GetState()) { case NReplication::TReplicationDescription::EState::Running: - if (const auto& lag = desc.GetRunningState().GetLag()) { - Cout << Endl << "Lag: " << *lag; + if (ShowStats) { + const auto& stats = desc.GetRunningState().GetStats(); + Cout << Endl << "Lag: " << ValueOr(stats.GetLag(), "n/a"); + Cout << Endl << "Initial Scan progress: " << ProgressOr(stats.GetInitialScanProgress(), "n/a"); } break; case NReplication::TReplicationDescription::EState::Error: @@ -434,13 +454,24 @@ int TCommandDescribe::PrintReplicationResponsePretty(const NYdb::NReplication::T } if (const auto& items = desc.GetItems()) { - TPrettyTable table({ "#", "Source", "Changefeed", "Destination" }, TPrettyTableConfig().WithoutRowDelimiters()); + TVector columnNames = { "#", "Source", "Destination", "Changefeed" }; + if (ShowStats) { + columnNames.push_back("Lag"); + columnNames.push_back("Progress"); + } + + TPrettyTable table(columnNames, TPrettyTableConfig().WithoutRowDelimiters()); for (const auto& item : items) { - table.AddRow() + auto& row = table.AddRow() .Column(0, item.Id) .Column(1, item.SrcPath) - .Column(2, item.SrcChangefeedName.value_or("n/a")) - .Column(3, item.DstPath); + .Column(2, item.DstPath) + .Column(3, ValueOr(item.SrcChangefeedName, "n/a")); + if (ShowStats) { + row + .Column(4, ValueOr(item.Stats.GetLag(), "n/a")) + .Column(5, ProgressOr(item.Stats.GetInitialScanProgress(), "n/a")); + } } Cout << Endl << "Items:" << Endl << table; } @@ -451,8 +482,12 @@ int TCommandDescribe::PrintReplicationResponsePretty(const NYdb::NReplication::T int TCommandDescribe::DescribeReplication(const TDriver& driver) { NReplication::TReplicationClient client(driver); - auto result = client.DescribeReplication(Path).ExtractValueSync(); + auto settings = NReplication::TDescribeReplicationSettings() + .IncludeStats(ShowStats); + + auto result = client.DescribeReplication(Path, settings).ExtractValueSync(); ThrowOnError(result); + return PrintDescription(this, OutputFormat, result, &TCommandDescribe::PrintReplicationResponsePretty); } diff --git a/ydb/public/sdk/cpp/client/draft/ydb_replication.cpp b/ydb/public/sdk/cpp/client/draft/ydb_replication.cpp index acfb0484e077..181a3f2d4171 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_replication.cpp +++ b/ydb/public/sdk/cpp/client/draft/ydb_replication.cpp @@ -59,15 +59,33 @@ const TOAuthCredentials& TConnectionParams::GetOAuthCredentials() const { return std::get(Credentials_); } -TRunningState::TRunningState(const std::optional& lag) - : Lag_(lag) +static TDuration DurationToDuration(const google::protobuf::Duration& value) { + return TDuration::MilliSeconds(google::protobuf::util::TimeUtil::DurationToMilliseconds(value)); +} + +TStats::TStats(const Ydb::Replication::DescribeReplicationResult_Stats& stats) + : Lag_(stats.has_lag() ? std::make_optional(DurationToDuration(stats.lag())) : std::nullopt) + , InitialScanProgress_(stats.has_initial_scan_progress() ? std::make_optional(stats.initial_scan_progress()) : std::nullopt) { } -const std::optional& TRunningState::GetLag() const { +const std::optional& TStats::GetLag() const { return Lag_; } +const std::optional& TStats::GetInitialScanProgress() const { + return InitialScanProgress_; +} + +TRunningState::TRunningState(const TStats& stats) + : Stats_(stats) +{ +} + +const TStats& TRunningState::GetStats() const { + return Stats_; +} + class TErrorState::TImpl { public: NYql::TIssues Issues; @@ -87,10 +105,6 @@ const NYql::TIssues& TErrorState::GetIssues() const { return Impl_->Issues; } -TDuration DurationToDuration(const google::protobuf::Duration& value) { - return TDuration::MilliSeconds(google::protobuf::util::TimeUtil::DurationToMilliseconds(value)); -} - template NYql::TIssues IssuesFromMessage(const ::google::protobuf::RepeatedPtrField& message) { NYql::TIssues issues; @@ -107,6 +121,7 @@ TReplicationDescription::TReplicationDescription(const Ydb::Replication::Describ .Id = item.id(), .SrcPath = item.source_path(), .DstPath = item.destination_path(), + .Stats = TStats(item.stats()), .SrcChangefeedName = item.has_source_changefeed_name() ? std::make_optional(item.source_changefeed_name()) : std::nullopt, }); @@ -114,8 +129,7 @@ TReplicationDescription::TReplicationDescription(const Ydb::Replication::Describ switch (desc.state_case()) { case Ydb::Replication::DescribeReplicationResult::kRunning: - State_ = TRunningState(desc.running().has_lag() - ? std::make_optional(DurationToDuration(desc.running().lag())) : std::nullopt); + State_ = TRunningState(desc.running().stats()); break; case Ydb::Replication::DescribeReplicationResult::kError: @@ -183,6 +197,7 @@ class TReplicationClient::TImpl: public TClientImplCommon(settings); request.set_path(path); + request.set_include_stats(settings.IncludeStats_); auto promise = NThreading::NewPromise(); diff --git a/ydb/public/sdk/cpp/client/draft/ydb_replication.h b/ydb/public/sdk/cpp/client/draft/ydb_replication.h index a5b732dedac5..ee7db99f197b 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_replication.h +++ b/ydb/public/sdk/cpp/client/draft/ydb_replication.h @@ -11,6 +11,7 @@ namespace Ydb::Replication { class ConnectionParams; class DescribeReplicationResult; + class DescribeReplicationResult_Stats; } namespace NYdb { @@ -25,7 +26,11 @@ namespace NYdb::NReplication { class TDescribeReplicationResult; using TAsyncDescribeReplicationResult = NThreading::TFuture; -struct TDescribeReplicationSettings: public TOperationRequestSettings {}; + +struct TDescribeReplicationSettings: public TOperationRequestSettings { + using TSelf = TDescribeReplicationSettings; + FLUENT_SETTING_DEFAULT(bool, IncludeStats, false); +}; struct TStaticCredentials { TString User; @@ -59,15 +64,28 @@ class TConnectionParams: private TCommonClientSettings { > Credentials_; }; -struct TRunningState { +class TStats { public: - TRunningState() = default; - explicit TRunningState(const std::optional& lag); + TStats() = default; + TStats(const Ydb::Replication::DescribeReplicationResult_Stats& stats); const std::optional& GetLag() const; + const std::optional& GetInitialScanProgress() const; private: std::optional Lag_; + std::optional InitialScanProgress_; +}; + +class TRunningState { +public: + TRunningState() = default; + explicit TRunningState(const TStats& stats); + + const TStats& GetStats() const; + +private: + TStats Stats_; }; struct TDoneState {}; @@ -90,6 +108,7 @@ class TReplicationDescription { ui64 Id; TString SrcPath; TString DstPath; + TStats Stats; std::optional SrcChangefeedName; }; diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 0c6543b0c013..679c34579790 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -2340,11 +2340,22 @@ TChangefeedDescription::TChangefeedDescription(const Ydb::Table::ChangefeedDescr : TChangefeedDescription(FromProto(proto)) {} +TChangefeedDescription::TInitialScanProgress::TInitialScanProgress() + : PartsTotal(0) + , PartsCompleted(0) +{} + TChangefeedDescription::TInitialScanProgress::TInitialScanProgress(ui32 total, ui32 completed) : PartsTotal(total) , PartsCompleted(completed) {} +TChangefeedDescription::TInitialScanProgress& TChangefeedDescription::TInitialScanProgress::operator+=(const TInitialScanProgress& other) { + PartsTotal += other.PartsTotal; + PartsCompleted += other.PartsCompleted; + return *this; +} + ui32 TChangefeedDescription::TInitialScanProgress::GetPartsTotal() const { return PartsTotal; } diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index 369c025f1bba..6c7fdc0b4e09 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -220,8 +220,11 @@ class TChangefeedDescription { public: class TInitialScanProgress { public: + TInitialScanProgress(); explicit TInitialScanProgress(ui32 total, ui32 completed); + TInitialScanProgress& operator+=(const TInitialScanProgress& other); + ui32 GetPartsTotal() const; ui32 GetPartsCompleted() const; float GetProgress() const; // percentage From 52e6a4e3badc18e11ab350ef5a9fec5ed3863299 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Fri, 5 Jul 2024 21:38:50 +0300 Subject: [PATCH 3/9] Fix alter replication config (#6348) --- ydb/core/tx/datashard/datashard_user_table.cpp | 2 ++ ydb/core/tx/datashard/datashard_user_table.h | 5 +++++ .../tx/datashard/datashard_ut_replication.cpp | 3 +++ .../datashard/ut_common/datashard_ut_common.cpp | 16 ++++++++++++++++ .../tx/datashard/ut_common/datashard_ut_common.h | 5 +++++ ydb/core/tx/schemeshard/schemeshard_impl.cpp | 6 ++++++ 6 files changed, 37 insertions(+) diff --git a/ydb/core/tx/datashard/datashard_user_table.cpp b/ydb/core/tx/datashard/datashard_user_table.cpp index 8584c716597c..a75c630bd58e 100644 --- a/ydb/core/tx/datashard/datashard_user_table.cpp +++ b/ydb/core/tx/datashard/datashard_user_table.cpp @@ -369,6 +369,8 @@ void TUserTable::AlterSchema() { schema.SetPartitionRangeEnd(Range.To.GetBuffer()); schema.SetPartitionRangeEndIsInclusive(Range.ToInclusive); + ReplicationConfig.Serialize(*schema.MutableReplicationConfig()); + schema.SetName(Name); schema.SetPath(Path); diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h index bca8c9107cc6..493ce38bcdb9 100644 --- a/ydb/core/tx/datashard/datashard_user_table.h +++ b/ydb/core/tx/datashard/datashard_user_table.h @@ -337,6 +337,11 @@ struct TUserTable : public TThrRefBase { bool HasStrongConsistency() const { return Consistency == NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_STRONG; } + + void Serialize(NKikimrSchemeOp::TTableReplicationConfig& proto) const { + proto.SetMode(Mode); + proto.SetConsistency(Consistency); + } }; struct TStats { diff --git a/ydb/core/tx/datashard/datashard_ut_replication.cpp b/ydb/core/tx/datashard/datashard_ut_replication.cpp index b0395077e915..30267d6537b4 100644 --- a/ydb/core/tx/datashard/datashard_ut_replication.cpp +++ b/ydb/core/tx/datashard/datashard_ut_replication.cpp @@ -244,6 +244,9 @@ Y_UNIT_TEST_SUITE(DataShardReplication) { ExecSQL(server, sender, "SELECT * FROM `/Root/table-1`"); ExecSQL(server, sender, "INSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);", true, Ydb::StatusIds::GENERIC_ERROR); + + WaitTxNotification(server, sender, AsyncAlterDropReplicationConfig(server, "/Root", "table-1")); + ExecSQL(server, sender, "INSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);"); } Y_UNIT_TEST(ApplyChangesToReplicatedTable) { diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp index f79a3de14c25..c400bae4a014 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp @@ -1729,6 +1729,22 @@ ui64 AsyncAlterDropStream( return RunSchemeTx(*server->GetRuntime(), std::move(request)); } +ui64 AsyncAlterDropReplicationConfig( + Tests::TServer::TPtr server, + const TString& workingDir, + const TString& tableName) +{ + auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpAlterTable, workingDir); + auto& tx = *request->Record.MutableTransaction()->MutableModifyScheme(); + tx.SetInternal(true); + + auto& desc = *tx.MutableAlterTable(); + desc.SetName(tableName); + desc.MutableReplicationConfig()->SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE); + + return RunSchemeTx(*server->GetRuntime(), std::move(request)); +} + void WaitTxNotification(Tests::TServer::TPtr server, TActorId sender, ui64 txId) { auto &runtime = *server->GetRuntime(); auto &settings = server->GetSettings(); diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h index 450164a88a21..f5e493f232aa 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h @@ -662,6 +662,11 @@ ui64 AsyncAlterDropStream( const TString& tableName, const TString& streamName); +ui64 AsyncAlterDropReplicationConfig( + Tests::TServer::TPtr server, + const TString& workingDir, + const TString& tableName); + struct TReadShardedTableState { TActorId Sender; TActorId Worker; diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 4edcd7070f2d..ae2baf61e4dc 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -6428,6 +6428,12 @@ TString TSchemeShard::FillAlterTableTxBody(TPathId pathId, TShardIdx shardIdx, T *patch); } + if (alterData->TableDescriptionFull.Defined() && alterData->TableDescriptionFull->HasReplicationConfig()) { + proto->MutableReplicationConfig()->CopyFrom(alterData->TableDescriptionFull->GetReplicationConfig()); + } else if (tableInfo->HasReplicationConfig()) { + proto->MutableReplicationConfig()->CopyFrom(tableInfo->ReplicationConfig()); + } + TString txBody; Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&txBody); return txBody; From b247517482e5e701757369c55f224f7b4b074437 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Fri, 5 Jul 2024 13:00:08 +0300 Subject: [PATCH 4/9] Fill change exchange split & activation lists just before commit; unify checks (#6281) --- .../tx/datashard/datashard_change_sending.cpp | 20 +++++----- ydb/core/tx/datashard/datashard_split_src.cpp | 40 +++++++++++-------- 2 files changed, 32 insertions(+), 28 deletions(-) diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp index d70874dd64aa..17bd8eccf591 100644 --- a/ydb/core/tx/datashard/datashard_change_sending.cpp +++ b/ydb/core/tx/datashard/datashard_change_sending.cpp @@ -286,7 +286,7 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase { ChangeExchangeSplit = true; } else { for (const auto dstTabletId : Self->ChangeSenderActivator.GetDstSet()) { - if (Self->SplitSrcSnapshotSender.Acked(dstTabletId)) { + if (Self->SplitSrcSnapshotSender.Acked(dstTabletId) && !Self->ChangeSenderActivator.Acked(dstTabletId)) { ActivationList.insert(dstTabletId); } } @@ -340,13 +340,13 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase { Self->RemoveChangeRecordsInFly = false; } - if (ChangeExchangeSplit) { - Self->KillChangeSender(ctx); - Self->ChangeExchangeSplitter.DoSplit(ctx); - } + if (!Self->ChangesQueue) { // double check queue + if (ChangeExchangeSplit) { + Self->KillChangeSender(ctx); + Self->ChangeExchangeSplitter.DoSplit(ctx); + } - for (const auto dstTabletId : ActivationList) { - if (!Self->ChangeSenderActivator.Acked(dstTabletId)) { + for (const auto dstTabletId : ActivationList) { Self->ChangeSenderActivator.DoSend(dstTabletId, ctx); } } @@ -383,7 +383,7 @@ class TDataShard::TTxChangeExchangeSplitAck: public TTransactionBase Y_ABORT_UNLESS(Self->ChangeExchangeSplitter.Done()); for (const auto dstTabletId : Self->ChangeSenderActivator.GetDstSet()) { - if (Self->SplitSrcSnapshotSender.Acked(dstTabletId)) { + if (Self->SplitSrcSnapshotSender.Acked(dstTabletId) && !Self->ChangeSenderActivator.Acked(dstTabletId)) { ActivationList.insert(dstTabletId); } } @@ -396,9 +396,7 @@ class TDataShard::TTxChangeExchangeSplitAck: public TTransactionBase << ", at tablet# " << Self->TabletID()); for (const auto dstTabletId : ActivationList) { - if (!Self->ChangeSenderActivator.Acked(dstTabletId)) { - Self->ChangeSenderActivator.DoSend(dstTabletId, ctx); - } + Self->ChangeSenderActivator.DoSend(dstTabletId, ctx); } } diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp index 6777b666f4b1..0f6874ed43b8 100644 --- a/ydb/core/tx/datashard/datashard_split_src.cpp +++ b/ydb/core/tx/datashard/datashard_split_src.cpp @@ -238,6 +238,8 @@ class TDataShard::TTxSplitSnapshotComplete : public NTabletFlatExecutor::TTransa private: TIntrusivePtr SnapContext; bool ChangeExchangeSplit; + THashSet ActivationList; + THashSet SplitList; public: TTxSplitSnapshotComplete(TDataShard* ds, TIntrusivePtr snapContext) @@ -372,13 +374,11 @@ class TDataShard::TTxSplitSnapshotComplete : public NTabletFlatExecutor::TTransa proto->SetTimeoutMs(kv.second.Timeout.MilliSeconds()); } - if (Self->ChangesQueue || tableInfo.HasCdcStreams()) { + if (tableInfo.HasAsyncIndexes() || tableInfo.HasCdcStreams()) { snapshot->SetWaitForActivation(true); - Self->ChangeSenderActivator.AddDst(dstTablet); - db.Table().Key(dstTablet).Update(); - + ActivationList.insert(dstTablet); if (tableInfo.HasCdcStreams()) { - Self->ChangeExchangeSplitter.AddDst(dstTablet); + SplitList.insert(dstTablet); } } @@ -397,14 +397,23 @@ class TDataShard::TTxSplitSnapshotComplete : public NTabletFlatExecutor::TTransa } } - ChangeExchangeSplit = !Self->ChangesQueue && !Self->ChangeExchangeSplitter.Done(); - if (needToReadPages) { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " BorrowSnapshot is restarting for split OpId " << opId); return false; } else { txc.Env.DropSnapshot(SnapContext); + for (ui64 dstTabletId : ActivationList) { + Self->ChangeSenderActivator.AddDst(dstTabletId); + db.Table().Key(dstTabletId).Update(); + } + + for (ui64 dstTabletId : SplitList) { + Self->ChangeExchangeSplitter.AddDst(dstTabletId); + } + + ChangeExchangeSplit = !Self->ChangesQueue && !Self->ChangeExchangeSplitter.Done(); + Self->State = TShardState::SplitSrcSendingSnapshot; Self->PersistSys(db, Schema::Sys_State, Self->State); @@ -415,7 +424,7 @@ class TDataShard::TTxSplitSnapshotComplete : public NTabletFlatExecutor::TTransa void Complete(const TActorContext &ctx) override { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Sending snapshots from src for split OpId " << Self->SrcSplitOpId); Self->SplitSrcSnapshotSender.DoSend(ctx); - if (ChangeExchangeSplit) { + if (ChangeExchangeSplit && !Self->ChangesQueue) { // double check queue Self->KillChangeSender(ctx); Self->ChangeExchangeSplitter.DoSplit(ctx); } @@ -432,14 +441,14 @@ class TDataShard::TTxSplitTransferSnapshotAck : public NTabletFlatExecutor::TTra private: TEvDataShard::TEvSplitTransferSnapshotAck::TPtr Ev; bool AllDstAcksReceived; - bool Activate; + ui64 ActivateTabletId; public: TTxSplitTransferSnapshotAck(TDataShard* ds, TEvDataShard::TEvSplitTransferSnapshotAck::TPtr& ev) : NTabletFlatExecutor::TTransactionBase(ds) , Ev(ev) , AllDstAcksReceived(false) - , Activate(false) + , ActivateTabletId(0) {} TTxType GetTxType() const override { return TXTYPE_SPLIT_TRANSFER_SNAPSHOT_ACK; } @@ -463,8 +472,8 @@ class TDataShard::TTxSplitTransferSnapshotAck : public NTabletFlatExecutor::TTra // Remove the row for acked snapshot db.Table().Key(dstTabletId).Delete(); - if (!Self->ChangesQueue && Self->ChangeExchangeSplitter.Done()) { - Activate = !Self->ChangeSenderActivator.Acked(dstTabletId); + if (!Self->ChangesQueue && Self->ChangeExchangeSplitter.Done() && !Self->ChangeSenderActivator.Acked(dstTabletId)) { + ActivateTabletId = dstTabletId; } return true; @@ -479,11 +488,8 @@ class TDataShard::TTxSplitTransferSnapshotAck : public NTabletFlatExecutor::TTra } } - if (Activate) { - const ui64 dstTabletId = Ev->Get()->Record.GetTabletId(); - if (!Self->ChangeSenderActivator.Acked(dstTabletId)) { - Self->ChangeSenderActivator.DoSend(dstTabletId, ctx); - } + if (ActivateTabletId && !Self->ChangesQueue) { // double check queue + Self->ChangeSenderActivator.DoSend(ActivateTabletId, ctx); } } }; From cc02834cf22700d7f8beaa0b0558a0ed3b6ff184 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Thu, 11 Jul 2024 16:44:43 +0300 Subject: [PATCH 5/9] Add records to change queue at Execute() stage (#6497) --- ydb/core/protos/counters_datashard.proto | 1 + ydb/core/tx/datashard/datashard.cpp | 187 ++++++++++++++---- ydb/core/tx/datashard/datashard__init.cpp | 13 +- .../tx/datashard/datashard_change_sending.cpp | 14 +- ydb/core/tx/datashard/datashard_impl.h | 46 ++++- .../datashard/datashard_schema_snapshots.cpp | 20 +- .../tx/datashard/datashard_schema_snapshots.h | 8 +- ydb/core/tx/datashard/datashard_split_src.cpp | 4 +- .../datashard_ut_change_exchange.cpp | 100 ++++++++++ ydb/core/tx/datashard/move_index_unit.cpp | 15 +- ydb/core/tx/datashard/move_table_unit.cpp | 15 +- .../tx/datashard/remove_schema_snapshots.cpp | 54 +++++ ydb/core/tx/datashard/ya.make | 1 + 13 files changed, 404 insertions(+), 74 deletions(-) create mode 100644 ydb/core/tx/datashard/remove_schema_snapshots.cpp diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto index ae611dedf966..b394e82d8de7 100644 --- a/ydb/core/protos/counters_datashard.proto +++ b/ydb/core/protos/counters_datashard.proto @@ -486,4 +486,5 @@ enum ETxTypes { TXTYPE_CLEANUP_VOLATILE = 80 [(TxTypeOpts) = {Name: "TxCleanupVolatile"}]; TXTYPE_PLAN_PREDICTED_TXS = 81 [(TxTypeOpts) = {Name: "TxPlanPredictedTxs"}]; TXTYPE_WRITE = 82 [(TxTypeOpts) = {Name: "TxWrite"}]; + TXTYPE_REMOVE_SCHEMA_SNAPSHOTS = 83 [(TxTypeOpts) = {Name: "TxRemoveSchemaSnapshots"}]; } diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 0d707cb49371..98cb91b97d29 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -750,6 +750,39 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r NIceDb::TUpdate(record.GetKind()), NIceDb::TUpdate(record.GetBody()), NIceDb::TUpdate(record.GetSource())); + + auto res = ChangesQueue.emplace(record.GetOrder(), record); + Y_VERIFY_S(res.second, "Duplicate change record: " << record.GetOrder()); + + if (res.first->second.SchemaVersion) { + res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference( + TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion)); + } + + if (CommittingChangeRecords.empty()) { + db.GetDatabase().OnCommit([this] { + CommittingChangeRecords.clear(); + }); + db.GetDatabase().OnRollback([this] { + for (const auto order : CommittingChangeRecords) { + auto cIt = ChangesQueue.find(order); + Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order); + + if (cIt->second.SchemaSnapshotAcquired) { + const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion); + if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) { + ScheduleRemoveSchemaSnapshot(snapshotKey); + } + } + + ChangesQueue.erase(cIt); + } + + CommittingChangeRecords.clear(); + }); + } + + CommittingChangeRecords.push_back(record.GetOrder()); } else { auto& state = LockChangeRecords[lockId]; Y_ABORT_UNLESS(state.Changes.empty() || state.Changes.back().LockOffset < record.GetLockOffset(), @@ -829,6 +862,14 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64 committed.Step = rowVersion.Step; committed.TxId = rowVersion.TxId; collected.push_back(committed); + + auto res = ChangesQueue.emplace(committed.Order, committed); + Y_VERIFY_S(res.second, "Duplicate change record: " << committed.Order); + + if (res.first->second.SchemaVersion) { + res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference( + TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion)); + } } Y_VERIFY_S(!CommittedLockChangeRecords.contains(lockId), "Cannot commit lock " << lockId << " more than once"); @@ -855,7 +896,26 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64 LockChangeRecords.erase(it); }); db.GetDatabase().OnRollback([this, lockId]() { - CommittedLockChangeRecords.erase(lockId); + auto it = CommittedLockChangeRecords.find(lockId); + Y_VERIFY_S(it != CommittedLockChangeRecords.end(), "Unexpected failure to find lockId# " << lockId); + + for (size_t i = 0; i < it->second.Count; ++i) { + const ui64 order = it->second.Order + i; + + auto cIt = ChangesQueue.find(order); + Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order); + + if (cIt->second.SchemaSnapshotAcquired) { + const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion); + if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) { + ScheduleRemoveSchemaSnapshot(snapshotKey); + } + } + + ChangesQueue.erase(cIt); + } + + CommittedLockChangeRecords.erase(it); }); } @@ -889,7 +949,6 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) { auto it = ChangesQueue.find(order); if (it == ChangesQueue.end()) { - Y_VERIFY_DEBUG_S(false, "Trying to remove non-enqueud record: " << order); return; } @@ -917,23 +976,9 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) { ChangesQueueBytes -= record.BodySize; if (record.SchemaSnapshotAcquired) { - Y_ABORT_UNLESS(record.TableId); - auto tableIt = TableInfos.find(record.TableId.LocalPathId); - - if (tableIt != TableInfos.end()) { - const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion); - const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey); - - if (last) { - const auto* snapshot = SchemaSnapshotManager.FindSnapshot(snapshotKey); - Y_ABORT_UNLESS(snapshot); - - if (snapshot->Schema->GetTableSchemaVersion() < tableIt->second->GetTableSchemaVersion()) { - SchemaSnapshotManager.RemoveShapshot(db, snapshotKey); - } - } - } else { - Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline); + const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion); + if (const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) { + ScheduleRemoveSchemaSnapshot(snapshotKey); } } @@ -954,7 +999,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) { CheckChangesQueueNoOverflow(); } -void TDataShard::EnqueueChangeRecords(TVector&& records, ui64 cookie) { +void TDataShard::EnqueueChangeRecords(TVector&& records, ui64 cookie, bool afterMove) { if (!records) { return; } @@ -974,27 +1019,24 @@ void TDataShard::EnqueueChangeRecords(TVectorTimeProvider->Now(); TVector forward(Reserve(records.size())); for (const auto& record : records) { - forward.emplace_back(record.Order, record.PathId, record.BodySize); + auto it = ChangesQueue.find(record.Order); + if (it == ChangesQueue.end()) { + Y_ABORT_UNLESS(afterMove); + continue; + } - auto res = ChangesQueue.emplace( - std::piecewise_construct, - std::forward_as_tuple(record.Order), - std::forward_as_tuple(record, now, cookie) - ); - if (res.second) { - ChangesList.PushBack(&res.first->second); + forward.emplace_back(record.Order, record.PathId, record.BodySize); - Y_ABORT_UNLESS(ChangesQueueBytes <= (Max() - record.BodySize)); - ChangesQueueBytes += record.BodySize; + it->second.EnqueuedAt = now; + it->second.ReservationCookie = cookie; + ChangesList.PushBack(&it->second); - if (record.SchemaVersion) { - res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference( - TSchemaSnapshotKey(record.TableId, record.SchemaVersion)); - } - } + Y_ABORT_UNLESS(ChangesQueueBytes <= (Max() - record.BodySize)); + ChangesQueueBytes += record.BodySize; } - + if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) { + Y_ABORT_UNLESS(!afterMove); ChangeQueueReservedCapacity -= it->second; ChangeQueueReservedCapacity += records.size(); } @@ -1160,6 +1202,14 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVectorsecond.SchemaVersion) { + res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference( + TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion)); + } + if (!rowset.Next()) { return false; } @@ -1258,6 +1308,14 @@ bool TDataShard::LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVectorsecond.SchemaVersion) { + res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference( + TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion)); + } } LockChangeRecords.erase(lockId); @@ -1316,6 +1374,51 @@ void TDataShard::ScheduleRemoveAbandonedLockChanges() { } } +void TDataShard::ScheduleRemoveSchemaSnapshot(const TSchemaSnapshotKey& key) { + Y_ABORT_UNLESS(!SchemaSnapshotManager.HasReference(key)); + + const auto* snapshot = SchemaSnapshotManager.FindSnapshot(key); + Y_ABORT_UNLESS(snapshot); + + auto it = TableInfos.find(key.PathId); + if (it == TableInfos.end()) { + Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline); + return; + } + + if (snapshot->Schema->GetTableSchemaVersion() < it->second->GetTableSchemaVersion()) { + bool wasEmpty = PendingSchemaSnapshotsToGc.empty(); + PendingSchemaSnapshotsToGc.push_back(key); + if (wasEmpty) { + Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots); + } + } +} + +void TDataShard::ScheduleRemoveAbandonedSchemaSnapshots() { + bool wasEmpty = PendingSchemaSnapshotsToGc.empty(); + + for (const auto& [key, snapshot] : SchemaSnapshotManager.GetSnapshots()) { + auto it = TableInfos.find(key.PathId); + if (it == TableInfos.end()) { + Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline); + break; + } + if (SchemaSnapshotManager.HasReference(key)) { + continue; + } + if (snapshot.Schema->GetTableSchemaVersion() >= it->second->GetTableSchemaVersion()) { + continue; + } + + PendingSchemaSnapshotsToGc.push_back(key); + } + + if (wasEmpty && !PendingSchemaSnapshotsToGc.empty()) { + Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots); + } +} + void TDataShard::PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation &op) { db.Table().Key(op.TxId).Update( NIceDb::TUpdate(op.Success), @@ -1529,8 +1632,18 @@ void TDataShard::AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersio Y_ABORT_UNLESS(TableInfos.contains(pathId.LocalPathId)); auto tableInfo = TableInfos[pathId.LocalPathId]; - const auto key = TSchemaSnapshotKey(pathId.OwnerId, pathId.LocalPathId, tableSchemaVersion); + const auto key = TSchemaSnapshotKey(pathId, tableSchemaVersion); SchemaSnapshotManager.AddSnapshot(txc.DB, key, TSchemaSnapshot(tableInfo, step, txId)); + + const auto& snapshots = SchemaSnapshotManager.GetSnapshots(); + for (auto it = snapshots.lower_bound(TSchemaSnapshotKey(pathId, 1)); it != snapshots.end(); ++it) { + if (it->first == key) { + break; + } + if (!SchemaSnapshotManager.HasReference(it->first)) { + ScheduleRemoveSchemaSnapshot(it->first); + } + } } void TDataShard::PersistLastLoanTableTid(NIceDb::TNiceDb& db, ui32 localTid) { diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp index 50d487ded64e..430ddca1c887 100644 --- a/ydb/core/tx/datashard/datashard__init.cpp +++ b/ydb/core/tx/datashard/datashard__init.cpp @@ -425,6 +425,12 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) { return false; } + if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) { + if (!Self->SchemaSnapshotManager.Load(db)) { + return false; + } + } + if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::ChangeRecords::TableId)) { if (!Self->LoadChangeRecords(db, ChangeRecords)) { return false; @@ -512,12 +518,6 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) { } } - if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) { - if (!Self->SchemaSnapshotManager.Load(db)) { - return false; - } - } - if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::Locks::TableId)) { TDataShardLocksDb locksDb(*Self, txc); if (!Self->SysLocks.Load(locksDb)) { @@ -547,6 +547,7 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) { Self->SubscribeNewLocks(); Self->ScheduleRemoveAbandonedLockChanges(); + Self->ScheduleRemoveAbandonedSchemaSnapshots(); return true; } diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp index 17bd8eccf591..dd628ce2d6ce 100644 --- a/ydb/core/tx/datashard/datashard_change_sending.cpp +++ b/ydb/core/tx/datashard/datashard_change_sending.cpp @@ -340,15 +340,13 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase { Self->RemoveChangeRecordsInFly = false; } - if (!Self->ChangesQueue) { // double check queue - if (ChangeExchangeSplit) { - Self->KillChangeSender(ctx); - Self->ChangeExchangeSplitter.DoSplit(ctx); - } + if (ChangeExchangeSplit) { + Self->KillChangeSender(ctx); + Self->ChangeExchangeSplitter.DoSplit(ctx); + } - for (const auto dstTabletId : ActivationList) { - Self->ChangeSenderActivator.DoSend(dstTabletId, ctx); - } + for (const auto dstTabletId : ActivationList) { + Self->ChangeSenderActivator.DoSend(dstTabletId, ctx); } Self->CheckStateChange(ctx); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 6abecffbbe41..45f1da995d86 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -236,6 +236,7 @@ class TDataShard class TTxCdcStreamScanProgress; class TTxCdcStreamEmitHeartbeats; class TTxUpdateFollowerReadEdge; + class TTxRemoveSchemaSnapshots; template friend class TTxDirectBase; class TTxUploadRows; @@ -366,6 +367,7 @@ class TDataShard EvReadonlyLeaseConfirmation, EvPlanPredictedTxs, EvTableStatsError, + EvRemoveSchemaSnapshots, EvEnd }; @@ -585,6 +587,8 @@ class TDataShard struct TEvReadonlyLeaseConfirmation: public TEventLocal {}; struct TEvPlanPredictedTxs : public TEventLocal {}; + + struct TEvRemoveSchemaSnapshots : public TEventLocal {}; }; struct Schema : NIceDb::Schema { @@ -1368,6 +1372,8 @@ class TDataShard void Handle(TEvPrivate::TEvPlanPredictedTxs::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvRemoveSchemaSnapshots::TPtr& ev, const TActorContext& ctx); + void HandleByReplicationSourceOffsetsServer(STATEFN_SIG); void DoPeriodicTasks(const TActorContext &ctx); @@ -1864,7 +1870,8 @@ class TDataShard void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 order, const TPathId& pathId); void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 lockId, ui64 lockOffset, const TPathId& pathId); void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order); - void EnqueueChangeRecords(TVector&& records, ui64 cookie = 0); + // TODO(ilnaz): remove 'afterMove' after #6541 + void EnqueueChangeRecords(TVector&& records, ui64 cookie = 0, bool afterMove = false); ui32 GetFreeChangeQueueCapacity(ui64 cookie); ui64 ReserveChangeQueueCapacity(ui32 capacity); void UpdateChangeExchangeLag(TInstant now); @@ -1878,6 +1885,8 @@ class TDataShard bool LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector& records); void ScheduleRemoveLockChanges(ui64 lockId); void ScheduleRemoveAbandonedLockChanges(); + void ScheduleRemoveSchemaSnapshot(const TSchemaSnapshotKey& key); + void ScheduleRemoveAbandonedSchemaSnapshots(); static void PersistCdcStreamScanLastKey(NIceDb::TNiceDb& db, const TSerializedCellVec& value, const TPathId& tablePathId, const TPathId& streamPathId); @@ -2740,24 +2749,29 @@ class TDataShard ui64 LockOffset; ui64 ReservationCookie; - explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId, - ui64 schemaVersion, TInstant created, TInstant enqueued, - ui64 lockId = 0, ui64 lockOffset = 0, ui64 cookie = 0) + explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId, ui64 schemaVersion, + TInstant created, ui64 lockId = 0, ui64 lockOffset = 0) : BodySize(bodySize) , TableId(tableId) , SchemaVersion(schemaVersion) , SchemaSnapshotAcquired(false) , CreatedAt(created) - , EnqueuedAt(enqueued) + , EnqueuedAt(TInstant::Zero()) , LockId(lockId) , LockOffset(lockOffset) - , ReservationCookie(cookie) + , ReservationCookie(0) + { + } + + explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record) + : TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion, + record.CreatedAt(), record.LockId, record.LockOffset) { } - explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record, TInstant now, ui64 cookie) - : TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion, record.CreatedAt(), now, - record.LockId, record.LockOffset, cookie) + explicit TEnqueuedRecord(const TChangeRecord& record) + : TEnqueuedRecord(record.GetBody().size(), record.GetTableId(), record.GetSchemaVersion(), + record.GetApproximateCreationDateTime(), record.GetLockId(), record.GetLockOffset()) { } }; @@ -2798,9 +2812,11 @@ class TDataShard size_t Count = 0; }; + TVector CommittingChangeRecords; THashMap LockChangeRecords; // ui64 is lock id THashMap CommittedLockChangeRecords; // ui64 is lock id TVector PendingLockChangeRecordsToRemove; + TVector PendingSchemaSnapshotsToGc; // in THashMap InChangeSenders; // ui64 is shard id @@ -2896,6 +2912,16 @@ class TDataShard CommittedLockChangeRecords = std::move(committedLockChangeRecords); } + auto TakeChangesQueue() { + auto result = std::move(ChangesQueue); + ChangesQueue.clear(); + return result; + } + + void SetChangesQueue(THashMap&& changesQueue) { + ChangesQueue = std::move(changesQueue); + } + protected: // Redundant init state required by flat executor implementation void StateInit(TAutoPtr &ev) { @@ -2917,6 +2943,7 @@ class TDataShard HFuncTraced(TEvMediatorTimecast::TEvNotifyPlanStep, Handle); HFuncTraced(TEvPrivate::TEvMediatorRestoreBackup, Handle); HFuncTraced(TEvPrivate::TEvRemoveLockChangeRecords, Handle); + HFuncTraced(TEvPrivate::TEvRemoveSchemaSnapshots, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateInactive unhandled event type: " << ev->GetTypeRewrite() @@ -3041,6 +3068,7 @@ class TDataShard HFuncTraced(TEvPrivate::TEvRemoveLockChangeRecords, Handle); HFunc(TEvPrivate::TEvConfirmReadonlyLease, Handle); HFunc(TEvPrivate::TEvPlanPredictedTxs, Handle); + HFuncTraced(TEvPrivate::TEvRemoveSchemaSnapshots, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateWork unhandled event type: " << ev->GetTypeRewrite() << " event: " << ev->ToString()); diff --git a/ydb/core/tx/datashard/datashard_schema_snapshots.cpp b/ydb/core/tx/datashard/datashard_schema_snapshots.cpp index 77cad7bf4d78..71b600eda02e 100644 --- a/ydb/core/tx/datashard/datashard_schema_snapshots.cpp +++ b/ydb/core/tx/datashard/datashard_schema_snapshots.cpp @@ -20,6 +20,7 @@ TSchemaSnapshotManager::TSchemaSnapshotManager(const TDataShard* self) void TSchemaSnapshotManager::Reset() { Snapshots.clear(); + References.clear(); } bool TSchemaSnapshotManager::Load(NIceDb::TNiceDb& db) { @@ -79,14 +80,16 @@ const TSchemaSnapshot* TSchemaSnapshotManager::FindSnapshot(const TSchemaSnapsho return Snapshots.FindPtr(key); } -void TSchemaSnapshotManager::RemoveShapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key) { +void TSchemaSnapshotManager::RemoveShapshot(NTable::TDatabase& db, const TSchemaSnapshotKey& key) { auto it = Snapshots.find(key); if (it == Snapshots.end()) { return; } Snapshots.erase(it); - PersistRemoveSnapshot(db, key); + + NIceDb::TNiceDb nicedb(db); + PersistRemoveSnapshot(nicedb, key); } void TSchemaSnapshotManager::RenameSnapshots(NTable::TDatabase& db, @@ -119,6 +122,10 @@ void TSchemaSnapshotManager::RenameSnapshots(NTable::TDatabase& db, } } +const TSchemaSnapshotManager::TSnapshots& TSchemaSnapshotManager::GetSnapshots() const { + return Snapshots; +} + bool TSchemaSnapshotManager::AcquireReference(const TSchemaSnapshotKey& key) { auto it = Snapshots.find(key); if (it == Snapshots.end()) { @@ -152,6 +159,15 @@ bool TSchemaSnapshotManager::ReleaseReference(const TSchemaSnapshotKey& key) { return true; } +bool TSchemaSnapshotManager::HasReference(const TSchemaSnapshotKey& key) const { + auto refIt = References.find(key); + if (refIt != References.end()) { + return refIt->second; + } else { + return false; + } +} + void TSchemaSnapshotManager::PersistAddSnapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key, const TSchemaSnapshot& snapshot) { using Schema = TDataShard::Schema; db.Table() diff --git a/ydb/core/tx/datashard/datashard_schema_snapshots.h b/ydb/core/tx/datashard/datashard_schema_snapshots.h index db0d3b655b34..0bc80a628e2e 100644 --- a/ydb/core/tx/datashard/datashard_schema_snapshots.h +++ b/ydb/core/tx/datashard/datashard_schema_snapshots.h @@ -23,6 +23,8 @@ struct TSchemaSnapshot { }; class TSchemaSnapshotManager { + using TSnapshots = TMap>; + public: explicit TSchemaSnapshotManager(const TDataShard* self); @@ -31,11 +33,13 @@ class TSchemaSnapshotManager { bool AddSnapshot(NTable::TDatabase& db, const TSchemaSnapshotKey& key, const TSchemaSnapshot& snapshot); const TSchemaSnapshot* FindSnapshot(const TSchemaSnapshotKey& key) const; - void RemoveShapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key); + void RemoveShapshot(NTable::TDatabase& db, const TSchemaSnapshotKey& key); void RenameSnapshots(NTable::TDatabase& db, const TPathId& prevTableId, const TPathId& newTableId); + const TSnapshots& GetSnapshots() const; bool AcquireReference(const TSchemaSnapshotKey& key); bool ReleaseReference(const TSchemaSnapshotKey& key); + bool HasReference(const TSchemaSnapshotKey& key) const; private: void PersistAddSnapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key, const TSchemaSnapshot& snapshot); @@ -43,7 +47,7 @@ class TSchemaSnapshotManager { private: const TDataShard* Self; - TMap> Snapshots; + TSnapshots Snapshots; THashMap References; }; // TSchemaSnapshotManager diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp index 0f6874ed43b8..b08dd8c143aa 100644 --- a/ydb/core/tx/datashard/datashard_split_src.cpp +++ b/ydb/core/tx/datashard/datashard_split_src.cpp @@ -424,7 +424,7 @@ class TDataShard::TTxSplitSnapshotComplete : public NTabletFlatExecutor::TTransa void Complete(const TActorContext &ctx) override { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Sending snapshots from src for split OpId " << Self->SrcSplitOpId); Self->SplitSrcSnapshotSender.DoSend(ctx); - if (ChangeExchangeSplit && !Self->ChangesQueue) { // double check queue + if (ChangeExchangeSplit) { Self->KillChangeSender(ctx); Self->ChangeExchangeSplitter.DoSplit(ctx); } @@ -488,7 +488,7 @@ class TDataShard::TTxSplitTransferSnapshotAck : public NTabletFlatExecutor::TTra } } - if (ActivateTabletId && !Self->ChangesQueue) { // double check queue + if (ActivateTabletId) { Self->ChangeSenderActivator.DoSend(ActivateTabletId, ctx); } } diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 1c5875930532..1b0d931a8719 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -3334,6 +3334,106 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + void MustNotLoseSchemaSnapshot(bool enableVolatileTx) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableDataShardVolatileTransactions(enableVolatileTx) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + Updates(NKikimrSchemeOp::ECdcStreamFormatJson))); + + auto tabletIds = GetTableShards(server, edgeActor, "/Root/Table"); + UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1); + + std::vector> blockedRemoveRecords; + auto blockRemoveRecords = runtime.AddObserver([&](auto& ev) { + Cerr << "... blocked remove record" << Endl; + blockedRemoveRecords.emplace_back(ev.Release()); + }); + + Cerr << "... execute first query" << Endl; + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10); + )"); + + WaitFor(runtime, [&]{ return blockedRemoveRecords.size() == 1; }, "blocked remove records"); + blockRemoveRecords.Remove(); + + std::vector> blockedPlans; + auto blockPlans = runtime.AddObserver([&](auto& ev) { + blockedPlans.emplace_back(ev.Release()); + }); + + Cerr << "... execute scheme query" << Endl; + const auto alterTxId = AsyncAlterAddExtraColumn(server, "/Root", "Table"); + + WaitFor(runtime, [&]{ return blockedPlans.size() > 0; }, "blocked plans"); + blockPlans.Remove(); + + std::vector> blockedPutResponses; + auto blockPutResponses = runtime.AddObserver([&](auto& ev) { + auto* msg = ev->Get(); + if (msg->Id.TabletID() == tabletIds[0]) { + Cerr << "... blocked put response:" << msg->Id << Endl; + blockedPutResponses.emplace_back(ev.Release()); + } + }); + + Cerr << "... execute second query" << Endl; + SendSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20); + )"); + + WaitFor(runtime, [&]{ return blockedPutResponses.size() > 0; }, "blocked put responses"); + auto wasBlockedPutResponses = blockedPutResponses.size(); + + Cerr << "... release blocked plans" << Endl; + for (auto& ev : std::exchange(blockedPlans, {})) { + runtime.Send(ev.release(), 0, true); + } + + WaitFor(runtime, [&]{ return blockedPutResponses.size() > wasBlockedPutResponses; }, "blocked put responses"); + wasBlockedPutResponses = blockedPutResponses.size(); + + Cerr << "... release blocked remove records" << Endl; + for (auto& ev : std::exchange(blockedRemoveRecords, {})) { + runtime.Send(ev.release(), 0, true); + } + + WaitFor(runtime, [&]{ return blockedPutResponses.size() > wasBlockedPutResponses; }, "blocked put responses"); + blockPutResponses.Remove(); + + Cerr << "... release blocked put responses" << Endl; + for (auto& ev : std::exchange(blockedPutResponses, {})) { + runtime.Send(ev.release(), 0, true); + } + + Cerr << "... finalize" << Endl; + WaitTxNotification(server, edgeActor, alterTxId); + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + }); + } + + Y_UNIT_TEST(MustNotLoseSchemaSnapshot) { + MustNotLoseSchemaSnapshot(false); + } + + Y_UNIT_TEST(MustNotLoseSchemaSnapshotWithVolatileTx) { + MustNotLoseSchemaSnapshot(true); + } + } // Cdc } // NKikimr diff --git a/ydb/core/tx/datashard/move_index_unit.cpp b/ydb/core/tx/datashard/move_index_unit.cpp index 6b3a30be457a..73fa338d35e7 100644 --- a/ydb/core/tx/datashard/move_index_unit.cpp +++ b/ydb/core/tx/datashard/move_index_unit.cpp @@ -60,20 +60,27 @@ class TMoveIndexUnit : public TExecutionUnit { NIceDb::TNiceDb db(txc.DB); ChangeRecords.clear(); - if (!DataShard.LoadChangeRecords(db, ChangeRecords)) { - return EExecutionStatus::Restart; - } + auto changesQueue = DataShard.TakeChangesQueue(); auto lockChangeRecords = DataShard.TakeLockChangeRecords(); auto committedLockChangeRecords = DataShard.TakeCommittedLockChangeRecords(); + if (!DataShard.LoadChangeRecords(db, ChangeRecords)) { + DataShard.SetChangesQueue(std::move(changesQueue)); + DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); + DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); + return EExecutionStatus::Restart; + } + if (!DataShard.LoadLockChangeRecords(db)) { + DataShard.SetChangesQueue(std::move(changesQueue)); DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); return EExecutionStatus::Restart; } if (!DataShard.LoadChangeRecordCommits(db, ChangeRecords)) { + DataShard.SetChangesQueue(std::move(changesQueue)); DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); return EExecutionStatus::Restart; @@ -99,7 +106,7 @@ class TMoveIndexUnit : public TExecutionUnit { void Complete(TOperation::TPtr, const TActorContext& ctx) override { DataShard.CreateChangeSender(ctx); DataShard.MaybeActivateChangeSender(ctx); - DataShard.EnqueueChangeRecords(std::move(ChangeRecords)); + DataShard.EnqueueChangeRecords(std::move(ChangeRecords), 0, true); } }; diff --git a/ydb/core/tx/datashard/move_table_unit.cpp b/ydb/core/tx/datashard/move_table_unit.cpp index 846f517ee10a..3e34394e15d6 100644 --- a/ydb/core/tx/datashard/move_table_unit.cpp +++ b/ydb/core/tx/datashard/move_table_unit.cpp @@ -60,20 +60,27 @@ class TMoveTableUnit : public TExecutionUnit { NIceDb::TNiceDb db(txc.DB); ChangeRecords.clear(); - if (!DataShard.LoadChangeRecords(db, ChangeRecords)) { - return EExecutionStatus::Restart; - } + auto changesQueue = DataShard.TakeChangesQueue(); auto lockChangeRecords = DataShard.TakeLockChangeRecords(); auto committedLockChangeRecords = DataShard.TakeCommittedLockChangeRecords(); + if (!DataShard.LoadChangeRecords(db, ChangeRecords)) { + DataShard.SetChangesQueue(std::move(changesQueue)); + DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); + DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); + return EExecutionStatus::Restart; + } + if (!DataShard.LoadLockChangeRecords(db)) { + DataShard.SetChangesQueue(std::move(changesQueue)); DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); return EExecutionStatus::Restart; } if (!DataShard.LoadChangeRecordCommits(db, ChangeRecords)) { + DataShard.SetChangesQueue(std::move(changesQueue)); DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); return EExecutionStatus::Restart; @@ -99,7 +106,7 @@ class TMoveTableUnit : public TExecutionUnit { void Complete(TOperation::TPtr, const TActorContext& ctx) override { DataShard.CreateChangeSender(ctx); DataShard.MaybeActivateChangeSender(ctx); - DataShard.EnqueueChangeRecords(std::move(ChangeRecords)); + DataShard.EnqueueChangeRecords(std::move(ChangeRecords), 0, true); } }; diff --git a/ydb/core/tx/datashard/remove_schema_snapshots.cpp b/ydb/core/tx/datashard/remove_schema_snapshots.cpp new file mode 100644 index 000000000000..fe63f30be61d --- /dev/null +++ b/ydb/core/tx/datashard/remove_schema_snapshots.cpp @@ -0,0 +1,54 @@ +#include "datashard_impl.h" + +namespace NKikimr::NDataShard { + +class TDataShard::TTxRemoveSchemaSnapshots: public NTabletFlatExecutor::TTransactionBase { +public: + TTxRemoveSchemaSnapshots(TDataShard* self) + : TBase(self) + { } + + TTxType GetTxType() const override { return TXTYPE_REMOVE_SCHEMA_SNAPSHOTS; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + while (!Self->PendingSchemaSnapshotsToGc.empty()) { + const auto key = Self->PendingSchemaSnapshotsToGc.back(); + const auto* snapshot = Self->GetSchemaSnapshotManager().FindSnapshot(key); + + if (!snapshot) { + Self->PendingSchemaSnapshotsToGc.pop_back(); + continue; + } + + if (Self->GetSchemaSnapshotManager().HasReference(key)) { + Self->PendingSchemaSnapshotsToGc.pop_back(); + continue; + } + + auto table = Self->FindUserTable(TPathId(key.OwnerId, key.PathId)); + if (!table) { + Self->PendingSchemaSnapshotsToGc.pop_back(); + continue; + } + + if (snapshot->Schema->GetTableSchemaVersion() >= table->GetTableSchemaVersion()) { + Self->PendingSchemaSnapshotsToGc.pop_back(); + continue; + } + + Self->GetSchemaSnapshotManager().RemoveShapshot(txc.DB, key); + Self->PendingSchemaSnapshotsToGc.pop_back(); + } + + return true; + } + + void Complete(const TActorContext&) override { + } +}; + +void TDataShard::Handle(TEvPrivate::TEvRemoveSchemaSnapshots::TPtr&, const TActorContext& ctx) { + Execute(new TTxRemoveSchemaSnapshots(this), ctx); +} + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index 49541b974798..7ed4a725816c 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -185,6 +185,7 @@ SRCS( remove_lock_change_records.cpp remove_locks.cpp range_avl_tree.cpp + remove_schema_snapshots.cpp range_ops.cpp range_treap.cpp read_iterator.h From 5e5b06c44933fd7b504dc8e4424d83850ac5c268 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Fri, 12 Jul 2024 13:13:32 +0300 Subject: [PATCH 6/9] Fix bug with broadcasting records (#6585) --- .../change_sender_common_ops.cpp | 2 +- .../ut_cdc_stream_reboots.cpp | 168 +++++++++++------- 2 files changed, 109 insertions(+), 61 deletions(-) diff --git a/ydb/core/change_exchange/change_sender_common_ops.cpp b/ydb/core/change_exchange/change_sender_common_ops.cpp index 8b10355730a7..89cfb4e5af30 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.cpp +++ b/ydb/core/change_exchange/change_sender_common_ops.cpp @@ -351,7 +351,7 @@ bool TBaseChangeSender::AddBroadcastPartition(ui64 order, ui64 partitionId) { Y_ABORT_UNLESS(it != Broadcasting.end()); auto& broadcast = it->second; - if (broadcast.Partitions.contains(partitionId)) { + if (broadcast.CompletedPartitions.contains(partitionId)) { return false; } diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp index 7310f0baeef1..6bf632ed83c8 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp @@ -518,68 +518,77 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { }); } + bool CheckRegistrations(TTestActorRuntime& runtime, NKikimrPQ::TMessageGroupInfo::EState expectedState, + const google::protobuf::RepeatedPtrField& tablePartitions, + const google::protobuf::RepeatedPtrField& topicPartitions) + { + for (const auto& topicPartition : topicPartitions) { + auto request = MakeHolder(); + { + auto& record = *request->Record.MutablePartitionRequest(); + record.SetPartition(topicPartition.GetPartitionId()); + auto& cmd = *record.MutableCmdGetMaxSeqNo(); + for (const auto& tablePartition : tablePartitions) { + cmd.AddSourceId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(tablePartition.GetDatashardId()))); + } + } + + const auto& sender = runtime.AllocateEdgeActor(); + ForwardToTablet(runtime, topicPartition.GetTabletId(), sender, request.Release()); + + auto response = runtime.GrabEdgeEvent(sender); + { + const auto& record = response->Get()->Record.GetPartitionResponse(); + const auto& result = record.GetCmdGetMaxSeqNoResult().GetSourceIdInfo(); + + UNIT_ASSERT_VALUES_EQUAL(result.size(), tablePartitions.size()); + for (const auto& item: result) { + if (item.GetState() != expectedState) { + return false; + } + } + } + } + + return true; + } + struct TItem { TString Path; - ui32 nPartitions; + ui32 ExpectedPartitionCount; }; - void CheckRegistrations(TTestActorRuntime& runtime, const TItem& table, const TItem& topic) { + void CheckRegistrations(TTestActorRuntime& runtime, const TItem& table, const TItem& topic, + const google::protobuf::RepeatedPtrField* initialTablePartitions = nullptr) + { auto tableDesc = DescribePath(runtime, table.Path, true, true); const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions(); - UNIT_ASSERT_VALUES_EQUAL(tablePartitions.size(), table.nPartitions); + UNIT_ASSERT_VALUES_EQUAL(tablePartitions.size(), table.ExpectedPartitionCount); auto topicDesc = DescribePrivatePath(runtime, topic.Path); const auto& topicPartitions = topicDesc.GetPathDescription().GetPersQueueGroup().GetPartitions(); - UNIT_ASSERT_VALUES_EQUAL(topicPartitions.size(), topic.nPartitions); + UNIT_ASSERT_VALUES_EQUAL(topicPartitions.size(), topic.ExpectedPartitionCount); while (true) { runtime.SimulateSleep(TDuration::Seconds(1)); - bool done = true; - - for (ui32 i = 0; i < topic.nPartitions; ++i) { - auto request = MakeHolder(); - { - auto& record = *request->Record.MutablePartitionRequest(); - record.SetPartition(topicPartitions[i].GetPartitionId()); - auto& cmd = *record.MutableCmdGetMaxSeqNo(); - for (const auto& tablePartition : tablePartitions) { - cmd.AddSourceId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(tablePartition.GetDatashardId()))); - } - } - - const auto& sender = runtime.AllocateEdgeActor(); - ForwardToTablet(runtime, topicPartitions[i].GetTabletId(), sender, request.Release()); - - auto response = runtime.GrabEdgeEvent(sender); - { - const auto& record = response->Get()->Record.GetPartitionResponse(); - const auto& result = record.GetCmdGetMaxSeqNoResult().GetSourceIdInfo(); - - UNIT_ASSERT_VALUES_EQUAL(result.size(), table.nPartitions); - for (const auto& item: result) { - done &= item.GetState() == NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED; - if (!done) { - break; - } - } - } - - if (!done) { - break; - } - } - - if (done) { + if (CheckRegistrations(runtime, NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED, tablePartitions, topicPartitions)) { break; } } + + if (initialTablePartitions) { + UNIT_ASSERT(CheckRegistrations(runtime, NKikimrPQ::TMessageGroupInfo::STATE_UNKNOWN, *initialTablePartitions, topicPartitions)); + } } - Y_UNIT_TEST_WITH_REBOOTS(SplitTable) { + template + void SplitTable(const TString& cdcStreamDesc) { T t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + NKikimrScheme::TEvDescribeSchemeResult initialTableDesc; { TInactiveZone inactive(activeZone); + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Uint32" } @@ -587,15 +596,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { KeyColumnNames: ["key"] )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); + initialTableDesc = DescribePath(runtime, "/MyRoot/Table", true, true); - TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"( - TableName: "Table" - StreamDescription { - Name: "Stream" - Mode: ECdcStreamModeKeysOnly - Format: ECdcStreamFormatProto - } - )"); + TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", cdcStreamDesc); t.TestEnv->TestWaitNotification(runtime, t.TxId); } @@ -613,16 +616,43 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { TInactiveZone inactive(activeZone); UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {1}); UploadRows(runtime, "/MyRoot/Table", 1, {1}, {2}, {Max()}); - CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1}); + CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1}, + &initialTableDesc.GetPathDescription().GetTablePartitions()); } }); } - Y_UNIT_TEST_WITH_REBOOTS(MergeTable) { + Y_UNIT_TEST_WITH_REBOOTS(SplitTable) { + SplitTable(R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + } + + Y_UNIT_TEST_WITH_REBOOTS(SplitTableResolvedTimestamps) { + SplitTable(R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + ResolvedTimestampsIntervalMs: 1000 + } + )"); + } + + template + void MergeTable(const TString& cdcStreamDesc) { T t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + NKikimrScheme::TEvDescribeSchemeResult initialTableDesc; { TInactiveZone inactive(activeZone); + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Uint32" } @@ -636,15 +666,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { } )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); + initialTableDesc = DescribePath(runtime, "/MyRoot/Table", true, true); - TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"( - TableName: "Table" - StreamDescription { - Name: "Stream" - Mode: ECdcStreamModeKeysOnly - Format: ECdcStreamFormatProto - } - )"); + TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", cdcStreamDesc); t.TestEnv->TestWaitNotification(runtime, t.TxId); } @@ -657,11 +681,35 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { { TInactiveZone inactive(activeZone); UploadRows(runtime, "/MyRoot/Table", 0, {1}, {2}, {1, Max()}); - CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2}); + CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2}, + &initialTableDesc.GetPathDescription().GetTablePartitions()); } }); } + Y_UNIT_TEST_WITH_REBOOTS(MergeTable) { + MergeTable(R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + } + + Y_UNIT_TEST_WITH_REBOOTS(MergeTableResolvedTimestamps) { + MergeTable(R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + ResolvedTimestampsIntervalMs: 1000 + } + )"); + } + Y_UNIT_TEST_WITH_REBOOTS(RacySplitTableAndCreateStream) { T t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { From f651117f01c981d362aab2eae1819b4fdf762492 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Fri, 12 Jul 2024 12:28:59 +0300 Subject: [PATCH 7/9] Continue to emit resolved timestamps after merge (#6594) --- .../persqueue/partition_sourcemanager.cpp | 3 +- ydb/core/persqueue/sourceid.h | 4 + .../datashard_ut_change_exchange.cpp | 88 +++++++++++++++++++ 3 files changed, 94 insertions(+), 1 deletion(-) diff --git a/ydb/core/persqueue/partition_sourcemanager.cpp b/ydb/core/persqueue/partition_sourcemanager.cpp index b0770f1f901c..0038c6742db2 100644 --- a/ydb/core/persqueue/partition_sourcemanager.cpp +++ b/ydb/core/persqueue/partition_sourcemanager.cpp @@ -81,7 +81,8 @@ void TPartitionSourceManager::TModificationBatch::Cancel() { } bool TPartitionSourceManager::TModificationBatch::HasModifications() const { - return !SourceIdWriter.GetSourceIdsToWrite().empty(); + return !SourceIdWriter.GetSourceIdsToWrite().empty() + || !SourceIdWriter.GetSourceIdsToDelete().empty(); } void TPartitionSourceManager::TModificationBatch::FillRequest(TEvKeyValue::TEvRequest* request) { diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h index 19d9b908f6db..ba19f1e02a56 100644 --- a/ydb/core/persqueue/sourceid.h +++ b/ydb/core/persqueue/sourceid.h @@ -122,6 +122,10 @@ class TSourceIdWriter { return Registrations; } + const THashSet& GetSourceIdsToDelete() const { + return Deregistrations; + } + template void RegisterSourceId(const TString& sourceId, Args&&... args) { Registrations[sourceId] = TSourceIdInfo(std::forward(args)...); diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 1b0d931a8719..c960b77f4698 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -3434,6 +3434,94 @@ Y_UNIT_TEST_SUITE(Cdc) { MustNotLoseSchemaSnapshot(true); } + Y_UNIT_TEST(ResolvedTimestampsContinueAfterMerge) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + SetSplitMergePartCountLimit(&runtime, -1); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))); + + Cerr << "... prepare" << Endl; + { + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"resolved":"***"})", + }); + + auto tabletIds = GetTableShards(server, edgeActor, "/Root/Table"); + UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1); + + WaitTxNotification(server, edgeActor, AsyncSplitTable(server, edgeActor, "/Root/Table", tabletIds.at(0), 2)); + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"resolved":"***"})", + R"({"resolved":"***"})", + }); + } + + auto initialTabletIds = GetTableShards(server, edgeActor, "/Root/Table"); + UNIT_ASSERT_VALUES_EQUAL(initialTabletIds.size(), 2); + + std::vector> blockedSplitRequests; + auto blockSplitRequests = runtime.AddObserver([&](auto& ev) { + if (ev->Get()->Record.GetPartitionRequest().HasCmdSplitMessageGroup()) { + blockedSplitRequests.emplace_back(ev.Release()); + } + }); + + Cerr << "... merge table" << Endl; + const auto mergeTxId = AsyncMergeTable(server, edgeActor, "/Root/Table", initialTabletIds); + WaitFor(runtime, [&]{ return blockedSplitRequests.size() == initialTabletIds.size(); }, "blocked split requests"); + blockSplitRequests.Remove(); + + std::vector> blockedRegisterRequests; + auto blockRegisterRequests = runtime.AddObserver([&](auto& ev) { + if (ev->Get()->Record.GetPartitionRequest().HasCmdRegisterMessageGroup()) { + blockedRegisterRequests.emplace_back(ev.Release()); + } + }); + + ui32 splitResponses = 0; + auto countSplitResponses = runtime.AddObserver([&](auto& ev) { + ++splitResponses; + }); + + Cerr << "... release split requests" << Endl; + for (auto& ev : std::exchange(blockedSplitRequests, {})) { + runtime.Send(ev.release(), 0, true); + WaitFor(runtime, [prev = splitResponses, &splitResponses]{ return splitResponses > prev; }, "split response"); + } + + Cerr << "... reboot pq tablet" << Endl; + RebootTablet(runtime, ResolvePqTablet(runtime, edgeActor, "/Root/Table/Stream", 0), edgeActor); + countSplitResponses.Remove(); + + Cerr << "... release register requests" << Endl; + blockRegisterRequests.Remove(); + for (auto& ev : std::exchange(blockedRegisterRequests, {})) { + runtime.Send(ev.release(), 0, true); + } + + Cerr << "... wait for merge tx notification" << Endl; + WaitTxNotification(server, edgeActor, mergeTxId); + + Cerr << "... wait for final heartbeat" << Endl; + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"resolved":"***"})", + R"({"resolved":"***"})", + R"({"resolved":"***"})", + }); + } + } // Cdc } // NKikimr From ebb52703a60b478b978b9b7e2835531fbfdfd16f Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Fri, 12 Jul 2024 15:39:32 +0300 Subject: [PATCH 8/9] Fix test build (#6621) --- ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index c960b77f4698..0be548fc63df 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -3491,7 +3491,7 @@ Y_UNIT_TEST_SUITE(Cdc) { }); ui32 splitResponses = 0; - auto countSplitResponses = runtime.AddObserver([&](auto& ev) { + auto countSplitResponses = runtime.AddObserver([&](auto&) { ++splitResponses; }); From 651efd45d7edf899c544f02d7d3ea2933d436140 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Thu, 13 Jun 2024 20:47:01 +0300 Subject: [PATCH 9/9] Fix bug in records broadcasting (#5513) --- .../change_sender_common_ops.cpp | 2 +- .../datashard_ut_change_exchange.cpp | 40 +++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/ydb/core/change_exchange/change_sender_common_ops.cpp b/ydb/core/change_exchange/change_sender_common_ops.cpp index 89cfb4e5af30..3d189ee6b4d1 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.cpp +++ b/ydb/core/change_exchange/change_sender_common_ops.cpp @@ -197,7 +197,7 @@ void TBaseChangeSender::SendRecords() { EraseNodesIf(broadcast.PendingPartitions, [&](ui64 partitionId) { if (Senders.contains(partitionId)) { auto& sender = Senders.at(partitionId); - sender.Prepared.push_back(std::move(it->second)); + sender.Prepared.push_back(it->second); if (!sender.ActorId) { Y_ABORT_UNLESS(!sender.Ready); registrations.insert(partitionId); diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 0be548fc63df..da7f5f397cb2 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -3147,6 +3148,45 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(ResolvedTimestampsMultiplePartitions) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", TShardedTableOptions().Shards(2)); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))); + + TVector>> records(2); // partition to records + while (true) { + for (ui32 i = 0; i < records.size(); ++i) { + records[i] = GetRecords(*server->GetRuntime(), edgeActor, "/Root/Table/Stream", i); + } + + if (AllOf(records, [](const auto& x) { return !x.empty(); })) { + break; + } + + SimulateSleep(server, TDuration::Seconds(1)); + } + + UNIT_ASSERT(records.size() > 1); + UNIT_ASSERT(!records[0].empty()); + AssertJsonsEqual(records[0][0].second, R"({"resolved":"***"})"); + + for (ui32 i = 1; i < records.size(); ++i) { + UNIT_ASSERT_VALUES_EQUAL(records[i][0].second, records[0][0].second); + } + } + Y_UNIT_TEST(InitialScanAndResolvedTimestamps) { TPortManager portManager; TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())