Skip to content

Commit

Permalink
24-1: Fix a race between mediator state and pipeline restoring active…
Browse files Browse the repository at this point in the history
… transactions (#3378)
  • Loading branch information
snaury authored Apr 8, 2024
1 parent b6a279d commit d14df37
Show file tree
Hide file tree
Showing 4 changed files with 288 additions and 22 deletions.
54 changes: 40 additions & 14 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2318,23 +2318,43 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo
EmitHeartbeats();
}

class TDataShard::TTxMediatorStateRestored : public TTransactionBase<TDataShard> {
public:
TTxMediatorStateRestored(TDataShard* self, ui64 readStep, ui64 observedStep)
: TTransactionBase(self)
, ReadStep(readStep)
, ObservedStep(observedStep)
{}

bool Execute(TTransactionContext& txc, const TActorContext&) override {
Y_ABORT_UNLESS(Self->MediatorStateRestoreTxPending);
Self->MediatorStateRestoreTxPending = false;

Self->FinishMediatorStateRestore(txc, ReadStep, ObservedStep);
return true;
}

void Complete(const TActorContext&) override {
// nothing
}

private:
const ui64 ReadStep;
const ui64 ObservedStep;
};

void TDataShard::CheckMediatorStateRestored() {
if (!MediatorStateWaiting ||
!RegistrationSended ||
!MediatorTimeCastEntry ||
CoordinatorSubscriptionsPending > 0 && CoordinatorPrevReadStepMax == Max<ui64>())
{
// We are not waiting or not ready to make a decision
if (MediatorStateWaiting &&
MediatorTimeCastEntry &&
CoordinatorPrevReadStepMax == Max<ui64>() &&
!MediatorStateBackupInitiated)
{
// It is possible we don't have coordinators with new protocol support
// Use a backup plan of acquiring a read snapshot for restoring the read step
Schedule(TDuration::MilliSeconds(50), new TEvPrivate::TEvMediatorRestoreBackup);
MediatorStateBackupInitiated = true;
}
return;
}

if (MediatorStateRestoreTxPending) {
// We already made a decision and are waiting for transaction to execute
return;
}

Expand Down Expand Up @@ -2372,6 +2392,13 @@ void TDataShard::CheckMediatorStateRestored() {
return;
}

MediatorStateRestoreTxPending = true;
Execute(new TTxMediatorStateRestored(this, readStep, observedStep));
}

void TDataShard::FinishMediatorStateRestore(TTransactionContext& txc, ui64 readStep, ui64 observedStep) {
Y_ABORT_UNLESS(MediatorStateWaiting);

// Using the inferred last read step we restore the pessimistic unprotected
// read edge. Note we only need to do so if there have actually been any
// unprotected reads in this datashard history. We also need to make sure
Expand All @@ -2386,6 +2413,8 @@ void TDataShard::CheckMediatorStateRestored() {
const TRowVersion edge = Max(lastReadEdge, preImmediateWriteEdge);
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored at " << TabletID()
<< " promoting UnprotectedReadEdge to " << edge);
Pipeline.MarkPlannedLogicallyCompleteUpTo(edge, txc);
Pipeline.MarkPlannedLogicallyIncompleteUpTo(edge, txc);
SnapshotManager.PromoteUnprotectedReadEdge(edge);
}

Expand Down Expand Up @@ -3289,10 +3318,7 @@ void TDataShard::Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const
}

void TDataShard::Handle(TEvPrivate::TEvMediatorRestoreBackup::TPtr&, const TActorContext&) {
if (MediatorStateWaiting && CoordinatorPrevReadStepMax == Max<ui64>()) {
// We are still waiting for new protol coordinator state
// TODO: send an old snapshot request to coordinators
}
Y_ABORT("This code path was always no-op and no longer used");
}

