diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 7da50e1cff67..5fdec1361b14 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -2174,6 +2174,12 @@ TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdge << " promoting UnprotectedReadEdge to " << version); SnapshotManager.PromoteUnprotectedReadEdge(version); + // Make sure pending distributed transactions are marked incomplete, + // since we just protected up to and including version from writes, + // we need to make sure new immediate conflicting writes are blocked + // and don't perform writes with out-of-order versions. + res.HadWrites |= Pipeline.MarkPlannedLogicallyIncompleteUpTo(version, txc); + // We want to promote the complete edge when protected reads are // used or when we're already writing something anyway. if (res.HadWrites) { diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 1fd915755040..e59adc95af56 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -1074,8 +1074,7 @@ const NHPTimer::STime TReader::MaxCyclesPerIteration = class TDataShard::TReadOperation : public TOperation, public IReadOperation { TDataShard* Self; - TActorId Sender; - std::shared_ptr Request; + TReadIteratorId ReadId; NMiniKQL::IEngineFlat::TValidationInfo ValidationInfo; @@ -1091,11 +1090,10 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { static constexpr ui32 Flags = NTxDataShard::TTxFlags::ReadOnly | NTxDataShard::TTxFlags::Immediate; public: - TReadOperation(TDataShard* ds, TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvRead::TPtr ev) + TReadOperation(TDataShard* ds, TInstant receivedAt, ui64 tieBreakerIndex, const TReadIteratorId& readId) : TOperation(TBasicOpInfo(EOperationKind::ReadTx, Flags, 0, receivedAt, tieBreakerIndex)) , Self(ds) - , Sender(ev->Sender) - , Request(ev->Release().Release()) + , ReadId(readId) {} void BuildExecutionPlan(bool loaded) override @@ -1117,14 +1115,13 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { } EExecutionStatus Execute(TTransactionContext& txc, const TActorContext& ctx) override { - TReadIteratorId readId(Sender, Request->Record.GetReadId()); - auto it = Self->ReadIterators.find(readId); - if (it == Self->ReadIterators.end()) { + auto readIt = Self->ReadIterators.find(ReadId); + if (readIt == Self->ReadIterators.end()) { // iterator has been aborted return EExecutionStatus::DelayComplete; } - Y_ABORT_UNLESS(it->second); - auto& state = *it->second; + Y_ABORT_UNLESS(readIt->second); + auto& state = *readIt->second; if (Result->Record.HasStatus()) { // error happened on check phase @@ -1133,9 +1130,11 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { Y_ABORT_UNLESS(state.State == TReadIteratorState::EState::Executing); + auto* request = state.Request; + ++ExecuteCount; LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Execute read# " << ExecuteCount - << ", request: " << Request->Record); + << ", request: " << request->Record); switch (Self->State) { case TShardState::Ready: @@ -1248,8 +1247,8 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { } } - state.LockId = state.Request->Record.GetLockTxId(); - state.LockNodeId = state.Request->Record.GetLockNodeId(); + state.LockId = request->Record.GetLockTxId(); + state.LockNodeId = request->Record.GetLockNodeId(); TDataShardLocksDb locksDb(*Self, txc); TSetupSysLocks guardLocks(state.LockId, state.LockNodeId, *Self, &locksDb); @@ -1309,22 +1308,37 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { state.IsHeadRead = false; if (!Self->IsFollower()) { + TRowVersion unreadableEdge = Self->Pipeline.GetUnreadableEdge(); + if (state.ReadVersion >= unreadableEdge) { + // This version is unreadable in repeatable read mode at the moment, we have to wait + // We actually have to completely destroy current state and start from scratch + LWTRACK(ReadWaitSnapshot, request->Orbit, state.ReadVersion.Step, state.ReadVersion.TxId); + Self->Pipeline.AddWaitingReadIterator(state.ReadVersion, std::move(state.Ev), ctx); + Self->DeleteReadIterator(readIt); + + // Make sure we rollback everything (on a slim chance there are any changes) + if (txc.DB.HasChanges()) { + txc.DB.RollbackChanges(); + } + + // This unit will remove current operation from the pipeline when we return + Abort(EExecutionUnitKind::CompletedOperations); + + return EExecutionStatus::Executed; + } + // Switch to repeatable read at the same version SetMvccSnapshot(state.ReadVersion, /* isRepeatable */ true); - TStepOrder order(state.ReadVersion.Step, state.ReadVersion.TxId); - const auto& plannedOps = Self->Pipeline.GetActivePlannedOps(); - auto it = plannedOps.lower_bound(order); - if (it != plannedOps.end() && it->first == order) { - if (!it->second->IsReadOnly()) { - // we need to wait this op - AddDependency(it->second); + // We may have had repeatable read conflicts, promote them + PromoteRepeatableReadConflicts(); - // Make sure current incomplete result will not be sent - Result = MakeEvReadResult(ctx.SelfID.NodeId()); + // Having runtime conflicts now means we have to wait and restart + if (HasRuntimeConflicts()) { + // Make sure current incomplete result will not be sent + Result = MakeEvReadResult(ctx.SelfID.NodeId()); - return EExecutionStatus::Continue; - } + return EExecutionStatus::Continue; } } else { auto [followerEdge, followerRepeatable] = Self->GetSnapshotManager().GetFollowerReadEdge(); @@ -1378,8 +1392,7 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { } void CheckRequestAndInit(TTransactionContext& txc, const TActorContext& ctx) override { - TReadIteratorId readId(Sender, Request->Record.GetReadId()); - auto it = Self->ReadIterators.find(readId); + auto it = Self->ReadIterators.find(ReadId); if (it == Self->ReadIterators.end()) { // iterator has been aborted return; @@ -1390,7 +1403,8 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { Result = MakeEvReadResult(ctx.SelfID.NodeId()); - const auto& record = Request->Record; + auto* request = state.Request; + const auto& record = request->Record; if (record.HasMaxRows()) state.Quota.Rows = record.GetMaxRows(); @@ -1409,7 +1423,7 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { state.Reverse = record.GetReverse(); if (state.Reverse) { - state.FirstUnprocessedQuery = Request->Keys.size() + Request->Ranges.size() - 1; + state.FirstUnprocessedQuery = request->Keys.size() + request->Ranges.size() - 1; } // Note: some checks already performed in TTxReadViaPipeline::Execute @@ -1459,10 +1473,10 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { } // Make ranges in the new 'any' form compatible with the old '+inf' form - for (size_t i = 0; i < Request->Ranges.size(); ++i) { - auto& range = Request->Ranges[i]; + for (size_t i = 0; i < request->Ranges.size(); ++i) { + auto& range = request->Ranges[i]; auto& keyFrom = range.From; - auto& keyTo = Request->Ranges[i].To; + auto& keyTo = request->Ranges[i].To; if (range.FromInclusive && keyFrom.GetCells().size() != TableInfo.KeyColumnCount) { keyFrom = ExtendWithNulls(keyFrom, TableInfo.KeyColumnCount); @@ -1474,16 +1488,16 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { } // Make prefixes in the new 'any' form compatible with the old '+inf' form - for (size_t i = 0; i < Request->Keys.size(); ++i) { - const auto& key = Request->Keys[i]; + for (size_t i = 0; i < request->Keys.size(); ++i) { + const auto& key = request->Keys[i]; if (key.GetCells().size() == TableInfo.KeyColumnCount) continue; - if (state.Keys.size() != Request->Keys.size()) { - state.Keys.resize(Request->Keys.size()); + if (state.Keys.size() != request->Keys.size()) { + state.Keys.resize(request->Keys.size()); } - // we can safely use cells referencing original Request->Keys[x], + // we can safely use cells referencing original request->Keys[x], // because request will live until the end state.Keys[i] = ExtendWithNulls(key, TableInfo.KeyColumnCount); } @@ -1503,7 +1517,6 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { state.Columns.push_back(col); } - state.Request = Request; state.State = TReadIteratorState::EState::Executing; Y_ASSERT(Result); @@ -1521,29 +1534,29 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { return; ResultSent = true; - TReadIteratorId readId(Sender, Request->Record.GetReadId()); - auto it = Self->ReadIterators.find(readId); + auto it = Self->ReadIterators.find(ReadId); if (it == Self->ReadIterators.end()) { // the one who removed the iterator should have replied to user - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << ReadId << " has been invalidated before TReadOperation::SendResult()"); return; } Y_ABORT_UNLESS(it->second); auto& state = *it->second; + auto* request = state.Request; if (!Result) { - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << ReadId << " TReadOperation::Execute() finished without Result, aborting"); Result = MakeEvReadResult(ctx.SelfID.NodeId()); SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, TStringBuilder() << "Iterator aborted" << " (shard# " << Self->TabletID() << " node# " << ctx.SelfID.NodeId() << " state# " << DatashardStateName(Self->State) << ")"); - Result->Record.SetReadId(readId.ReadId); - Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId); + Result->Record.SetReadId(ReadId.ReadId); + Self->SendImmediateReadResult(ReadId.Sender, Result.release(), 0, state.SessionId); - Request->ReadSpan.EndError("Iterator aborted"); + request->ReadSpan.EndError("Iterator aborted"); Self->DeleteReadIterator(it); return; } @@ -1557,13 +1570,13 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { // error happened and status set auto& record = Result->Record; if (record.HasStatus()) { - record.SetReadId(readId.ReadId); + record.SetReadId(ReadId.ReadId); record.SetSeqNo(state.SeqNo + 1); - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << ReadId << " TReadOperation::Execute() finished with error, aborting: " << record.DebugString()); - Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId); + Self->SendImmediateReadResult(ReadId.Sender, Result.release(), 0, state.SessionId); - Request->ReadSpan.EndError("Finished with error"); + request->ReadSpan.EndError("Finished with error"); Self->DeleteReadIterator(it); return; } @@ -1571,7 +1584,7 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { Y_ASSERT(Reader); Y_ASSERT(BlockBuilder); - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << ReadId << " sends rowCount# " << Reader->GetRowsRead() << ", bytes# " << Reader->GetBytesRead() << ", quota rows left# " << (state.Quota.Rows - Reader->GetRowsRead()) << ", quota bytes left# " << (state.Quota.Bytes - Reader->GetBytesRead()) @@ -1587,29 +1600,29 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { if (!gSkipReadIteratorResultFailPoint.Check(Self->TabletID())) { LWTRACK(ReadSendResult, state.Orbit); - Self->SendImmediateReadResult(Sender, Result.release(), 0, state.SessionId); + Self->SendImmediateReadResult(ReadId.Sender, Result.release(), 0, state.SessionId); } } void Complete(const TActorContext& ctx) override { - TReadIteratorId readId(Sender, Request->Record.GetReadId()); - auto it = Self->ReadIterators.find(readId); + auto it = Self->ReadIterators.find(ReadId); if (it == Self->ReadIterators.end()) { // the one who removed the iterator should have reply to user - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << ReadId << " has been invalidated before TReadOperation::Complete()"); return; } auto& state = *it->second; + auto* request = state.Request; - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Complete read# " << state.ReadId + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Complete read# " << ReadId << " after executionsCount# " << ExecuteCount); SendResult(ctx); - it = Self->ReadIterators.find(readId); + it = Self->ReadIterators.find(ReadId); if (it == Self->ReadIterators.end()) { - // We sent an error and delete iterator + // We sent an error and deleted iterator return; } @@ -1621,17 +1634,17 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { if (!state.IsExhausted()) { ctx.Send( Self->SelfId(), - new TEvDataShard::TEvReadContinue(Sender, Request->Record.GetReadId())); + new TEvDataShard::TEvReadContinue(ReadId.Sender, ReadId.ReadId)); } else { Self->IncCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() - << " read iterator# " << readId << " exhausted"); + << " read iterator# " << ReadId << " exhausted"); } } else { - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << ReadId << " finished in read"); - Request->ReadSpan.EndOk(); + request->ReadSpan.EndOk(); Self->DeleteReadIterator(it); } } @@ -1874,7 +1887,6 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { }; class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionBase { - TEvDataShard::TEvRead::TPtr Ev; TReadIteratorId ReadId; // When we need to reply with an error @@ -1885,10 +1897,9 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB bool WaitComplete = false; public: - TTxReadViaPipeline(TDataShard* ds, TEvDataShard::TEvRead::TPtr ev, NWilson::TTraceId &&traceId) + TTxReadViaPipeline(TDataShard* ds, const TReadIteratorId& readId, NWilson::TTraceId &&traceId) : TBase(ds, std::move(traceId)) - , Ev(std::move(ev)) - , ReadId(Ev->Sender, Ev->Get()->Record.GetReadId()) + , ReadId(readId) {} TTxType GetTxType() const override { return TXTYPE_READ; } @@ -1920,22 +1931,23 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB // iterator already aborted return true; } + auto& state = *readIt->second; ReplyError( Ydb::StatusIds::INTERNAL_ERROR, TStringBuilder() << "Failed to sync follower: " << errMessage << " (shard# " << Self->TabletID() << " node# " << ctx.SelfID.NodeId() << " state# " << DatashardStateName(Self->State) << ")", ctx.SelfID.NodeId(), - Ev->Get()->ReadSpan); + state.Request->ReadSpan); return true; } } - if (Ev) { + if (!Op) { // We must perform some initialization in transaction (e.g. after a follower sync), but before the operation is built Y_ABORT_UNLESS(readIt != Self->ReadIterators.end()); Y_ABORT_UNLESS(readIt->second); auto& state = *readIt->second; - auto* request = Ev->Get(); + auto* request = state.Request; const auto& record = request->Record; NWilson::TSpan &readSpan = request->ReadSpan; @@ -2043,7 +2055,7 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB TRowVersion unreadableEdge = Self->Pipeline.GetUnreadableEdge(); if (state.ReadVersion >= unreadableEdge) { LWTRACK(ReadWaitSnapshot, request->Orbit, state.ReadVersion.Step, state.ReadVersion.TxId); - Self->Pipeline.AddWaitingReadIterator(state.ReadVersion, std::move(Ev), ctx); + Self->Pipeline.AddWaitingReadIterator(state.ReadVersion, std::move(state.Ev), ctx); Self->DeleteReadIterator(readIt); return true; } @@ -2119,7 +2131,7 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB } const ui64 tieBreaker = Self->NextTieBreakerIndex++; - Op = new TReadOperation(Self, ctx.Now(), tieBreaker, Ev); + Op = new TReadOperation(Self, ctx.Now(), tieBreaker, ReadId); Op->BuildExecutionPlan(false); Self->Pipeline.GetExecutionUnit(Op->GetCurrentUnit()).AddOperation(Op); @@ -2133,7 +2145,6 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB Op->SetUsingSnapshotFlag(); } - Ev = nullptr; Op->IncrementInProgress(); } @@ -2238,7 +2249,7 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB }; class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase { - TEvDataShard::TEvReadContinue::TPtr Ev; + TReadIteratorId ReadId; std::unique_ptr Result; std::unique_ptr BlockBuilder; @@ -2247,9 +2258,9 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase bool DelayedResult = false; public: - TTxReadContinue(TDataShard* ds, TEvDataShard::TEvReadContinue::TPtr ev, NWilson::TTraceId &&traceId) + TTxReadContinue(TDataShard* ds, const TReadIteratorId& readId, NWilson::TTraceId &&traceId) : TBase(ds, std::move(traceId)) - , Ev(ev) + , ReadId(readId) {} // note that intentionally the same as TEvRead @@ -2260,20 +2271,18 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase // 1. Since TTxReadContinue scheduled, shard was ready. // 2. If shards changes the state, it must cancel iterators and we will // not find our readId ReadIterators. - TReadIteratorId readId(Ev->Get()->Reader, Ev->Get()->ReadId); - auto it = Self->ReadIterators.find(readId); + auto it = Self->ReadIterators.find(ReadId); if (it == Self->ReadIterators.end()) { // read has been aborted - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for reader# " - << Ev->Get()->Reader << ", readId# " << Ev->Get()->ReadId << " didn't found state"); + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for iterator# " << ReadId + << " didn't find state"); return true; } Y_ASSERT(it->second); auto& state = *it->second; - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for reader# " - << Ev->Get()->Reader << ", readId# " << Ev->Get()->ReadId + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for iterator# " << ReadId << ", firstUnprocessedQuery# " << state.FirstUnprocessedQuery); Result = MakeEvReadResult(ctx.SelfID.NodeId()); @@ -2368,7 +2377,7 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase Y_ASSERT(Result); LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() - << " ReadContinue: reader# " << Ev->Get()->Reader << ", readId# " << Ev->Get()->ReadId + << " ReadContinue: iterator# " << ReadId << ", FirstUnprocessedQuery# " << state.FirstUnprocessedQuery); TDataShardLocksDb locksDb(*Self, txc); @@ -2388,7 +2397,7 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase Self->WaitVolatileDependenciesThenSend( Reader->GetVolatileReadDependencies(), Self->SelfId(), - std::make_unique(Ev->Get()->Reader, Ev->Get()->ReadId)); + std::make_unique(ReadId.Sender, ReadId.ReadId)); return true; } @@ -2411,9 +2420,7 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase } void ApplyLocks(const TActorContext& ctx) { - const auto* request = Ev->Get(); - TReadIteratorId readId(request->Reader, request->ReadId); - auto it = Self->ReadIterators.find(readId); + auto it = Self->ReadIterators.find(ReadId); Y_ABORT_UNLESS(it != Self->ReadIterators.end()); Y_ABORT_UNLESS(it->second); auto& state = *it->second; @@ -2449,7 +2456,7 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase addLock->SetSchemeShard(state.PathId.OwnerId); addLock->SetPathId(state.PathId.LocalPathId); - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << ReadId << " TTxReadContinue::Execute() found broken lock# " << state.Lock->GetLockId()); // A broken write lock means we are reading inconsistent results and must abort @@ -2468,21 +2475,19 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase } void SendResult(const TActorContext& ctx) { - auto* request = Ev->Get(); - TReadIteratorId readId(request->Reader, request->ReadId); - auto it = Self->ReadIterators.find(readId); + auto it = Self->ReadIterators.find(ReadId); Y_ABORT_UNLESS(it != Self->ReadIterators.end()); Y_ABORT_UNLESS(it->second); auto& state = *it->second; if (!Result) { - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << ReadId << " TTxReadContinue::Execute() finished without Result, aborting"); Result = MakeEvReadResult(ctx.SelfID.NodeId()); SetStatusError(Result->Record, Ydb::StatusIds::ABORTED, "Iterator aborted"); - Result->Record.SetReadId(readId.ReadId); - Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); + Result->Record.SetReadId(ReadId.ReadId); + Self->SendImmediateReadResult(ReadId.Sender, Result.release(), 0, state.SessionId); state.Request->ReadSpan.EndError("Iterator aborted"); Self->DeleteReadIterator(it); @@ -2493,10 +2498,10 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase auto& record = Result->Record; if (record.HasStatus()) { record.SetSeqNo(state.SeqNo + 1); - record.SetReadId(readId.ReadId); - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + record.SetReadId(ReadId.ReadId); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << ReadId << " TTxReadContinue::Execute() finished with error, aborting: " << record.DebugString()); - Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); + Self->SendImmediateReadResult(ReadId.Sender, Result.release(), 0, state.SessionId); state.Request->ReadSpan.EndError("Finished with error"); Self->DeleteReadIterator(it); @@ -2506,7 +2511,7 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase Y_ASSERT(Reader); Y_ASSERT(BlockBuilder); - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " readContinue iterator# " << readId + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " readContinue iterator# " << ReadId << " sends rowCount# " << Reader->GetRowsRead() << ", bytes# " << Reader->GetBytesRead() << ", quota rows left# " << (state.Quota.Rows - Reader->GetRowsRead()) << ", quota bytes left# " << (state.Quota.Bytes - Reader->GetBytesRead()) @@ -2518,7 +2523,7 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase bool useful = Reader->FillResult(*Result, state); if (useful) { LWTRACK(ReadSendResult, state.Orbit); - Self->SendImmediateReadResult(request->Reader, Result.release(), 0, state.SessionId); + Self->SendImmediateReadResult(ReadId.Sender, Result.release(), 0, state.SessionId); } if (Reader->HasUnreadQueries()) { @@ -2528,14 +2533,14 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase if (!state.IsExhausted()) { ctx.Send( Self->SelfId(), - new TEvDataShard::TEvReadContinue(request->Reader, request->ReadId)); + new TEvDataShard::TEvReadContinue(ReadId.Sender, ReadId.ReadId)); } else { Self->IncCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() - << " read iterator# " << readId << " exhausted"); + << " read iterator# " << ReadId << " exhausted"); } } else { - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << readId + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << ReadId << " finished in ReadContinue"); state.Request->ReadSpan.EndOk(); @@ -2698,16 +2703,21 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct sessionId = ev->InterconnectSession; } - ReadIterators.emplace( + auto pr = ReadIterators.emplace( readId, new TReadIteratorState( readId, TPathId(record.GetTableId().GetOwnerId(), record.GetTableId().GetTableId()), sessionId, readVersion, isHeadRead, AppData()->MonotonicTimeProvider->Now(), std::move(request->Orbit))); + Y_ABORT_UNLESS(pr.second); + + auto& state = *pr.first->second; + state.Ev = std::move(ev); + state.Request = request; SetCounter(COUNTER_READ_ITERATORS_COUNT, ReadIterators.size()); - Executor()->Execute(new TTxReadViaPipeline(this, ev, request->ReadSpan.GetTraceId()), ctx); + Executor()->Execute(new TTxReadViaPipeline(this, readId, request->ReadSpan.GetTraceId()), ctx); } void TDataShard::Handle(TEvDataShard::TEvReadContinue::TPtr& ev, const TActorContext& ctx) { @@ -2716,8 +2726,8 @@ void TDataShard::Handle(TEvDataShard::TEvReadContinue::TPtr& ev, const TActorCon if (Y_UNLIKELY(it == ReadIterators.end())) { return; } - - Executor()->Execute(new TTxReadContinue(this, ev, it->second->Request->ReadSpan.GetTraceId()), ctx); + + Executor()->Execute(new TTxReadContinue(this, readId, it->second->Request->ReadSpan.GetTraceId()), ctx); } void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext& ctx) { @@ -2835,9 +2845,7 @@ void TDataShard::Handle(TEvDataShard::TEvReadCancel::TPtr& ev, const TActorConte LWTRACK(ReadCancel, state->Orbit); - if (state->Request) { - state->Request->ReadSpan.EndError("Cancelled"); - } + state->Request->ReadSpan.EndError("Cancelled"); DeleteReadIterator(it); } @@ -2860,6 +2868,7 @@ void TDataShard::CancelReadIterators(Ydb::StatusIds::StatusCode code, const TStr result->Record.SetSeqNo(state->SeqNo + 1); SendViaSession(state->SessionId, readIteratorId.Sender, SelfId(), result.release()); + state->Request->ReadSpan.EndError("Cancelled"); } ReadIterators.clear(); @@ -2913,6 +2922,7 @@ void TDataShard::ReadIteratorsOnNodeDisconnected(const TActorId& sessionId, cons ++exhaustedCount; } + state->Request->ReadSpan.EndError("Disconnected"); ReadIterators.erase(it); } diff --git a/ydb/core/tx/datashard/datashard_dep_tracker.cpp b/ydb/core/tx/datashard/datashard_dep_tracker.cpp index a5874bdf6a3b..cbc79414ca93 100644 --- a/ydb/core/tx/datashard/datashard_dep_tracker.cpp +++ b/ydb/core/tx/datashard/datashard_dep_tracker.cpp @@ -46,8 +46,8 @@ namespace { return a.GetStep() < b.Step || (a.GetStep() == b.Step && a.GetTxId() < b.TxId); } - bool IsLessEqual(const TOperation& a, const TRowVersion& b) { - return a.GetStep() < b.Step || (a.GetStep() == b.Step && a.GetTxId() <= b.TxId); + bool IsEqual(const TOperation& a, const TRowVersion& b) { + return a.GetStep() == b.Step && a.GetTxId() == b.TxId; } } @@ -799,8 +799,10 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera Y_ABORT_UNLESS(!conflict.IsImmediate()); if (snapshot.IsMax()) { conflict.AddImmediateConflict(op); - } else if (snapshotRepeatable ? IsLessEqual(conflict, snapshot) : IsLess(conflict, snapshot)) { + } else if (IsLess(conflict, snapshot)) { op->AddDependency(&conflict); + } else if (IsEqual(conflict, snapshot)) { + op->AddRepeatableReadConflict(&conflict); } }; diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 77dfdd0b5c7e..efd05dfd96f9 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -39,6 +39,7 @@ TPipeline::~TPipeline() pr.second->ClearSpecialDependencies(); pr.second->ClearPlannedConflicts(); pr.second->ClearImmediateConflicts(); + pr.second->ClearRepeatableReadConflicts(); } } @@ -487,6 +488,7 @@ void TPipeline::UnblockNormalDependencies(const TOperation::TPtr &op) op->ClearDependencies(); op->ClearPlannedConflicts(); op->ClearImmediateConflicts(); + op->ClearRepeatableReadConflicts(); DepTracker.RemoveOperation(op); } diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h index de0e2ef7a24d..1ca7c1396ebe 100644 --- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h +++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h @@ -208,9 +208,13 @@ namespace NKqpHelpers { return FormatResult(result); } - inline TString KqpSimpleCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) { + inline auto KqpSimpleSendCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) { Y_ABORT_UNLESS(!txId.empty(), "commit on empty transaction"); - auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */))); + return SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */)); + } + + inline TString KqpSimpleCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) { + auto response = AwaitResponse(runtime, KqpSimpleSendCommit(runtime, sessionId, txId, query)); if (response.operation().status() != Ydb::StatusIds::SUCCESS) { return TStringBuilder() << "ERROR: " << response.operation().status(); } diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index 95e02823a185..30da853306db 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -4247,4 +4247,401 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorPageFaults) { } } +Y_UNIT_TEST_SUITE(DataShardReadIteratorConsistency) { + + Y_UNIT_TEST(LocalSnapshotReadWithPlanQueueRace) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + TServer::TPtr server = new TServer(serverSettings); + + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + + auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + auto shardActor = ResolveTablet(runtime, shards.at(0)); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10), (3, 30), (5, 50), (7, 70), (9, 90);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20), (4, 40), (6, 60), (8, 80);"); + + std::vector reads; + auto captureReads = runtime.AddObserver([&](TEvDataShard::TEvRead::TPtr& ev) { + if (ev->GetRecipientRewrite() == shardActor) { + Cerr << "... captured TEvRead for " << shardActor << Endl; + reads.push_back(std::move(ev)); + } + }); + + std::vector plans; + auto capturePlans = runtime.AddObserver([&](TEvTxProcessing::TEvPlanStep::TPtr& ev) { + if (ev->GetRecipientRewrite() == shardActor) { + Cerr << "... captured TEvPlanStep for " << shardActor << Endl; + plans.push_back(std::move(ev)); + } + }); + + auto readFuture = KqpSimpleSend(runtime, R"( + SELECT * FROM `/Root/table-1` ORDER BY key; + )"); + + auto upsertFuture = KqpSimpleSend(runtime, R"( + UPSERT INTO `/Root/table-1` SELECT * FROM `/Root/table-2`; + )"); + + WaitFor(runtime, [&]{ return reads.size() > 0 && plans.size() > 0; }, "read and plan"); + + captureReads.Remove(); + capturePlans.Remove(); + + TRowVersion lastTx; + for (auto& ev : plans) { + auto* msg = ev->Get(); + for (auto& tx : msg->Record.GetTransactions()) { + // Remember the last transaction in the plan + lastTx = TRowVersion(msg->Record.GetStep(), tx.GetTxId()); + } + runtime.Send(ev.Release(), 0, true); + } + plans.clear(); + + for (auto& ev : reads) { + auto* msg = ev->Get(); + // We expect it to be an immediate read + UNIT_ASSERT_C(!msg->Record.HasSnapshot(), msg->Record.DebugString()); + // Limit each chunk to just 2 rows + // This will force it to sleep and read in repeatable snapshot mode + msg->Record.SetMaxRowsInResult(2); + // Message must be immediate after plan in the mailbox + runtime.Send(ev.Release(), 0, true); + } + reads.clear(); + + std::vector readContinues; + auto captureReadContinues = runtime.AddObserver([&](TEvDataShard::TEvReadContinue::TPtr& ev) { + if (ev->GetRecipientRewrite() == shardActor) { + Cerr << "... captured TEvReadContinue for " << shardActor << Endl; + readContinues.push_back(std::move(ev)); + } + }); + + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(upsertFuture))), + ""); + + captureReadContinues.Remove(); + for (auto& ev : readContinues) { + runtime.Send(ev.Release(), 0, true); + } + readContinues.clear(); + + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(readFuture))), + // Technically result without 2, 4, 6 and 8 is possible + // In practice we will never block writes because of unfinished reads + "{ items { uint32_value: 1 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 20 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 30 } }, " + "{ items { uint32_value: 4 } items { uint32_value: 40 } }, " + "{ items { uint32_value: 5 } items { uint32_value: 50 } }, " + "{ items { uint32_value: 6 } items { uint32_value: 60 } }, " + "{ items { uint32_value: 7 } items { uint32_value: 70 } }, " + "{ items { uint32_value: 8 } items { uint32_value: 80 } }, " + "{ items { uint32_value: 9 } items { uint32_value: 90 } }"); + } + + Y_UNIT_TEST(LocalSnapshotReadHasRequiredDependencies) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + // We need to block transactions with readsets + .SetEnableDataShardVolatileTransactions(false); + TServer::TPtr server = new TServer(serverSettings); + + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + + auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + auto shardActor = ResolveTablet(runtime, shards.at(0)); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10), (3, 30), (5, 50), (7, 70);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20), (4, 40), (6, 60);"); + + std::vector readsets; + auto captureReadSets = runtime.AddObserver([&](TEvTxProcessing::TEvReadSet::TPtr& ev) { + if (ev->GetRecipientRewrite() == shardActor) { + Cerr << "... captured readset for " << ev->GetRecipientRewrite() << Endl; + readsets.push_back(std::move(ev)); + } + }); + + // Block while writing to some keys + auto upsertFuture = KqpSimpleSend(runtime, R"( + UPSERT INTO `/Root/table-1` SELECT * FROM `/Root/table-2`; + )"); + + WaitFor(runtime, [&]{ return readsets.size() > 0; }, "readset"); + + captureReadSets.Remove(); + + auto modifyReads = runtime.AddObserver([&](TEvDataShard::TEvRead::TPtr& ev) { + if (ev->GetRecipientRewrite() == shardActor) { + Cerr << "... modifying TEvRead for " << shardActor << Endl; + auto* msg = ev->Get(); + // We expect it to be an immediate read + UNIT_ASSERT_C(!msg->Record.HasSnapshot(), msg->Record.DebugString()); + // Limit each chunk to just 2 rows + // This will force it to sleep and read in repeatable snapshot mode + msg->Record.SetMaxRowsInResult(2); + } + }); + + // Read all rows, including currently undecided keys + auto readFuture = KqpSimpleSend(runtime, R"( + SELECT * FROM `/Root/table-1` + WHERE key <= 5 + ORDER BY key; + )"); + + // Give read a chance to finish incorrectly + runtime.SimulateSleep(TDuration::Seconds(1)); + + for (auto& ev : readsets) { + runtime.Send(ev.Release(), 0, true); + } + readsets.clear(); + + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(upsertFuture))), + ""); + + // We must have observed all rows at the given repeatable snapshot + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(readFuture))), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 20 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 30 } }, " + "{ items { uint32_value: 4 } items { uint32_value: 40 } }, " + "{ items { uint32_value: 5 } items { uint32_value: 50 } }"); + } + + Y_UNIT_TEST(LocalSnapshotReadNoUnnecessaryDependencies) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + // We need to block transactions with readsets + .SetEnableDataShardVolatileTransactions(false); + TServer::TPtr server = new TServer(serverSettings); + + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + + auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + auto shardActor = ResolveTablet(runtime, shards.at(0)); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10), (3, 30), (5, 50), (7, 70);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20), (4, 40), (6, 60);"); + + std::vector readsets; + auto captureReadSets = runtime.AddObserver([&](TEvTxProcessing::TEvReadSet::TPtr& ev) { + if (ev->GetRecipientRewrite() == shardActor) { + Cerr << "... captured readset for " << ev->GetRecipientRewrite() << Endl; + readsets.push_back(std::move(ev)); + } + }); + + // Block while writing to key 2 + auto upsertFuture = KqpSimpleSend(runtime, R"( + UPSERT INTO `/Root/table-1` SELECT * FROM `/Root/table-2` WHERE key = 2; + )"); + + WaitFor(runtime, [&]{ return readsets.size() > 0; }, "readset"); + + captureReadSets.Remove(); + + auto modifyReads = runtime.AddObserver([&](TEvDataShard::TEvRead::TPtr& ev) { + if (ev->GetRecipientRewrite() == shardActor) { + Cerr << "... modifying TEvRead for " << shardActor << Endl; + auto* msg = ev->Get(); + // We expect it to be an immediate read + UNIT_ASSERT_C(!msg->Record.HasSnapshot(), msg->Record.DebugString()); + // Limit each chunk to just 2 rows + // This will force it to sleep and read in repeatable snapshot mode + msg->Record.SetMaxRowsInResult(2); + } + }); + + // Read all rows, not including currently undecided keys + auto readFuture = KqpSimpleSend(runtime, R"( + SELECT * FROM `/Root/table-1` + WHERE key >= 3 + ORDER BY key; + )"); + + // Read must complete without waiting for the above upsert to finish + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(readFuture))), + "{ items { uint32_value: 3 } items { uint32_value: 30 } }, " + "{ items { uint32_value: 5 } items { uint32_value: 50 } }, " + "{ items { uint32_value: 7 } items { uint32_value: 70 } }"); + + for (auto& ev : readsets) { + runtime.Send(ev.Release(), 0, true); + } + readsets.clear(); + + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(upsertFuture))), + ""); + } + + Y_UNIT_TEST(LocalSnapshotReadWithConcurrentWrites) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + // We need to block transactions with readsets + .SetEnableDataShardVolatileTransactions(false); + TServer::TPtr server = new TServer(serverSettings); + + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + + auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + auto shardActor = ResolveTablet(runtime, shards.at(0)); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10), (3, 30), (5, 50), (7, 70);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20), (4, 40), (6, 60);"); + + std::vector readsets; + auto captureReadSets = runtime.AddObserver([&](TEvTxProcessing::TEvReadSet::TPtr& ev) { + if (ev->GetRecipientRewrite() == shardActor) { + Cerr << "... captured readset for " << ev->GetRecipientRewrite() << Endl; + readsets.push_back(std::move(ev)); + } + }); + + // The first upsert needs to block while writing to key 2 + auto upsertFuture1 = KqpSimpleSend(runtime, R"( + UPSERT INTO `/Root/table-1` SELECT * FROM `/Root/table-2` WHERE key = 2; + )"); + + WaitFor(runtime, [&]{ return readsets.size() > 0; }, "readset"); + + captureReadSets.Remove(); + + TRowVersion txVersion = TRowVersion::Min(); + auto observePlanSteps = runtime.AddObserver([&](TEvTxProcessing::TEvPlanStep::TPtr& ev) { + if (ev->GetRecipientRewrite() == shardActor) { + auto* msg = ev->Get(); + for (const auto& tx : msg->Record.GetTransactions()) { + txVersion = TRowVersion(msg->Record.GetStep(), tx.GetTxId()); + Cerr << "... observed plan for tx " << txVersion << Endl; + } + } + }); + + // Start a transaction that reads from key 3 + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, R"( + SELECT key, value FROM `/Root/table-1` WHERE key = 3; + )"), + "{ items { uint32_value: 3 } items { uint32_value: 30 } }"); + + // The second upsert should be ready to execute, but blocked by write-write conflict on key 2 + // Note we also read from key 3, so that later only one transaction may survive + auto upsertFuture2 = KqpSimpleSend(runtime, R"( + SELECT key, value FROM `/Root/table-1` WHERE key = 3; + $rows = ( + SELECT key, value FROM `/Root/table-2` WHERE key = 4 + UNION ALL + SELECT 2u AS key, 21u AS value + UNION ALL + SELECT 3u AS key, 31u AS value + ); + UPSERT INTO `/Root/table-1` SELECT * FROM $rows; + )"); + + WaitFor(runtime, [&]{ return txVersion != TRowVersion::Min(); }, "plan step"); + + observePlanSteps.Remove(); + auto forceSnapshotRead = runtime.AddObserver([&](TEvDataShard::TEvRead::TPtr& ev) { + if (ev->GetRecipientRewrite() == shardActor) { + auto* msg = ev->Get(); + if (!msg->Record.HasSnapshot()) { + Cerr << "... forcing read snapshot " << txVersion << Endl; + msg->Record.MutableSnapshot()->SetStep(txVersion.Step); + msg->Record.MutableSnapshot()->SetTxId(txVersion.TxId); + } + } + }); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 5 + ORDER BY key; + )"), + "{ items { uint32_value: 5 } items { uint32_value: 50 } }, " + "{ items { uint32_value: 7 } items { uint32_value: 70 } }"); + + auto commitFuture = KqpSimpleSendCommit(runtime, sessionId, txId, R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 32); + )"); + + // Give it all a chance to complete + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Unblock readsets + for (auto& ev : readsets) { + runtime.Send(ev.Release(), 0, true); + } + readsets.clear(); + + auto result1 = FormatResult(AwaitResponse(runtime, std::move(upsertFuture2))); + auto result2 = FormatResult(AwaitResponse(runtime, std::move(commitFuture))); + + UNIT_ASSERT_C( + result1 == "ERROR: ABORTED" || result2 == "ERROR: ABORTED", + "result1: " << result1 << ", " + "result2: " << result2); + } + +} + } // namespace NKikimr diff --git a/ydb/core/tx/datashard/operation.cpp b/ydb/core/tx/datashard/operation.cpp index f1779c24bf91..9de4faa6bb17 100644 --- a/ydb/core/tx/datashard/operation.cpp +++ b/ydb/core/tx/datashard/operation.cpp @@ -183,6 +183,40 @@ void TOperation::ClearImmediateConflicts() { ImmediateConflicts.clear(); } +void TOperation::AddRepeatableReadConflict(const TOperation::TPtr &op) { + Y_ABORT_UNLESS(this != op.Get()); + Y_DEBUG_ABORT_UNLESS(IsImmediate()); + Y_DEBUG_ABORT_UNLESS(!op->IsImmediate()); + + if (IsMvccSnapshotRepeatable()) { + AddDependency(this); + return; + } + + if (RepeatableReadConflicts.insert(op).second) { + op->RepeatableReadConflicts.insert(this); + } +} + +void TOperation::PromoteRepeatableReadConflicts() { + Y_ABORT_UNLESS(IsImmediate()); + + for (auto& op : RepeatableReadConflicts) { + Y_DEBUG_ABORT_UNLESS(op->RepeatableReadConflicts.contains(this)); + op->RepeatableReadConflicts.erase(this); + AddDependency(op); + } + RepeatableReadConflicts.clear(); +} + +void TOperation::ClearRepeatableReadConflicts() { + for (auto& op : RepeatableReadConflicts) { + Y_DEBUG_ABORT_UNLESS(op->RepeatableReadConflicts.contains(this)); + op->RepeatableReadConflicts.erase(this); + } + RepeatableReadConflicts.clear(); +} + void TOperation::AddVolatileDependency(ui64 txId) { VolatileDependencies.insert(txId); } diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h index 192661fb1fe8..3a7f798ddd30 100644 --- a/ydb/core/tx/datashard/operation.h +++ b/ydb/core/tx/datashard/operation.h @@ -719,6 +719,7 @@ class TOperation const absl::flat_hash_set> &GetSpecialDependencies() const { return SpecialDependencies; } const absl::flat_hash_set> &GetPlannedConflicts() const { return PlannedConflicts; } const absl::flat_hash_set> &GetImmediateConflicts() const { return ImmediateConflicts; } + const absl::flat_hash_set> &GetRepeatableReadConflicts() const { return RepeatableReadConflicts; } const absl::flat_hash_set &GetVolatileDependencies() const { return VolatileDependencies; } bool HasVolatileDependencies() const { return !VolatileDependencies.empty(); } bool GetVolatileDependenciesAborted() const { return VolatileDependenciesAborted; } @@ -736,6 +737,10 @@ class TOperation void ClearSpecialDependents(); void ClearSpecialDependencies(); + void AddRepeatableReadConflict(const TOperation::TPtr &op); + void PromoteRepeatableReadConflicts(); + void ClearRepeatableReadConflicts(); + void AddVolatileDependency(ui64 txId); void RemoveVolatileDependency(ui64 txId, bool success); void ClearVolatileDependenciesAborted() { VolatileDependenciesAborted = false; } @@ -925,6 +930,7 @@ class TOperation absl::flat_hash_set> SpecialDependencies; absl::flat_hash_set> PlannedConflicts; absl::flat_hash_set> ImmediateConflicts; + absl::flat_hash_set> RepeatableReadConflicts; absl::flat_hash_set VolatileDependencies; bool VolatileDependenciesAborted = false; TVector ExecutionPlan; diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h index a42653089eb2..8b2a4a2b7e5e 100644 --- a/ydb/core/tx/datashard/read_iterator.h +++ b/ydb/core/tx/datashard/read_iterator.h @@ -179,7 +179,9 @@ struct TReadIteratorState { bool Reverse = false; - std::shared_ptr Request; + // The original event handle + TEvDataShard::TEvRead::TPtr Ev; + TEvDataShard::TEvRead* Request = nullptr; // parallel to Request->Keys, but real data only in indices, // where in Request->Keys we have key prefix (here we have properly extended one).