From bc426c575e9fa4e8a2b1baec4a011aa3f71e45b6 Mon Sep 17 00:00:00 2001 From: Aleksei Borzenkov Date: Mon, 12 Aug 2024 16:05:03 +0000 Subject: [PATCH] Fix unexpected read iterator stream reset --- .../tx/datashard/datashard__read_iterator.cpp | 83 +++++++++-- ydb/core/tx/datashard/datashard_impl.h | 4 + .../datashard/datashard_ut_read_iterator.cpp | 134 ++++++++++++++++++ ydb/core/tx/datashard/read_iterator.h | 1 + 4 files changed, 209 insertions(+), 13 deletions(-) diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index ba5ea8e85e03..b0dadde686c7 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -5,6 +5,7 @@ #include "datashard_locks_db.h" #include "probes.h" +#include #include #include @@ -315,6 +316,8 @@ class TReader { , Self(self) , TableId(state.PathId.OwnerId, state.PathId.LocalPathId, state.SchemaVersion) , FirstUnprocessedQuery(State.FirstUnprocessedQuery) + , LastProcessedKey(State.LastProcessedKey) + , LastProcessedKeyErasedOrMissing(State.LastProcessedKeyErasedOrMissing) { GetTimeFast(&StartTime); EndTime = StartTime; @@ -329,10 +332,10 @@ class TReader { bool toInclusive; TSerializedCellVec keyFromCells; TSerializedCellVec keyToCells; - if (Y_UNLIKELY(FirstUnprocessedQuery == State.FirstUnprocessedQuery && State.LastProcessedKey)) { + if (LastProcessedKey) { if (!State.Reverse) { - keyFromCells = TSerializedCellVec(State.LastProcessedKey); - fromInclusive = State.LastProcessedKeyErasedOrMissing; + keyFromCells = TSerializedCellVec(LastProcessedKey); + fromInclusive = LastProcessedKeyErasedOrMissing; keyToCells = range.To; toInclusive = range.ToInclusive; @@ -341,8 +344,8 @@ class TReader { keyFromCells = range.From; fromInclusive = range.FromInclusive; - keyToCells = TSerializedCellVec(State.LastProcessedKey); - toInclusive = State.LastProcessedKeyErasedOrMissing; + keyToCells = TSerializedCellVec(LastProcessedKey); + toInclusive = LastProcessedKeyErasedOrMissing; } } else { keyFromCells = range.From; @@ -505,6 +508,7 @@ class TReader { while (FirstUnprocessedQuery < State.Request->Ranges.size()) { if (ReachedTotalRowsLimit()) { FirstUnprocessedQuery = -1; + LastProcessedKey.clear(); return true; } @@ -531,6 +535,7 @@ class TReader { FirstUnprocessedQuery++; else FirstUnprocessedQuery--; + LastProcessedKey.clear(); } return true; @@ -542,6 +547,7 @@ class TReader { while (FirstUnprocessedQuery < State.Request->Keys.size()) { if (ReachedTotalRowsLimit()) { FirstUnprocessedQuery = -1; + LastProcessedKey.clear(); return true; } @@ -567,6 +573,7 @@ class TReader { FirstUnprocessedQuery++; else FirstUnprocessedQuery--; + LastProcessedKey.clear(); } return true; @@ -732,6 +739,28 @@ class TReader { } void UpdateState(TReadIteratorState& state, bool sentResult) { + if (state.FirstUnprocessedQuery == FirstUnprocessedQuery && + state.LastProcessedKey && !LastProcessedKey) + { + LOG_CRIT_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, + "DataShard " << Self->TabletID() << " detected unexpected reset of LastProcessedKey:" + << " ReadId# " << State.ReadId + << " LastSeqNo# " << State.SeqNo + << " LastQuery# " << State.FirstUnprocessedQuery + << " RowsRead# " << RowsRead + << " RowsProcessed# " << RowsProcessed + << " RowsSinceLastCheck# " << RowsSinceLastCheck + << " BytesInResult# " << BytesInResult + << " DeletedRowSkips# " << DeletedRowSkips + << " InvisibleRowSkips# " << InvisibleRowSkips + << " Quota.Rows# " << State.Quota.Rows + << " Quota.Bytes# " << State.Quota.Bytes + << " State.TotalRows# " << State.TotalRows + << " State.TotalRowsLimit# " << State.TotalRowsLimit + << " State.MaxRowsInResult# " << State.MaxRowsInResult); + Self->IncCounterReadIteratorLastKeyReset(); + } + state.TotalRows += RowsRead; state.FirstUnprocessedQuery = FirstUnprocessedQuery; state.LastProcessedKey = LastProcessedKey; @@ -1683,6 +1712,7 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { if (Reader->HasUnreadQueries()) { Reader->UpdateState(state, ResultSent); if (!state.IsExhausted()) { + state.ReadContinuePending = true; ctx.Send( Self->SelfId(), new TEvDataShard::TEvReadContinue(ReadId.Sender, ReadId.ReadId)); @@ -2333,6 +2363,15 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase Y_ASSERT(it->second); auto& state = *it->second; + if (state.IsExhausted()) { + // iterator quota reduced and exhausted while ReadContinue was inflight + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for iterator# " << ReadId + << ", quota exhausted while rescheduling"); + state.ReadContinuePending = false; + Result.reset(); + return true; + } + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for iterator# " << ReadId << ", firstUnprocessedQuery# " << state.FirstUnprocessedQuery); @@ -2446,6 +2485,7 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase if (Reader->Read(txc, ctx)) { // Retry later when dependencies are resolved if (!Reader->GetVolatileReadDependencies().empty()) { + state.ReadContinuePending = true; Self->WaitVolatileDependenciesThenSend( Reader->GetVolatileReadDependencies(), Self->SelfId(), @@ -2532,6 +2572,8 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase Y_ABORT_UNLESS(it->second); auto& state = *it->second; + state.ReadContinuePending = false; + if (!Result) { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << ReadId << " TTxReadContinue::Execute() finished without Result, aborting"); @@ -2579,14 +2621,14 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase } if (Reader->HasUnreadQueries()) { - Y_ASSERT(it->second); - auto& state = *it->second; + bool wasExhausted = state.IsExhausted(); Reader->UpdateState(state, useful); if (!state.IsExhausted()) { + state.ReadContinuePending = true; ctx.Send( Self->SelfId(), new TEvDataShard::TEvReadContinue(ReadId.Sender, ReadId.ReadId)); - } else { + } else if (!wasExhausted) { Self->IncCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << ReadId << " exhausted"); @@ -2859,14 +2901,19 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext& bool wasExhausted = state.IsExhausted(); state.UpQuota( record.GetSeqNo(), - record.GetMaxRows(), - record.GetMaxBytes()); + record.HasMaxRows() ? record.GetMaxRows() : Max(), + record.HasMaxBytes() ? record.GetMaxBytes() : Max()); if (wasExhausted && !state.IsExhausted()) { DecCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT); - ctx.Send( - SelfId(), - new TEvDataShard::TEvReadContinue(ev->Sender, record.GetReadId())); + if (!state.ReadContinuePending) { + state.ReadContinuePending = true; + ctx.Send( + SelfId(), + new TEvDataShard::TEvReadContinue(ev->Sender, record.GetReadId())); + } + } else if (!wasExhausted && state.IsExhausted()) { + IncCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT); } LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadAck for read iterator# " << readId @@ -2995,6 +3042,16 @@ void TDataShard::UnsubscribeReadIteratorSessions(const TActorContext& ctx) { ReadIteratorSessions.clear(); } +void TDataShard::IncCounterReadIteratorLastKeyReset() { + if (!CounterReadIteratorLastKeyReset) { + CounterReadIteratorLastKeyReset = GetServiceCounters(AppData()->Counters, "tablets") + ->GetSubgroup("type", "DataShard") + ->GetSubgroup("category", "app") + ->GetCounter("DataShard/ReadIteratorLastKeyReset", true); + } + ++*CounterReadIteratorLastKeyReset; +} + } // NKikimr::NDataShard template<> diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index c5b1fff03a5a..053e7b69d5ef 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -3322,6 +3322,10 @@ class TDataShard bool AllowCancelROwithReadsets() const; void ResolveTablePath(const TActorContext &ctx); + +public: + NMonitoring::TDynamicCounters::TCounterPtr CounterReadIteratorLastKeyReset; + void IncCounterReadIteratorLastKeyReset(); }; NKikimrTxDataShard::TError::EKind ConvertErrCode(NMiniKQL::IEngineFlat::EResult code); diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index 6b87667757bb..2ef259939309 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -4627,6 +4627,140 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorConsistency) { "result2: " << result2); } + template + class TBlockEvents : public std::deque { + public: + TBlockEvents(TTestActorRuntime& runtime, std::function condition = {}) + : Runtime(runtime) + , Condition(std::move(condition)) + , Holder(Runtime.AddObserver( + [this](typename TEvType::TPtr& ev) { + this->Process(ev); + })) + {} + + TBlockEvents& Unblock(size_t count = -1) { + while (!this->empty() && count > 0) { + auto& ev = this->front(); + IEventHandle* ptr = ev.Get(); + UnblockedOnce.insert(ptr); + Runtime.Send(ev.Release(), 0, /* viaActorSystem */ true); + this->pop_front(); + --count; + } + return *this; + } + + void Stop() { + UnblockedOnce.clear(); + Holder.Remove(); + } + + private: + void Process(typename TEvType::TPtr& ev) { + IEventHandle* ptr = ev.Get(); + auto it = UnblockedOnce.find(ptr); + if (it != UnblockedOnce.end()) { + UnblockedOnce.erase(it); + return; + } + + if (Condition && !Condition(ev)) { + return; + } + + this->emplace_back(std::move(ev)); + } + + private: + TTestActorRuntime& Runtime; + std::function Condition; + TTestActorRuntime::TEventObserverHolder Holder; + THashSet UnblockedOnce; + }; + + Y_UNIT_TEST(Bug_7674_IteratorDuplicateRows) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + TServer::TPtr server = new TServer(serverSettings); + + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + + CreateShardedTable(server, sender, "/Root", "table-1", 1); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10), (2, 20), (3, 30), (4, 40), (5, 50);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (6, 60), (7, 70), (8, 80), (9, 90), (10, 100);"); + runtime.SimulateSleep(TDuration::Seconds(1)); + + auto forceSmallChunks = runtime.AddObserver( + [&](TEvDataShard::TEvRead::TPtr& ev) { + auto* msg = ev->Get(); + // Force chunks of at most 3 rows + msg->Record.SetMaxRowsInResult(3); + }); + + TBlockEvents blockedAcks(runtime); + TBlockEvents blockedResults(runtime); + TBlockEvents blockedContinue(runtime); + + auto waitFor = [&](const TString& description, const auto& condition, size_t count = 1) { + while (!condition()) { + UNIT_ASSERT_C(count > 0, "... failed to wait for " << description); + Cerr << "... waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + runtime.DispatchEvents(options); + --count; + } + }; + + auto readFuture = KqpSimpleSend(runtime, "SELECT key, value FROM `/Root/table-1` ORDER BY key LIMIT 7"); + waitFor("first TEvReadContinue", [&]{ return blockedContinue.size() >= 1; }); + waitFor("first TEvReadResult", [&]{ return blockedResults.size() >= 1; }); + + blockedContinue.Unblock(1); + waitFor("second TEvReadContinue", [&]{ return blockedContinue.size() >= 1; }); + waitFor("second TEvReadResult", [&]{ return blockedResults.size() >= 2; }); + + // We need both results to arrive without pauses + blockedResults.Unblock(); + + waitFor("both TEvReadAcks", [&]{ return blockedAcks.size() >= 2; }); + + // Unblock the first TEvReadAck and then pending TEvReadContinue + blockedAcks.Unblock(1); + blockedContinue.Unblock(1); + + // Give it some time to trigger the bug + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + + // Stop blocking everything + blockedAcks.Unblock().Stop(); + blockedResults.Unblock().Stop(); + blockedContinue.Unblock().Stop(); + + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(readFuture))), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 20 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 30 } }, " + "{ items { uint32_value: 4 } items { uint32_value: 40 } }, " + "{ items { uint32_value: 5 } items { uint32_value: 50 } }, " + "{ items { uint32_value: 6 } items { uint32_value: 60 } }, " + "{ items { uint32_value: 7 } items { uint32_value: 70 } }"); + } + } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h index 913c6d8d75f3..96cea578d50d 100644 --- a/ydb/core/tx/datashard/read_iterator.h +++ b/ydb/core/tx/datashard/read_iterator.h @@ -205,6 +205,7 @@ struct TReadIteratorState { TActorId SessionId; TMonotonic StartTs; bool IsFinished = false; + bool ReadContinuePending = false; // note that we send SeqNo's starting from 1 ui64 SeqNo = 0;