bool TDataShard::WaitPlanStep(ui64 step) {
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ class TDataShard
class TTxHandleSafeKqpScan;
class TTxHandleSafeBuildIndexScan;

class TTxMediatorStateRestored;

ITransaction *CreateTxMonitoring(TDataShard *self,
NMon::TEvRemoteHttpInfo::TPtr ev);
ITransaction *CreateTxGetInfo(TDataShard *self,
Expand Down Expand Up @@ -1947,6 +1949,7 @@ class TDataShard
void SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorContext& ctx);

void CheckMediatorStateRestored();
void FinishMediatorStateRestore(TTransactionContext&, ui64, ui64);

void FillExecutionStats(const TExecutionProfile& execProfile, TEvDataShard::TEvProposeTransactionResult& result) const;

Expand Down Expand Up @@ -2613,7 +2616,7 @@ class TDataShard

TVector<THolder<IEventHandle>> MediatorStateWaitingMsgs;
bool MediatorStateWaiting = false;
bool MediatorStateBackupInitiated = false;
bool MediatorStateRestoreTxPending = false;

bool IcbRegistered = false;

Expand Down
30 changes: 23 additions & 7 deletions ydb/core/tx/datashard/datashard_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,24 +401,40 @@ void TPipeline::AddActiveOp(TOperation::TPtr op)
if (Self->IsMvccEnabled()) {
TStepOrder stepOrder = op->GetStepOrder();
TRowVersion version(stepOrder.Step, stepOrder.TxId);
TRowVersion completeEdge = Max(
Self->SnapshotManager.GetCompleteEdge(),
Self->SnapshotManager.GetUnprotectedReadEdge());
if (version <= completeEdge) {
op->SetFlag(TTxFlags::BlockingImmediateOps);
} else if (version <= Self->SnapshotManager.GetIncompleteEdge()) {
op->SetFlag(TTxFlags::BlockingImmediateWrites);
if (version <= Self->SnapshotManager.GetCompleteEdge() ||
version < Self->SnapshotManager.GetImmediateWriteEdge() ||
version < Self->SnapshotManager.GetUnprotectedReadEdge())
{
// This transaction would have been marked as logically complete
if (!op->HasFlag(TTxFlags::BlockingImmediateOps)) {
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Adding BlockingImmediateOps for op " << *op << " at " << Self->TabletID());
op->SetFlag(TTxFlags::BlockingImmediateOps);
}
} else if (version <= Self->SnapshotManager.GetIncompleteEdge() ||
version <= Self->SnapshotManager.GetUnprotectedReadEdge())
{
// This transaction would have been marked as logically incomplete
if (!op->HasFlag(TTxFlags::BlockingImmediateWrites)) {
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Adding BlockingImmediateWrites for op " << *op << " at " << Self->TabletID());
op->SetFlag(TTxFlags::BlockingImmediateWrites);
}
}
}
auto pr = ActivePlannedOps.emplace(op->GetStepOrder(), op);
Y_ABORT_UNLESS(pr.second, "AddActiveOp must never add duplicate transactions");
Y_ABORT_UNLESS(pr.first == std::prev(ActivePlannedOps.end()), "AddActiveOp must always add transactions in order");
bool isComplete = op->HasFlag(TTxFlags::BlockingImmediateOps);
if (ActivePlannedOpsLogicallyCompleteEnd == ActivePlannedOps.end() && !isComplete) {
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Operation " << *op << " is the new logically complete end at " << Self->TabletID());
ActivePlannedOpsLogicallyCompleteEnd = pr.first;
}
bool isIncomplete = isComplete || op->HasFlag(TTxFlags::BlockingImmediateWrites);
if (ActivePlannedOpsLogicallyIncompleteEnd == ActivePlannedOps.end() && !isIncomplete) {
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Operation " << *op << " is the new logically incomplete end at " << Self->TabletID());
ActivePlannedOpsLogicallyIncompleteEnd = pr.first;
}
}
Expand Down
221 changes: 221 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4675,6 +4675,227 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"{ items { uint64_value: 15 } }");
}

Y_UNIT_TEST(PipelineAndMediatorRestoreRace) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetDomainPlanResolution(100)
.SetEnableDataShardVolatileTransactions(false);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;

