Skip to content

Commit

Permalink
Fix stale read of some acknowledged writes after a table split (#2286)
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Feb 29, 2024
1 parent 6db3715 commit ecdcbb4
Show file tree
Hide file tree
Showing 5 changed files with 665 additions and 15 deletions.
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/create_table_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ EExecutionStatus TCreateTableUnit::Execute(TOperation::TPtr op,
txc.DB.NoMoreReadsForTx();
DataShard.SetPersistState(TShardState::Ready, txc);
DataShard.CheckMvccStateChangeCanStart(ctx); // Recheck
DataShard.SendRegistrationRequestTimeCast(ctx);
}

return EExecutionStatus::DelayCompleteNoMoreRestarts;
Expand Down
68 changes: 58 additions & 10 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,23 @@ void TDataShard::SendRegistrationRequestTimeCast(const TActorContext &ctx) {
if (RegistrationSended)
return;

if (!ProcessingParams)
if (!ProcessingParams) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
<< " not sending time cast registration request in state "
<< DatashardStateName(State)
<< ": missing processing params");
return;
}

if (State == TShardState::WaitScheme ||
State == TShardState::SplitDstReceivingSnapshot)
{
// We don't have all the necessary info yet
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID()
<< " not sending time cast registration request in state "
<< DatashardStateName(State));
return;
}

LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Send registration request to time cast "
<< DatashardStateName(State) << " tabletId " << TabletID()
Expand Down Expand Up @@ -2028,6 +2043,13 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const
}
}

LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "GetMvccTxVersion at " << TabletID()
<< " CompleteEdge# " << SnapshotManager.GetCompleteEdge()
<< " IncompleteEdge# " << SnapshotManager.GetIncompleteEdge()
<< " UnprotectedReadEdge# " << SnapshotManager.GetUnprotectedReadEdge()
<< " ImmediateWriteEdge# " << SnapshotManager.GetImmediateWriteEdge()
<< " ImmediateWriteEdgeReplied# " << SnapshotManager.GetImmediateWriteEdgeReplied());

TRowVersion edge;
TRowVersion readEdge = Max(
SnapshotManager.GetCompleteEdge(),
Expand Down Expand Up @@ -2142,6 +2164,8 @@ TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdge
// We need to wait for completion until the flag is committed
res.WaitCompletion = true;
}
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "PromoteImmediatePostExecuteEdges at " << TabletID()
<< " promoting UnprotectedReadEdge to " << version);
SnapshotManager.PromoteUnprotectedReadEdge(version);

