diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto index 55299cdb93ba..4b168f10e5cf 100644 --- a/ydb/core/protos/counters_datashard.proto +++ b/ydb/core/protos/counters_datashard.proto @@ -490,4 +490,5 @@ enum ETxTypes { TXTYPE_CLEANUP_VOLATILE = 80 [(TxTypeOpts) = {Name: "TxCleanupVolatile"}]; TXTYPE_PLAN_PREDICTED_TXS = 81 [(TxTypeOpts) = {Name: "TxPlanPredictedTxs"}]; TXTYPE_WRITE = 82 [(TxTypeOpts) = {Name: "TxWrite"}]; + TXTYPE_REMOVE_SCHEMA_SNAPSHOTS = 83 [(TxTypeOpts) = {Name: "TxRemoveSchemaSnapshots"}]; } diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index ba07749ec285..7dd1d115a230 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -855,6 +855,39 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r NIceDb::TUpdate(record.GetKind()), NIceDb::TUpdate(record.GetBody()), NIceDb::TUpdate(record.GetSource())); + + auto res = ChangesQueue.emplace(record.GetOrder(), record); + Y_VERIFY_S(res.second, "Duplicate change record: " << record.GetOrder()); + + if (res.first->second.SchemaVersion) { + res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference( + TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion)); + } + + if (CommittingChangeRecords.empty()) { + db.GetDatabase().OnCommit([this] { + CommittingChangeRecords.clear(); + }); + db.GetDatabase().OnRollback([this] { + for (const auto order : CommittingChangeRecords) { + auto cIt = ChangesQueue.find(order); + Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order); + + if (cIt->second.SchemaSnapshotAcquired) { + const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion); + if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) { + ScheduleRemoveSchemaSnapshot(snapshotKey); + } + } + + ChangesQueue.erase(cIt); + } + + CommittingChangeRecords.clear(); + }); + } + + CommittingChangeRecords.push_back(record.GetOrder()); } else { auto& state = LockChangeRecords[lockId]; Y_ABORT_UNLESS(state.Changes.empty() || state.Changes.back().LockOffset < record.GetLockOffset(), @@ -934,6 +967,14 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64 committed.Step = rowVersion.Step; committed.TxId = rowVersion.TxId; collected.push_back(committed); + + auto res = ChangesQueue.emplace(committed.Order, committed); + Y_VERIFY_S(res.second, "Duplicate change record: " << committed.Order); + + if (res.first->second.SchemaVersion) { + res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference( + TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion)); + } } Y_VERIFY_S(!CommittedLockChangeRecords.contains(lockId), "Cannot commit lock " << lockId << " more than once"); @@ -960,7 +1001,26 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64 LockChangeRecords.erase(it); }); db.GetDatabase().OnRollback([this, lockId]() { - CommittedLockChangeRecords.erase(lockId); + auto it = CommittedLockChangeRecords.find(lockId); + Y_VERIFY_S(it != CommittedLockChangeRecords.end(), "Unexpected failure to find lockId# " << lockId); + + for (size_t i = 0; i < it->second.Count; ++i) { + const ui64 order = it->second.Order + i; + + auto cIt = ChangesQueue.find(order); + Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order); + + if (cIt->second.SchemaSnapshotAcquired) { + const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion); + if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) { + ScheduleRemoveSchemaSnapshot(snapshotKey); + } + } + + ChangesQueue.erase(cIt); + } + + CommittedLockChangeRecords.erase(it); }); } @@ -994,7 +1054,6 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) { auto it = ChangesQueue.find(order); if (it == ChangesQueue.end()) { - Y_VERIFY_DEBUG_S(false, "Trying to remove non-enqueud record: " << order); return; } @@ -1022,23 +1081,9 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) { ChangesQueueBytes -= record.BodySize; if (record.SchemaSnapshotAcquired) { - Y_ABORT_UNLESS(record.TableId); - auto tableIt = TableInfos.find(record.TableId.LocalPathId); - - if (tableIt != TableInfos.end()) { - const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion); - const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey); - - if (last) { - const auto* snapshot = SchemaSnapshotManager.FindSnapshot(snapshotKey); - Y_ABORT_UNLESS(snapshot); - - if (snapshot->Schema->GetTableSchemaVersion() < tableIt->second->GetTableSchemaVersion()) { - SchemaSnapshotManager.RemoveShapshot(db, snapshotKey); - } - } - } else { - Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline); + const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion); + if (const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) { + ScheduleRemoveSchemaSnapshot(snapshotKey); } } @@ -1059,7 +1104,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) { CheckChangesQueueNoOverflow(); } -void TDataShard::EnqueueChangeRecords(TVector&& records, ui64 cookie) { +void TDataShard::EnqueueChangeRecords(TVector&& records, ui64 cookie, bool afterMove) { if (!records) { return; } @@ -1079,27 +1124,24 @@ void TDataShard::EnqueueChangeRecords(TVectorTimeProvider->Now(); TVector forward(Reserve(records.size())); for (const auto& record : records) { - forward.emplace_back(record.Order, record.PathId, record.BodySize); + auto it = ChangesQueue.find(record.Order); + if (it == ChangesQueue.end()) { + Y_ABORT_UNLESS(afterMove); + continue; + } - auto res = ChangesQueue.emplace( - std::piecewise_construct, - std::forward_as_tuple(record.Order), - std::forward_as_tuple(record, now, cookie) - ); - if (res.second) { - ChangesList.PushBack(&res.first->second); + forward.emplace_back(record.Order, record.PathId, record.BodySize); - Y_ABORT_UNLESS(ChangesQueueBytes <= (Max() - record.BodySize)); - ChangesQueueBytes += record.BodySize; + it->second.EnqueuedAt = now; + it->second.ReservationCookie = cookie; + ChangesList.PushBack(&it->second); - if (record.SchemaVersion) { - res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference( - TSchemaSnapshotKey(record.TableId, record.SchemaVersion)); - } - } + Y_ABORT_UNLESS(ChangesQueueBytes <= (Max() - record.BodySize)); + ChangesQueueBytes += record.BodySize; } - + if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) { + Y_ABORT_UNLESS(!afterMove); ChangeQueueReservedCapacity -= it->second; ChangeQueueReservedCapacity += records.size(); } @@ -1265,6 +1307,14 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVectorsecond.SchemaVersion) { + res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference( + TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion)); + } + if (!rowset.Next()) { return false; } @@ -1363,6 +1413,14 @@ bool TDataShard::LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVectorsecond.SchemaVersion) { + res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference( + TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion)); + } } LockChangeRecords.erase(lockId); @@ -1421,6 +1479,51 @@ void TDataShard::ScheduleRemoveAbandonedLockChanges() { } } +void TDataShard::ScheduleRemoveSchemaSnapshot(const TSchemaSnapshotKey& key) { + Y_ABORT_UNLESS(!SchemaSnapshotManager.HasReference(key)); + + const auto* snapshot = SchemaSnapshotManager.FindSnapshot(key); + Y_ABORT_UNLESS(snapshot); + + auto it = TableInfos.find(key.PathId); + if (it == TableInfos.end()) { + Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline); + return; + } + + if (snapshot->Schema->GetTableSchemaVersion() < it->second->GetTableSchemaVersion()) { + bool wasEmpty = PendingSchemaSnapshotsToGc.empty(); + PendingSchemaSnapshotsToGc.push_back(key); + if (wasEmpty) { + Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots); + } + } +} + +void TDataShard::ScheduleRemoveAbandonedSchemaSnapshots() { + bool wasEmpty = PendingSchemaSnapshotsToGc.empty(); + + for (const auto& [key, snapshot] : SchemaSnapshotManager.GetSnapshots()) { + auto it = TableInfos.find(key.PathId); + if (it == TableInfos.end()) { + Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline); + break; + } + if (SchemaSnapshotManager.HasReference(key)) { + continue; + } + if (snapshot.Schema->GetTableSchemaVersion() >= it->second->GetTableSchemaVersion()) { + continue; + } + + PendingSchemaSnapshotsToGc.push_back(key); + } + + if (wasEmpty && !PendingSchemaSnapshotsToGc.empty()) { + Send(SelfId(), new TEvPrivate::TEvRemoveSchemaSnapshots); + } +} + void TDataShard::PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation &op) { db.Table().Key(op.TxId).Update( NIceDb::TUpdate(op.Success), @@ -1649,8 +1752,18 @@ void TDataShard::AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersio Y_ABORT_UNLESS(TableInfos.contains(pathId.LocalPathId)); auto tableInfo = TableInfos[pathId.LocalPathId]; - const auto key = TSchemaSnapshotKey(pathId.OwnerId, pathId.LocalPathId, tableSchemaVersion); + const auto key = TSchemaSnapshotKey(pathId, tableSchemaVersion); SchemaSnapshotManager.AddSnapshot(txc.DB, key, TSchemaSnapshot(tableInfo, step, txId)); + + const auto& snapshots = SchemaSnapshotManager.GetSnapshots(); + for (auto it = snapshots.lower_bound(TSchemaSnapshotKey(pathId, 1)); it != snapshots.end(); ++it) { + if (it->first == key) { + break; + } + if (!SchemaSnapshotManager.HasReference(it->first)) { + ScheduleRemoveSchemaSnapshot(it->first); + } + } } void TDataShard::PersistLastLoanTableTid(NIceDb::TNiceDb& db, ui32 localTid) { diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp index 89981068248d..a0561a9c998f 100644 --- a/ydb/core/tx/datashard/datashard__init.cpp +++ b/ydb/core/tx/datashard/datashard__init.cpp @@ -425,6 +425,12 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) { return false; } + if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) { + if (!Self->SchemaSnapshotManager.Load(db)) { + return false; + } + } + if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::ChangeRecords::TableId)) { if (!Self->LoadChangeRecords(db, ChangeRecords)) { return false; @@ -512,12 +518,6 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) { } } - if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::SchemaSnapshots::TableId)) { - if (!Self->SchemaSnapshotManager.Load(db)) { - return false; - } - } - if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::Locks::TableId)) { TDataShardLocksDb locksDb(*Self, txc); if (!Self->SysLocks.Load(locksDb)) { @@ -547,6 +547,7 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) { Self->SubscribeNewLocks(); Self->ScheduleRemoveAbandonedLockChanges(); + Self->ScheduleRemoveAbandonedSchemaSnapshots(); return true; } diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp index eade34395329..9023b29d7b9f 100644 --- a/ydb/core/tx/datashard/datashard_change_sending.cpp +++ b/ydb/core/tx/datashard/datashard_change_sending.cpp @@ -340,15 +340,13 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase { Self->RemoveChangeRecordsInFly = false; } - if (!Self->ChangesQueue) { // double check queue - if (ChangeExchangeSplit) { - Self->KillChangeSender(ctx); - Self->ChangeExchangeSplitter.DoSplit(ctx); - } + if (ChangeExchangeSplit) { + Self->KillChangeSender(ctx); + Self->ChangeExchangeSplitter.DoSplit(ctx); + } - for (const auto dstTabletId : ActivationList) { - Self->ChangeSenderActivator.DoSend(dstTabletId, ctx); - } + for (const auto dstTabletId : ActivationList) { + Self->ChangeSenderActivator.DoSend(dstTabletId, ctx); } Self->CheckStateChange(ctx); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index b770044dedb1..049ff62146fa 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -241,6 +241,7 @@ class TDataShard class TTxCdcStreamScanProgress; class TTxCdcStreamEmitHeartbeats; class TTxUpdateFollowerReadEdge; + class TTxRemoveSchemaSnapshots; template friend class TTxDirectBase; class TTxUploadRows; @@ -374,6 +375,7 @@ class TDataShard EvPlanPredictedTxs, EvStatisticsScanFinished, EvTableStatsError, + EvRemoveSchemaSnapshots, EvEnd }; @@ -595,6 +597,8 @@ class TDataShard struct TEvPlanPredictedTxs : public TEventLocal {}; struct TEvStatisticsScanFinished : public TEventLocal {}; + + struct TEvRemoveSchemaSnapshots : public TEventLocal {}; }; struct Schema : NIceDb::Schema { @@ -1383,6 +1387,8 @@ class TDataShard void Handle(TEvPrivate::TEvPlanPredictedTxs::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvRemoveSchemaSnapshots::TPtr& ev, const TActorContext& ctx); + void HandleByReplicationSourceOffsetsServer(STATEFN_SIG); void DoPeriodicTasks(const TActorContext &ctx); @@ -1906,7 +1912,8 @@ class TDataShard void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 order, const TPathId& pathId); void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 lockId, ui64 lockOffset, const TPathId& pathId); void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order); - void EnqueueChangeRecords(TVector&& records, ui64 cookie = 0); + // TODO(ilnaz): remove 'afterMove' after #6541 + void EnqueueChangeRecords(TVector&& records, ui64 cookie = 0, bool afterMove = false); ui32 GetFreeChangeQueueCapacity(ui64 cookie); ui64 ReserveChangeQueueCapacity(ui32 capacity); void UpdateChangeExchangeLag(TInstant now); @@ -1920,6 +1927,8 @@ class TDataShard bool LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector& records); void ScheduleRemoveLockChanges(ui64 lockId); void ScheduleRemoveAbandonedLockChanges(); + void ScheduleRemoveSchemaSnapshot(const TSchemaSnapshotKey& key); + void ScheduleRemoveAbandonedSchemaSnapshots(); static void PersistCdcStreamScanLastKey(NIceDb::TNiceDb& db, const TSerializedCellVec& value, const TPathId& tablePathId, const TPathId& streamPathId); @@ -2804,24 +2813,29 @@ class TDataShard ui64 LockOffset; ui64 ReservationCookie; - explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId, - ui64 schemaVersion, TInstant created, TInstant enqueued, - ui64 lockId = 0, ui64 lockOffset = 0, ui64 cookie = 0) + explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId, ui64 schemaVersion, + TInstant created, ui64 lockId = 0, ui64 lockOffset = 0) : BodySize(bodySize) , TableId(tableId) , SchemaVersion(schemaVersion) , SchemaSnapshotAcquired(false) , CreatedAt(created) - , EnqueuedAt(enqueued) + , EnqueuedAt(TInstant::Zero()) , LockId(lockId) , LockOffset(lockOffset) - , ReservationCookie(cookie) + , ReservationCookie(0) + { + } + + explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record) + : TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion, + record.CreatedAt(), record.LockId, record.LockOffset) { } - explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record, TInstant now, ui64 cookie) - : TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion, record.CreatedAt(), now, - record.LockId, record.LockOffset, cookie) + explicit TEnqueuedRecord(const TChangeRecord& record) + : TEnqueuedRecord(record.GetBody().size(), record.GetTableId(), record.GetSchemaVersion(), + record.GetApproximateCreationDateTime(), record.GetLockId(), record.GetLockOffset()) { } }; @@ -2863,9 +2877,11 @@ class TDataShard size_t Count = 0; }; + TVector CommittingChangeRecords; THashMap LockChangeRecords; // ui64 is lock id THashMap CommittedLockChangeRecords; // ui64 is lock id TVector PendingLockChangeRecordsToRemove; + TVector PendingSchemaSnapshotsToGc; // in THashMap InChangeSenders; // ui64 is shard id @@ -2965,6 +2981,16 @@ class TDataShard CommittedLockChangeRecords = std::move(committedLockChangeRecords); } + auto TakeChangesQueue() { + auto result = std::move(ChangesQueue); + ChangesQueue.clear(); + return result; + } + + void SetChangesQueue(THashMap&& changesQueue) { + ChangesQueue = std::move(changesQueue); + } + protected: // Redundant init state required by flat executor implementation void StateInit(TAutoPtr &ev) { @@ -2986,6 +3012,7 @@ class TDataShard HFuncTraced(TEvMediatorTimecast::TEvNotifyPlanStep, Handle); HFuncTraced(TEvPrivate::TEvMediatorRestoreBackup, Handle); HFuncTraced(TEvPrivate::TEvRemoveLockChangeRecords, Handle); + HFuncTraced(TEvPrivate::TEvRemoveSchemaSnapshots, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateInactive unhandled event type: " << ev->GetTypeRewrite() @@ -3114,6 +3141,7 @@ class TDataShard HFunc(TEvPrivate::TEvPlanPredictedTxs, Handle); HFunc(NStat::TEvStatistics::TEvStatisticsRequest, Handle); HFunc(TEvPrivate::TEvStatisticsScanFinished, Handle); + HFuncTraced(TEvPrivate::TEvRemoveSchemaSnapshots, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { ALOG_WARN(NKikimrServices::TX_DATASHARD, "TDataShard::StateWork unhandled event type: " << ev->GetTypeRewrite() << " event: " << ev->ToString()); diff --git a/ydb/core/tx/datashard/datashard_schema_snapshots.cpp b/ydb/core/tx/datashard/datashard_schema_snapshots.cpp index 7b603ac4eca2..9c56e4ad7381 100644 --- a/ydb/core/tx/datashard/datashard_schema_snapshots.cpp +++ b/ydb/core/tx/datashard/datashard_schema_snapshots.cpp @@ -20,6 +20,7 @@ TSchemaSnapshotManager::TSchemaSnapshotManager(const TDataShard* self) void TSchemaSnapshotManager::Reset() { Snapshots.clear(); + References.clear(); } bool TSchemaSnapshotManager::Load(NIceDb::TNiceDb& db) { @@ -79,14 +80,16 @@ const TSchemaSnapshot* TSchemaSnapshotManager::FindSnapshot(const TSchemaSnapsho return Snapshots.FindPtr(key); } -void TSchemaSnapshotManager::RemoveShapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key) { +void TSchemaSnapshotManager::RemoveShapshot(NTable::TDatabase& db, const TSchemaSnapshotKey& key) { auto it = Snapshots.find(key); if (it == Snapshots.end()) { return; } Snapshots.erase(it); - PersistRemoveSnapshot(db, key); + + NIceDb::TNiceDb nicedb(db); + PersistRemoveSnapshot(nicedb, key); } void TSchemaSnapshotManager::RenameSnapshots(NTable::TDatabase& db, @@ -119,6 +122,10 @@ void TSchemaSnapshotManager::RenameSnapshots(NTable::TDatabase& db, } } +const TSchemaSnapshotManager::TSnapshots& TSchemaSnapshotManager::GetSnapshots() const { + return Snapshots; +} + bool TSchemaSnapshotManager::AcquireReference(const TSchemaSnapshotKey& key) { auto it = Snapshots.find(key); if (it == Snapshots.end()) { @@ -152,6 +159,15 @@ bool TSchemaSnapshotManager::ReleaseReference(const TSchemaSnapshotKey& key) { return true; } +bool TSchemaSnapshotManager::HasReference(const TSchemaSnapshotKey& key) const { + auto refIt = References.find(key); + if (refIt != References.end()) { + return refIt->second; + } else { + return false; + } +} + void TSchemaSnapshotManager::PersistAddSnapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key, const TSchemaSnapshot& snapshot) { using Schema = TDataShard::Schema; db.Table() diff --git a/ydb/core/tx/datashard/datashard_schema_snapshots.h b/ydb/core/tx/datashard/datashard_schema_snapshots.h index db0d3b655b34..0bc80a628e2e 100644 --- a/ydb/core/tx/datashard/datashard_schema_snapshots.h +++ b/ydb/core/tx/datashard/datashard_schema_snapshots.h @@ -23,6 +23,8 @@ struct TSchemaSnapshot { }; class TSchemaSnapshotManager { + using TSnapshots = TMap>; + public: explicit TSchemaSnapshotManager(const TDataShard* self); @@ -31,11 +33,13 @@ class TSchemaSnapshotManager { bool AddSnapshot(NTable::TDatabase& db, const TSchemaSnapshotKey& key, const TSchemaSnapshot& snapshot); const TSchemaSnapshot* FindSnapshot(const TSchemaSnapshotKey& key) const; - void RemoveShapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key); + void RemoveShapshot(NTable::TDatabase& db, const TSchemaSnapshotKey& key); void RenameSnapshots(NTable::TDatabase& db, const TPathId& prevTableId, const TPathId& newTableId); + const TSnapshots& GetSnapshots() const; bool AcquireReference(const TSchemaSnapshotKey& key); bool ReleaseReference(const TSchemaSnapshotKey& key); + bool HasReference(const TSchemaSnapshotKey& key) const; private: void PersistAddSnapshot(NIceDb::TNiceDb& db, const TSchemaSnapshotKey& key, const TSchemaSnapshot& snapshot); @@ -43,7 +47,7 @@ class TSchemaSnapshotManager { private: const TDataShard* Self; - TMap> Snapshots; + TSnapshots Snapshots; THashMap References; }; // TSchemaSnapshotManager diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp index 47a806bc579d..1e88d7495c63 100644 --- a/ydb/core/tx/datashard/datashard_split_src.cpp +++ b/ydb/core/tx/datashard/datashard_split_src.cpp @@ -430,7 +430,7 @@ class TDataShard::TTxSplitSnapshotComplete : public NTabletFlatExecutor::TTransa void Complete(const TActorContext &ctx) override { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Sending snapshots from src for split OpId " << Self->SrcSplitOpId); Self->SplitSrcSnapshotSender.DoSend(ctx); - if (ChangeExchangeSplit && !Self->ChangesQueue) { // double check queue + if (ChangeExchangeSplit) { Self->KillChangeSender(ctx); Self->ChangeExchangeSplitter.DoSplit(ctx); } @@ -494,7 +494,7 @@ class TDataShard::TTxSplitTransferSnapshotAck : public NTabletFlatExecutor::TTra } } - if (ActivateTabletId && !Self->ChangesQueue) { // double check queue + if (ActivateTabletId) { Self->ChangeSenderActivator.DoSend(ActivateTabletId, ctx); } } diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index de473a467412..31e6d1b7494a 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -3500,6 +3500,106 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + void MustNotLoseSchemaSnapshot(bool enableVolatileTx) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + .SetEnableDataShardVolatileTransactions(enableVolatileTx) + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + Updates(NKikimrSchemeOp::ECdcStreamFormatJson))); + + auto tabletIds = GetTableShards(server, edgeActor, "/Root/Table"); + UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1); + + std::vector> blockedRemoveRecords; + auto blockRemoveRecords = runtime.AddObserver([&](auto& ev) { + Cerr << "... blocked remove record" << Endl; + blockedRemoveRecords.emplace_back(ev.Release()); + }); + + Cerr << "... execute first query" << Endl; + ExecSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10); + )"); + + WaitFor(runtime, [&]{ return blockedRemoveRecords.size() == 1; }, "blocked remove records"); + blockRemoveRecords.Remove(); + + std::vector> blockedPlans; + auto blockPlans = runtime.AddObserver([&](auto& ev) { + blockedPlans.emplace_back(ev.Release()); + }); + + Cerr << "... execute scheme query" << Endl; + const auto alterTxId = AsyncAlterAddExtraColumn(server, "/Root", "Table"); + + WaitFor(runtime, [&]{ return blockedPlans.size() > 0; }, "blocked plans"); + blockPlans.Remove(); + + std::vector> blockedPutResponses; + auto blockPutResponses = runtime.AddObserver([&](auto& ev) { + auto* msg = ev->Get(); + if (msg->Id.TabletID() == tabletIds[0]) { + Cerr << "... blocked put response:" << msg->Id << Endl; + blockedPutResponses.emplace_back(ev.Release()); + } + }); + + Cerr << "... execute second query" << Endl; + SendSQL(server, edgeActor, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20); + )"); + + WaitFor(runtime, [&]{ return blockedPutResponses.size() > 0; }, "blocked put responses"); + auto wasBlockedPutResponses = blockedPutResponses.size(); + + Cerr << "... release blocked plans" << Endl; + for (auto& ev : std::exchange(blockedPlans, {})) { + runtime.Send(ev.release(), 0, true); + } + + WaitFor(runtime, [&]{ return blockedPutResponses.size() > wasBlockedPutResponses; }, "blocked put responses"); + wasBlockedPutResponses = blockedPutResponses.size(); + + Cerr << "... release blocked remove records" << Endl; + for (auto& ev : std::exchange(blockedRemoveRecords, {})) { + runtime.Send(ev.release(), 0, true); + } + + WaitFor(runtime, [&]{ return blockedPutResponses.size() > wasBlockedPutResponses; }, "blocked put responses"); + blockPutResponses.Remove(); + + Cerr << "... release blocked put responses" << Endl; + for (auto& ev : std::exchange(blockedPutResponses, {})) { + runtime.Send(ev.release(), 0, true); + } + + Cerr << "... finalize" << Endl; + WaitTxNotification(server, edgeActor, alterTxId); + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"update":{"value":10},"key":[1]})", + R"({"update":{"value":20},"key":[2]})", + }); + } + + Y_UNIT_TEST(MustNotLoseSchemaSnapshot) { + MustNotLoseSchemaSnapshot(false); + } + + Y_UNIT_TEST(MustNotLoseSchemaSnapshotWithVolatileTx) { + MustNotLoseSchemaSnapshot(true); + } + } // Cdc } // NKikimr diff --git a/ydb/core/tx/datashard/move_index_unit.cpp b/ydb/core/tx/datashard/move_index_unit.cpp index 6b3a30be457a..73fa338d35e7 100644 --- a/ydb/core/tx/datashard/move_index_unit.cpp +++ b/ydb/core/tx/datashard/move_index_unit.cpp @@ -60,20 +60,27 @@ class TMoveIndexUnit : public TExecutionUnit { NIceDb::TNiceDb db(txc.DB); ChangeRecords.clear(); - if (!DataShard.LoadChangeRecords(db, ChangeRecords)) { - return EExecutionStatus::Restart; - } + auto changesQueue = DataShard.TakeChangesQueue(); auto lockChangeRecords = DataShard.TakeLockChangeRecords(); auto committedLockChangeRecords = DataShard.TakeCommittedLockChangeRecords(); + if (!DataShard.LoadChangeRecords(db, ChangeRecords)) { + DataShard.SetChangesQueue(std::move(changesQueue)); + DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); + DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); + return EExecutionStatus::Restart; + } + if (!DataShard.LoadLockChangeRecords(db)) { + DataShard.SetChangesQueue(std::move(changesQueue)); DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); return EExecutionStatus::Restart; } if (!DataShard.LoadChangeRecordCommits(db, ChangeRecords)) { + DataShard.SetChangesQueue(std::move(changesQueue)); DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); return EExecutionStatus::Restart; @@ -99,7 +106,7 @@ class TMoveIndexUnit : public TExecutionUnit { void Complete(TOperation::TPtr, const TActorContext& ctx) override { DataShard.CreateChangeSender(ctx); DataShard.MaybeActivateChangeSender(ctx); - DataShard.EnqueueChangeRecords(std::move(ChangeRecords)); + DataShard.EnqueueChangeRecords(std::move(ChangeRecords), 0, true); } }; diff --git a/ydb/core/tx/datashard/move_table_unit.cpp b/ydb/core/tx/datashard/move_table_unit.cpp index 846f517ee10a..3e34394e15d6 100644 --- a/ydb/core/tx/datashard/move_table_unit.cpp +++ b/ydb/core/tx/datashard/move_table_unit.cpp @@ -60,20 +60,27 @@ class TMoveTableUnit : public TExecutionUnit { NIceDb::TNiceDb db(txc.DB); ChangeRecords.clear(); - if (!DataShard.LoadChangeRecords(db, ChangeRecords)) { - return EExecutionStatus::Restart; - } + auto changesQueue = DataShard.TakeChangesQueue(); auto lockChangeRecords = DataShard.TakeLockChangeRecords(); auto committedLockChangeRecords = DataShard.TakeCommittedLockChangeRecords(); + if (!DataShard.LoadChangeRecords(db, ChangeRecords)) { + DataShard.SetChangesQueue(std::move(changesQueue)); + DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); + DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); + return EExecutionStatus::Restart; + } + if (!DataShard.LoadLockChangeRecords(db)) { + DataShard.SetChangesQueue(std::move(changesQueue)); DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); return EExecutionStatus::Restart; } if (!DataShard.LoadChangeRecordCommits(db, ChangeRecords)) { + DataShard.SetChangesQueue(std::move(changesQueue)); DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); return EExecutionStatus::Restart; @@ -99,7 +106,7 @@ class TMoveTableUnit : public TExecutionUnit { void Complete(TOperation::TPtr, const TActorContext& ctx) override { DataShard.CreateChangeSender(ctx); DataShard.MaybeActivateChangeSender(ctx); - DataShard.EnqueueChangeRecords(std::move(ChangeRecords)); + DataShard.EnqueueChangeRecords(std::move(ChangeRecords), 0, true); } }; diff --git a/ydb/core/tx/datashard/remove_schema_snapshots.cpp b/ydb/core/tx/datashard/remove_schema_snapshots.cpp new file mode 100644 index 000000000000..fe63f30be61d --- /dev/null +++ b/ydb/core/tx/datashard/remove_schema_snapshots.cpp @@ -0,0 +1,54 @@ +#include "datashard_impl.h" + +namespace NKikimr::NDataShard { + +class TDataShard::TTxRemoveSchemaSnapshots: public NTabletFlatExecutor::TTransactionBase { +public: + TTxRemoveSchemaSnapshots(TDataShard* self) + : TBase(self) + { } + + TTxType GetTxType() const override { return TXTYPE_REMOVE_SCHEMA_SNAPSHOTS; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + while (!Self->PendingSchemaSnapshotsToGc.empty()) { + const auto key = Self->PendingSchemaSnapshotsToGc.back(); + const auto* snapshot = Self->GetSchemaSnapshotManager().FindSnapshot(key); + + if (!snapshot) { + Self->PendingSchemaSnapshotsToGc.pop_back(); + continue; + } + + if (Self->GetSchemaSnapshotManager().HasReference(key)) { + Self->PendingSchemaSnapshotsToGc.pop_back(); + continue; + } + + auto table = Self->FindUserTable(TPathId(key.OwnerId, key.PathId)); + if (!table) { + Self->PendingSchemaSnapshotsToGc.pop_back(); + continue; + } + + if (snapshot->Schema->GetTableSchemaVersion() >= table->GetTableSchemaVersion()) { + Self->PendingSchemaSnapshotsToGc.pop_back(); + continue; + } + + Self->GetSchemaSnapshotManager().RemoveShapshot(txc.DB, key); + Self->PendingSchemaSnapshotsToGc.pop_back(); + } + + return true; + } + + void Complete(const TActorContext&) override { + } +}; + +void TDataShard::Handle(TEvPrivate::TEvRemoveSchemaSnapshots::TPtr&, const TActorContext& ctx) { + Execute(new TTxRemoveSchemaSnapshots(this), ctx); +} + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index 7bcfa4c2af15..b16535504346 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -189,6 +189,7 @@ SRCS( receive_snapshot_unit.cpp remove_lock_change_records.cpp remove_locks.cpp + remove_schema_snapshots.cpp range_ops.cpp read_iterator.h restore_unit.cpp