diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index ce98a47b6599..12bc7e28e7cb 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -2318,6 +2318,31 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo EmitHeartbeats(); } +class TDataShard::TTxMediatorStateRestored : public TTransactionBase { +public: + TTxMediatorStateRestored(TDataShard* self, ui64 readStep, ui64 observedStep) + : TTransactionBase(self) + , ReadStep(readStep) + , ObservedStep(observedStep) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + Y_ABORT_UNLESS(Self->MediatorStateRestoreTxPending); + Self->MediatorStateRestoreTxPending = false; + + Self->FinishMediatorStateRestore(txc, ReadStep, ObservedStep); + return true; + } + + void Complete(const TActorContext&) override { + // nothing + } + +private: + const ui64 ReadStep; + const ui64 ObservedStep; +}; + void TDataShard::CheckMediatorStateRestored() { if (!MediatorStateWaiting || !RegistrationSended || @@ -2325,16 +2350,11 @@ void TDataShard::CheckMediatorStateRestored() { CoordinatorSubscriptionsPending > 0 && CoordinatorPrevReadStepMax == Max()) { // We are not waiting or not ready to make a decision - if (MediatorStateWaiting && - MediatorTimeCastEntry && - CoordinatorPrevReadStepMax == Max() && - !MediatorStateBackupInitiated) - { - // It is possible we don't have coordinators with new protocol support - // Use a backup plan of acquiring a read snapshot for restoring the read step - Schedule(TDuration::MilliSeconds(50), new TEvPrivate::TEvMediatorRestoreBackup); - MediatorStateBackupInitiated = true; - } + return; + } + + if (MediatorStateRestoreTxPending) { + // We already made a decision and are waiting for transaction to execute return; } @@ -2372,6 +2392,13 @@ void TDataShard::CheckMediatorStateRestored() { return; } + MediatorStateRestoreTxPending = true; + Execute(new TTxMediatorStateRestored(this, readStep, observedStep)); +} + +void TDataShard::FinishMediatorStateRestore(TTransactionContext& txc, ui64 readStep, ui64 observedStep) { + Y_ABORT_UNLESS(MediatorStateWaiting); + // Using the inferred last read step we restore the pessimistic unprotected // read edge. Note we only need to do so if there have actually been any // unprotected reads in this datashard history. We also need to make sure @@ -2386,6 +2413,8 @@ void TDataShard::CheckMediatorStateRestored() { const TRowVersion edge = Max(lastReadEdge, preImmediateWriteEdge); LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored at " << TabletID() << " promoting UnprotectedReadEdge to " << edge); + Pipeline.MarkPlannedLogicallyCompleteUpTo(edge, txc); + Pipeline.MarkPlannedLogicallyIncompleteUpTo(edge, txc); SnapshotManager.PromoteUnprotectedReadEdge(edge); } @@ -3289,10 +3318,7 @@ void TDataShard::Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const } void TDataShard::Handle(TEvPrivate::TEvMediatorRestoreBackup::TPtr&, const TActorContext&) { - if (MediatorStateWaiting && CoordinatorPrevReadStepMax == Max()) { - // We are still waiting for new protol coordinator state - // TODO: send an old snapshot request to coordinators - } + Y_ABORT("This code path was always no-op and no longer used"); } bool TDataShard::WaitPlanStep(ui64 step) { diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 7720260dce60..b277cfe1ef43 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -247,6 +247,8 @@ class TDataShard class TTxHandleSafeKqpScan; class TTxHandleSafeBuildIndexScan; + class TTxMediatorStateRestored; + ITransaction *CreateTxMonitoring(TDataShard *self, NMon::TEvRemoteHttpInfo::TPtr ev); ITransaction *CreateTxGetInfo(TDataShard *self, @@ -1947,6 +1949,7 @@ class TDataShard void SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorContext& ctx); void CheckMediatorStateRestored(); + void FinishMediatorStateRestore(TTransactionContext&, ui64, ui64); void FillExecutionStats(const TExecutionProfile& execProfile, TEvDataShard::TEvProposeTransactionResult& result) const; @@ -2613,7 +2616,7 @@ class TDataShard TVector> MediatorStateWaitingMsgs; bool MediatorStateWaiting = false; - bool MediatorStateBackupInitiated = false; + bool MediatorStateRestoreTxPending = false; bool IcbRegistered = false; diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index f993241ab68f..15cfdc4c88f7 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -401,13 +401,25 @@ void TPipeline::AddActiveOp(TOperation::TPtr op) if (Self->IsMvccEnabled()) { TStepOrder stepOrder = op->GetStepOrder(); TRowVersion version(stepOrder.Step, stepOrder.TxId); - TRowVersion completeEdge = Max( - Self->SnapshotManager.GetCompleteEdge(), - Self->SnapshotManager.GetUnprotectedReadEdge()); - if (version <= completeEdge) { - op->SetFlag(TTxFlags::BlockingImmediateOps); - } else if (version <= Self->SnapshotManager.GetIncompleteEdge()) { - op->SetFlag(TTxFlags::BlockingImmediateWrites); + if (version <= Self->SnapshotManager.GetCompleteEdge() || + version < Self->SnapshotManager.GetImmediateWriteEdge() || + version < Self->SnapshotManager.GetUnprotectedReadEdge()) + { + // This transaction would have been marked as logically complete + if (!op->HasFlag(TTxFlags::BlockingImmediateOps)) { + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, + "Adding BlockingImmediateOps for op " << *op << " at " << Self->TabletID()); + op->SetFlag(TTxFlags::BlockingImmediateOps); + } + } else if (version <= Self->SnapshotManager.GetIncompleteEdge() || + version <= Self->SnapshotManager.GetUnprotectedReadEdge()) + { + // This transaction would have been marked as logically incomplete + if (!op->HasFlag(TTxFlags::BlockingImmediateWrites)) { + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, + "Adding BlockingImmediateWrites for op " << *op << " at " << Self->TabletID()); + op->SetFlag(TTxFlags::BlockingImmediateWrites); + } } } auto pr = ActivePlannedOps.emplace(op->GetStepOrder(), op); @@ -415,10 +427,14 @@ void TPipeline::AddActiveOp(TOperation::TPtr op) Y_ABORT_UNLESS(pr.first == std::prev(ActivePlannedOps.end()), "AddActiveOp must always add transactions in order"); bool isComplete = op->HasFlag(TTxFlags::BlockingImmediateOps); if (ActivePlannedOpsLogicallyCompleteEnd == ActivePlannedOps.end() && !isComplete) { + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, + "Operation " << *op << " is the new logically complete end at " << Self->TabletID()); ActivePlannedOpsLogicallyCompleteEnd = pr.first; } bool isIncomplete = isComplete || op->HasFlag(TTxFlags::BlockingImmediateWrites); if (ActivePlannedOpsLogicallyIncompleteEnd == ActivePlannedOps.end() && !isIncomplete) { + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, + "Operation " << *op << " is the new logically incomplete end at " << Self->TabletID()); ActivePlannedOpsLogicallyIncompleteEnd = pr.first; } } diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index 51da5d32d2ff..ea3c67b171e7 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -4675,6 +4675,227 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { "{ items { uint64_value: 15 } }"); } + Y_UNIT_TEST(PipelineAndMediatorRestoreRace) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(100) + .SetEnableDataShardVolatileTransactions(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + + UNIT_ASSERT_VALUES_EQUAL( + KqpSchemeExec(runtime, R"( + CREATE TABLE `/Root/table1` (key int, value int, PRIMARY KEY (key)); + CREATE TABLE `/Root/table2` (key int, value int, PRIMARY KEY (key)); + )"), + "SUCCESS"); + + const auto shards1 = GetTableShards(server, sender, "/Root/table1"); + UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u); + + // Upsert initial data + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + UPSERT INTO `/Root/table1` (key, value) VALUES (1, 10); + UPSERT INTO `/Root/table2` (key, value) VALUES (2, 20); + )"), + ""); + + // Make sure shards have unprotected reads enabled + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/table1` + UNION ALL + SELECT key, value FROM `/Root/table2` + ORDER BY key; + )"), + "{ items { int32_value: 1 } items { int32_value: 10 } }, " + "{ items { int32_value: 2 } items { int32_value: 20 } }"); + + std::vector readsets; + auto blockReadSets = runtime.AddObserver( + [&](TEvTxProcessing::TEvReadSet::TPtr& ev) { + auto* msg = ev->Get(); + Cerr << "... blocking readset for " << msg->Record.GetTabletDest() << Endl; + readsets.push_back(std::move(ev)); + }); + + size_t planSteps = 0; + auto observePlanSteps = runtime.AddObserver( + [&](TEvTxProcessing::TEvPlanStep::TPtr& ev) { + auto* msg = ev->Get(); + Cerr << "... observed plan step " << msg->Record.GetStep() << " for " << msg->Record.GetTabletID() << Endl; + ++planSteps; + }); + + // Create a "staircase" of transactions at different steps + + // Upsert1 will have outgoing readsets from both shards + Cerr << "... sending upsert1" << Endl; + auto upsert1 = KqpSimpleSend(runtime, R"( + SELECT key, value FROM `/Root/table1` WHERE key = 1; + SELECT key, value FROM `/Root/table2` WHERE key = 2; + UPSERT INTO `/Root/table1` (key, value) VALUES (3, 30), (5, 50); + UPSERT INTO `/Root/table2` (key, value) VALUES (4, 40); + )"); + WaitFor(runtime, [&]{ return planSteps >= 2; }, "upsert1 plan step"); + UNIT_ASSERT_VALUES_EQUAL(planSteps, 2u); + WaitFor(runtime, [&]{ return readsets.size() >= 2; }, "upsert1 readsets"); + UNIT_ASSERT_VALUES_EQUAL(readsets.size(), 2u); + + // Upsert2 will be blocked by dependencies (key 5) at table1, but not table2 + Cerr << "... sending upsert2" << Endl; + auto upsert2 = KqpSimpleSend(runtime, R"( + SELECT key, value FROM `/Root/table2` WHERE key = 2; + UPSERT INTO `/Root/table1` (key, value) VALUES (5, 55), (7, 70); + )"); + WaitFor(runtime, [&]{ return planSteps >= 4; }, "upsert2 plan step"); + UNIT_ASSERT_VALUES_EQUAL(planSteps, 4u); + WaitFor(runtime, [&]{ return readsets.size() >= 3; }, "upsert2 readset from table2"); + UNIT_ASSERT_VALUES_EQUAL(readsets.size(), 3u); + + // Upsert3 will be blocked by dependencies (key 7) at table1, but not table2 + Cerr << "... sending upsert3" << Endl; + auto upsert3 = KqpSimpleSend(runtime, R"( + SELECT key, value FROM `/Root/table2` WHERE key = 2; + UPSERT INTO `/Root/table1` (key, value) VALUES (7, 77), (9, 90); + )"); + WaitFor(runtime, [&]{ return planSteps >= 6; }, "upsert3 plan step"); + UNIT_ASSERT_VALUES_EQUAL(planSteps, 6u); + WaitFor(runtime, [&]{ return readsets.size() >= 4; }, "upsert3 readset from table2"); + UNIT_ASSERT_VALUES_EQUAL(readsets.size(), 4u); + + // Sleep a little to make sure everything is persisted at table1 and mediator time advanced + runtime.SimulateSleep(TDuration::MilliSeconds(200)); + + // Now restart table1 shard while blocking mediator timecast registration + std::vector registrations; + auto blockRegistrations = runtime.AddObserver( + [&](TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev) { + Cerr << "... blocking timecast registration result for " << ev->GetRecipientRewrite() << Endl; + registrations.push_back(std::move(ev)); + }); + + // ... waiting for the new tablet actor booting + TActorId shardActor; + auto waitBoot = runtime.AddObserver( + [&](TEvTablet::TEvBoot::TPtr& ev) { + auto* msg = ev->Get(); + if (msg->TabletID == shards1.at(0)) { + shardActor = ev->GetRecipientRewrite(); + Cerr << "... booting " << msg->TabletID << " with actor " << shardActor << Endl; + } + }); + + // ... and blocking progress transactions + size_t allowProgress = 0; + std::vector> blockedProgress; + auto blockProgress = runtime.AddObserver([&](TAutoPtr& ev) { + if (shardActor && + ev->GetRecipientRewrite() == shardActor && + ev->GetTypeRewrite() == EventSpaceBegin(TKikimrEvents::ES_PRIVATE) + 0 /* EvProgressTransaction */) + { + if (allowProgress > 0) { + Cerr << "... allowing EvProgressTransaction for " << ev->GetRecipientRewrite() << Endl; + --allowProgress; + } else { + Cerr << "... blocking EvProgressTransaction for " << ev->GetRecipientRewrite() << Endl; + blockedProgress.push_back(std::move(ev)); + } + } + }); + + Cerr << "... rebooting " << shards1.at(0) << Endl; + GracefulRestartTablet(runtime, shards1.at(0), sender); + + WaitFor(runtime, [&]{ return registrations.size() >= 1; }, "timecast registration"); + UNIT_ASSERT_VALUES_EQUAL(registrations.size(), 1u); + + WaitFor(runtime, [&]{ return readsets.size() >= 8; }, "readsets resent"); + UNIT_ASSERT_VALUES_EQUAL(readsets.size(), 8u); + + // We need to unblock two transactions + // The first is already marked incomplete + // The second will be added to the pipeline, but blocked by dependencies + for (int i = 0; i < 2; ++i) { + WaitFor(runtime, [&]{ return blockedProgress.size() >= 1; }, "blocked progress"); + UNIT_ASSERT_VALUES_EQUAL(blockedProgress.size(), 1); + + Cerr << "... unblocking a single progress tx" << Endl; + allowProgress += blockedProgress.size(); + for (auto& ev : blockedProgress) { + runtime.Send(ev.Release(), 0, true); + } + blockedProgress.clear(); + } + + WaitFor(runtime, [&]{ return blockedProgress.size() >= 1; }, "blocked progress"); + UNIT_ASSERT_VALUES_EQUAL(blockedProgress.size(), 1); + + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + + Cerr << "... unblocking timecast registration" << Endl; + blockRegistrations.Remove(); + for (auto& ev : registrations) { + runtime.Send(ev.Release(), 0, true); + } + + runtime.SimulateSleep(TDuration::MilliSeconds(200)); + + // Unblock the final transaction + // It's going to be before the restored worst-case unprotected read edge + // However because it's no the complete/incomplete tail it will not update lists properly + Cerr << "... unblocking final progress tx" << Endl; + blockProgress.Remove(); + for (auto& ev : blockedProgress) { + runtime.Send(ev.Release(), 0, true); + } + blockedProgress.clear(); + + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + + // Perform snapshot read that will try to mark all pending transactions as logically complete/incomplete + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/table1` WHERE key = 1 + UNION ALL + SELECT key, value FROM `/Root/table2` WHERE key = 2 + ORDER BY key; + )"), + "{ items { int32_value: 1 } items { int32_value: 10 } }, " + "{ items { int32_value: 2 } items { int32_value: 20 } }"); + + Cerr << "... unblocking readsets" << Endl; + blockReadSets.Remove(); + for (auto& ev : readsets) { + runtime.Send(ev.Release(), 0, true); + } + readsets.clear(); + + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(upsert1))), + "{ items { int32_value: 1 } items { int32_value: 10 } }\n" + "{ items { int32_value: 2 } items { int32_value: 20 } }"); + + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(upsert2))), + "{ items { int32_value: 2 } items { int32_value: 20 } }"); + + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(upsert3))), + "{ items { int32_value: 2 } items { int32_value: 20 } }"); + } + } } // namespace NKikimr