UNIT_ASSERT_VALUES_EQUAL(
KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table1` (key int, value int, PRIMARY KEY (key));
CREATE TABLE `/Root/table2` (key int, value int, PRIMARY KEY (key));
)"),
"SUCCESS");

const auto shards1 = GetTableShards(server, sender, "/Root/table1");
UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u);

// Upsert initial data
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
UPSERT INTO `/Root/table1` (key, value) VALUES (1, 10);
UPSERT INTO `/Root/table2` (key, value) VALUES (2, 20);
)"),
"<empty>");

// Make sure shards have unprotected reads enabled
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
SELECT key, value FROM `/Root/table1`
UNION ALL
SELECT key, value FROM `/Root/table2`
ORDER BY key;
)"),
"{ items { int32_value: 1 } items { int32_value: 10 } }, "
"{ items { int32_value: 2 } items { int32_value: 20 } }");

std::vector<TEvTxProcessing::TEvReadSet::TPtr> readsets;
auto blockReadSets = runtime.AddObserver<TEvTxProcessing::TEvReadSet>(
[&](TEvTxProcessing::TEvReadSet::TPtr& ev) {
auto* msg = ev->Get();
Cerr << "... blocking readset for " << msg->Record.GetTabletDest() << Endl;
readsets.push_back(std::move(ev));
});

size_t planSteps = 0;
auto observePlanSteps = runtime.AddObserver<TEvTxProcessing::TEvPlanStep>(
[&](TEvTxProcessing::TEvPlanStep::TPtr& ev) {
auto* msg = ev->Get();
Cerr << "... observed plan step " << msg->Record.GetStep() << " for " << msg->Record.GetTabletID() << Endl;
++planSteps;
});

// Create a "staircase" of transactions at different steps

// Upsert1 will have outgoing readsets from both shards
Cerr << "... sending upsert1" << Endl;
auto upsert1 = KqpSimpleSend(runtime, R"(
SELECT key, value FROM `/Root/table1` WHERE key = 1;
SELECT key, value FROM `/Root/table2` WHERE key = 2;
UPSERT INTO `/Root/table1` (key, value) VALUES (3, 30), (5, 50);
UPSERT INTO `/Root/table2` (key, value) VALUES (4, 40);
)");
WaitFor(runtime, [&]{ return planSteps >= 2; }, "upsert1 plan step");
UNIT_ASSERT_VALUES_EQUAL(planSteps, 2u);
WaitFor(runtime, [&]{ return readsets.size() >= 2; }, "upsert1 readsets");
UNIT_ASSERT_VALUES_EQUAL(readsets.size(), 2u);

// Upsert2 will be blocked by dependencies (key 5) at table1, but not table2
Cerr << "... sending upsert2" << Endl;
auto upsert2 = KqpSimpleSend(runtime, R"(
SELECT key, value FROM `/Root/table2` WHERE key = 2;
UPSERT INTO `/Root/table1` (key, value) VALUES (5, 55), (7, 70);
)");
WaitFor(runtime, [&]{ return planSteps >= 4; }, "upsert2 plan step");
UNIT_ASSERT_VALUES_EQUAL(planSteps, 4u);
WaitFor(runtime, [&]{ return readsets.size() >= 3; }, "upsert2 readset from table2");
UNIT_ASSERT_VALUES_EQUAL(readsets.size(), 3u);

// Upsert3 will be blocked by dependencies (key 7) at table1, but not table2
Cerr << "... sending upsert3" << Endl;
auto upsert3 = KqpSimpleSend(runtime, R"(
SELECT key, value FROM `/Root/table2` WHERE key = 2;
UPSERT INTO `/Root/table1` (key, value) VALUES (7, 77), (9, 90);
)");
WaitFor(runtime, [&]{ return planSteps >= 6; }, "upsert3 plan step");
UNIT_ASSERT_VALUES_EQUAL(planSteps, 6u);
WaitFor(runtime, [&]{ return readsets.size() >= 4; }, "upsert3 readset from table2");
UNIT_ASSERT_VALUES_EQUAL(readsets.size(), 4u);

// Sleep a little to make sure everything is persisted at table1 and mediator time advanced
runtime.SimulateSleep(TDuration::MilliSeconds(200));

// Now restart table1 shard while blocking mediator timecast registration
std::vector<TEvMediatorTimecast::TEvRegisterTabletResult::TPtr> registrations;
auto blockRegistrations = runtime.AddObserver<TEvMediatorTimecast::TEvRegisterTabletResult>(
[&](TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev) {
Cerr << "... blocking timecast registration result for " << ev->GetRecipientRewrite() << Endl;
registrations.push_back(std::move(ev));
});

// ... waiting for the new tablet actor booting
TActorId shardActor;
auto waitBoot = runtime.AddObserver<TEvTablet::TEvBoot>(
[&](TEvTablet::TEvBoot::TPtr& ev) {
auto* msg = ev->Get();
if (msg->TabletID == shards1.at(0)) {
shardActor = ev->GetRecipientRewrite();
Cerr << "... booting " << msg->TabletID << " with actor " << shardActor << Endl;
}
});

// ... and blocking progress transactions
size_t allowProgress = 0;
std::vector<TAutoPtr<IEventHandle>> blockedProgress;
auto blockProgress = runtime.AddObserver([&](TAutoPtr<IEventHandle>& ev) {
if (shardActor &&
ev->GetRecipientRewrite() == shardActor &&
ev->GetTypeRewrite() == EventSpaceBegin(TKikimrEvents::ES_PRIVATE) + 0 /* EvProgressTransaction */)
{
if (allowProgress > 0) {
Cerr << "... allowing EvProgressTransaction for " << ev->GetRecipientRewrite() << Endl;
--allowProgress;
} else {
Cerr << "... blocking EvProgressTransaction for " << ev->GetRecipientRewrite() << Endl;
blockedProgress.push_back(std::move(ev));
}
}
});

Cerr << "... rebooting " << shards1.at(0) << Endl;
GracefulRestartTablet(runtime, shards1.at(0), sender);

WaitFor(runtime, [&]{ return registrations.size() >= 1; }, "timecast registration");
UNIT_ASSERT_VALUES_EQUAL(registrations.size(), 1u);

WaitFor(runtime, [&]{ return readsets.size() >= 8; }, "readsets resent");
UNIT_ASSERT_VALUES_EQUAL(readsets.size(), 8u);

// We need to unblock two transactions
// The first is already marked incomplete
// The second will be added to the pipeline, but blocked by dependencies
for (int i = 0; i < 2; ++i) {
WaitFor(runtime, [&]{ return blockedProgress.size() >= 1; }, "blocked progress");
UNIT_ASSERT_VALUES_EQUAL(blockedProgress.size(), 1);

Cerr << "... unblocking a single progress tx" << Endl;
allowProgress += blockedProgress.size();
for (auto& ev : blockedProgress) {
runtime.Send(ev.Release(), 0, true);
}
blockedProgress.clear();
}

WaitFor(runtime, [&]{ return blockedProgress.size() >= 1; }, "blocked progress");
UNIT_ASSERT_VALUES_EQUAL(blockedProgress.size(), 1);

runtime.SimulateSleep(TDuration::MilliSeconds(1));

Cerr << "... unblocking timecast registration" << Endl;
blockRegistrations.Remove();
for (auto& ev : registrations) {
runtime.Send(ev.Release(), 0, true);
}

runtime.SimulateSleep(TDuration::MilliSeconds(200));

// Unblock the final transaction
// It's going to be before the restored worst-case unprotected read edge
// However because it's no the complete/incomplete tail it will not update lists properly
Cerr << "... unblocking final progress tx" << Endl;
blockProgress.Remove();
for (auto& ev : blockedProgress) {
runtime.Send(ev.Release(), 0, true);
}
blockedProgress.clear();

runtime.SimulateSleep(TDuration::MilliSeconds(1));

// Perform snapshot read that will try to mark all pending transactions as logically complete/incomplete
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
SELECT key, value FROM `/Root/table1` WHERE key = 1
UNION ALL
SELECT key, value FROM `/Root/table2` WHERE key = 2
ORDER BY key;
)"),
"{ items { int32_value: 1 } items { int32_value: 10 } }, "
"{ items { int32_value: 2 } items { int32_value: 20 } }");

Cerr << "... unblocking readsets" << Endl;
blockReadSets.Remove();
for (auto& ev : readsets) {
runtime.Send(ev.Release(), 0, true);
}
readsets.clear();

UNIT_ASSERT_VALUES_EQUAL(
FormatResult(AwaitResponse(runtime, std::move(upsert1))),
"{ items { int32_value: 1 } items { int32_value: 10 } }\n"
"{ items { int32_value: 2 } items { int32_value: 20 } }");

UNIT_ASSERT_VALUES_EQUAL(
FormatResult(AwaitResponse(runtime, std::move(upsert2))),
"{ items { int32_value: 2 } items { int32_value: 20 } }");

UNIT_ASSERT_VALUES_EQUAL(
FormatResult(AwaitResponse(runtime, std::move(upsert3))),
"{ items { int32_value: 2 } items { int32_value: 20 } }");
}

}

} // namespace NKikimr

0 comments on commit d14df37

Please sign in to comment.