diff --git a/ydb/core/tx/datashard/create_table_unit.cpp b/ydb/core/tx/datashard/create_table_unit.cpp index 3f1a08d4dfc9..61d1cfc97c92 100644 --- a/ydb/core/tx/datashard/create_table_unit.cpp +++ b/ydb/core/tx/datashard/create_table_unit.cpp @@ -86,6 +86,7 @@ EExecutionStatus TCreateTableUnit::Execute(TOperation::TPtr op, txc.DB.NoMoreReadsForTx(); DataShard.SetPersistState(TShardState::Ready, txc); DataShard.CheckMvccStateChangeCanStart(ctx); // Recheck + DataShard.SendRegistrationRequestTimeCast(ctx); } return EExecutionStatus::DelayCompleteNoMoreRestarts; diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 3dd2e46db213..336ec09904ae 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -399,8 +399,23 @@ void TDataShard::SendRegistrationRequestTimeCast(const TActorContext &ctx) { if (RegistrationSended) return; - if (!ProcessingParams) + if (!ProcessingParams) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() + << " not sending time cast registration request in state " + << DatashardStateName(State) + << ": missing processing params"); return; + } + + if (State == TShardState::WaitScheme || + State == TShardState::SplitDstReceivingSnapshot) + { + // We don't have all the necessary info yet + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() + << " not sending time cast registration request in state " + << DatashardStateName(State)); + return; + } LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Send registration request to time cast " << DatashardStateName(State) << " tabletId " << TabletID() @@ -2028,6 +2043,13 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const } } + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "GetMvccTxVersion at " << TabletID() + << " CompleteEdge# " << SnapshotManager.GetCompleteEdge() + << " IncompleteEdge# " << SnapshotManager.GetIncompleteEdge() + << " UnprotectedReadEdge# " << SnapshotManager.GetUnprotectedReadEdge() + << " ImmediateWriteEdge# " << SnapshotManager.GetImmediateWriteEdge() + << " ImmediateWriteEdgeReplied# " << SnapshotManager.GetImmediateWriteEdgeReplied()); + TRowVersion edge; TRowVersion readEdge = Max( SnapshotManager.GetCompleteEdge(), @@ -2142,6 +2164,8 @@ TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdge // We need to wait for completion until the flag is committed res.WaitCompletion = true; } + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "PromoteImmediatePostExecuteEdges at " << TabletID() + << " promoting UnprotectedReadEdge to " << version); SnapshotManager.PromoteUnprotectedReadEdge(version); // We want to promote the complete edge when protected reads are @@ -2304,6 +2328,19 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo for (auto it = MediatorDelayedReplies.begin(); it != MediatorDelayedReplies.end();) { const ui64 step = it->first.Step; + if (SrcSplitDescription) { + if (State == TShardState::SplitSrcSendingSnapshot || + State == TShardState::SplitSrcWaitForPartitioningChanged || + State == TShardState::PreOffline || + State == TShardState::Offline) + { + // We cannot send replies, since dst shard is now in charge + // of keeping track of acknowledged writes. So we expect + // split src logic to reboot this shard later. + break; + } + } + if (step <= mediatorStep) { SnapshotManager.PromoteImmediateWriteEdgeReplied(it->first); Send(it->second.Target, it->second.Event.Release(), 0, it->second.Cookie); @@ -2371,13 +2408,16 @@ void TDataShard::CheckMediatorStateRestored() { // HEAD reads must include that in their results. const ui64 waitStep = CoordinatorPrevReadStepMax; const ui64 readStep = CoordinatorPrevReadStepMax; - - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored: waitStep# " << waitStep << " readStep# " << readStep); + const ui64 observedStep = GetMaxObservedStep(); + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored at " << TabletID() << ":" + << " waitStep# " << waitStep + << " readStep# " << readStep + << " observedStep# " << observedStep); // WARNING: we must perform this check BEFORE we update unprotected read edge // We may enter this code path multiple times, and we expect that the above // read step may be refined while we wait based on pessimistic backup step. - if (GetMaxObservedStep() < waitStep) { + if (observedStep < waitStep) { // We need to wait until we observe mediator step that is at least // as large as the step we found. if (MediatorTimeCastWaitingSteps.insert(waitStep).second) { @@ -2398,7 +2438,10 @@ void TDataShard::CheckMediatorStateRestored() { SnapshotManager.GetImmediateWriteEdge().Step > SnapshotManager.GetCompleteEdge().Step ? SnapshotManager.GetImmediateWriteEdge().Prev() : TRowVersion::Min(); - SnapshotManager.PromoteUnprotectedReadEdge(Max(lastReadEdge, preImmediateWriteEdge)); + const TRowVersion edge = Max(lastReadEdge, preImmediateWriteEdge); + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored at " << TabletID() + << " promoting UnprotectedReadEdge to " << edge); + SnapshotManager.PromoteUnprotectedReadEdge(edge); } // Promote the replied immediate write edge up to the currently observed step @@ -2407,7 +2450,7 @@ void TDataShard::CheckMediatorStateRestored() { // data that is definitely not replied yet. if (SnapshotManager.GetImmediateWriteEdgeReplied() < SnapshotManager.GetImmediateWriteEdge()) { const ui64 writeStep = SnapshotManager.GetImmediateWriteEdge().Step; - const TRowVersion edge(GetMaxObservedStep(), Max()); + const TRowVersion edge(observedStep, Max()); SnapshotManager.PromoteImmediateWriteEdgeReplied( Min(edge, SnapshotManager.GetImmediateWriteEdge())); // Try to ensure writes become visible sooner rather than later @@ -2544,6 +2587,10 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr, rejectDescriptions.push_back(TStringBuilder() << "is in process of split opId " << DstSplitOpId << " state " << DatashardStateName(State)); + } else if (State == TShardState::WaitScheme) { + reject = true; + rejectReasons |= ERejectReasons::WrongState; + rejectDescriptions.push_back("is not created yet"); } else if (State == TShardState::PreOffline || State == TShardState::Offline) { reject = true; rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::ERROR; @@ -2706,6 +2753,11 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc auto* msg = ev->Get(); LWTRACK(ProposeTransactionRequest, msg->Orbit); + if (CheckDataTxRejectAndReply(ev, ctx)) { + IncCounter(COUNTER_PREPARE_REQUEST); + return; + } + // Check if we need to delay an immediate transaction if (MediatorStateWaiting && (ev->Get()->GetFlags() & TTxFlags::Immediate) && @@ -2738,10 +2790,6 @@ void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TAc IncCounter(COUNTER_PREPARE_REQUEST); - if (CheckDataTxRejectAndReply(ev, ctx)) { - return; - } - switch (ev->Get()->GetTxKind()) { case NKikimrTxDataShard::TX_KIND_DATA: case NKikimrTxDataShard::TX_KIND_SCAN: diff --git a/ydb/core/tx/datashard/datashard_split_dst.cpp b/ydb/core/tx/datashard/datashard_split_dst.cpp index 12d152c188e5..d78ca11ccf2c 100644 --- a/ydb/core/tx/datashard/datashard_split_dst.cpp +++ b/ydb/core/tx/datashard/datashard_split_dst.cpp @@ -175,6 +175,7 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Received snapshot for split/merge TxId " << opId << " from tabeltId " << srcTabletId); + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Received snapshot: " << record.DebugString()); if (!Self->DstSplitSchemaInitialized) { LegacyInitSchema(txc); @@ -291,8 +292,9 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa Self->PromoteFollowerReadEdge(txc); } - Self->State = TShardState::Ready; - Self->PersistSys(db, Schema::Sys_State, Self->State); + // Note: we persist Ready, but keep current state in memory until Complete + Self->SetPersistState(TShardState::Ready, txc); + Self->State = TShardState::SplitDstReceivingSnapshot; } return true; @@ -306,9 +308,36 @@ class TDataShard::TTxSplitTransferSnapshot : public NTabletFlatExecutor::TTransa ctx.Send(ackTo, new TEvDataShard::TEvSplitTransferSnapshotAck(opId, Self->TabletID())); - if (LastSnapshotReceived) { - // We have received all the data, reload everything from the received system tables - Self->Execute(Self->CreateTxInit(), ctx); + // Note: we skip init in an unlikely event of state resetting between Execute and Complete + if (LastSnapshotReceived && Self->State == TShardState::SplitDstReceivingSnapshot) { + // We have received all the data, finish shard initialization + // Note: previously we used TxInit, however received system tables + // have been empty for years now, and since pipes are still open we + // may receive requests between TxInit loading the Ready state and + // its Complete method initializing everything properly. Instead + // necessary steps are repeated here. + Self->State = TShardState::Ready; + + // We are already in StateWork, but we need to repeat many steps now that we are Ready + Self->SwitchToWork(ctx); + + // We can send the registration request now that we are ready + Self->SendRegistrationRequestTimeCast(ctx); + + // Initialize snapshot expiration queue with current context time + Self->GetSnapshotManager().InitExpireQueue(ctx.Now()); + if (Self->GetSnapshotManager().HasExpiringSnapshots()) { + Self->PlanCleanup(ctx); + } + + // Initialize change senders + Self->KillChangeSender(ctx); + Self->CreateChangeSender(ctx); + Self->MaybeActivateChangeSender(ctx); + Self->EmitHeartbeats(); + + // Switch mvcc state if needed + Self->CheckMvccStateChangeCanStart(ctx); } } }; diff --git a/ydb/core/tx/datashard/datashard_split_src.cpp b/ydb/core/tx/datashard/datashard_split_src.cpp index c3a4bf8df9ef..6777b666f4b1 100644 --- a/ydb/core/tx/datashard/datashard_split_src.cpp +++ b/ydb/core/tx/datashard/datashard_split_src.cpp @@ -529,6 +529,15 @@ class TDataShard::TTxSplitPartitioningChanged : public NTabletFlatExecutor::TTra } } + if (!Self->MediatorDelayedReplies.empty()) { + // We have some pending mediator replies, which must not be replied. + // Unfortunately we may linger around for a long time, and clients + // would keep awaiting replies for all that time. We have to make + // sure those clients receive an appropriate disconnection error + // instead. + ctx.Send(Self->SelfId(), new TEvents::TEvPoison); + } + // TODO: properly check if there are no loans Self->CheckStateChange(ctx); } diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index 44e4bf7d555d..486c88502e0e 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -3988,6 +3988,569 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { "{ items { uint32_value: 4 } items { uint32_value: 40 } }"); } + /** + * This observer forces newly created nodes to start on particular nodes + */ + class TCreateTabletNodePinning { + public: + TCreateTabletNodePinning(TTestActorRuntime& runtime) + : Runtime(runtime) + , Node1(Runtime.GetNodeId(0)) + , Observer(Runtime.AddObserver([this](auto& ev) { this->Process(ev); })) + {} + + void SetNodeIndexes(std::initializer_list nodeIndexes) { + AllowedNodes.clear(); + for (ui32 nodeIndex : nodeIndexes) { + AllowedNodes.push_back(Node1 + nodeIndex); + } + } + + void Remove() { + Observer.Remove(); + } + + private: + void Process(TEvHive::TEvCreateTablet::TPtr& ev) { + auto* msg = ev->Get(); + msg->Record.ClearAllowedNodeIDs(); + for (ui32 nodeId : AllowedNodes) { + msg->Record.AddAllowedNodeIDs(nodeId); + } + } + + private: + TTestActorRuntime& Runtime; + const ui32 Node1; + std::vector AllowedNodes; + TTestActorRuntime::TEventObserverHolder Observer; + }; + + /** + * This observer allows blocking timecast updates at different nodes + */ + class TMediatorTimeCastBlocking { + public: + TMediatorTimeCastBlocking(TTestActorRuntime& runtime) + : Runtime(runtime) + , Node1(Runtime.GetNodeId(0)) + , Observer(Runtime.AddObserver([this](auto& ev) { this->Process(ev); })) + {} + + void SetMaxAllowedStep(ui32 nodeIndex, ui64 step) { + while (Nodes.size() <= nodeIndex) { + Nodes.emplace_back(); + } + Nodes[nodeIndex].MaxAllowedStep = step; + } + + void Unblock(ui32 nodeIndex) { + while (Nodes.size() <= nodeIndex) { + Nodes.emplace_back(); + } + auto& state = Nodes[nodeIndex]; + state.MaxAllowedStep = Max(); + for (auto& kv : state.Buckets) { + if (kv.second.LastBlocked) { + Runtime.Send(kv.second.LastBlocked.release(), nodeIndex, true); + } + } + } + + ui64 WaitNextStep() { + ui64 step = LastStep; + WaitFor(Runtime, [&]{ return LastStep > step; }, "next timecast step", 10); + return LastStep; + } + + void Remove() { + Observer.Remove(); + } + + private: + void Process(TEvMediatorTimecast::TEvUpdate::TPtr& ev) { + auto* msg = ev->Get(); + ui32 nodeId = ev->GetRecipientRewrite().NodeId(); + ui32 nodeIndex = nodeId - Node1; + auto bucket = msg->Record.GetBucket(); + auto step = msg->Record.GetTimeBarrier(); + Cerr << "... observed step " << step << " at node " << nodeId << " bucket " << bucket << Endl; + LastStep = Max(LastStep, step); + while (Nodes.size() <= nodeIndex) { + Nodes.emplace_back(); + } + auto& state = Nodes[nodeIndex]; + auto& bucketState = state.Buckets[bucket]; + ui64 prevObserved = bucketState.LastObservedStep; + bucketState.LastObservedStep = step; + bucketState.LastBlocked.reset(); + if (step > state.MaxAllowedStep) { + Cerr << "... blocked step " << step << " at node " << nodeId << " bucket " << bucket << Endl; + // Generate an blocked event we would resend when on unblock + auto* newMsg = new TEvMediatorTimecast::TEvUpdate; + newMsg->Record = msg->Record; + bucketState.LastBlocked.reset(new IEventHandle( + ev->Recipient, + ev->Sender, + newMsg, + ev->Flags, + ev->Cookie)); + bucketState.LastBlocked->Rewrite(ev->GetTypeRewrite(), ev->GetRecipientRewrite()); + // Modify the original event to send the max allowed step when necessary + if (prevObserved < state.MaxAllowedStep) { + Cerr << "... synthetic step " << state.MaxAllowedStep << " at node " << nodeId << " bucket " << bucket << Endl; + msg->Record.SetTimeBarrier(state.MaxAllowedStep); + } else { + ev.Reset(); + } + return; + } + } + + private: + struct TBucketState { + ui64 LastObservedStep = 0; + std::unique_ptr LastBlocked; + }; + + struct TNodeState { + ui64 MaxAllowedStep = Max(); + std::map Buckets; + }; + + public: + ui64 LastStep = 0; + + private: + TTestActorRuntime& Runtime; + const ui32 Node1; + std::vector Nodes; + TTestActorRuntime::TEventObserverHolder Observer; + }; + + /** + * Observer for blocking split at src tablets + */ + class TSplitSrcBlocking { + public: + TSplitSrcBlocking(TTestActorRuntime& runtime) + : Runtime(runtime) + , Node1(Runtime.GetNodeId(0)) + , Observer(Runtime.AddObserver([this](auto& ev) { this->Process(ev); })) + {} + + void Unblock() { + Observer.Remove(); + for (auto& ev : Blocked) { + ui32 nodeIndex = ev->GetRecipientRewrite().NodeId() - Node1; + Runtime.Send(ev.release(), nodeIndex, true); + } + Blocked.clear(); + } + + private: + void Process(TEvDataShard::TEvSplit::TPtr& ev) { + Cerr << "... blocking TEvSplit" << Endl; + Blocked.emplace_back(ev.Release()); + } + + private: + TTestActorRuntime& Runtime; + const ui32 Node1; + std::vector> Blocked; + TTestActorRuntime::TEventObserverHolder Observer; + }; + + Y_UNIT_TEST(RepeatableReadAfterSplitRace) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetNodeCount(2) + .SetUseRealThreads(false) + .SetDomainPlanResolution(100); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TCreateTabletNodePinning createTabletNodePinning(runtime); + + // Create table-1 at node 1 + createTabletNodePinning.SetNodeIndexes({ 0 }); + CreateShardedTable(server, sender, "/Root", "table-1", 1); + auto shards1 = GetTableShards(server, sender, "/Root/table-1"); + + // Create table-2 at node 2 + createTabletNodePinning.SetNodeIndexes({ 1 }); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + // Insert initial values + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20);")); + + TSplitSrcBlocking splitSrcBlocking(runtime); + TMediatorTimeCastBlocking mediatorTimeCastBlocking(runtime); + + // We need to make a snapshot read to force unprotected reads + // This will also ensure both nodes have an updated mediator time cast + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + UNION ALL + SELECT key, value FROM `/Root/table-2` + ORDER BY key; + )")), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 20 } }"); + + // One more upsert to table-2, this will bump mediator time at node 2 past the snapshot + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (3, 30);")); + + // We won't allow node 2 to go past that write version + mediatorTimeCastBlocking.SetMaxAllowedStep(1, mediatorTimeCastBlocking.LastStep); + + // Start split of table-1 at key 10 + auto splitSender = runtime.AllocateEdgeActor(); + SetSplitMergePartCountLimit(server->GetRuntime(), -1); + Cerr << "... starting split of table-1" << Endl; + ui64 splitTxId = AsyncSplitTable(server, splitSender, "/Root/table-1", shards1.at(0), 10); + + // We want to mediator time to advance, so the next snapshot is greater than the last write version + mediatorTimeCastBlocking.WaitNextStep(); + + // Start a snapshot read from table-1 + // This will run at node 1 shard where mediator time is recent + Cerr << "... reading from table-1" << Endl; + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key; + )")), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }"); + + // Now unblock TEvSplit and wait until it finishes + splitSrcBlocking.Unblock(); + + // Wait until split finishes, so we can continue working with the new shards + Cerr << "... waiting for split to finish" << Endl; + WaitTxNotification(server, splitSender, splitTxId); + + // Start an upsert into newly split table-1 + TString upsertSessionId = CreateSessionRPC(runtime); + auto upsertFuture = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (4, 40); + )"), upsertSessionId, "", /* commit */ true)); + + // We cannot wait for it to finish (bug may be fixed) + // So we sleep for several seconds instead + Cerr << "... sleeping for 2 seconds" << Endl; + runtime.SimulateSleep(TDuration::Seconds(2)); + + // Unblock mediator timecast at node 2 + mediatorTimeCastBlocking.Unblock(1); + + // Wait until upsert finishes + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(upsertFuture))), + ""); + + // Repeat read in a previous tx, this read must be repeatable + // In other words we must not observe the new 4/40 row + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleContinue(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + UNION ALL + SELECT key, value FROM `/Root/table-2` + ORDER BY key; + )")), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 20 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 30 } }"); + } + + Y_UNIT_TEST(DelayedWriteReadableAfterSplit) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetNodeCount(2) + .SetUseRealThreads(false) + .SetDomainPlanResolution(100); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TCreateTabletNodePinning createTabletNodePinning(runtime); + + // Create table-1 at node 1 + createTabletNodePinning.SetNodeIndexes({ 0 }); + CreateShardedTable(server, sender, "/Root", "table-1", 1); + auto shards1 = GetTableShards(server, sender, "/Root/table-1"); + + // Create table-2 at node 2 + createTabletNodePinning.SetNodeIndexes({ 1 }); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + // Insert initial values + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20);")); + + TSplitSrcBlocking splitSrcBlocking(runtime); + TMediatorTimeCastBlocking mediatorTimeCastBlocking(runtime); + + // We need to make a snapshot read to force unprotected reads + // This will also ensure both nodes have an updated mediator time cast + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + UNION ALL + SELECT key, value FROM `/Root/table-2` + ORDER BY key; + )")), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 20 } }"); + + // One more upsert to table-2, this will bump mediator time past the snapshot + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (3, 30);")); + + // Wait for the next step, we will expect to read at that step + ui64 step = mediatorTimeCastBlocking.WaitNextStep(); + Cerr << "... expecting next read at step " << step << Endl; + + // We won't allow node 2 to go past that snapshot + mediatorTimeCastBlocking.SetMaxAllowedStep(1, step); + + // Start a snapshot read from table-1 + // This will run at node 1 shard where mediator time is recent + Cerr << "... reading from table-1" << Endl; + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key; + )")), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }"); + + // Start split of table-1 at key 10 + auto splitSender = runtime.AllocateEdgeActor(); + SetSplitMergePartCountLimit(server->GetRuntime(), -1); + Cerr << "... starting split of table-1" << Endl; + ui64 splitTxId = AsyncSplitTable(server, splitSender, "/Root/table-1", shards1.at(0), 10); + + // Perform an immediate write, which will happen after the above snapshot + // We also wait for the result to make sure mediator time advances at node 1 + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (4, 40); + )")), + ""); + + // Unblock split at src + splitSrcBlocking.Unblock(); + + // Wait until split finishes, so we can continue working with the new shards + Cerr << "... waiting for split to finish" << Endl; + WaitTxNotification(server, splitSender, splitTxId); + + // Start an immediate read from the new left shard of table-1 + TString readSessionId = CreateSessionRPC(runtime); + Cerr << "... starting immediate read from table-1" << Endl; + auto readFuture = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key <= 5 + ORDER BY key; + )"), readSessionId, "", /* commit */ true)); + + // We cannot wait for it to finish (bug may be fixed) + // So we sleep for several seconds instead + Cerr << "... sleeping for 2 seconds" << Endl; + runtime.SimulateSleep(TDuration::Seconds(2)); + + // Unblock mediator timecast at node 2 + mediatorTimeCastBlocking.Unblock(1); + + // Wait until read finishes, we must observe previously inserted row + Cerr << "... waiting for table-1 read to finish" << Endl; + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(readFuture))), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 4 } items { uint32_value: 40 } }"); + + // Repeat read in a previous tx, this read must be repeatable + // In other words we must not observe the new 4/40 row + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleContinue(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + UNION ALL + SELECT key, value FROM `/Root/table-2` + ORDER BY key; + )")), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 20 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 30 } }"); + } + + Y_UNIT_TEST(DelayedWriteReplyAfterSplit) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetNodeCount(2) + .SetUseRealThreads(false) + .SetDomainPlanResolution(100); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + TCreateTabletNodePinning createTabletNodePinning(runtime); + + // Create table-1 at node 1 + createTabletNodePinning.SetNodeIndexes({ 0 }); + CreateShardedTable(server, sender, "/Root", "table-1", 1); + auto shards1 = GetTableShards(server, sender, "/Root/table-1"); + + // Create table-2 at node 2 + createTabletNodePinning.SetNodeIndexes({ 1 }); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + // Insert initial values + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20);")); + + TSplitSrcBlocking splitSrcBlocking(runtime); + TMediatorTimeCastBlocking mediatorTimeCastBlocking(runtime); + + // We need to make a snapshot read to force unprotected reads + // This will also ensure both nodes have an updated mediator time cast + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + UNION ALL + SELECT key, value FROM `/Root/table-2` + ORDER BY key; + )")), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 20 } }"); + + // One more upsert to table-2, this will bump mediator time past the snapshot + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (3, 30);")); + + // Wait for the next step, we will expect to read at that step + ui64 step = mediatorTimeCastBlocking.WaitNextStep(); + Cerr << "... expecting next read at step " << step << Endl; + + // We won't allow neither node 1 nor node 2 to go past that snapshot + mediatorTimeCastBlocking.SetMaxAllowedStep(0, step); + mediatorTimeCastBlocking.SetMaxAllowedStep(1, step); + + // Start a snapshot read from table-1 + // This will run at node 1 shard + Cerr << "... reading from table-1" << Endl; + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key; + )")), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }"); + + // Start split of table-1 at key 10 + auto splitSender = runtime.AllocateEdgeActor(); + SetSplitMergePartCountLimit(server->GetRuntime(), -1); + Cerr << "... starting split of table-1" << Endl; + ui64 splitTxId = AsyncSplitTable(server, splitSender, "/Root/table-1", shards1.at(0), 10); + + // Start upserting a row into table-1, since mediator time is blocked + // this will not be able to reply until mediator time advances. + TString upsertSessionId = CreateSessionRPC(runtime); + Cerr << "... starting upsert into table-1" << Endl; + auto upsertFuture = SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (4, 40); + )"), upsertSessionId, "", /* commit */ true)); + + // Sleep a little, so the above upsert is fully executed at future step and reply is enqueued + runtime.SimulateSleep(TDuration::MicroSeconds(100)); + + // We expect mediator time to be at the last step + UNIT_ASSERT_VALUES_EQUAL(mediatorTimeCastBlocking.LastStep, step); + + // Now unblock the split and wait until it finishes + splitSrcBlocking.Unblock(); + Cerr << "... waiting for split to finish" << Endl; + WaitTxNotification(server, splitSender, splitTxId); + + // We expect mediator time to still be at the last step + UNIT_ASSERT_VALUES_EQUAL(mediatorTimeCastBlocking.LastStep, step); + + // The new shard should be ready to take requests, validate by an immediate read + // The new row must not be visible, since mediator time is still in the past, + // and the write could not have sent a successful reply yet. + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key <= 5 + ORDER BY key; + )")), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }"); + + // Now that we confirmed the new shard is readable, allow mediator time to advance at node 1 + mediatorTimeCastBlocking.Unblock(0); + + // Sleep for a couple of seconds and check whether upsert has replied + runtime.SimulateSleep(TDuration::Seconds(2)); + + // When upsert replies, only errors are allowed, otherwise new reads + // at node 2 would keep reading stale values. + if (upsertFuture.HasValue() || upsertFuture.HasException()) { + Cerr << "... upsert finished before unblocking node 2" << Endl; + auto upsertResult = FormatResult(upsertFuture.ExtractValueSync()); + if (upsertResult == "") { + // It was successful, double check that node 2 can see the result + // For obvious reasons it cannot (due to blocked mediator time) + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key <= 5 + ORDER BY key; + )")), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 4 } items { uint32_value: 40 } }, "); + } else { + // Otherwise we must get an UNDETERMINED error + // The upsert hasn't happened yet, but will happen later + UNIT_ASSERT_VALUES_EQUAL( + upsertResult, + "ERROR: UNDETERMINED"); + } + } else { + // Try to be future proof, in case we implement waiting with dst shards + Cerr << "... upsert did not finish before unblocking node 2" << Endl; + } + } + } } // namespace NKikimr