From 2ad4e786fe1869aa8d88bef47db72fb6a20f71fe Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Mon, 26 Aug 2024 17:47:57 +0300 Subject: [PATCH] Async replication: configurable default retention period (#8270) --- ydb/core/base/appdata.cpp | 3 +++ ydb/core/base/appdata_fwd.h | 5 +++++ ydb/core/driver_lib/run/run.cpp | 4 ++++ ydb/core/protos/config.proto | 2 ++ ydb/core/protos/console_config.proto | 1 + ydb/core/protos/replication.proto | 4 ++++ .../replication/controller/stream_creator.cpp | 20 +++++++++++++------ .../replication/controller/stream_creator.h | 3 ++- 8 files changed, 35 insertions(+), 7 deletions(-) diff --git a/ydb/core/base/appdata.cpp b/ydb/core/base/appdata.cpp index ca40e8537151..97737c602414 100644 --- a/ydb/core/base/appdata.cpp +++ b/ydb/core/base/appdata.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -61,6 +62,7 @@ struct TAppData::TImpl { NKikimrConfig::TGraphConfig GraphConfig; NKikimrSharedCache::TSharedCacheConfig SharedCacheConfig; NKikimrConfig::TMetadataCacheConfig MetadataCacheConfig; + NKikimrReplication::TReplicationDefaults ReplicationConfig; }; TAppData::TAppData( @@ -113,6 +115,7 @@ TAppData::TAppData( , GraphConfig(Impl->GraphConfig) , SharedCacheConfig(Impl->SharedCacheConfig) , MetadataCacheConfig(Impl->MetadataCacheConfig) + , ReplicationConfig(Impl->ReplicationConfig) , KikimrShouldContinue(kikimrShouldContinue) {} diff --git a/ydb/core/base/appdata_fwd.h b/ydb/core/base/appdata_fwd.h index 78c78590ef15..f22a246d2cd1 100644 --- a/ydb/core/base/appdata_fwd.h +++ b/ydb/core/base/appdata_fwd.h @@ -65,6 +65,10 @@ namespace NKikimrConfig { class TMetadataCacheConfig; } +namespace NKikimrReplication { + class TReplicationDefaults; +} + namespace NKikimrNetClassifier { class TNetClassifierDistributableConfig; class TNetClassifierConfig; @@ -213,6 +217,7 @@ struct TAppData { NKikimrConfig::TGraphConfig& GraphConfig; NKikimrSharedCache::TSharedCacheConfig& SharedCacheConfig; NKikimrConfig::TMetadataCacheConfig& MetadataCacheConfig; + NKikimrReplication::TReplicationDefaults& ReplicationConfig; bool EnforceUserTokenRequirement = false; bool EnforceUserTokenCheckRequirement = false; // check token if it was specified bool AllowHugeKeyValueDeletes = true; // delete when all clients limit deletes per request diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 5996acf0166d..5e9c23d634e5 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1126,6 +1126,10 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig) AppData->MetadataCacheConfig.CopyFrom(runConfig.AppConfig.GetMetadataCacheConfig()); } + if (runConfig.AppConfig.HasReplicationConfig()) { + AppData->ReplicationConfig = runConfig.AppConfig.GetReplicationConfig(); + } + // setup resource profiles AppData->ResourceProfiles = new TResourceProfiles; if (runConfig.AppConfig.GetBootstrapConfig().ResourceProfilesSize()) diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index e80d14c4a5df..cf823c864767 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -24,6 +24,7 @@ import "ydb/core/protos/local.proto"; import "ydb/core/protos/netclassifier.proto"; import "ydb/core/protos/node_broker.proto"; import "ydb/core/protos/pqconfig.proto"; +import "ydb/core/protos/replication.proto"; import "ydb/core/protos/resource_broker.proto"; import "ydb/core/protos/shared_cache.proto"; import "ydb/core/protos/stream.proto"; @@ -1902,6 +1903,7 @@ message TAppConfig { optional TBlobCacheConfig BlobCacheConfig = 78; optional TLimiterConfig CompDiskLimiterConfig = 79; optional TMetadataCacheConfig MetadataCacheConfig = 80; + optional NKikimrReplication.TReplicationDefaults ReplicationConfig = 83; repeated TNamedConfig NamedConfigs = 100; optional string ClusterYamlConfig = 101; diff --git a/ydb/core/protos/console_config.proto b/ydb/core/protos/console_config.proto index f1751c5a5072..7c73e760f3fa 100644 --- a/ydb/core/protos/console_config.proto +++ b/ydb/core/protos/console_config.proto @@ -139,6 +139,7 @@ message TConfigItem { S3ProxyResolverConfigItem = 76; BackgroundCleaningConfigItem = 77; MetadataCacheConfigItem = 80; + ReplicationConfigItem = 83; NamedConfigsItem = 100; ClusterYamlConfigItem = 101; diff --git a/ydb/core/protos/replication.proto b/ydb/core/protos/replication.proto index 63571ce6fa55..3764b0511c53 100644 --- a/ydb/core/protos/replication.proto +++ b/ydb/core/protos/replication.proto @@ -5,6 +5,10 @@ import "ydb/public/api/protos/ydb_issue_message.proto"; package NKikimrReplication; option java_package = "ru.yandex.kikimr.proto"; +message TReplicationDefaults { + optional int32 RetentionPeriodSeconds = 1 [default = 86400]; // 1d +} + message TStaticCredentials { optional string User = 1; optional string Password = 2 [(Ydb.sensitive) = true]; diff --git a/ydb/core/tx/replication/controller/stream_creator.cpp b/ydb/core/tx/replication/controller/stream_creator.cpp index 086c2892a381..7603154d5443 100644 --- a/ydb/core/tx/replication/controller/stream_creator.cpp +++ b/ydb/core/tx/replication/controller/stream_creator.cpp @@ -4,7 +4,9 @@ #include "target_with_stream.h" #include "util.h" +#include #include +#include #include #include #include @@ -16,9 +18,12 @@ namespace NKikimr::NReplication::NController { class TStreamCreator: public TActorBootstrapped { - static NYdb::NTable::TChangefeedDescription MakeChangefeed(const TString& name, const NJson::TJsonMap& attrs) { + static NYdb::NTable::TChangefeedDescription MakeChangefeed( + const TString& name, const TDuration& retentionPeriod, const NJson::TJsonMap& attrs) + { using namespace NYdb::NTable; return TChangefeedDescription(name, EChangefeedMode::Updates, EChangefeedFormat::Json) + .WithRetentionPeriod(retentionPeriod) .WithInitialScan() .AddAttribute("__async_replication", NJson::WriteJson(attrs, false)); } @@ -133,14 +138,15 @@ class TStreamCreator: public TActorBootstrapped { TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath, - const TString& streamName) + const TString& streamName, + const TDuration& streamRetentionPeriod) : Parent(parent) , YdbProxy(proxy) , ReplicationId(rid) , TargetId(tid) , Kind(kind) , SrcPath(srcPath) - , Changefeed(MakeChangefeed(streamName, NJson::TJsonMap{ + , Changefeed(MakeChangefeed(streamName, streamRetentionPeriod, NJson::TJsonMap{ {"path", dstPath}, {"id", ToString(rid)}, })) @@ -175,13 +181,15 @@ IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TAct Y_ABORT_UNLESS(target); return CreateStreamCreator(ctx.SelfID, replication->GetYdbProxy(), replication->GetId(), target->GetId(), target->GetKind(), - target->GetSrcPath(), target->GetDstPath(), target->GetStreamName()); + target->GetSrcPath(), target->GetDstPath(), target->GetStreamName(), + TDuration::Seconds(AppData()->ReplicationConfig.GetRetentionPeriodSeconds())); } IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid, - TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath, const TString& streamName) + TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath, + const TString& streamName, const TDuration& streamRetentionPeriod) { - return new TStreamCreator(parent, proxy, rid, tid, kind, srcPath, dstPath, streamName); + return new TStreamCreator(parent, proxy, rid, tid, kind, srcPath, dstPath, streamName, streamRetentionPeriod); } } diff --git a/ydb/core/tx/replication/controller/stream_creator.h b/ydb/core/tx/replication/controller/stream_creator.h index 94eca13b4554..1eca930efb6b 100644 --- a/ydb/core/tx/replication/controller/stream_creator.h +++ b/ydb/core/tx/replication/controller/stream_creator.h @@ -6,6 +6,7 @@ namespace NKikimr::NReplication::NController { IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TActorContext& ctx); IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid, - TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath, const TString& streamName); + TReplication::ETargetKind kind, const TString& srcPath, const TString& dstPath, + const TString& streamName, const TDuration& streamRetentionPeriod); }