diff --git a/ydb/core/change_exchange/change_sender_common_ops.h b/ydb/core/change_exchange/change_sender_common_ops.h index 8c9f45a1c698..823b208f5323 100644 --- a/ydb/core/change_exchange/change_sender_common_ops.h +++ b/ydb/core/change_exchange/change_sender_common_ops.h @@ -336,7 +336,7 @@ class TBaseChangeSender { Y_ABORT_UNLESS(it != Broadcasting.end()); auto& broadcast = it->second; - if (broadcast.Partitions.contains(partitionId)) { + if (broadcast.CompletedPartitions.contains(partitionId)) { return false; } diff --git a/ydb/core/persqueue/partition_sourcemanager.cpp b/ydb/core/persqueue/partition_sourcemanager.cpp index f81b3a7da3b2..c9214300384a 100644 --- a/ydb/core/persqueue/partition_sourcemanager.cpp +++ b/ydb/core/persqueue/partition_sourcemanager.cpp @@ -81,7 +81,8 @@ void TPartitionSourceManager::TModificationBatch::Cancel() { } bool TPartitionSourceManager::TModificationBatch::HasModifications() const { - return !SourceIdWriter.GetSourceIdsToWrite().empty(); + return !SourceIdWriter.GetSourceIdsToWrite().empty() + || !SourceIdWriter.GetSourceIdsToDelete().empty(); } void TPartitionSourceManager::TModificationBatch::FillRequest(TEvKeyValue::TEvRequest* request) { diff --git a/ydb/core/persqueue/sourceid.h b/ydb/core/persqueue/sourceid.h index 992e1271c847..e20a97db6acd 100644 --- a/ydb/core/persqueue/sourceid.h +++ b/ydb/core/persqueue/sourceid.h @@ -85,6 +85,10 @@ class TSourceIdWriter { return Registrations; } + const THashSet& GetSourceIdsToDelete() const { + return Deregistrations; + } + template void RegisterSourceId(const TString& sourceId, Args&&... args) { Registrations[sourceId] = TSourceIdInfo(std::forward(args)...); diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto index 55299cdb93ba..4b168f10e5cf 100644 --- a/ydb/core/protos/counters_datashard.proto +++ b/ydb/core/protos/counters_datashard.proto @@ -490,4 +490,5 @@ enum ETxTypes { TXTYPE_CLEANUP_VOLATILE = 80 [(TxTypeOpts) = {Name: "TxCleanupVolatile"}]; TXTYPE_PLAN_PREDICTED_TXS = 81 [(TxTypeOpts) = {Name: "TxPlanPredictedTxs"}]; TXTYPE_WRITE = 82 [(TxTypeOpts) = {Name: "TxWrite"}]; + TXTYPE_REMOVE_SCHEMA_SNAPSHOTS = 83 [(TxTypeOpts) = {Name: "TxRemoveSchemaSnapshots"}]; } diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index ba07749ec285..7dd1d115a230 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -855,6 +855,39 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r NIceDb::TUpdate(record.GetKind()), NIceDb::TUpdate(record.GetBody()), NIceDb::TUpdate(record.GetSource())); + + auto res = ChangesQueue.emplace(record.GetOrder(), record); + Y_VERIFY_S(res.second, "Duplicate change record: " << record.GetOrder()); + + if (res.first->second.SchemaVersion) { + res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference( + TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion)); + } + + if (CommittingChangeRecords.empty()) { + db.GetDatabase().OnCommit([this] { + CommittingChangeRecords.clear(); + }); + db.GetDatabase().OnRollback([this] { + for (const auto order : CommittingChangeRecords) { + auto cIt = ChangesQueue.find(order); + Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order); + + if (cIt->second.SchemaSnapshotAcquired) { + const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion); + if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) { + ScheduleRemoveSchemaSnapshot(snapshotKey); + } + } + + ChangesQueue.erase(cIt); + } + + CommittingChangeRecords.clear(); + }); + } + + CommittingChangeRecords.push_back(record.GetOrder()); } else { auto& state = LockChangeRecords[lockId]; Y_ABORT_UNLESS(state.Changes.empty() || state.Changes.back().LockOffset < record.GetLockOffset(), @@ -934,6 +967,14 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64 committed.Step = rowVersion.Step; committed.TxId = rowVersion.TxId; collected.push_back(committed); + + auto res = ChangesQueue.emplace(committed.Order, committed); + Y_VERIFY_S(res.second, "Duplicate change record: " << committed.Order); + + if (res.first->second.SchemaVersion) { + res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference( + TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion)); + } } Y_VERIFY_S(!CommittedLockChangeRecords.contains(lockId), "Cannot commit lock " << lockId << " more than once"); @@ -960,7 +1001,26 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64 LockChangeRecords.erase(it); }); db.GetDatabase().OnRollback([this, lockId]() { - CommittedLockChangeRecords.erase(lockId); + auto it = CommittedLockChangeRecords.find(lockId); + Y_VERIFY_S(it != CommittedLockChangeRecords.end(), "Unexpected failure to find lockId# " << lockId); + + for (size_t i = 0; i < it->second.Count; ++i) { + const ui64 order = it->second.Order + i; + + auto cIt = ChangesQueue.find(order); + Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order); + + if (cIt->second.SchemaSnapshotAcquired) { + const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion); + if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) { + ScheduleRemoveSchemaSnapshot(snapshotKey); + } + } + + ChangesQueue.erase(cIt); + } + + CommittedLockChangeRecords.erase(it); }); } @@ -994,7 +1054,6 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) { auto it = ChangesQueue.find(order); if (it == ChangesQueue.end()) { - Y_VERIFY_DEBUG_S(false, "Trying to remove non-enqueud record: " << order); return; } @@ -1022,23 +1081,9 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) { ChangesQueueBytes -= record.BodySize; if (record.SchemaSnapshotAcquired) { - Y_ABORT_UNLESS(record.TableId); - auto tableIt = TableInfos.find(record.TableId.LocalPathId); - - if (tableIt != TableInfos.end()) { - const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion); - const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey); - - if (last) { - const auto* snapshot = SchemaSnapshotManager.FindSnapshot(snapshotKey); - Y_ABORT_UNLESS(snapshot); - - if (snapshot->Schema->GetTableSchemaVersion() < tableIt->second->GetTableSchemaVersion()) { - SchemaSnapshotManager.RemoveShapshot(db, snapshotKey); - } - } - } else { - Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline); + const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion); + if (const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) { + ScheduleRemoveSchemaSnapshot(snapshotKey); } } @@ -1059,7 +1104,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) { CheckChangesQueueNoOverflow(); } -void TDataShard::EnqueueChangeRecords(TVector&& records, ui64 cookie) { +void TDataShard::EnqueueChangeRecords(TVector&& records, ui64 cookie, bool afterMove) { if (!records) { return; } @@ -1079,27 +1124,24 @@ void TDataShard::EnqueueChangeRecords(TVectorTimeProvider->Now(); TVector forward(Reserve(records.size())); for (const auto& record : records) { - forward.emplace_back(record.Order, record.PathId, record.BodySize); + auto it = ChangesQueue.find(record.Order); + if (it == ChangesQueue.end()) { + Y_ABORT_UNLESS(afterMove); + continue; + } - auto res = ChangesQueue.emplace( - std::piecewise_construct, - std::forward_as_tuple(record.Order), - std::forward_as_tuple(record, now, cookie) - ); - if (res.second) { - ChangesList.PushBack(&res.first->second); + forward.emplace_back(record.Order, record.PathId, record.BodySize); - Y_ABORT_UNLESS(ChangesQueueBytes <= (Max() - record.BodySize)); - ChangesQueueBytes += record.BodySize; + it->second.EnqueuedAt = now; + it->second.ReservationCookie = cookie; + ChangesList.PushBack(&it->second); - if (record.SchemaVersion) { - res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference( - TSchemaSnapshotKey(record.TableId, record.SchemaVersion)); - } - } + Y_ABORT_UNLESS(ChangesQueueBytes <= (Max() - record.BodySize)); + ChangesQueueBytes += record.BodySize; } - + if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) { + Y_ABORT_UNLESS(!afterMove); ChangeQueueReservedCapacity -= it->second; ChangeQueueReservedCapacity += records.size(); } @@ -1265,6 +1307,14 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVectorsecond.SchemaVersion) { + res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference( + TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion)); + } + if (!rowset.Next()) { return false; } @@ -1363,6 +1413,14 @@ bool TDataShard::LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVectorsecond.SchemaVersion) { + res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference( + TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion)); + } } LockChangeRecords.erase(lockId); @@ -1421,6 +1479,51 @@ void TDataShard::ScheduleRemoveAbandonedLockChanges() { } } +void TDataShard::ScheduleRemoveSchemaSnapshot(const TSchemaSnapshotKey& key) { + Y_ABORT_UNLESS(!SchemaSnapshotManager.HasReference(key)); + + const auto* snapshot = SchemaSnapshotManager.FindSnapshot(key); + Y_ABORT_UNLESS(snapshot); + + auto it = TableInfos.find(key.PathId); + if (it == TableInfos.end()) { + Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline); + return; + } + + if (snapshot->Schema->GetTableSchemaVersion() < it->second->GetTableSchemaVersion()) { + bool wasEmpty = PendingSchemaSnapshotsToGc.empty(); + PendingSchemaSnapshotsToGc.push_back(key); + if (wasEmpty) { + Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots); + } + } +} + +void TDataShard::ScheduleRemoveAbandonedSchemaSnapshots() { + bool wasEmpty = PendingSchemaSnapshotsToGc.empty(); + + for (const auto& [key, snapshot] : SchemaSnapshotManager.GetSnapshots()) { + auto it = TableInfos.find(key.PathId); + if (it == TableInfos.end()) { + Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline); + break; + } + if (SchemaSnapshotManager.HasReference(key)) { + continue; + } + if (snapshot.Schema->GetTableSchemaVersion() >= it->second->GetTableSchemaVersion()) { + continue; + } + + PendingSchemaSnapshotsToGc.push_back(key); + } + + if (wasEmpty && !PendingSchemaSnapshotsToGc.empty()) { + Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots); + } +} + void TDataShard::PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation &op) { db.Table().Key(op.TxId).Update( NIceDb::TUpdate(op.Success), @@ -1649,8 +1752,18 @@ void TDataShard::AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersio Y_ABORT_UNLESS(TableInfos.contains(pathId.LocalPathId)); auto tableInfo = TableInfos[pathId.LocalPathId]; - const auto key = TSchemaSnapshotKey(pathId.OwnerId, pathId.LocalPathId, tableSchemaVersion); + const auto key = TSchemaSnapshotKey(pathId, tableSchemaVersion); SchemaSnapshotManager.AddSnapshot(txc.DB, key, TSchemaSnapshot(tableInfo, step, txId)); + + const auto& snapshots = SchemaSnapshotManager.GetSnapshots(); + for (auto it = snapshots.lower_bound(TSchemaSnapshotKey(pathId, 1)); it != snapshots.end(); ++it) { + if (it->first == key) { + break; + } + if (!SchemaSnapshotManager.HasReference(it->first)) { + ScheduleRemoveSchemaSnapshot(it->first); + } + } } void TDataShard::PersistLastLoanTableTid(NIceDb::TNiceDb& db, ui32 localTid) { diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp index 89981068248d..a0561a9c998f 100644 --- a/ydb/core/tx/datashard/datashard__init.cpp +++ b/ydb/core/tx/datashard/datashard__init.cpp @@ -425,6 +425,12 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) { return false; } + if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) { + if (!Self->SchemaSnapshotManager.Load(db)) { + return false; + } + } + if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::ChangeRecords::TableId)) { if (!Self->LoadChangeRecords(db, ChangeRecords)) { return false; @@ -512,12 +518,6 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) { } } - if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) { - if (!Self->SchemaSnapshotManager.Load(db)) { - return false; - } - } - if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::Locks::TableId)) { TDataShardLocksDb locksDb(*Self, txc); if (!Self->SysLocks.Load(locksDb)) { @@ -547,6 +547,7 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) { Self->SubscribeNewLocks(); Self->ScheduleRemoveAbandonedLockChanges(); + Self->ScheduleRemoveAbandonedSchemaSnapshots(); return true; } diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp index 181f3fdf8d23..9023b29d7b9f 100644 --- a/ydb/core/tx/datashard/datashard_change_sending.cpp +++ b/ydb/core/tx/datashard/datashard_change_sending.cpp @@ -286,7 +286,7 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase { ChangeExchangeSplit = true; } else { for (const auto dstTabletId : Self->ChangeSenderActivator.GetDstSet()) { - if (Self->SplitSrcSnapshotSender.Acked(dstTabletId)) { + if (Self->SplitSrcSnapshotSender.Acked(dstTabletId) && !Self->ChangeSenderActivator.Acked(dstTabletId)) { ActivationList.insert(dstTabletId); } } @@ -346,9 +346,7 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase { } for (const auto dstTabletId : ActivationList) { - if (!Self->ChangeSenderActivator.Acked(dstTabletId)) { - Self->ChangeSenderActivator.DoSend(dstTabletId, ctx); - } + Self->ChangeSenderActivator.DoSend(dstTabletId, ctx); } Self->CheckStateChange(ctx); @@ -383,7 +381,7 @@ class TDataShard::TTxChangeExchangeSplitAck: public TTransactionBase Y_ABORT_UNLESS(Self->ChangeExchangeSplitter.Done()); for (const auto dstTabletId : Self->ChangeSenderActivator.GetDstSet()) { - if (Self->SplitSrcSnapshotSender.Acked(dstTabletId)) { + if (Self->SplitSrcSnapshotSender.Acked(dstTabletId) && !Self->ChangeSenderActivator.Acked(dstTabletId)) { ActivationList.insert(dstTabletId); } } @@ -396,9 +394,7 @@ class TDataShard::TTxChangeExchangeSplitAck: public TTransactionBase << ", at tablet# " << Self->TabletID()); for (const auto dstTabletId : ActivationList) { - if (!Self->ChangeSenderActivator.Acked(dstTabletId)) { - Self->ChangeSenderActivator.DoSend(dstTabletId, ctx); - } + Self->ChangeSenderActivator.DoSend(dstTabletId, ctx); } } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index b770044dedb1..049ff62146fa 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -241,6 +241,7 @@ class TDataShard class TTxCdcStreamScanProgress; class TTxCdcStreamEmitHeartbeats; class TTxUpdateFollowerReadEdge; + class TTxRemoveSchemaSnapshots; template friend class TTxDirectBase; class TTxUploadRows; @@ -374,6 +375,7 @@ class TDataShard EvPlanPredictedTxs, EvStatisticsScanFinished, EvTableStatsError, + EvRemoveSchemaSnapshots, EvEnd }; @@ -595,6 +597,8 @@ class TDataShard struct TEvPlanPredictedTxs : public TEventLocal {}; struct TEvStatisticsScanFinished : public TEventLocal {}; + + struct TEvRemoveSchemaSnapshots : public TEventLocal {}; }; struct Schema : NIceDb::Schema { @@ -1383,6 +1387,8 @@ class TDataShard void Handle(TEvPrivate::TEvPlanPredictedTxs::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvRemoveSchemaSnapshots::TPtr& ev, const TActorContext& ctx); + void HandleByReplicationSourceOffsetsServer(STATEFN_SIG); void DoPeriodicTasks(const TActorContext &ctx); @@ -1906,7 +1912,8 @@ class TDataShard void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 order, const TPathId& pathId); void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 lockId, ui64 lockOffset, const TPathId& pathId); void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order); - void EnqueueChangeRecords(TVector&& records, ui64 cookie = 0); + // TODO(ilnaz): remove 'afterMove' after #6541 + void EnqueueChangeRecords(TVector&& records, ui64 cookie = 0, bool afterMove = false); ui32 GetFreeChangeQueueCapacity(ui64 cookie); ui64 ReserveChangeQueueCapacity(ui32 capacity); void UpdateChangeExchangeLag(TInstant now); @@ -1920,6 +1927,8 @@ class TDataShard bool LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector& records); void ScheduleRemoveLockChanges(ui64 lockId); void ScheduleRemoveAbandonedLockChanges(); + void ScheduleRemoveSchemaSnapshot(const TSchemaSnapshotKey& key); + void ScheduleRemoveAbandonedSchemaSnapshots(); static void PersistCdcStreamScanLastKey(NIceDb::TNiceDb& db, const TSerializedCellVec& value, const TPathId& tablePathId, const TPathId& streamPathId); @@ -2804,24 +2813,29 @@ class TDataShard ui64 LockOffset; ui64 ReservationCookie; - explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId, - ui64 schemaVersion, TInstant created, TInstant enqueued, - ui64 lockId = 0, ui64 lockOffset = 0, ui64 cookie = 0) + explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId, ui64 schemaVersion, + TInstant created, ui64 lockId = 0, ui64 lockOffset = 0) : BodySize(bodySize) , TableId(tableId) , SchemaVersion(schemaVersion) , SchemaSnapshotAcquired(false) , CreatedAt(created) - , EnqueuedAt(enqueued) + , EnqueuedAt(TInstant::Zero()) , LockId(lockId) , LockOffset(lockOffset) - , ReservationCookie(cookie) + , ReservationCookie(0) + { + } + + explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record) + : TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion, + record.CreatedAt(), record.LockId, record.LockOffset) { } - explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record, TInstant now, ui64 cookie) - : TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion, record.CreatedAt(), now, - record.LockId, record.LockOffset, cookie) + explicit TEnqueuedRecord(const TChangeRecord& record) + : TEnqueuedRecord(record.GetBody().size(), record.GetTableId(), record.GetSchemaVersion(), + record.GetApproximateCreationDateTime(), record.GetLockId(), record.GetLockOffset()) { } }; @@ -2863,9 +2877,11 @@ class TDataShard size_t Count = 0; }; + TVector CommittingChangeRecords; THashMap LockChangeRecords; // ui64 is lock id THashMap CommittedLockChangeRecords; // ui64 is lock id TVector PendingLockChangeRecordsToRemove; + TVector PendingSchemaSnapshotsToGc; // in THashMap InChangeSenders; // ui64 is shard id @@ -2965,6 +2981,16 @@ class TDataShard CommittedLockChangeRecords = std::move(committedLockChangeRecords); } + auto TakeChangesQueue() { + auto result = std::move(ChangesQueue); + ChangesQueue.clear(); + return result; + } + + void SetChangesQueue(THashMap&& changesQueue) { + ChangesQueue = std::move(changesQueue); + } + protected: // Redundant init state required by flat executor implementation void StateInit(TAutoPtr &ev) { @@ -2986,6 +3012,7 @@ class TDataShard HFuncTraced(TEvMediatorTimecast::TEvNotifyPlanStep, Handle); HFuncTraced(TEvPrivate::TEvMediatorRestoreBackup, Handle); HFuncTraced(TEvPrivate::TEvRemoveLockChangeRecords, Handle); + HFuncTraced(TEvPrivate::TEvRemoveSchemaSnapshots, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateInactive unhandled event type: " << ev->GetTypeRewrite() @@ -3114,6 +3141,7 @@ class TDataShard HFunc(TEvPrivate::TEvPlanPredictedTxs, Handle); HFunc(NStat::TEvStatistics::TEvStatisticsRequest, Handle); HFunc(TEvPrivate::TEvStatisticsScanFinished, Handle); + HFuncTraced(TEvPrivate::TEvRemoveSchemaSnapshots, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateWork unhandled event type: " << ev->GetTypeRewrite() << " event: " << ev->ToString()); diff --git a/ydb/core/tx/datashard/datashard_schema_snapshots.cpp b/ydb/core/tx/datashard/datashard_schema_snapshots.cpp index 7b603ac4eca2..9c56e4ad7381 100644 --- a/ydb/core/tx/datashard/datashard_schema_snapshots.cpp +++ b/ydb/core/tx/datashard/datashard_schema_snapshots.cpp @@ -20,6 +20,7 @@ TSchemaSnapshotManager::TSchemaSnapshotManager(const TDataShard* self) void TSchemaSnapshotManager::Reset() { Snapshots.clear(); + References.clear(); } bool TSchemaSnapshotManager::Load(NIceDb::TNiceDb& db) { @@ -79,14 +80,16 @@ const TSchemaSnapshot* TSchemaSnapshotManager::FindSnapshot(const TSchemaSnapsho return Snapshots.FindPtr(key); } -void TSchemaSnapshotManager::RemoveShapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key) { +void TSchemaSnapshotManager::RemoveShapshot(NTable::TDatabase& db, const TSchemaSnapshotKey& key) { auto it = Snapshots.find(key); if (it == Snapshots.end()) { return; } Snapshots.erase(it); - PersistRemoveSnapshot(db, key); + + NIceDb::TNiceDb nicedb(db); + PersistRemoveSnapshot(nicedb, key); } void TSchemaSnapshotManager::RenameSnapshots(NTable::TDatabase& db, @@ -119,6 +122,10 @@ void TSchemaSnapshotManager::RenameSnapshots(NTable::TDatabase& db, } } +const TSchemaSnapshotManager::TSnapshots& TSchemaSnapshotManager::GetSnapshots() const { + return Snapshots; +} + bool TSchemaSnapshotManager::AcquireReference(const TSchemaSnapshotKey& key) { auto it = Snapshots.find(key); if (it == Snapshots.end()) { @@ -152,6 +159,15 @@ bool TSchemaSnapshotManager::ReleaseReference(const TSchemaSnapshotKey& key) { return true; } +bool TSchemaSnapshotManager::HasReference(const TSchemaSnapshotKey& key) const { + auto refIt = References.find(key); + if (refIt != References.end()) { + return refIt->second; + } else { + return false; + } +} + void TSchemaSnapshotManager::PersistAddSnapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key, const TSchemaSnapshot& snapshot) { using Schema = TDataShard::Schema; db.Table() diff --git a/ydb/core/tx/datashard/datashard_schema_snapshots.h b/ydb/core/tx/datashard/datashard_schema_snapshots.h index db0d3b655b34..0bc80a628e2e 100644 --- a/ydb/core/tx/datashard/datashard_schema_snapshots.h +++ b/ydb/core/tx/datashard/datashard_schema_snapshots.h @@ -23,6 +23,8 @@ struct TSchemaSnapshot { }; class TSchemaSnapshotManager { + using TSnapshots = TMap>; + public: explicit TSchemaSnapshotManager(const TDataShard* self); @@ -31,11 +33,13 @@ class TSchemaSnapshotManager { bool AddSnapshot(NTable::TDatabase& db, const TSchemaSnapshotKey& key, const TSchemaSnapshot& snapshot); const TSchemaSnapshot* FindSnapshot(const TSchemaSnapshotKey& key) const; - void RemoveShapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key); + void RemoveShapshot(NTable::TDatabase& db, const TSchemaSnapshotKey& key); void RenameSnapshots(NTable::TDatabase& db, const TPathId& prevTableId, const TPathId& newTableId); + const TSnapshots& GetSnapshots() const; bool AcquireReference(const TSchemaSnapshotKey& key); bool ReleaseReference(const TSchemaSnapshotKey& key); + bool HasReference(const TSchemaSnapshotKey& key) const; private: void PersistAddSnapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key, const TSchemaSnapshot& snapshot); @@ -43,7 +47,7 @@ class TSchemaSnapshotManager { private: const TDataShard* Self; - TMap> Snapshots; + TSnapshots Snapshots; THashMap References; }; // TSchemaSnapshotManager diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp index b5e303fb5ba1..1e88d7495c63 100644 --- a/ydb/core/tx/datashard/datashard_split_src.cpp +++ b/ydb/core/tx/datashard/datashard_split_src.cpp @@ -244,6 +244,8 @@ class TDataShard::TTxSplitSnapshotComplete : public NTabletFlatExecutor::TTransa private: TIntrusivePtr SnapContext; bool ChangeExchangeSplit; + THashSet ActivationList; + THashSet SplitList; public: TTxSplitSnapshotComplete(TDataShard* ds, TIntrusivePtr snapContext) @@ -378,13 +380,11 @@ class TDataShard::TTxSplitSnapshotComplete : public NTabletFlatExecutor::TTransa proto->SetTimeoutMs(kv.second.Timeout.MilliSeconds()); } - if (Self->ChangesQueue || tableInfo.HasCdcStreams()) { + if (tableInfo.HasAsyncIndexes() || tableInfo.HasCdcStreams()) { snapshot->SetWaitForActivation(true); - Self->ChangeSenderActivator.AddDst(dstTablet); - db.Table().Key(dstTablet).Update(); - + ActivationList.insert(dstTablet); if (tableInfo.HasCdcStreams()) { - Self->ChangeExchangeSplitter.AddDst(dstTablet); + SplitList.insert(dstTablet); } } @@ -403,14 +403,23 @@ class TDataShard::TTxSplitSnapshotComplete : public NTabletFlatExecutor::TTransa } } - ChangeExchangeSplit = !Self->ChangesQueue && !Self->ChangeExchangeSplitter.Done(); - if (needToReadPages) { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " BorrowSnapshot is restarting for split OpId " << opId); return false; } else { txc.Env.DropSnapshot(SnapContext); + for (ui64 dstTabletId : ActivationList) { + Self->ChangeSenderActivator.AddDst(dstTabletId); + db.Table().Key(dstTabletId).Update(); + } + + for (ui64 dstTabletId : SplitList) { + Self->ChangeExchangeSplitter.AddDst(dstTabletId); + } + + ChangeExchangeSplit = !Self->ChangesQueue && !Self->ChangeExchangeSplitter.Done(); + Self->State = TShardState::SplitSrcSendingSnapshot; Self->PersistSys(db, Schema::Sys_State, Self->State); @@ -438,14 +447,14 @@ class TDataShard::TTxSplitTransferSnapshotAck : public NTabletFlatExecutor::TTra private: TEvDataShard::TEvSplitTransferSnapshotAck::TPtr Ev; bool AllDstAcksReceived; - bool Activate; + ui64 ActivateTabletId; public: TTxSplitTransferSnapshotAck(TDataShard* ds, TEvDataShard::TEvSplitTransferSnapshotAck::TPtr& ev) : NTabletFlatExecutor::TTransactionBase(ds) , Ev(ev) , AllDstAcksReceived(false) - , Activate(false) + , ActivateTabletId(0) {} TTxType GetTxType() const override { return TXTYPE_SPLIT_TRANSFER_SNAPSHOT_ACK; } @@ -469,8 +478,8 @@ class TDataShard::TTxSplitTransferSnapshotAck : public NTabletFlatExecutor::TTra // Remove the row for acked snapshot db.Table().Key(dstTabletId).Delete(); - if (!Self->ChangesQueue && Self->ChangeExchangeSplitter.Done()) { - Activate = !Self->ChangeSenderActivator.Acked(dstTabletId); + if (!Self->ChangesQueue && Self->ChangeExchangeSplitter.Done() && !Self->ChangeSenderActivator.Acked(dstTabletId)) { + ActivateTabletId = dstTabletId; } return true; @@ -485,11 +494,8 @@ class TDataShard::TTxSplitTransferSnapshotAck : public NTabletFlatExecutor::TTra } } - if (Activate) { - const ui64 dstTabletId = Ev->Get()->Record.GetTabletId(); - if (!Self->ChangeSenderActivator.Acked(dstTabletId)) { - Self->ChangeSenderActivator.DoSend(dstTabletId, ctx); - } + if (ActivateTabletId) { + Self->ChangeSenderActivator.DoSend(ActivateTabletId, ctx); } } }; diff --git a/ydb/core/tx/datashard/datashard_user_table.cpp b/ydb/core/tx/datashard/datashard_user_table.cpp index 4e3f58065826..3e95d901b9b8 100644 --- a/ydb/core/tx/datashard/datashard_user_table.cpp +++ b/ydb/core/tx/datashard/datashard_user_table.cpp @@ -392,6 +392,8 @@ void TUserTable::AlterSchema() { schema.SetPartitionRangeEnd(Range.To.GetBuffer()); schema.SetPartitionRangeEndIsInclusive(Range.ToInclusive); + ReplicationConfig.Serialize(*schema.MutableReplicationConfig()); + schema.SetName(Name); schema.SetPath(Path); diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h index 0f60e80569dc..af7c9ab2a6d8 100644 --- a/ydb/core/tx/datashard/datashard_user_table.h +++ b/ydb/core/tx/datashard/datashard_user_table.h @@ -339,6 +339,11 @@ struct TUserTable : public TThrRefBase { bool HasStrongConsistency() const { return Consistency == NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_STRONG; } + + void Serialize(NKikimrSchemeOp::TTableReplicationConfig& proto) const { + proto.SetMode(Mode); + proto.SetConsistency(Consistency); + } }; struct TStats { diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index de473a467412..518740f0d9bf 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -3500,6 +3500,194 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + void MustNotLoseSchemaSnapshot(bool enableVolatileTx) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableDataShardVolatileTransactions(enableVolatileTx) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + Updates(NKikimrSchemeOp::ECdcStreamFormatJson))); + + auto tabletIds = GetTableShards(server, edgeActor, "/Root/Table"); + UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1); + + std::vector> blockedRemoveRecords; + auto blockRemoveRecords = runtime.AddObserver([&](auto& ev) { + Cerr << "... blocked remove record" << Endl; + blockedRemoveRecords.emplace_back(ev.Release()); + }); + + Cerr << "... execute first query" << Endl; + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10); + )"); + + WaitFor(runtime, [&]{ return blockedRemoveRecords.size() == 1; }, "blocked remove records"); + blockRemoveRecords.Remove(); + + std::vector> blockedPlans; + auto blockPlans = runtime.AddObserver([&](auto& ev) { + blockedPlans.emplace_back(ev.Release()); + }); + + Cerr << "... execute scheme query" << Endl; + const auto alterTxId = AsyncAlterAddExtraColumn(server, "/Root", "Table"); + + WaitFor(runtime, [&]{ return blockedPlans.size() > 0; }, "blocked plans"); + blockPlans.Remove(); + + std::vector> blockedPutResponses; + auto blockPutResponses = runtime.AddObserver([&](auto& ev) { + auto* msg = ev->Get(); + if (msg->Id.TabletID() == tabletIds[0]) { + Cerr << "... blocked put response:" << msg->Id << Endl; + blockedPutResponses.emplace_back(ev.Release()); + } + }); + + Cerr << "... execute second query" << Endl; + SendSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20); + )"); + + WaitFor(runtime, [&]{ return blockedPutResponses.size() > 0; }, "blocked put responses"); + auto wasBlockedPutResponses = blockedPutResponses.size(); + + Cerr << "... release blocked plans" << Endl; + for (auto& ev : std::exchange(blockedPlans, {})) { + runtime.Send(ev.release(), 0, true); + } + + WaitFor(runtime, [&]{ return blockedPutResponses.size() > wasBlockedPutResponses; }, "blocked put responses"); + wasBlockedPutResponses = blockedPutResponses.size(); + + Cerr << "... release blocked remove records" << Endl; + for (auto& ev : std::exchange(blockedRemoveRecords, {})) { + runtime.Send(ev.release(), 0, true); + } + + WaitFor(runtime, [&]{ return blockedPutResponses.size() > wasBlockedPutResponses; }, "blocked put responses"); + blockPutResponses.Remove(); + + Cerr << "... release blocked put responses" << Endl; + for (auto& ev : std::exchange(blockedPutResponses, {})) { + runtime.Send(ev.release(), 0, true); + } + + Cerr << "... finalize" << Endl; + WaitTxNotification(server, edgeActor, alterTxId); + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + }); + } + + Y_UNIT_TEST(MustNotLoseSchemaSnapshot) { + MustNotLoseSchemaSnapshot(false); + } + + Y_UNIT_TEST(MustNotLoseSchemaSnapshotWithVolatileTx) { + MustNotLoseSchemaSnapshot(true); + } + + Y_UNIT_TEST(ResolvedTimestampsContinueAfterMerge) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + SetSplitMergePartCountLimit(&runtime, -1); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + WithResolvedTimestamps(TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))); + + Cerr << "... prepare" << Endl; + { + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"resolved":"***"})", + }); + + auto tabletIds = GetTableShards(server, edgeActor, "/Root/Table"); + UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1); + + WaitTxNotification(server, edgeActor, AsyncSplitTable(server, edgeActor, "/Root/Table", tabletIds.at(0), 2)); + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"resolved":"***"})", + R"({"resolved":"***"})", + }); + } + + auto initialTabletIds = GetTableShards(server, edgeActor, "/Root/Table"); + UNIT_ASSERT_VALUES_EQUAL(initialTabletIds.size(), 2); + + std::vector> blockedSplitRequests; + auto blockSplitRequests = runtime.AddObserver([&](auto& ev) { + if (ev->Get()->Record.GetPartitionRequest().HasCmdSplitMessageGroup()) { + blockedSplitRequests.emplace_back(ev.Release()); + } + }); + + Cerr << "... merge table" << Endl; + const auto mergeTxId = AsyncMergeTable(server, edgeActor, "/Root/Table", initialTabletIds); + WaitFor(runtime, [&]{ return blockedSplitRequests.size() == initialTabletIds.size(); }, "blocked split requests"); + blockSplitRequests.Remove(); + + std::vector> blockedRegisterRequests; + auto blockRegisterRequests = runtime.AddObserver([&](auto& ev) { + if (ev->Get()->Record.GetPartitionRequest().HasCmdRegisterMessageGroup()) { + blockedRegisterRequests.emplace_back(ev.Release()); + } + }); + + ui32 splitResponses = 0; + auto countSplitResponses = runtime.AddObserver([&](auto&) { + ++splitResponses; + }); + + Cerr << "... release split requests" << Endl; + for (auto& ev : std::exchange(blockedSplitRequests, {})) { + runtime.Send(ev.release(), 0, true); + WaitFor(runtime, [prev = splitResponses, &splitResponses]{ return splitResponses > prev; }, "split response"); + } + + Cerr << "... reboot pq tablet" << Endl; + RebootTablet(runtime, ResolvePqTablet(runtime, edgeActor, "/Root/Table/Stream", 0), edgeActor); + countSplitResponses.Remove(); + + Cerr << "... release register requests" << Endl; + blockRegisterRequests.Remove(); + for (auto& ev : std::exchange(blockedRegisterRequests, {})) { + runtime.Send(ev.release(), 0, true); + } + + Cerr << "... wait for merge tx notification" << Endl; + WaitTxNotification(server, edgeActor, mergeTxId); + + Cerr << "... wait for final heartbeat" << Endl; + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"resolved":"***"})", + R"({"resolved":"***"})", + R"({"resolved":"***"})", + }); + } + } // Cdc } // NKikimr diff --git a/ydb/core/tx/datashard/datashard_ut_replication.cpp b/ydb/core/tx/datashard/datashard_ut_replication.cpp index b0395077e915..30267d6537b4 100644 --- a/ydb/core/tx/datashard/datashard_ut_replication.cpp +++ b/ydb/core/tx/datashard/datashard_ut_replication.cpp @@ -244,6 +244,9 @@ Y_UNIT_TEST_SUITE(DataShardReplication) { ExecSQL(server, sender, "SELECT * FROM `/Root/table-1`"); ExecSQL(server, sender, "INSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);", true, Ydb::StatusIds::GENERIC_ERROR); + + WaitTxNotification(server, sender, AsyncAlterDropReplicationConfig(server, "/Root", "table-1")); + ExecSQL(server, sender, "INSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);"); } Y_UNIT_TEST(ApplyChangesToReplicatedTable) { diff --git a/ydb/core/tx/datashard/move_index_unit.cpp b/ydb/core/tx/datashard/move_index_unit.cpp index 6b3a30be457a..73fa338d35e7 100644 --- a/ydb/core/tx/datashard/move_index_unit.cpp +++ b/ydb/core/tx/datashard/move_index_unit.cpp @@ -60,20 +60,27 @@ class TMoveIndexUnit : public TExecutionUnit { NIceDb::TNiceDb db(txc.DB); ChangeRecords.clear(); - if (!DataShard.LoadChangeRecords(db, ChangeRecords)) { - return EExecutionStatus::Restart; - } + auto changesQueue = DataShard.TakeChangesQueue(); auto lockChangeRecords = DataShard.TakeLockChangeRecords(); auto committedLockChangeRecords = DataShard.TakeCommittedLockChangeRecords(); + if (!DataShard.LoadChangeRecords(db, ChangeRecords)) { + DataShard.SetChangesQueue(std::move(changesQueue)); + DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); + DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); + return EExecutionStatus::Restart; + } + if (!DataShard.LoadLockChangeRecords(db)) { + DataShard.SetChangesQueue(std::move(changesQueue)); DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); return EExecutionStatus::Restart; } if (!DataShard.LoadChangeRecordCommits(db, ChangeRecords)) { + DataShard.SetChangesQueue(std::move(changesQueue)); DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); return EExecutionStatus::Restart; @@ -99,7 +106,7 @@ class TMoveIndexUnit : public TExecutionUnit { void Complete(TOperation::TPtr, const TActorContext& ctx) override { DataShard.CreateChangeSender(ctx); DataShard.MaybeActivateChangeSender(ctx); - DataShard.EnqueueChangeRecords(std::move(ChangeRecords)); + DataShard.EnqueueChangeRecords(std::move(ChangeRecords), 0, true); } }; diff --git a/ydb/core/tx/datashard/move_table_unit.cpp b/ydb/core/tx/datashard/move_table_unit.cpp index 846f517ee10a..3e34394e15d6 100644 --- a/ydb/core/tx/datashard/move_table_unit.cpp +++ b/ydb/core/tx/datashard/move_table_unit.cpp @@ -60,20 +60,27 @@ class TMoveTableUnit : public TExecutionUnit { NIceDb::TNiceDb db(txc.DB); ChangeRecords.clear(); - if (!DataShard.LoadChangeRecords(db, ChangeRecords)) { - return EExecutionStatus::Restart; - } + auto changesQueue = DataShard.TakeChangesQueue(); auto lockChangeRecords = DataShard.TakeLockChangeRecords(); auto committedLockChangeRecords = DataShard.TakeCommittedLockChangeRecords(); + if (!DataShard.LoadChangeRecords(db, ChangeRecords)) { + DataShard.SetChangesQueue(std::move(changesQueue)); + DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); + DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); + return EExecutionStatus::Restart; + } + if (!DataShard.LoadLockChangeRecords(db)) { + DataShard.SetChangesQueue(std::move(changesQueue)); DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); return EExecutionStatus::Restart; } if (!DataShard.LoadChangeRecordCommits(db, ChangeRecords)) { + DataShard.SetChangesQueue(std::move(changesQueue)); DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); return EExecutionStatus::Restart; @@ -99,7 +106,7 @@ class TMoveTableUnit : public TExecutionUnit { void Complete(TOperation::TPtr, const TActorContext& ctx) override { DataShard.CreateChangeSender(ctx); DataShard.MaybeActivateChangeSender(ctx); - DataShard.EnqueueChangeRecords(std::move(ChangeRecords)); + DataShard.EnqueueChangeRecords(std::move(ChangeRecords), 0, true); } }; diff --git a/ydb/core/tx/datashard/remove_schema_snapshots.cpp b/ydb/core/tx/datashard/remove_schema_snapshots.cpp new file mode 100644 index 000000000000..fe63f30be61d --- /dev/null +++ b/ydb/core/tx/datashard/remove_schema_snapshots.cpp @@ -0,0 +1,54 @@ +#include "datashard_impl.h" + +namespace NKikimr::NDataShard { + +class TDataShard::TTxRemoveSchemaSnapshots: public NTabletFlatExecutor::TTransactionBase { +public: + TTxRemoveSchemaSnapshots(TDataShard* self) + : TBase(self) + { } + + TTxType GetTxType() const override { return TXTYPE_REMOVE_SCHEMA_SNAPSHOTS; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + while (!Self->PendingSchemaSnapshotsToGc.empty()) { + const auto key = Self->PendingSchemaSnapshotsToGc.back(); + const auto* snapshot = Self->GetSchemaSnapshotManager().FindSnapshot(key); + + if (!snapshot) { + Self->PendingSchemaSnapshotsToGc.pop_back(); + continue; + } + + if (Self->GetSchemaSnapshotManager().HasReference(key)) { + Self->PendingSchemaSnapshotsToGc.pop_back(); + continue; + } + + auto table = Self->FindUserTable(TPathId(key.OwnerId, key.PathId)); + if (!table) { + Self->PendingSchemaSnapshotsToGc.pop_back(); + continue; + } + + if (snapshot->Schema->GetTableSchemaVersion() >= table->GetTableSchemaVersion()) { + Self->PendingSchemaSnapshotsToGc.pop_back(); + continue; + } + + Self->GetSchemaSnapshotManager().RemoveShapshot(txc.DB, key); + Self->PendingSchemaSnapshotsToGc.pop_back(); + } + + return true; + } + + void Complete(const TActorContext&) override { + } +}; + +void TDataShard::Handle(TEvPrivate::TEvRemoveSchemaSnapshots::TPtr&, const TActorContext& ctx) { + Execute(new TTxRemoveSchemaSnapshots(this), ctx); +} + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp index 1421e3afa670..821acb6f89c5 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp @@ -1730,6 +1730,22 @@ ui64 AsyncAlterDropStream( return RunSchemeTx(*server->GetRuntime(), std::move(request)); } +ui64 AsyncAlterDropReplicationConfig( + Tests::TServer::TPtr server, + const TString& workingDir, + const TString& tableName) +{ + auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpAlterTable, workingDir); + auto& tx = *request->Record.MutableTransaction()->MutableModifyScheme(); + tx.SetInternal(true); + + auto& desc = *tx.MutableAlterTable(); + desc.SetName(tableName); + desc.MutableReplicationConfig()->SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_NONE); + + return RunSchemeTx(*server->GetRuntime(), std::move(request)); +} + ui64 AsyncCreateContinuousBackup( Tests::TServer::TPtr server, const TString& workingDir, diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h index 9257505c43a9..fad26710ac6a 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.h +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.h @@ -670,6 +670,11 @@ ui64 AsyncAlterDropStream( const TString& tableName, const TString& streamName); +ui64 AsyncAlterDropReplicationConfig( + Tests::TServer::TPtr server, + const TString& workingDir, + const TString& tableName); + ui64 AsyncCreateContinuousBackup( Tests::TServer::TPtr server, const TString& workingDir, diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index 7bcfa4c2af15..b16535504346 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -189,6 +189,7 @@ SRCS( receive_snapshot_unit.cpp remove_lock_change_records.cpp remove_locks.cpp + remove_schema_snapshots.cpp range_ops.cpp read_iterator.h restore_unit.cpp diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 05098b99d693..39e487f418de 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -6538,6 +6538,12 @@ TString TSchemeShard::FillAlterTableTxBody(TPathId pathId, TShardIdx shardIdx, T *patch); } + if (alterData->TableDescriptionFull.Defined() && alterData->TableDescriptionFull->HasReplicationConfig()) { + proto->MutableReplicationConfig()->CopyFrom(alterData->TableDescriptionFull->GetReplicationConfig()); + } else if (tableInfo->HasReplicationConfig()) { + proto->MutableReplicationConfig()->CopyFrom(tableInfo->ReplicationConfig()); + } + TString txBody; Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&txBody); return txBody; diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp index 42ecc9f6397d..7444ec4dac15 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots/ut_cdc_stream_reboots.cpp @@ -556,68 +556,77 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { }); } + bool CheckRegistrations(TTestActorRuntime& runtime, NKikimrPQ::TMessageGroupInfo::EState expectedState, + const google::protobuf::RepeatedPtrField& tablePartitions, + const google::protobuf::RepeatedPtrField& topicPartitions) + { + for (const auto& topicPartition : topicPartitions) { + auto request = MakeHolder(); + { + auto& record = *request->Record.MutablePartitionRequest(); + record.SetPartition(topicPartition.GetPartitionId()); + auto& cmd = *record.MutableCmdGetMaxSeqNo(); + for (const auto& tablePartition : tablePartitions) { + cmd.AddSourceId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(tablePartition.GetDatashardId()))); + } + } + + const auto& sender = runtime.AllocateEdgeActor(); + ForwardToTablet(runtime, topicPartition.GetTabletId(), sender, request.Release()); + + auto response = runtime.GrabEdgeEvent(sender); + { + const auto& record = response->Get()->Record.GetPartitionResponse(); + const auto& result = record.GetCmdGetMaxSeqNoResult().GetSourceIdInfo(); + + UNIT_ASSERT_VALUES_EQUAL(result.size(), tablePartitions.size()); + for (const auto& item: result) { + if (item.GetState() != expectedState) { + return false; + } + } + } + } + + return true; + } + struct TItem { TString Path; - ui32 nPartitions; + ui32 ExpectedPartitionCount; }; - void CheckRegistrations(TTestActorRuntime& runtime, const TItem& table, const TItem& topic) { + void CheckRegistrations(TTestActorRuntime& runtime, const TItem& table, const TItem& topic, + const google::protobuf::RepeatedPtrField* initialTablePartitions = nullptr) + { auto tableDesc = DescribePath(runtime, table.Path, true, true); const auto& tablePartitions = tableDesc.GetPathDescription().GetTablePartitions(); - UNIT_ASSERT_VALUES_EQUAL(tablePartitions.size(), table.nPartitions); + UNIT_ASSERT_VALUES_EQUAL(tablePartitions.size(), table.ExpectedPartitionCount); auto topicDesc = DescribePrivatePath(runtime, topic.Path); const auto& topicPartitions = topicDesc.GetPathDescription().GetPersQueueGroup().GetPartitions(); - UNIT_ASSERT_VALUES_EQUAL(topicPartitions.size(), topic.nPartitions); + UNIT_ASSERT_VALUES_EQUAL(topicPartitions.size(), topic.ExpectedPartitionCount); while (true) { runtime.SimulateSleep(TDuration::Seconds(1)); - bool done = true; - - for (ui32 i = 0; i < topic.nPartitions; ++i) { - auto request = MakeHolder(); - { - auto& record = *request->Record.MutablePartitionRequest(); - record.SetPartition(topicPartitions[i].GetPartitionId()); - auto& cmd = *record.MutableCmdGetMaxSeqNo(); - for (const auto& tablePartition : tablePartitions) { - cmd.AddSourceId(NPQ::NSourceIdEncoding::EncodeSimple(ToString(tablePartition.GetDatashardId()))); - } - } - - const auto& sender = runtime.AllocateEdgeActor(); - ForwardToTablet(runtime, topicPartitions[i].GetTabletId(), sender, request.Release()); - - auto response = runtime.GrabEdgeEvent(sender); - { - const auto& record = response->Get()->Record.GetPartitionResponse(); - const auto& result = record.GetCmdGetMaxSeqNoResult().GetSourceIdInfo(); - - UNIT_ASSERT_VALUES_EQUAL(result.size(), table.nPartitions); - for (const auto& item: result) { - done &= item.GetState() == NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED; - if (!done) { - break; - } - } - } - - if (!done) { - break; - } - } - - if (done) { + if (CheckRegistrations(runtime, NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED, tablePartitions, topicPartitions)) { break; } } + + if (initialTablePartitions) { + UNIT_ASSERT(CheckRegistrations(runtime, NKikimrPQ::TMessageGroupInfo::STATE_UNKNOWN, *initialTablePartitions, topicPartitions)); + } } - Y_UNIT_TEST_WITH_REBOOTS(SplitTable) { + template + void SplitTable(const TString& cdcStreamDesc) { T t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + NKikimrScheme::TEvDescribeSchemeResult initialTableDesc; { TInactiveZone inactive(activeZone); + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Uint32" } @@ -625,15 +634,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { KeyColumnNames: ["key"] )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); + initialTableDesc = DescribePath(runtime, "/MyRoot/Table", true, true); - TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"( - TableName: "Table" - StreamDescription { - Name: "Stream" - Mode: ECdcStreamModeKeysOnly - Format: ECdcStreamFormatProto - } - )"); + TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", cdcStreamDesc); t.TestEnv->TestWaitNotification(runtime, t.TxId); } @@ -651,16 +654,43 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { TInactiveZone inactive(activeZone); UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); UploadRow(runtime, "/MyRoot/Table", 1, {1}, {2}, {TCell::Make(Max())}, {TCell::Make(Max())}); - CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1}); + CheckRegistrations(runtime, {"/MyRoot/Table", 2}, {"/MyRoot/Table/Stream/streamImpl", 1}, + &initialTableDesc.GetPathDescription().GetTablePartitions()); } }); } - Y_UNIT_TEST_WITH_REBOOTS(MergeTable) { + Y_UNIT_TEST_WITH_REBOOTS(SplitTable) { + SplitTable(R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + } + + Y_UNIT_TEST_WITH_REBOOTS(SplitTableResolvedTimestamps) { + SplitTable(R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + ResolvedTimestampsIntervalMs: 1000 + } + )"); + } + + template + void MergeTable(const TString& cdcStreamDesc) { T t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + NKikimrScheme::TEvDescribeSchemeResult initialTableDesc; { TInactiveZone inactive(activeZone); + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Uint32" } @@ -674,15 +704,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { } )"); t.TestEnv->TestWaitNotification(runtime, t.TxId); + initialTableDesc = DescribePath(runtime, "/MyRoot/Table", true, true); - TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", R"( - TableName: "Table" - StreamDescription { - Name: "Stream" - Mode: ECdcStreamModeKeysOnly - Format: ECdcStreamFormatProto - } - )"); + TestCreateCdcStream(runtime, ++t.TxId, "/MyRoot", cdcStreamDesc); t.TestEnv->TestWaitNotification(runtime, t.TxId); } @@ -696,11 +720,35 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { TInactiveZone inactive(activeZone); UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)}); UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(Max())}, {TCell::Make(Max())}); - CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2}); + CheckRegistrations(runtime, {"/MyRoot/Table", 1}, {"/MyRoot/Table/Stream/streamImpl", 2}, + &initialTableDesc.GetPathDescription().GetTablePartitions()); } }); } + Y_UNIT_TEST_WITH_REBOOTS(MergeTable) { + MergeTable(R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + } + )"); + } + + Y_UNIT_TEST_WITH_REBOOTS(MergeTableResolvedTimestamps) { + MergeTable(R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + ResolvedTimestampsIntervalMs: 1000 + } + )"); + } + Y_UNIT_TEST_WITH_REBOOTS(RacySplitTableAndCreateStream) { T t; t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {