diff --git a/ydb/core/change_exchange/change_sender.cpp b/ydb/core/change_exchange/change_sender.cpp index 183584e2bb26..78a8baf4881b 100644 --- a/ydb/core/change_exchange/change_sender.cpp +++ b/ydb/core/change_exchange/change_sender.cpp @@ -10,6 +10,7 @@ namespace NKikimr::NChangeExchange { void TChangeSender::LazyCreateSender(THashMap& senders, ui64 partitionId) { + ++UninitSenders; auto res = senders.emplace(partitionId, TSender{}); Y_ABORT_UNLESS(res.second); @@ -27,6 +28,7 @@ void TChangeSender::RegisterSender(ui64 partitionId) { Y_ABORT_UNLESS(!sender.ActorId); sender.ActorId = ActorOps->RegisterWithSameMailbox(SenderFactory->CreateSender(partitionId)); + --UninitSenders; } void TChangeSender::CreateMissingSenders(const TVector& partitionIds) { diff --git a/ydb/core/change_exchange/change_sender.h b/ydb/core/change_exchange/change_sender.h index 1fb3a50dfa56..fd8e1ad387bf 100644 --- a/ydb/core/change_exchange/change_sender.h +++ b/ydb/core/change_exchange/change_sender.h @@ -174,6 +174,10 @@ class TChangeSender { return ReadySenders == Senders.size(); } + inline bool IsAllSendersReadyOrUninit() { + return ReadySenders + UninitSenders == Senders.size(); + } + void SetPartitionResolver(IPartitionResolverVisitor* partitionResolver) { PartitionResolver.Reset(partitionResolver); } @@ -194,6 +198,7 @@ class TChangeSender { THashMap Senders; // ui64 is partition id ui64 ReadySenders = 0; + ui64 UninitSenders = 0; TSet Enqueued; TSet PendingBody; TMap PendingSent; // ui64 is order diff --git a/ydb/core/tx/datashard/change_exchange_impl.h b/ydb/core/tx/datashard/change_exchange_impl.h index b6af2aea263e..faac6f42d072 100644 --- a/ydb/core/tx/datashard/change_exchange_impl.h +++ b/ydb/core/tx/datashard/change_exchange_impl.h @@ -8,6 +8,7 @@ namespace NDataShard { IActor* CreateAsyncIndexChangeSender(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId); IActor* CreateCdcStreamChangeSender(const TDataShardId& dataShard, const TPathId& streamPathId); +IActor* CreateIncrRestoreChangeSender(const TActorId& changeServerActor, const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& restoreTargetPathId); } // NDataShard } // NKikimr diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index d1ba342c3309..8e3fec8ce53b 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -1,371 +1,26 @@ #include "change_exchange.h" #include "change_exchange_impl.h" #include "change_record.h" +#include "change_sender_table_base.h" #include "datashard_impl.h" -#include #include -#include #include #include #include -#include -#include -#include #include #include #include +#include +#include #include namespace NKikimr::NDataShard { -using namespace NTable; using ESenderType = TEvChangeExchange::ESenderType; -class TBaseChangeSenderShard: public TActorBootstrapped { - TStringBuf GetLogPrefix() const { - if (!LogPrefix) { - LogPrefix = TStringBuilder() - << "[BaseChangeSenderShard]" - << "[" << DataShard.TabletId << ":" << DataShard.Generation << "]" - << "[" << ShardId << "]" - << SelfId() /* contains brackets */ << " "; - } - - return LogPrefix.GetRef(); - } - - /// GetProxyServices - - void GetProxyServices() { - Send(MakeTxProxyID(), new TEvTxUserProxy::TEvGetProxyServicesRequest); - Become(&TThis::StateGetProxyServices); - } - - STATEFN(StateGetProxyServices) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvTxUserProxy::TEvGetProxyServicesResponse, Handle); - default: - return StateBase(ev); - } - } - - void Handle(TEvTxUserProxy::TEvGetProxyServicesResponse::TPtr& ev) { - LOG_D("Handle " << ev->Get()->ToString()); - - LeaderPipeCache = ev->Get()->Services.LeaderPipeCache; - Handshake(); - } - - /// Handshake - - void Handshake() { - Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvConfirmReadonlyLease, 0, ++LeaseConfirmationCookie); - Become(&TThis::StateHandshake); - } - - STATEFN(StateHandshake) { - switch (ev->GetTypeRewrite()) { - hFunc(TDataShard::TEvPrivate::TEvReadonlyLeaseConfirmation, Handle); - hFunc(TEvChangeExchange::TEvStatus, Handshake); - default: - return StateBase(ev); - } - } - - void Handle(TDataShard::TEvPrivate::TEvReadonlyLeaseConfirmation::TPtr& ev) { - if (ev->Cookie != LeaseConfirmationCookie) { - LOG_W("Readonly lease confirmation cookie mismatch" - << ": expected# " << LeaseConfirmationCookie - << ", got# " << ev->Cookie); - return; - } - - auto handshake = MakeHolder(); - handshake->Record.SetOrigin(DataShard.TabletId); - handshake->Record.SetGeneration(DataShard.Generation); - - Send(LeaderPipeCache, new TEvPipeCache::TEvForward(handshake.Release(), ShardId, true)); - } - - void Handshake(TEvChangeExchange::TEvStatus::TPtr& ev) { - LOG_D("Handshake " << ev->Get()->ToString()); - - const auto& record = ev->Get()->Record; - switch (record.GetStatus()) { - case NKikimrChangeExchange::TEvStatus::STATUS_OK: - LastRecordOrder = record.GetLastRecordOrder(); - return Ready(); - default: - LOG_E("Handshake status" - << ": status# " << static_cast(record.GetStatus()) - << ", reason# " << static_cast(record.GetReason())); - return Leave(); - } - } - - void Ready() { - Send(Parent, new NChangeExchange::TEvChangeExchangePrivate::TEvReady(ShardId)); - Become(&TThis::StateWaitingRecords); - } - - /// WaitingRecords - - STATEFN(StateWaitingRecords) { - switch (ev->GetTypeRewrite()) { - hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); - default: - return StateBase(ev); - } - } - - class TSerializer: public NChangeExchange::TBaseVisitor { - NKikimrChangeExchange::TChangeRecord& Record; - - public: - explicit TSerializer(NKikimrChangeExchange::TChangeRecord& record) - : Record(record) - { - } - - void Visit(const TChangeRecord& record) override { - record.Serialize(Record); - } - }; - - void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) { - LOG_D("Handle " << ev->Get()->ToString()); - - auto records = MakeHolder(); - records->Record.SetOrigin(DataShard.TabletId); - records->Record.SetGeneration(DataShard.Generation); - - for (auto recordPtr : ev->Get()->Records) { - const auto& record = *recordPtr; - - if (record.GetOrder() <= LastRecordOrder) { - continue; - } - - auto& proto = *records->Record.AddRecords(); - TSerializer serializer(proto); - record.Accept(serializer); - Adjust(proto); - } - - if (!records->Record.RecordsSize()) { - return Ready(); - } - - Send(LeaderPipeCache, new TEvPipeCache::TEvForward(records.Release(), ShardId, false)); - Become(&TThis::StateWaitingStatus); - } - - void Adjust(NKikimrChangeExchange::TChangeRecord& record) const { - record.SetPathOwnerId(TargetTablePathId.OwnerId); - record.SetLocalPathId(TargetTablePathId.LocalPathId); - - Y_ABORT_UNLESS(record.HasAsyncIndex()); - AdjustTags(*record.MutableAsyncIndex()); - } - - void AdjustTags(NKikimrChangeExchange::TDataChange& record) const { - AdjustTags(*record.MutableKey()->MutableTags()); - - switch (record.GetRowOperationCase()) { - case NKikimrChangeExchange::TDataChange::kUpsert: - AdjustTags(*record.MutableUpsert()->MutableTags()); - break; - case NKikimrChangeExchange::TDataChange::kReset: - AdjustTags(*record.MutableReset()->MutableTags()); - break; - default: - break; - } - } - - void AdjustTags(google::protobuf::RepeatedField& tags) const { - for (int i = 0; i < tags.size(); ++i) { - auto it = TagMap.find(tags[i]); - Y_ABORT_UNLESS(it != TagMap.end()); - tags[i] = it->second; - } - } - - /// WaitingStatus - - STATEFN(StateWaitingStatus) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvChangeExchange::TEvStatus, Handle); - default: - return StateBase(ev); - } - } - - void Handle(TEvChangeExchange::TEvStatus::TPtr& ev) { - LOG_D("Handle " << ev->Get()->ToString()); - - const auto& record = ev->Get()->Record; - switch (record.GetStatus()) { - case NKikimrChangeExchange::TEvStatus::STATUS_OK: - LastRecordOrder = record.GetLastRecordOrder(); - return Ready(); - // TODO: REJECT? - default: - LOG_E("Apply status" - << ": status# " << static_cast(record.GetStatus()) - << ", reason# " << static_cast(record.GetReason())); - return Leave(); - } - } - - bool CanRetry() const { - if (CurrentStateFunc() != static_cast(&TThis::StateHandshake)) { - return false; - } - - return Attempt < MaxAttempts; - } - - void Retry() { - ++Attempt; - Delay = Min(2 * Delay, MaxDelay); - - LOG_N("Retry" - << ": attempt# " << Attempt - << ", delay# " << Delay); - - const auto random = TDuration::FromValue(TAppData::RandomProvider->GenRand64() % Delay.MicroSeconds()); - Schedule(Delay + random, new TEvents::TEvWakeup()); - } - - void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { - if (ShardId != ev->Get()->TabletId) { - return; - } - - if (CanRetry()) { - Unlink(); - Retry(); - } else { - Leave(); - } - } - - void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev) { - using namespace NChangeExchange; - - TStringStream html; - - HTML(html) { - Header(html, "Base partition change sender", DataShard.TabletId); - - SimplePanel(html, "Info", [this](IOutputStream& html) { - HTML(html) { - DL_CLASS("dl-horizontal") { - TermDescLink(html, "ShardId", ShardId, TabletPath(ShardId)); - TermDesc(html, "TargetTablePathId", TargetTablePathId); - TermDesc(html, "LeaderPipeCache", LeaderPipeCache); - TermDesc(html, "LastRecordOrder", LastRecordOrder); - } - } - }); - } - - Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(html.Str())); - } - - void Leave() { - Send(Parent, new NChangeExchange::TEvChangeExchangePrivate::TEvGone(ShardId)); - PassAway(); - } - - void Unlink() { - if (LeaderPipeCache) { - Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(ShardId)); - } - } - - void PassAway() override { - Unlink(); - TActorBootstrapped::PassAway(); - } - -public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::CHANGE_SENDER_ASYNC_INDEX_ACTOR_PARTITION; - } - - TBaseChangeSenderShard(const TActorId& parent, const TDataShardId& dataShard, ui64 shardId, - const TPathId& indexTablePathId, const TMap& tagMap) - : Parent(parent) - , DataShard(dataShard) - , ShardId(shardId) - , TargetTablePathId(indexTablePathId) - , TagMap(tagMap) - , LeaseConfirmationCookie(0) - , LastRecordOrder(0) - { - } - - void Bootstrap() { - GetProxyServices(); - } - - STATEFN(StateBase) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); - hFunc(NMon::TEvRemoteHttpInfo, Handle); - sFunc(TEvents::TEvWakeup, Handshake); - sFunc(TEvents::TEvPoison, PassAway); - } - } - -private: - const TActorId Parent; - const TDataShardId DataShard; - const ui64 ShardId; - const TPathId TargetTablePathId; - const TMap TagMap; // from main to index - mutable TMaybe LogPrefix; - - TActorId LeaderPipeCache; - ui64 LeaseConfirmationCookie; - ui64 LastRecordOrder; - - // Retry on delivery problem - static constexpr ui32 MaxAttempts = 3; - static constexpr auto MaxDelay = TDuration::MilliSeconds(50); - ui32 Attempt = 0; - TDuration Delay = TDuration::MilliSeconds(10); - -}; // TBaseChangeSenderShard - -#define DEFINE_STATE_INTRO \ - public: \ - struct TStateTag {}; \ - private: \ - const TDerived* AsDerived() const { \ - return static_cast(this); \ - } \ - TDerived* AsDerived() { \ - return static_cast(this); \ - } \ - TStringBuf GetLogPrefix() const { \ - return AsDerived()->GetLogPrefix(); \ - } - -#define USE_STATE(STATE) \ - friend class T ## STATE ## State; \ - STATEFN(State ## STATE) { \ - return T ## STATE ## State::State(ev); \ - } \ - bool Is ## STATE ## State() const { \ - return CurrentStateFunc() == static_cast(&TThis::State ## STATE); \ - } - template class TResolveIndexState : virtual public NSchemeCache::TSchemeCacheHelpers @@ -435,283 +90,6 @@ class TResolveIndexState } }; -template -class TResolveUserTableState - : virtual public NSchemeCache::TSchemeCacheHelpers -{ - DEFINE_STATE_INTRO; - -public: - void ResolveUserTable() { - auto request = MakeHolder(); - request->ResultSet.emplace_back(MakeNavigateEntry(AsDerived()->UserTableId, TNavigate::OpTable)); - - AsDerived()->Send(MakeSchemeCacheID(), new TEvNavigate(request.Release())); - AsDerived()->Become(&TDerived::StateResolveUserTable); - } - - STATEFN(State) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); - sFunc(TEvents::TEvWakeup, ResolveUserTable); - default: - return AsDerived()->StateBase(ev); - } - } - -private: - void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { - const auto& result = ev->Get()->Request; - - LOG_D("HandleUserTable TEvTxProxySchemeCache::TEvNavigateKeySetResult" - << ": result# " << (result ? result->ToString(*AppData()->TypeRegistry) : "nullptr")); - - if (!AsDerived()->CheckNotEmpty(result)) { - return; - } - - if (!AsDerived()->CheckEntriesCount(result, 1)) { - return; - } - - const auto& entry = result->ResultSet.at(0); - - if (!AsDerived()->CheckTableId(entry, AsDerived()->UserTableId)) { - return; - } - - if (!AsDerived()->CheckEntrySucceeded(entry)) { - return; - } - - if (!AsDerived()->CheckEntryKind(entry, TNavigate::KindTable)) { - return; - } - - for (const auto& [tag, column] : entry.Columns) { - Y_DEBUG_ABORT_UNLESS(!AsDerived()->MainColumnToTag.contains(column.Name)); - AsDerived()->MainColumnToTag.emplace(column.Name, tag); - } - - AsDerived()->NextState(TStateTag{}); - } -}; - -template -class TResolveTargetTableState - : virtual public NSchemeCache::TSchemeCacheHelpers -{ - DEFINE_STATE_INTRO; - -public: - void ResolveTargetTable() { - auto request = MakeHolder(); - request->ResultSet.emplace_back(MakeNavigateEntry(AsDerived()->TargetTablePathId, TNavigate::OpTable)); - - AsDerived()->Send(MakeSchemeCacheID(), new TEvNavigate(request.Release())); - AsDerived()->Become(&TDerived::StateResolveTargetTable); - } - - STATEFN(State) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); - sFunc(TEvents::TEvWakeup, OnRetry); - default: - return AsDerived()->StateBase(ev); - } - } - -private: - void OnRetry() { - AsDerived()->OnRetry(TStateTag{}); - } - - void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { - const auto& result = ev->Get()->Request; - - LOG_D("HandleTargetTable TEvTxProxySchemeCache::TEvNavigateKeySetResult" - << ": result# " << (result ? result->ToString(*AppData()->TypeRegistry) : "nullptr")); - - if (!AsDerived()->CheckNotEmpty(result)) { - return; - } - - if (!AsDerived()->CheckEntriesCount(result, 1)) { - return; - } - - const auto& entry = result->ResultSet.at(0); - - if (!AsDerived()->CheckTableId(entry, AsDerived()->TargetTablePathId)) { - return; - } - - if (!AsDerived()->CheckEntrySucceeded(entry)) { - return; - } - - if (!AsDerived()->CheckEntryKind(entry, TNavigate::KindTable)) { - return; - } - - AsDerived()->TagMap.clear(); - TVector keyColumnTypes; - - for (const auto& [tag, column] : entry.Columns) { - auto it = AsDerived()->MainColumnToTag.find(column.Name); - Y_ABORT_UNLESS(it != AsDerived()->MainColumnToTag.end()); - - Y_DEBUG_ABORT_UNLESS(!AsDerived()->TagMap.contains(it->second)); - AsDerived()->TagMap.emplace(it->second, tag); - - if (column.KeyOrder < 0) { - continue; - } - - if (keyColumnTypes.size() <= static_cast(column.KeyOrder)) { - keyColumnTypes.resize(column.KeyOrder + 1); - } - - keyColumnTypes[column.KeyOrder] = column.PType; - } - - AsDerived()->KeyDesc = MakeHolder( - entry.TableId, - AsDerived()->GetFullRange(keyColumnTypes.size()).ToTableRange(), - TKeyDesc::ERowOperation::Update, - keyColumnTypes, - TVector() - ); - - AsDerived()->SetPartitionResolver(CreateDefaultPartitionResolver(*AsDerived()->KeyDesc.Get())); - - AsDerived()->NextState(TStateTag{}); - } -}; - -template -class TResolveKeysState - : virtual public NSchemeCache::TSchemeCacheHelpers -{ - DEFINE_STATE_INTRO; - -public: - void ResolveKeys() { - auto request = MakeHolder(); - request->ResultSet.emplace_back(std::move(AsDerived()->KeyDesc)); - - AsDerived()->Send(MakeSchemeCacheID(), new TEvResolve(request.Release())); - AsDerived()->Become(&TDerived::StateResolveKeys); - } - - STATEFN(State) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle); - sFunc(TEvents::TEvWakeup, OnRetry); - default: - return AsDerived()->StateBase(ev); - } - } - -private: - void OnRetry() { - AsDerived()->OnRetry(TStateTag{}); - } - - void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) { - const auto& result = ev->Get()->Request; - - LOG_D("HandleKeys TEvTxProxySchemeCache::TEvResolveKeySetResult" - << ": result# " << (result ? result->ToString(*AppData()->TypeRegistry) : "nullptr")); - - if (!AsDerived()->CheckNotEmpty(result)) { - return; - } - - if (!AsDerived()->CheckEntriesCount(result, 1)) { - return; - } - - auto& entry = result->ResultSet.at(0); - - if (!AsDerived()->CheckTableId(entry, AsDerived()->TargetTablePathId)) { - return; - } - - if (!AsDerived()->CheckEntrySucceeded(entry)) { - return; - } - - if (!entry.KeyDescription->GetPartitions()) { - LOG_W("Empty partitions list" - << ": entry# " << entry.ToString(*AppData()->TypeRegistry)); - return AsDerived()->Retry(); - } - - const bool versionChanged = !AsDerived()->TargetTableVersion || AsDerived()->TargetTableVersion != entry.GeneralVersion; - AsDerived()->TargetTableVersion = entry.GeneralVersion; - - AsDerived()->KeyDesc = std::move(entry.KeyDescription); - AsDerived()->CreateSenders(MakePartitionIds(AsDerived()->KeyDesc->GetPartitions()), versionChanged); - - AsDerived()->NextState(TStateTag{}); - } - - static TVector MakePartitionIds(const TVector& partitions) { - TVector result(Reserve(partitions.size())); - - for (const auto& partition : partitions) { - result.push_back(partition.ShardId); // partition = shard - } - - return result; - } -}; - -template -struct TSchemeChecksMixin - : virtual private NSchemeCache::TSchemeCacheHelpers -{ - const TDerived* AsDerived() const { - return static_cast(this); - } - - TDerived* AsDerived() { - return static_cast(this); - } - - template - bool Check(CheckFunc checkFunc, FailFunc failFunc, const T& subject, Args&&... args) { - return checkFunc(AsDerived()->CurrentStateName(), subject, std::forward(args)..., std::bind(failFunc, AsDerived(), std::placeholders::_1)); - } - - template - bool CheckNotEmpty(const TAutoPtr& result) { - return Check(&TSchemeCacheHelpers::CheckNotEmpty, &TDerived::LogCritAndRetry, result); - } - - template - bool CheckEntriesCount(const TAutoPtr& result, ui32 expected) { - return Check(&TSchemeCacheHelpers::CheckEntriesCount, &TDerived::LogCritAndRetry, result, expected); - } - - template - bool CheckTableId(const T& entry, const TTableId& expected) { - return Check(&TSchemeCacheHelpers::CheckTableId, &TDerived::LogCritAndRetry, entry, expected); - } - - template - bool CheckEntrySucceeded(const T& entry) { - return Check(&TSchemeCacheHelpers::CheckEntrySucceeded, &TDerived::LogWarnAndRetry, entry); - } - - template - bool CheckEntryKind(const T& entry, TNavigate::EKind expected) { - return Check(&TSchemeCacheHelpers::CheckEntryKind, &TDerived::LogWarnAndRetry, entry, expected); - } - -}; - class TAsyncIndexChangeSenderMain : public TActorBootstrapped , public NChangeExchange::TChangeSender @@ -833,7 +211,7 @@ class TAsyncIndexChangeSenderMain } IActor* CreateSender(ui64 partitionId) const override { - return new TBaseChangeSenderShard(SelfId(), DataShard, partitionId, TargetTablePathId, TagMap); + return CreateTableChangeSenderShard(SelfId(), DataShard, partitionId, TargetTablePathId, TagMap); } void Handle(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { diff --git a/ydb/core/tx/datashard/change_sender_incr_restore.cpp b/ydb/core/tx/datashard/change_sender_incr_restore.cpp new file mode 100644 index 000000000000..c8ab30c38a4c --- /dev/null +++ b/ydb/core/tx/datashard/change_sender_incr_restore.cpp @@ -0,0 +1,238 @@ +#include "change_exchange.h" +#include "change_exchange_impl.h" +#include "change_record.h" +#include "change_sender_table_base.h" +#include "datashard_impl.h" +#include "incr_restore_scan.h" + +#include +#include + +#include +#include +#include + +#include + +namespace NKikimr::NDataShard { + +class TIncrRestoreChangeSenderMain + : public TActorBootstrapped + , public NChangeExchange::TChangeSender + , public NChangeExchange::IChangeSenderIdentity + , public NChangeExchange::IChangeSenderPathResolver + , public NChangeExchange::IChangeSenderFactory + , private TSchemeChecksMixin + , private TResolveUserTableState + , private TResolveTargetTableState + , private TResolveKeysState +{ + friend struct TSchemeChecksMixin; + + USE_STATE(ResolveUserTable); + USE_STATE(ResolveTargetTable); + USE_STATE(ResolveKeys); + + TStringBuf GetLogPrefix() const { + if (!LogPrefix) { + LogPrefix = TStringBuilder() + << "[IncrRestoreChangeSenderMain]" + << "[" << GetChangeSenderIdentity() << "]" // maybe better add something else + << SelfId() /* contains brackets */ << " "; + } + + return LogPrefix.GetRef(); + } + + static TSerializedTableRange GetFullRange(ui32 keyColumnsCount) { + TVector fromValues(keyColumnsCount); + TVector toValues; + return TSerializedTableRange(fromValues, true, toValues, false); + } + + bool IsResolving() const override { + return IsResolveUserTableState() + || IsResolveTargetTableState() + || IsResolveKeysState(); + } + + TStringBuf CurrentStateName() const { + if (IsResolveUserTableState()) { + return "ResolveUserTable"; + } else if (IsResolveTargetTableState()) { + return "ResolveTargetTable"; + } else if (IsResolveKeysState()) { + return "ResolveKeys"; + } else { + return ""; + } + } + + void OnRetry(TResolveTargetTableState::TStateTag) { + ResolveTargetTable(); + } + + void OnRetry(TResolveKeysState::TStateTag) { + ResolveTargetTable(); + } + + void NextState(TResolveUserTableState::TStateTag) { + Y_ABORT_UNLESS(MainColumnToTag.contains("__ydb_incrBackupImpl_deleted")); + ResolveTargetTable(); + } + + void NextState(TResolveTargetTableState::TStateTag) { + ResolveKeys(); + } + + void NextState(TResolveKeysState::TStateTag) { + if (!FirstServe) { + FirstServe = true; + Send(ChangeServer, new TEvIncrementalRestoreScan::TEvServe()); + } + Serve(); + } + + void Retry() { + Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup()); + } + + void Serve() { + Become(&TThis::StateMain); + } + + void LogCritAndRetry(const TString& error) { + LOG_C(error); + Retry(); + } + + void LogWarnAndRetry(const TString& error) { + LOG_W(error); + Retry(); + } + + /// Main + + STATEFN(StateMain) { + return StateBase(ev); + } + + void Resolve() override { + ResolveTargetTable(); + } + + bool IsResolved() const override { + return KeyDesc && KeyDesc->GetPartitions(); + } + + IActor* CreateSender(ui64 partitionId) const override { + return CreateTableChangeSenderShard(SelfId(), DataShard, partitionId, TargetTablePathId, TagMap); + } + + void Handle(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + EnqueueRecords(std::move(ev->Get()->Records)); + } + + void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + ProcessRecords(std::move(ev->Get()->Records)); + } + + void Handle(NChangeExchange::TEvChangeExchange::TEvForgetRecords::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + ForgetRecords(std::move(ev->Get()->Records)); + } + + void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvReady::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + OnReady(ev->Get()->PartitionId); + + if (NoMoreData && IsAllSendersReadyOrUninit()) { + Send(ChangeServer, new TEvIncrementalRestoreScan::TEvFinished()); + } + } + + void Handle(NChangeExchange::TEvChangeExchangePrivate::TEvGone::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + OnGone(ev->Get()->PartitionId); + } + + void Handle(TEvChangeExchange::TEvRemoveSender::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + Y_ABORT_UNLESS(ev->Get()->PathId == GetChangeSenderIdentity()); + + RemoveRecords(); + PassAway(); + } + + void Handle(TEvIncrementalRestoreScan::TEvNoMoreData::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + NoMoreData = true; + + if (IsAllSendersReadyOrUninit()) { + Send(ChangeServer, new TEvIncrementalRestoreScan::TEvFinished()); + } + } + + void PassAway() override { + KillSenders(); + TActorBootstrapped::PassAway(); + } + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::CHANGE_SENDER_INCR_RESTORE_ACTOR_MAIN; + } + + explicit TIncrRestoreChangeSenderMain(const TActorId& changeServerActor, const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& targetPathId) + : TActorBootstrapped() + , TChangeSender(this, this, this, this, changeServerActor) + , DataShard(dataShard) + , UserTableId(userTableId) + , TargetTablePathId(targetPathId) + , TargetTableVersion(0) + { + } + + void Bootstrap() { + ResolveUserTable(); + } + + STFUNC(StateBase) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvIncrementalRestoreScan::TEvNoMoreData, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); + hFunc(NChangeExchange::TEvChangeExchange::TEvForgetRecords, Handle); + hFunc(TEvChangeExchange::TEvRemoveSender, Handle); + hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvReady, Handle); + hFunc(NChangeExchange::TEvChangeExchangePrivate::TEvGone, Handle); + sFunc(TEvents::TEvPoison, PassAway); + } + } + + TPathId GetChangeSenderIdentity() const override final { + return TargetTablePathId; + } + +private: + const TDataShardId DataShard; + const TTableId UserTableId; + mutable TMaybe LogPrefix; + + THashMap MainColumnToTag; + TMap TagMap; // from incrBackupTable to targetTable + + TPathId TargetTablePathId; + ui64 TargetTableVersion; + THolder KeyDesc; + bool NoMoreData = false; + bool FirstServe = false; +}; // TIncrRestoreChangeSenderMain + +IActor* CreateIncrRestoreChangeSender(const TActorId& changeServerActor, const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& restoreTargetPathId) { + return new TIncrRestoreChangeSenderMain(changeServerActor, dataShard, userTableId, restoreTargetPathId); +} + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/change_sender_table_base.cpp b/ydb/core/tx/datashard/change_sender_table_base.cpp new file mode 100644 index 000000000000..464a8e84493b --- /dev/null +++ b/ydb/core/tx/datashard/change_sender_table_base.cpp @@ -0,0 +1,340 @@ +#include "change_sender_table_base.h" +#include "datashard_impl.h" + +#include +#include +#include +#include + +namespace NKikimr::NDataShard { + +class TTableChangeSenderShard: public TActorBootstrapped { + TStringBuf GetLogPrefix() const { + if (!LogPrefix) { + LogPrefix = TStringBuilder() + << "[TableChangeSenderShard]" + << "[" << DataShard.TabletId << ":" << DataShard.Generation << "]" + << "[" << ShardId << "]" + << SelfId() /* contains brackets */ << " "; + } + + return LogPrefix.GetRef(); + } + + /// GetProxyServices + + void GetProxyServices() { + Send(MakeTxProxyID(), new TEvTxUserProxy::TEvGetProxyServicesRequest); + Become(&TThis::StateGetProxyServices); + } + + STATEFN(StateGetProxyServices) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxUserProxy::TEvGetProxyServicesResponse, Handle); + default: + return StateBase(ev); + } + } + + void Handle(TEvTxUserProxy::TEvGetProxyServicesResponse::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + + LeaderPipeCache = ev->Get()->Services.LeaderPipeCache; + Handshake(); + } + + /// Handshake + + void Handshake() { + Send(DataShard.ActorId, new TDataShard::TEvPrivate::TEvConfirmReadonlyLease, 0, ++LeaseConfirmationCookie); + Become(&TThis::StateHandshake); + } + + STATEFN(StateHandshake) { + switch (ev->GetTypeRewrite()) { + hFunc(TDataShard::TEvPrivate::TEvReadonlyLeaseConfirmation, Handle); + hFunc(TEvChangeExchange::TEvStatus, Handshake); + default: + return StateBase(ev); + } + } + + void Handle(TDataShard::TEvPrivate::TEvReadonlyLeaseConfirmation::TPtr& ev) { + if (ev->Cookie != LeaseConfirmationCookie) { + LOG_W("Readonly lease confirmation cookie mismatch" + << ": expected# " << LeaseConfirmationCookie + << ", got# " << ev->Cookie); + return; + } + + auto handshake = MakeHolder(); + handshake->Record.SetOrigin(DataShard.TabletId); + handshake->Record.SetGeneration(DataShard.Generation); + + Send(LeaderPipeCache, new TEvPipeCache::TEvForward(handshake.Release(), ShardId, true)); + } + + void Handshake(TEvChangeExchange::TEvStatus::TPtr& ev) { + LOG_D("Handshake " << ev->Get()->ToString()); + + const auto& record = ev->Get()->Record; + switch (record.GetStatus()) { + case NKikimrChangeExchange::TEvStatus::STATUS_OK: + LastRecordOrder = record.GetLastRecordOrder(); + return Ready(); + default: + LOG_E("Handshake status" + << ": status# " << static_cast(record.GetStatus()) + << ", reason# " << static_cast(record.GetReason())); + return Leave(); + } + } + + void Ready() { + Send(Parent, new NChangeExchange::TEvChangeExchangePrivate::TEvReady(ShardId)); + Become(&TThis::StateWaitingRecords); + } + + /// WaitingRecords + + STATEFN(StateWaitingRecords) { + switch (ev->GetTypeRewrite()) { + hFunc(NChangeExchange::TEvChangeExchange::TEvRecords, Handle); + default: + return StateBase(ev); + } + } + + class TSerializer: public NChangeExchange::TBaseVisitor { + NKikimrChangeExchange::TChangeRecord& Record; + + public: + explicit TSerializer(NKikimrChangeExchange::TChangeRecord& record) + : Record(record) + { + } + + void Visit(const TChangeRecord& record) override { + record.Serialize(Record); + } + }; + + void Handle(NChangeExchange::TEvChangeExchange::TEvRecords::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + + auto records = MakeHolder(); + records->Record.SetOrigin(DataShard.TabletId); + records->Record.SetGeneration(DataShard.Generation); + + for (auto recordPtr : ev->Get()->Records) { + const auto& record = *recordPtr; + + if (record.GetOrder() <= LastRecordOrder) { + continue; + } + + auto& proto = *records->Record.AddRecords(); + TSerializer serializer(proto); + record.Accept(serializer); + Adjust(proto); + } + + if (!records->Record.RecordsSize()) { + return Ready(); + } + + Send(LeaderPipeCache, new TEvPipeCache::TEvForward(records.Release(), ShardId, false)); + Become(&TThis::StateWaitingStatus); + } + + void Adjust(NKikimrChangeExchange::TChangeRecord& record) const { + record.SetPathOwnerId(TargetTablePathId.OwnerId); + record.SetLocalPathId(TargetTablePathId.LocalPathId); + + Y_ABORT_UNLESS(record.HasAsyncIndex()); + AdjustTags(*record.MutableAsyncIndex()); + } + + void AdjustTags(NKikimrChangeExchange::TDataChange& record) const { + AdjustTags(*record.MutableKey()->MutableTags()); + + switch (record.GetRowOperationCase()) { + case NKikimrChangeExchange::TDataChange::kUpsert: + AdjustTags(*record.MutableUpsert()->MutableTags()); + break; + case NKikimrChangeExchange::TDataChange::kReset: + AdjustTags(*record.MutableReset()->MutableTags()); + break; + default: + break; + } + } + + void AdjustTags(google::protobuf::RepeatedField& tags) const { + for (int i = 0; i < tags.size(); ++i) { + auto it = TagMap.find(tags[i]); + Y_ABORT_UNLESS(it != TagMap.end()); + tags[i] = it->second; + } + } + + /// WaitingStatus + + STATEFN(StateWaitingStatus) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvChangeExchange::TEvStatus, Handle); + default: + return StateBase(ev); + } + } + + void Handle(TEvChangeExchange::TEvStatus::TPtr& ev) { + LOG_D("Handle " << ev->Get()->ToString()); + + const auto& record = ev->Get()->Record; + switch (record.GetStatus()) { + case NKikimrChangeExchange::TEvStatus::STATUS_OK: + LastRecordOrder = record.GetLastRecordOrder(); + return Ready(); + // TODO: REJECT? + default: + LOG_E("Apply status" + << ": status# " << static_cast(record.GetStatus()) + << ", reason# " << static_cast(record.GetReason())); + return Leave(); + } + } + + bool CanRetry() const { + if (CurrentStateFunc() != static_cast(&TThis::StateHandshake)) { + return false; + } + + return Attempt < MaxAttempts; + } + + void Retry() { + ++Attempt; + Delay = Min(2 * Delay, MaxDelay); + + LOG_N("Retry" + << ": attempt# " << Attempt + << ", delay# " << Delay); + + const auto random = TDuration::FromValue(TAppData::RandomProvider->GenRand64() % Delay.MicroSeconds()); + Schedule(Delay + random, new TEvents::TEvWakeup()); + } + + void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { + if (ShardId != ev->Get()->TabletId) { + return; + } + + if (CanRetry()) { + Unlink(); + Retry(); + } else { + Leave(); + } + } + + void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev) { + using namespace NChangeExchange; + + TStringStream html; + + HTML(html) { + Header(html, "Base partition change sender", DataShard.TabletId); + + SimplePanel(html, "Info", [this](IOutputStream& html) { + HTML(html) { + DL_CLASS("dl-horizontal") { + TermDescLink(html, "ShardId", ShardId, TabletPath(ShardId)); + TermDesc(html, "TargetTablePathId", TargetTablePathId); + TermDesc(html, "LeaderPipeCache", LeaderPipeCache); + TermDesc(html, "LastRecordOrder", LastRecordOrder); + } + } + }); + } + + Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes(html.Str())); + } + + void Leave() { + Send(Parent, new NChangeExchange::TEvChangeExchangePrivate::TEvGone(ShardId)); + PassAway(); + } + + void Unlink() { + if (LeaderPipeCache) { + Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(ShardId)); + } + } + + void PassAway() override { + Unlink(); + TActorBootstrapped::PassAway(); + } + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::CHANGE_SENDER_ASYNC_INDEX_ACTOR_PARTITION; + } + + TTableChangeSenderShard(const TActorId& parent, const TDataShardId& dataShard, ui64 shardId, + const TPathId& indexTablePathId, const TMap& tagMap) + : Parent(parent) + , DataShard(dataShard) + , ShardId(shardId) + , TargetTablePathId(indexTablePathId) + , TagMap(tagMap) + , LeaseConfirmationCookie(0) + , LastRecordOrder(0) + { + } + + void Bootstrap() { + GetProxyServices(); + } + + STATEFN(StateBase) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); + hFunc(NMon::TEvRemoteHttpInfo, Handle); + sFunc(TEvents::TEvWakeup, Handshake); + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + const TActorId Parent; + const TDataShardId DataShard; + const ui64 ShardId; + const TPathId TargetTablePathId; + const TMap TagMap; // from main to index + mutable TMaybe LogPrefix; + + TActorId LeaderPipeCache; + ui64 LeaseConfirmationCookie; + ui64 LastRecordOrder; + + // Retry on delivery problem + static constexpr ui32 MaxAttempts = 3; + static constexpr auto MaxDelay = TDuration::MilliSeconds(50); + ui32 Attempt = 0; + TDuration Delay = TDuration::MilliSeconds(10); + +}; // TTableChangeSenderShard + +IActor* CreateTableChangeSenderShard( + const TActorId& parent, + const TDataShardId& dataShard, + ui64 shardId, + const TPathId& indexTablePathId, + const TMap& tagMap) +{ + return new TTableChangeSenderShard(parent, dataShard, shardId, indexTablePathId, tagMap); +} + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/change_sender_table_base.h b/ydb/core/tx/datashard/change_sender_table_base.h new file mode 100644 index 000000000000..542c8366d0b3 --- /dev/null +++ b/ydb/core/tx/datashard/change_sender_table_base.h @@ -0,0 +1,319 @@ +#pragma once + +#include "change_exchange_helpers.h" + +#include +#include + +namespace NKikimr::NDataShard { + +using namespace NTable; + +#define DEFINE_STATE_INTRO \ + public: \ + struct TStateTag {}; \ + private: \ + const TDerived* AsDerived() const { \ + return static_cast(this); \ + } \ + TDerived* AsDerived() { \ + return static_cast(this); \ + } \ + TStringBuf GetLogPrefix() const { \ + return AsDerived()->GetLogPrefix(); \ + } + +#define USE_STATE(STATE) \ + friend class T ## STATE ## State; \ + STATEFN(State ## STATE) { \ + return T ## STATE ## State::State(ev); \ + } \ + bool Is ## STATE ## State() const { \ + return CurrentStateFunc() == static_cast(&TThis::State ## STATE); \ + } + +template +class TResolveUserTableState + : virtual public NSchemeCache::TSchemeCacheHelpers +{ + DEFINE_STATE_INTRO; + +public: + void ResolveUserTable() { + auto request = MakeHolder(); + request->ResultSet.emplace_back(MakeNavigateEntry(AsDerived()->UserTableId, TNavigate::OpTable)); + + AsDerived()->Send(MakeSchemeCacheID(), new TEvNavigate(request.Release())); + AsDerived()->Become(&TDerived::StateResolveUserTable); + } + + STATEFN(State) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + sFunc(TEvents::TEvWakeup, ResolveUserTable); + default: + return AsDerived()->StateBase(ev); + } + } + +private: + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + const auto& result = ev->Get()->Request; + + LOG_D("HandleUserTable TEvTxProxySchemeCache::TEvNavigateKeySetResult" + << ": result# " << (result ? result->ToString(*AppData()->TypeRegistry) : "nullptr")); + + if (!AsDerived()->CheckNotEmpty(result)) { + return; + } + + if (!AsDerived()->CheckEntriesCount(result, 1)) { + return; + } + + const auto& entry = result->ResultSet.at(0); + + if (!AsDerived()->CheckTableId(entry, AsDerived()->UserTableId)) { + return; + } + + if (!AsDerived()->CheckEntrySucceeded(entry)) { + return; + } + + if (!AsDerived()->CheckEntryKind(entry, TNavigate::KindTable)) { + return; + } + + for (const auto& [tag, column] : entry.Columns) { + Y_DEBUG_ABORT_UNLESS(!AsDerived()->MainColumnToTag.contains(column.Name)); + AsDerived()->MainColumnToTag.emplace(column.Name, tag); + } + + AsDerived()->NextState(TStateTag{}); + } +}; + +template +class TResolveTargetTableState + : virtual public NSchemeCache::TSchemeCacheHelpers +{ + DEFINE_STATE_INTRO; + +public: + void ResolveTargetTable() { + auto request = MakeHolder(); + request->ResultSet.emplace_back(MakeNavigateEntry(AsDerived()->TargetTablePathId, TNavigate::OpTable)); + + AsDerived()->Send(MakeSchemeCacheID(), new TEvNavigate(request.Release())); + AsDerived()->Become(&TDerived::StateResolveTargetTable); + } + + STATEFN(State) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + sFunc(TEvents::TEvWakeup, OnRetry); + default: + return AsDerived()->StateBase(ev); + } + } + +private: + void OnRetry() { + AsDerived()->OnRetry(TStateTag{}); + } + + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + const auto& result = ev->Get()->Request; + + LOG_D("HandleTargetTable TEvTxProxySchemeCache::TEvNavigateKeySetResult" + << ": result# " << (result ? result->ToString(*AppData()->TypeRegistry) : "nullptr")); + + if (!AsDerived()->CheckNotEmpty(result)) { + return; + } + + if (!AsDerived()->CheckEntriesCount(result, 1)) { + return; + } + + const auto& entry = result->ResultSet.at(0); + + if (!AsDerived()->CheckTableId(entry, AsDerived()->TargetTablePathId)) { + return; + } + + if (!AsDerived()->CheckEntrySucceeded(entry)) { + return; + } + + if (!AsDerived()->CheckEntryKind(entry, TNavigate::KindTable)) { + return; + } + + AsDerived()->TagMap.clear(); + TVector keyColumnTypes; + + for (const auto& [tag, column] : entry.Columns) { + auto it = AsDerived()->MainColumnToTag.find(column.Name); + Y_ABORT_UNLESS(it != AsDerived()->MainColumnToTag.end()); + + Y_DEBUG_ABORT_UNLESS(!AsDerived()->TagMap.contains(it->second)); + AsDerived()->TagMap.emplace(it->second, tag); + + if (column.KeyOrder < 0) { + continue; + } + + if (keyColumnTypes.size() <= static_cast(column.KeyOrder)) { + keyColumnTypes.resize(column.KeyOrder + 1); + } + + keyColumnTypes[column.KeyOrder] = column.PType; + } + + AsDerived()->KeyDesc = MakeHolder( + entry.TableId, + AsDerived()->GetFullRange(keyColumnTypes.size()).ToTableRange(), + TKeyDesc::ERowOperation::Update, + keyColumnTypes, + TVector() + ); + + AsDerived()->SetPartitionResolver(CreateDefaultPartitionResolver(*AsDerived()->KeyDesc.Get())); + + AsDerived()->NextState(TStateTag{}); + } +}; + +template +class TResolveKeysState + : virtual public NSchemeCache::TSchemeCacheHelpers +{ + DEFINE_STATE_INTRO; + +public: + void ResolveKeys() { + auto request = MakeHolder(); + request->ResultSet.emplace_back(std::move(AsDerived()->KeyDesc)); + + AsDerived()->Send(MakeSchemeCacheID(), new TEvResolve(request.Release())); + AsDerived()->Become(&TDerived::StateResolveKeys); + } + + STATEFN(State) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle); + sFunc(TEvents::TEvWakeup, OnRetry); + default: + return AsDerived()->StateBase(ev); + } + } + +private: + void OnRetry() { + AsDerived()->OnRetry(TStateTag{}); + } + + void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) { + const auto& result = ev->Get()->Request; + + LOG_D("HandleKeys TEvTxProxySchemeCache::TEvResolveKeySetResult" + << ": result# " << (result ? result->ToString(*AppData()->TypeRegistry) : "nullptr")); + + if (!AsDerived()->CheckNotEmpty(result)) { + return; + } + + if (!AsDerived()->CheckEntriesCount(result, 1)) { + return; + } + + auto& entry = result->ResultSet.at(0); + + if (!AsDerived()->CheckTableId(entry, AsDerived()->TargetTablePathId)) { + return; + } + + if (!AsDerived()->CheckEntrySucceeded(entry)) { + return; + } + + if (!entry.KeyDescription->GetPartitions()) { + LOG_W("Empty partitions list" + << ": entry# " << entry.ToString(*AppData()->TypeRegistry)); + return AsDerived()->Retry(); + } + + const bool versionChanged = !AsDerived()->TargetTableVersion || AsDerived()->TargetTableVersion != entry.GeneralVersion; + AsDerived()->TargetTableVersion = entry.GeneralVersion; + + AsDerived()->KeyDesc = std::move(entry.KeyDescription); + AsDerived()->CreateSenders(MakePartitionIds(AsDerived()->KeyDesc->GetPartitions()), versionChanged); + + AsDerived()->NextState(TStateTag{}); + } + + static TVector MakePartitionIds(const TVector& partitions) { + TVector result(Reserve(partitions.size())); + + for (const auto& partition : partitions) { + result.push_back(partition.ShardId); // partition = shard + } + + return result; + } +}; + +template +struct TSchemeChecksMixin + : virtual private NSchemeCache::TSchemeCacheHelpers +{ + const TDerived* AsDerived() const { + return static_cast(this); + } + + TDerived* AsDerived() { + return static_cast(this); + } + + template + bool Check(CheckFunc checkFunc, FailFunc failFunc, const T& subject, Args&&... args) { + return checkFunc(AsDerived()->CurrentStateName(), subject, std::forward(args)..., std::bind(failFunc, AsDerived(), std::placeholders::_1)); + } + + template + bool CheckNotEmpty(const TAutoPtr& result) { + return Check(&TSchemeCacheHelpers::CheckNotEmpty, &TDerived::LogCritAndRetry, result); + } + + template + bool CheckEntriesCount(const TAutoPtr& result, ui32 expected) { + return Check(&TSchemeCacheHelpers::CheckEntriesCount, &TDerived::LogCritAndRetry, result, expected); + } + + template + bool CheckTableId(const T& entry, const TTableId& expected) { + return Check(&TSchemeCacheHelpers::CheckTableId, &TDerived::LogCritAndRetry, entry, expected); + } + + template + bool CheckEntrySucceeded(const T& entry) { + return Check(&TSchemeCacheHelpers::CheckEntrySucceeded, &TDerived::LogWarnAndRetry, entry); + } + + template + bool CheckEntryKind(const T& entry, TNavigate::EKind expected) { + return Check(&TSchemeCacheHelpers::CheckEntryKind, &TDerived::LogWarnAndRetry, entry, expected); + } + +}; + +IActor* CreateTableChangeSenderShard( + const TActorId& parent, + const TDataShardId& dataShard, + ui64 shardId, + const TPathId& indexTablePathId, + const TMap& tagMap); + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 43c2b9b153c0..078e0b57a0ea 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -322,7 +322,7 @@ class TDataShard friend class TTxStartMvccStateChange; friend class TTxExecuteMvccStateChange; - friend class TBaseChangeSenderShard; + friend class TTableChangeSenderShard; class TTxPersistSubDomainPathId; class TTxPersistSubDomainOutOfSpace; diff --git a/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp b/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp index 405f422ed21e..cd1b5183d76c 100644 --- a/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp +++ b/ydb/core/tx/datashard/datashard_ut_incremental_restore_scan.cpp @@ -1,11 +1,20 @@ #include "incr_restore_scan.h" +#include "change_exchange_impl.h" +#include +#include #include #include #include +#include namespace NKikimr::NDataShard { +using namespace NDataShard::NKqpHelpers; +using namespace NSchemeShard; +using namespace NSchemeBoard; +using namespace Tests; + class TDriverMock : public NTable::IDriver { @@ -101,6 +110,35 @@ class TRuntimeCbExecutor { TActorId Impl; }; +static void SetupLogging(TTestActorRuntime& runtime) { + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::CHANGE_EXCHANGE, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::PQ_READ_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::PQ_METACACHE, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::CONTINUOUS_BACKUP, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::REPLICATION_SERVICE, NLog::PRI_DEBUG); +} + +TShardedTableOptions SimpleTable() { + return TShardedTableOptions(); +} + +TMaybe GetTablePathId(TTestActorRuntime& runtime, TActorId sender, TString path) { + auto request = MakeHolder(); + request->Record.MutableDescribePath()->SetPath(path); + runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); + + auto reply = runtime.GrabEdgeEventRethrow(sender); + if (reply->Get()->GetRecord().GetStatus() != NKikimrScheme::EStatus::StatusSuccess) { + return {}; + } + + return GetPathId(reply->Get()->GetRecord()); +} + Y_UNIT_TEST_SUITE(IncrementalRestoreScan) { Y_UNIT_TEST(Empty) { TPortManager pm; @@ -113,8 +151,7 @@ Y_UNIT_TEST_SUITE(IncrementalRestoreScan) { auto sender = runtime.AllocateEdgeActor(); auto sender2 = runtime.AllocateEdgeActor(); - runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); - runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); + SetupLogging(runtime); TUserTable::TPtr table = new TUserTable; NTable::TScheme::TTableSchema tableSchema; @@ -165,6 +202,142 @@ Y_UNIT_TEST_SUITE(IncrementalRestoreScan) { runtime.GrabEdgeEventRethrow(sender); } + + Y_UNIT_TEST(ChangeSenderEmpty) { + TPortManager pm; + Tests::TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new Tests::TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + CreateShardedTable( + server, + edgeActor, + "/Root", + "IncrBackupTable", + SimpleTable() + .AllowSystemColumnNames(true) + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}, + {"__ydb_incrBackupImpl_deleted", "Bool", false, false}})); + + TPathId targetPathId = *GetTablePathId(runtime, edgeActor, "/Root/Table"); + TPathId sourcePathId = *GetTablePathId(runtime, edgeActor, "/Root/IncrBackupTable"); + + auto* changeSender = CreateIncrRestoreChangeSender(edgeActor, TDataShardId{}, sourcePathId, targetPathId); + + auto changeSenderActor = runtime.Register(changeSender); + runtime.EnableScheduleForActor(changeSenderActor); + runtime.GrabEdgeEventRethrow(edgeActor); + + auto request = MakeHolder(); + runtime.Send(new IEventHandle(changeSenderActor, edgeActor, request.Release())); + + runtime.GrabEdgeEventRethrow(edgeActor); + } + + Y_UNIT_TEST(ChangeSenderSimple) { + TPortManager pm; + Tests::TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new Tests::TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + auto [_, dstTable] = CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + auto [srcShards, srcTable] = CreateShardedTable( + server, + edgeActor, + "/Root", + "IncrBackupTable", + SimpleTable() + .AllowSystemColumnNames(true) + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}, + {"__ydb_incrBackupImpl_deleted", "Bool", false, false}})); + + TPathId targetPathId = dstTable.PathId; + TPathId sourcePathId = srcTable.PathId; + + TDataShardId sourceDatashard; + + { + auto request = MakeHolder(); + runtime.SendToPipe(srcShards[0], edgeActor, request.Release(), 0, GetPipeConfigWithRetries()); + + auto ev = runtime.GrabEdgeEventRethrow(edgeActor); + sourceDatashard.ActorId = ev->Sender; + } + + auto* changeSender = CreateIncrRestoreChangeSender(edgeActor, sourceDatashard, sourcePathId, targetPathId); + auto changeSenderActor = runtime.Register(changeSender); + + { + runtime.EnableScheduleForActor(changeSenderActor); + runtime.GrabEdgeEventRethrow(edgeActor); + } + + { + + NKikimrChangeExchange::TDataChange body; + TVector keyCells = { TCell::Make(ui32(0)) }; + auto key = TSerializedCellVec::Serialize(keyCells); + body.MutableKey()->AddTags(0); + body.MutableKey()->SetData(key); + body.MutableErase(); + + auto recordPtr = TChangeRecordBuilder(TChangeRecord::EKind::IncrementalRestore) + .WithOrder(0) + .WithGroup(0) + .WithPathId(targetPathId) + .WithTableId(sourcePathId) + .WithBody(body.SerializeAsString()) + .WithSource(TChangeRecord::ESource::InitialScan) + .Build(); + + { + const auto& record = *recordPtr; + TVector records; + records.emplace_back(record.GetOrder(), record.GetPathId(), record.GetBody().size()); + + auto request = MakeHolder(records); + runtime.Send(new IEventHandle(changeSenderActor, edgeActor, request.Release())); + } + + runtime.GrabEdgeEventRethrow(edgeActor); + + { + TVector records; + records.emplace_back(recordPtr); + + auto request = MakeHolder(records); + runtime.Send(new IEventHandle(changeSenderActor, edgeActor, request.Release())); + } + + runtime.GrabEdgeEventRethrow(edgeActor); + } + + // there is a race here between noMoreData and all senders is ready or unint right now + + { + auto request = MakeHolder(); + runtime.Send(new IEventHandle(changeSenderActor, edgeActor, request.Release())); + + runtime.GrabEdgeEventRethrow(edgeActor); + } + } } } // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/incr_restore_scan.cpp b/ydb/core/tx/datashard/incr_restore_scan.cpp index 0fbe2f156b8c..9041f859a996 100644 --- a/ydb/core/tx/datashard/incr_restore_scan.cpp +++ b/ydb/core/tx/datashard/incr_restore_scan.cpp @@ -207,7 +207,6 @@ class TIncrementalRestoreScan EScan Progress() { auto rows = Buffer.Flush(); - TVector changeRecords; TVector records; for (auto& [k, v] : rows) { diff --git a/ydb/core/tx/datashard/incr_restore_scan.h b/ydb/core/tx/datashard/incr_restore_scan.h index a53c39bce987..9294e835e498 100644 --- a/ydb/core/tx/datashard/incr_restore_scan.h +++ b/ydb/core/tx/datashard/incr_restore_scan.h @@ -15,7 +15,8 @@ using namespace NActors; struct TEvIncrementalRestoreScan { enum EEv { - EvNoMoreData = EventSpaceBegin(TKikimrEvents::ES_INCREMENTAL_RESTORE_SCAN), + EvServe = EventSpaceBegin(TKikimrEvents::ES_INCREMENTAL_RESTORE_SCAN), + EvNoMoreData, EvFinished, EvEnd, @@ -23,6 +24,7 @@ struct TEvIncrementalRestoreScan { static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_INCREMENTAL_RESTORE_SCAN)); + struct TEvServe: public TEventLocal {}; struct TEvNoMoreData: public TEventLocal {}; struct TEvFinished: public TEventLocal {}; }; diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index 16e57524720b..4ffd37bade70 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -27,6 +27,8 @@ SRCS( change_sender.cpp change_sender_async_index.cpp change_sender_cdc_stream.cpp + change_sender_incr_restore.cpp + change_sender_table_base.cpp check_commit_writes_tx_unit.cpp check_data_tx_unit.cpp check_distributed_erase_tx_unit.cpp diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index ba3a64077ad0..f115a5651c4f 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -1056,5 +1056,6 @@ message TActivity { SAMPLE_K_UPLOAD_ACTOR = 646; LOCAL_KMEANS_SCAN_ACTOR = 647; INCREMENTAL_RESTORE_SCAN_ACTOR = 648; + CHANGE_SENDER_INCR_RESTORE_ACTOR_MAIN = 649; }; };