// We want to promote the complete edge when protected reads are
Expand Down Expand Up @@ -2304,6 +2328,19 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo
for (auto it = MediatorDelayedReplies.begin(); it != MediatorDelayedReplies.end();) {
const ui64 step = it->first.Step;

if (SrcSplitDescription) {
if (State == TShardState::SplitSrcSendingSnapshot ||
State == TShardState::SplitSrcWaitForPartitioningChanged ||
State == TShardState::PreOffline ||
State == TShardState::Offline)
{
// We cannot send replies, since dst shard is now in charge
// of keeping track of acknowledged writes. So we expect
// split src logic to reboot this shard later.
break;
}
}

if (step <= mediatorStep) {
SnapshotManager.PromoteImmediateWriteEdgeReplied(it->first);
Send(it->second.Target, it->second.Event.Release(), 0, it->second.Cookie);
Expand Down Expand Up @@ -2371,13 +2408,16 @@ void TDataShard::CheckMediatorStateRestored() {
// HEAD reads must include that in their results.
const ui64 waitStep = CoordinatorPrevReadStepMax;
const ui64 readStep = CoordinatorPrevReadStepMax;

LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored: waitStep# " << waitStep << " readStep# " << readStep);
const ui64 observedStep = GetMaxObservedStep();
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored at " << TabletID() << ":"
<< " waitStep# " << waitStep
<< " readStep# " << readStep
<< " observedStep# " << observedStep);

// WARNING: we must perform this check BEFORE we update unprotected read edge
// We may enter this code path multiple times, and we expect that the above
// read step may be refined while we wait based on pessimistic backup step.
if (GetMaxObservedStep() < waitStep) {
if (observedStep < waitStep) {
// We need to wait until we observe mediator step that is at least
// as large as the step we found.
if (MediatorTimeCastWaitingSteps.insert(waitStep).second) {
Expand All @@ -2398,7 +2438,10 @@ void TDataShard::CheckMediatorStateRestored() {
SnapshotManager.GetImmediateWriteEdge().Step > SnapshotManager.GetCompleteEdge().Step
? SnapshotManager.GetImmediateWriteEdge().Prev()
: TRowVersion::Min();
SnapshotManager.PromoteUnprotectedReadEdge(Max(lastReadEdge, preImmediateWriteEdge));
const TRowVersion edge = Max(lastReadEdge, preImmediateWriteEdge);
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored at " << TabletID()
<< " promoting UnprotectedReadEdge to " << edge);
SnapshotManager.PromoteUnprotectedReadEdge(edge);
}

// Promote the replied immediate write edge up to the currently observed step
Expand All @@ -2407,7 +2450,7 @@ void TDataShard::CheckMediatorStateRestored() {
// data that is definitely not replied yet.
if (SnapshotManager.GetImmediateWriteEdgeReplied() < SnapshotManager.GetImmediateWriteEdge()) {
const ui64 writeStep = SnapshotManager.GetImmediateWriteEdge().Step;
const TRowVersion edge(GetMaxObservedStep(), Max<ui64>());
const TRowVersion edge(observedStep, Max<ui64>());
SnapshotManager.PromoteImmediateWriteEdgeReplied(
Min(edge, SnapshotManager.GetImmediateWriteEdge()));
// Try to ensure writes become visible sooner rather than later
Expand Down Expand Up @@ -2544,6 +2587,10 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr,
rejectDescriptions.push_back(TStringBuilder()
<< "is in process of split opId " << DstSplitOpId
<< " state " << DatashardStateName(State));
} else if (State == TShardState::WaitScheme) {
reject = true;
rejectReasons |= ERejectReasons::WrongState;
rejectDescriptions.push_back("is not created yet");
} else if (State == TShardState::PreOffline || State == TShardState::Offline) {
reject = true;
rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::ERROR;
Expand Down Expand Up @@ -2706,6 +2753,11 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc
auto* msg = ev->Get();
LWTRACK(ProposeTransactionRequest, msg->Orbit);

if (CheckDataTxRejectAndReply(ev, ctx)) {
IncCounter(COUNTER_PREPARE_REQUEST);
return;
}

// Check if we need to delay an immediate transaction
if (MediatorStateWaiting &&
(ev->Get()->GetFlags() & TTxFlags::Immediate) &&
Expand Down Expand Up @@ -2738,10 +2790,6 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc

IncCounter(COUNTER_PREPARE_REQUEST);

if (CheckDataTxRejectAndReply(ev, ctx)) {
return;
}

switch (ev->Get()->GetTxKind()) {
case NKikimrTxDataShard::TX_KIND_DATA:
case NKikimrTxDataShard::TX_KIND_SCAN:
Expand Down
39 changes: 34 additions & 5 deletions ydb/core/tx/datashard/datashard_split_dst.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Received snapshot for split/merge TxId " << opId
<< " from tabeltId " << srcTabletId);
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Received snapshot: " << record.DebugString());

if (!Self->DstSplitSchemaInitialized) {
LegacyInitSchema(txc);
Expand Down Expand Up @@ -291,8 +292,9 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa
Self->PromoteFollowerReadEdge(txc);
}

Self->State = TShardState::Ready;
Self->PersistSys(db, Schema::Sys_State, Self->State);
// Note: we persist Ready, but keep current state in memory until Complete
Self->SetPersistState(TShardState::Ready, txc);
Self->State = TShardState::SplitDstReceivingSnapshot;
}

return true;
Expand All @@ -306,9 +308,36 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa

ctx.Send(ackTo, new TEvDataShard::TEvSplitTransferSnapshotAck(opId, Self->TabletID()));

if (LastSnapshotReceived) {
// We have received all the data, reload everything from the received system tables
Self->Execute(Self->CreateTxInit(), ctx);
// Note: we skip init in an unlikely event of state resetting between Execute and Complete
if (LastSnapshotReceived && Self->State == TShardState::SplitDstReceivingSnapshot) {
// We have received all the data, finish shard initialization
// Note: previously we used TxInit, however received system tables
// have been empty for years now, and since pipes are still open we
// may receive requests between TxInit loading the Ready state and
// its Complete method initializing everything properly. Instead
// necessary steps are repeated here.
Self->State = TShardState::Ready;

// We are already in StateWork, but we need to repeat many steps now that we are Ready
Self->SwitchToWork(ctx);

// We can send the registration request now that we are ready
Self->SendRegistrationRequestTimeCast(ctx);

// Initialize snapshot expiration queue with current context time
Self->GetSnapshotManager().InitExpireQueue(ctx.Now());
if (Self->GetSnapshotManager().HasExpiringSnapshots()) {
Self->PlanCleanup(ctx);
}

// Initialize change senders
Self->KillChangeSender(ctx);
Self->CreateChangeSender(ctx);
Self->MaybeActivateChangeSender(ctx);
Self->EmitHeartbeats();

// Switch mvcc state if needed
Self->CheckMvccStateChangeCanStart(ctx);
}
}
};
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/tx/datashard/datashard_split_src.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,15 @@ class TDataShard::TTxSplitPartitioningChanged : public NTabletFlatExecutor::TTra
}
}

if (!Self->MediatorDelayedReplies.empty()) {
// We have some pending mediator replies, which must not be replied.
// Unfortunately we may linger around for a long time, and clients
// would keep awaiting replies for all that time. We have to make
// sure those clients receive an appropriate disconnection error
// instead.
ctx.Send(Self->SelfId(), new TEvents::TEvPoison);
}

// TODO: properly check if there are no loans
Self->CheckStateChange(ctx);
}
Expand Down
Loading

0 comments on commit ecdcbb4

Please sign in to comment.