Skip to content

Commit

Permalink
24-1-async-replication: Sync with main (#6618)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Jul 14, 2024
1 parent 3f6c7e3 commit f2c77f7
Show file tree
Hide file tree
Showing 49 changed files with 1,287 additions and 189 deletions.
2 changes: 1 addition & 1 deletion ydb/core/base/services_assert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
#include <ydb/library/services/services.pb.h>

static_assert(static_cast<ui32>(NKikimrServices::EServiceKikimr_MIN) > static_cast<ui32>(NActorsServices::EServiceCommon_MAX), "KIKIMR SERVICES IDs SHOULD BE GREATER THAN COMMON ONES");
static_assert(NKikimrServices::TActivity::EType_ARRAYSIZE < 640, "ACTOR ACTIVITY TYPES MUST BE NOT VERY BIG TO BE ARRAY INDICES"); // If we would have many different actor activities, it is OK to increase this value.
static_assert(NKikimrServices::TActivity::EType_ARRAYSIZE < 768, "ACTOR ACTIVITY TYPES MUST BE NOT VERY BIG TO BE ARRAY INDICES"); // If we would have many different actor activities, it is OK to increase this value.
4 changes: 2 additions & 2 deletions ydb/core/change_exchange/change_sender_common_ops.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ void TBaseChangeSender::SendRecords() {
EraseNodesIf(broadcast.PendingPartitions, [&](ui64 partitionId) {
if (Senders.contains(partitionId)) {
auto& sender = Senders.at(partitionId);
sender.Prepared.push_back(std::move(it->second));
sender.Prepared.push_back(it->second);
if (!sender.ActorId) {
Y_ABORT_UNLESS(!sender.Ready);
registrations.insert(partitionId);
Expand Down Expand Up @@ -351,7 +351,7 @@ bool TBaseChangeSender::AddBroadcastPartition(ui64 order, ui64 partitionId) {
Y_ABORT_UNLESS(it != Broadcasting.end());

auto& broadcast = it->second;
if (broadcast.Partitions.contains(partitionId)) {
if (broadcast.CompletedPartitions.contains(partitionId)) {
return false;
}

Expand Down
13 changes: 12 additions & 1 deletion ydb/core/grpc_services/rpc_replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio

auto ev = std::make_unique<NReplication::TEvController::TEvDescribeReplication>();
PathIdFromPathId(pathId, ev->Record.MutablePathId());
ev->Record.SetIncludeStats(GetProtoRequest()->include_stats());

NTabletPipe::SendData(SelfId(), ControllerPipeClient, ev.release());
Become(&TDescribeReplicationRPC::StateDescribeReplication);
Expand Down Expand Up @@ -167,16 +168,26 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
if (from.HasSrcStreamName()) {
to.set_source_changefeed_name(from.GetSrcStreamName());
}
if (from.HasLagMilliSeconds()) {
*to.mutable_stats()->mutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration(
from.GetLagMilliSeconds());
}
if (from.HasInitialScanProgress()) {
to.mutable_stats()->set_initial_scan_progress(from.GetInitialScanProgress());
}
}

static void ConvertState(NKikimrReplication::TReplicationState& from, Ydb::Replication::DescribeReplicationResult& to) {
switch (from.GetStateCase()) {
case NKikimrReplication::TReplicationState::kStandBy:
to.mutable_running();
if (from.GetStandBy().HasLagMilliSeconds()) {
*to.mutable_running()->mutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration(
*to.mutable_running()->mutable_stats()->mutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration(
from.GetStandBy().GetLagMilliSeconds());
}
if (from.GetStandBy().HasInitialScanProgress()) {
to.mutable_running()->mutable_stats()->set_initial_scan_progress(from.GetStandBy().GetInitialScanProgress());
}
break;
case NKikimrReplication::TReplicationState::kError:
*to.mutable_error()->mutable_issues() = std::move(*from.MutableError()->MutableIssues());
Expand Down
31 changes: 31 additions & 0 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5275,7 +5275,10 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
}

Y_UNIT_TEST(CreateAsyncReplicationWithSecret) {
using namespace NReplication;

TKikimrRunner kikimr("root@builtin");
auto repl = TReplicationClient(kikimr.GetDriver(), TCommonClientSettings().Database("/Root"));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -5319,6 +5322,34 @@ Y_UNIT_TEST_SUITE(KqpScheme) {

Sleep(TDuration::Seconds(1));
}

while (true) {
auto settings = TDescribeReplicationSettings().IncludeStats(true);
const auto result = repl.DescribeReplication("/Root/replication", settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

const auto& desc = result.GetReplicationDescription();
UNIT_ASSERT_VALUES_EQUAL(desc.GetState(), TReplicationDescription::EState::Running);

const auto& total = desc.GetRunningState().GetStats();
if (!total.GetInitialScanProgress() || *total.GetInitialScanProgress() < 100) {
Sleep(TDuration::Seconds(1));
continue;
}

UNIT_ASSERT(total.GetInitialScanProgress());
UNIT_ASSERT_DOUBLES_EQUAL(*total.GetInitialScanProgress(), 100.0, 0.01);

const auto& items = desc.GetItems();
UNIT_ASSERT_VALUES_EQUAL(items.size(), 1);
const auto& item = items.at(0).Stats;

UNIT_ASSERT(item.GetInitialScanProgress());
UNIT_ASSERT_DOUBLES_EQUAL(*item.GetInitialScanProgress(), *total.GetInitialScanProgress(), 0.01);

// TODO: check lag too
break;
}
}

Y_UNIT_TEST(AlterAsyncReplication) {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/persqueue/partition_sourcemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ void TPartitionSourceManager::TModificationBatch::Cancel() {
}

bool TPartitionSourceManager::TModificationBatch::HasModifications() const {
return !SourceIdWriter.GetSourceIdsToWrite().empty();
return !SourceIdWriter.GetSourceIdsToWrite().empty()
|| !SourceIdWriter.GetSourceIdsToDelete().empty();
}

void TPartitionSourceManager::TModificationBatch::FillRequest(TEvKeyValue::TEvRequest* request) {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/persqueue/sourceid.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ class TSourceIdWriter {
return Registrations;
}

const THashSet<TString>& GetSourceIdsToDelete() const {
return Deregistrations;
}

template <typename... Args>
void RegisterSourceId(const TString& sourceId, Args&&... args) {
Registrations[sourceId] = TSourceIdInfo(std::forward<Args>(args)...);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/counters_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -486,4 +486,5 @@ enum ETxTypes {
TXTYPE_CLEANUP_VOLATILE = 80 [(TxTypeOpts) = {Name: "TxCleanupVolatile"}];
TXTYPE_PLAN_PREDICTED_TXS = 81 [(TxTypeOpts) = {Name: "TxPlanPredictedTxs"}];
TXTYPE_WRITE = 82 [(TxTypeOpts) = {Name: "TxWrite"}];
TXTYPE_REMOVE_SCHEMA_SNAPSHOTS = 83 [(TxTypeOpts) = {Name: "TxRemoveSchemaSnapshots"}];
}
6 changes: 6 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,11 @@ enum ECdcStreamFormat {
ECdcStreamFormatDebeziumJson = 4;
}

message TCdcStreamScanProgress {
optional uint32 ShardsTotal = 1;
optional uint32 ShardsCompleted = 2;
}

message TCdcStreamDescription {
optional string Name = 1;
optional ECdcStreamMode Mode = 2;
Expand All @@ -820,6 +825,7 @@ message TCdcStreamDescription {
optional string AwsRegion = 9;
// Set to '0' to disable resolved timestamps
optional uint64 ResolvedTimestampsIntervalMs = 10;
optional TCdcStreamScanProgress ScanProgress = 11;
}

message TCreateCdcStream {
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/protos/replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ message TReplicationConfig {

message TTargetSpecific {
message TTarget {
// in/out
optional string SrcPath = 1;
optional string DstPath = 2;
optional string SrcStreamName = 3;
// out
optional uint64 Id = 4;
optional uint32 LagMilliSeconds = 5;
optional float InitialScanProgress = 6; // pencentage
}

repeated TTarget Targets = 1;
Expand All @@ -59,6 +63,7 @@ message TReplicationConfig {
message TReplicationState {
message TStandBy {
optional uint32 LagMilliSeconds = 1;
optional float InitialScanProgress = 2; // pencentage
}

message TPaused {
Expand Down Expand Up @@ -146,6 +151,7 @@ message TEvDropReplicationResult {

message TEvDescribeReplication {
optional NKikimrProto.TPathID PathId = 1;
optional bool IncludeStats = 2;
}

message TEvDescribeReplicationResult {
Expand Down
Loading

0 comments on commit f2c77f7

Please sign in to comment.