Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unexpected read iterator stream reset #7697

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 70 additions & 13 deletions ydb/core/tx/datashard/datashard__read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "datashard_locks_db.h"
#include "probes.h"

#include <ydb/core/base/counters.h>
#include <ydb/core/formats/arrow/arrow_batch_builder.h>

#include <ydb/library/actors/core/monotonic_provider.h>
Expand Down Expand Up @@ -315,6 +316,8 @@ class TReader {
, Self(self)
, TableId(state.PathId.OwnerId, state.PathId.LocalPathId, state.SchemaVersion)
, FirstUnprocessedQuery(State.FirstUnprocessedQuery)
, LastProcessedKey(State.LastProcessedKey)
, LastProcessedKeyErasedOrMissing(State.LastProcessedKeyErasedOrMissing)
{
GetTimeFast(&StartTime);
EndTime = StartTime;
Expand All @@ -329,10 +332,10 @@ class TReader {
bool toInclusive;
TSerializedCellVec keyFromCells;
TSerializedCellVec keyToCells;
if (Y_UNLIKELY(FirstUnprocessedQuery == State.FirstUnprocessedQuery && State.LastProcessedKey)) {
if (LastProcessedKey) {
if (!State.Reverse) {
keyFromCells = TSerializedCellVec(State.LastProcessedKey);
fromInclusive = State.LastProcessedKeyErasedOrMissing;
keyFromCells = TSerializedCellVec(LastProcessedKey);
fromInclusive = LastProcessedKeyErasedOrMissing;

keyToCells = range.To;
toInclusive = range.ToInclusive;
Expand All @@ -341,8 +344,8 @@ class TReader {
keyFromCells = range.From;
fromInclusive = range.FromInclusive;

keyToCells = TSerializedCellVec(State.LastProcessedKey);
toInclusive = State.LastProcessedKeyErasedOrMissing;
keyToCells = TSerializedCellVec(LastProcessedKey);
toInclusive = LastProcessedKeyErasedOrMissing;
}
} else {
keyFromCells = range.From;
Expand Down Expand Up @@ -505,6 +508,7 @@ class TReader {
while (FirstUnprocessedQuery < State.Request->Ranges.size()) {
if (ReachedTotalRowsLimit()) {
FirstUnprocessedQuery = -1;
LastProcessedKey.clear();
return true;
}

Expand All @@ -531,6 +535,7 @@ class TReader {
FirstUnprocessedQuery++;
else
FirstUnprocessedQuery--;
LastProcessedKey.clear();
}

return true;
Expand All @@ -542,6 +547,7 @@ class TReader {
while (FirstUnprocessedQuery < State.Request->Keys.size()) {
if (ReachedTotalRowsLimit()) {
FirstUnprocessedQuery = -1;
LastProcessedKey.clear();
return true;
}

Expand All @@ -567,6 +573,7 @@ class TReader {
FirstUnprocessedQuery++;
else
FirstUnprocessedQuery--;
LastProcessedKey.clear();
snaury marked this conversation as resolved.
Show resolved Hide resolved
}

return true;
Expand Down Expand Up @@ -732,6 +739,28 @@ class TReader {
}

void UpdateState(TReadIteratorState& state, bool sentResult) {
if (state.FirstUnprocessedQuery == FirstUnprocessedQuery &&
state.LastProcessedKey && !LastProcessedKey)
{
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"DataShard " << Self->TabletID() << " detected unexpected reset of LastProcessedKey:"
<< " ReadId# " << State.ReadId
<< " LastSeqNo# " << State.SeqNo
<< " LastQuery# " << State.FirstUnprocessedQuery
<< " RowsRead# " << RowsRead
<< " RowsProcessed# " << RowsProcessed
<< " RowsSinceLastCheck# " << RowsSinceLastCheck
<< " BytesInResult# " << BytesInResult
<< " DeletedRowSkips# " << DeletedRowSkips
<< " InvisibleRowSkips# " << InvisibleRowSkips
<< " Quota.Rows# " << State.Quota.Rows
<< " Quota.Bytes# " << State.Quota.Bytes
<< " State.TotalRows# " << State.TotalRows
<< " State.TotalRowsLimit# " << State.TotalRowsLimit
<< " State.MaxRowsInResult# " << State.MaxRowsInResult);
Self->IncCounterReadIteratorLastKeyReset();
}

state.TotalRows += RowsRead;
state.FirstUnprocessedQuery = FirstUnprocessedQuery;
state.LastProcessedKey = LastProcessedKey;
Expand Down Expand Up @@ -1683,6 +1712,7 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation {
if (Reader->HasUnreadQueries()) {
Reader->UpdateState(state, ResultSent);
if (!state.IsExhausted()) {
MBkkt marked this conversation as resolved.
Show resolved Hide resolved
state.ReadContinuePending = true;
ctx.Send(
Self->SelfId(),
new TEvDataShard::TEvReadContinue(ReadId.Sender, ReadId.ReadId));
Expand Down Expand Up @@ -2333,6 +2363,15 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
Y_ASSERT(it->second);
auto& state = *it->second;

if (state.IsExhausted()) {
// iterator quota reduced and exhausted while ReadContinue was inflight
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for iterator# " << ReadId
<< ", quota exhausted while rescheduling");
state.ReadContinuePending = false;
Result.reset();
return true;
}

LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for iterator# " << ReadId
<< ", firstUnprocessedQuery# " << state.FirstUnprocessedQuery);

Expand Down Expand Up @@ -2446,6 +2485,7 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
if (Reader->Read(txc, ctx)) {
// Retry later when dependencies are resolved
if (!Reader->GetVolatileReadDependencies().empty()) {
state.ReadContinuePending = true;
Self->WaitVolatileDependenciesThenSend(
Reader->GetVolatileReadDependencies(),
Self->SelfId(),
Expand Down Expand Up @@ -2532,6 +2572,8 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
Y_ABORT_UNLESS(it->second);
auto& state = *it->second;

state.ReadContinuePending = false;

if (!Result) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << ReadId
<< " TTxReadContinue::Execute() finished without Result, aborting");
Expand Down Expand Up @@ -2579,14 +2621,14 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
}

if (Reader->HasUnreadQueries()) {
Y_ASSERT(it->second);
auto& state = *it->second;
bool wasExhausted = state.IsExhausted();
Reader->UpdateState(state, useful);
if (!state.IsExhausted()) {
state.ReadContinuePending = true;
ctx.Send(
Self->SelfId(),
new TEvDataShard::TEvReadContinue(ReadId.Sender, ReadId.ReadId));
} else {
} else if (!wasExhausted) {
Self->IncCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT);
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID()
<< " read iterator# " << ReadId << " exhausted");
Expand Down Expand Up @@ -2859,14 +2901,19 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext&
bool wasExhausted = state.IsExhausted();
state.UpQuota(
record.GetSeqNo(),
record.GetMaxRows(),
record.GetMaxBytes());
record.HasMaxRows() ? record.GetMaxRows() : Max<ui64>(),
record.HasMaxBytes() ? record.GetMaxBytes() : Max<ui64>());

if (wasExhausted && !state.IsExhausted()) {
DecCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT);
ctx.Send(
SelfId(),
new TEvDataShard::TEvReadContinue(ev->Sender, record.GetReadId()));
if (!state.ReadContinuePending) {
state.ReadContinuePending = true;
ctx.Send(
SelfId(),
new TEvDataShard::TEvReadContinue(ev->Sender, record.GetReadId()));
}
} else if (!wasExhausted && state.IsExhausted()) {
IncCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT);
}

LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadAck for read iterator# " << readId
Expand Down Expand Up @@ -2995,6 +3042,16 @@ void TDataShard::UnsubscribeReadIteratorSessions(const TActorContext& ctx) {
ReadIteratorSessions.clear();
}

void TDataShard::IncCounterReadIteratorLastKeyReset() {
if (!CounterReadIteratorLastKeyReset) {
CounterReadIteratorLastKeyReset = GetServiceCounters(AppData()->Counters, "tablets")
->GetSubgroup("type", "DataShard")
->GetSubgroup("category", "app")
->GetCounter("DataShard/ReadIteratorLastKeyReset", true);
}
++*CounterReadIteratorLastKeyReset;
}

} // NKikimr::NDataShard

template<>
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3322,6 +3322,10 @@ class TDataShard
bool AllowCancelROwithReadsets() const;

void ResolveTablePath(const TActorContext &ctx);

public:
NMonitoring::TDynamicCounters::TCounterPtr CounterReadIteratorLastKeyReset;
void IncCounterReadIteratorLastKeyReset();
};

NKikimrTxDataShard::TError::EKind ConvertErrCode(NMiniKQL::IEngineFlat::EResult code);
Expand Down
134 changes: 134 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4627,6 +4627,140 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorConsistency) {
"result2: " << result2);
}

template<class TEvType>
class TBlockEvents : public std::deque<typename TEvType::TPtr> {
public:
TBlockEvents(TTestActorRuntime& runtime, std::function<bool(typename TEvType::TPtr&)> condition = {})
: Runtime(runtime)
, Condition(std::move(condition))
, Holder(Runtime.AddObserver<TEvType>(
[this](typename TEvType::TPtr& ev) {
this->Process(ev);
}))
{}

TBlockEvents& Unblock(size_t count = -1) {
while (!this->empty() && count > 0) {
auto& ev = this->front();
IEventHandle* ptr = ev.Get();
UnblockedOnce.insert(ptr);
Runtime.Send(ev.Release(), 0, /* viaActorSystem */ true);
this->pop_front();
--count;
}
return *this;
}

void Stop() {
UnblockedOnce.clear();
Holder.Remove();
}

private:
void Process(typename TEvType::TPtr& ev) {
IEventHandle* ptr = ev.Get();
auto it = UnblockedOnce.find(ptr);
if (it != UnblockedOnce.end()) {
UnblockedOnce.erase(it);
return;
}

if (Condition && !Condition(ev)) {
return;
}

this->emplace_back(std::move(ev));
}

private:
TTestActorRuntime& Runtime;
std::function<bool(typename TEvType::TPtr&)> Condition;
TTestActorRuntime::TEventObserverHolder Holder;
THashSet<IEventHandle*> UnblockedOnce;
};

Y_UNIT_TEST(Bug_7674_IteratorDuplicateRows) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false);
TServer::TPtr server = new TServer(serverSettings);

auto& runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;

CreateShardedTable(server, sender, "/Root", "table-1", 1);

ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10), (2, 20), (3, 30), (4, 40), (5, 50);");
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (6, 60), (7, 70), (8, 80), (9, 90), (10, 100);");
runtime.SimulateSleep(TDuration::Seconds(1));

auto forceSmallChunks = runtime.AddObserver<TEvDataShard::TEvRead>(
[&](TEvDataShard::TEvRead::TPtr& ev) {
auto* msg = ev->Get();
// Force chunks of at most 3 rows
msg->Record.SetMaxRowsInResult(3);
});

TBlockEvents<TEvDataShard::TEvReadAck> blockedAcks(runtime);
TBlockEvents<TEvDataShard::TEvReadResult> blockedResults(runtime);
TBlockEvents<TEvDataShard::TEvReadContinue> blockedContinue(runtime);

auto waitFor = [&](const TString& description, const auto& condition, size_t count = 1) {
while (!condition()) {
UNIT_ASSERT_C(count > 0, "... failed to wait for " << description);
Cerr << "... waiting for " << description << Endl;
TDispatchOptions options;
options.CustomFinalCondition = [&]() {
return condition();
};
runtime.DispatchEvents(options);
--count;
}
};

auto readFuture = KqpSimpleSend(runtime, "SELECT key, value FROM `/Root/table-1` ORDER BY key LIMIT 7");
waitFor("first TEvReadContinue", [&]{ return blockedContinue.size() >= 1; });
waitFor("first TEvReadResult", [&]{ return blockedResults.size() >= 1; });

blockedContinue.Unblock(1);
waitFor("second TEvReadContinue", [&]{ return blockedContinue.size() >= 1; });
waitFor("second TEvReadResult", [&]{ return blockedResults.size() >= 2; });

// We need both results to arrive without pauses
blockedResults.Unblock();

waitFor("both TEvReadAcks", [&]{ return blockedAcks.size() >= 2; });

// Unblock the first TEvReadAck and then pending TEvReadContinue
blockedAcks.Unblock(1);
blockedContinue.Unblock(1);

// Give it some time to trigger the bug
runtime.SimulateSleep(TDuration::MilliSeconds(1));

// Stop blocking everything
blockedAcks.Unblock().Stop();
blockedResults.Unblock().Stop();
blockedContinue.Unblock().Stop();

UNIT_ASSERT_VALUES_EQUAL(
FormatResult(AwaitResponse(runtime, std::move(readFuture))),
"{ items { uint32_value: 1 } items { uint32_value: 10 } }, "
"{ items { uint32_value: 2 } items { uint32_value: 20 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 30 } }, "
"{ items { uint32_value: 4 } items { uint32_value: 40 } }, "
"{ items { uint32_value: 5 } items { uint32_value: 50 } }, "
"{ items { uint32_value: 6 } items { uint32_value: 60 } }, "
"{ items { uint32_value: 7 } items { uint32_value: 70 } }");
}

}

} // namespace NKikimr
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/read_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ struct TReadIteratorState {
TActorId SessionId;
TMonotonic StartTs;
bool IsFinished = false;
bool ReadContinuePending = false;

// note that we send SeqNo's starting from 1
ui64 SeqNo = 0;
Expand Down
Loading