Skip to content

Commit

Permalink
Merge 18961ab into c85f845
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Aug 13, 2024
2 parents c85f845 + 18961ab commit 83603dc
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 13 deletions.
83 changes: 70 additions & 13 deletions ydb/core/tx/datashard/datashard__read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "datashard_locks_db.h"
#include "probes.h"

#include <ydb/core/base/counters.h>
#include <ydb/core/formats/arrow/arrow_batch_builder.h>

#include <ydb/library/actors/core/monotonic_provider.h>
Expand Down Expand Up @@ -315,6 +316,8 @@ class TReader {
, Self(self)
, TableId(state.PathId.OwnerId, state.PathId.LocalPathId, state.SchemaVersion)
, FirstUnprocessedQuery(State.FirstUnprocessedQuery)
, LastProcessedKey(State.LastProcessedKey)
, LastProcessedKeyErased(State.LastProcessedKeyErased)
{
GetTimeFast(&StartTime);
EndTime = StartTime;
Expand All @@ -329,10 +332,10 @@ class TReader {
bool toInclusive;
TSerializedCellVec keyFromCells;
TSerializedCellVec keyToCells;
if (Y_UNLIKELY(FirstUnprocessedQuery == State.FirstUnprocessedQuery && State.LastProcessedKey)) {
if (LastProcessedKey) {
if (!State.Reverse) {
keyFromCells = TSerializedCellVec(State.LastProcessedKey);
fromInclusive = State.LastProcessedKeyErased;
keyFromCells = TSerializedCellVec(LastProcessedKey);
fromInclusive = LastProcessedKeyErased;

keyToCells = range.To;
toInclusive = range.ToInclusive;
Expand All @@ -341,8 +344,8 @@ class TReader {
keyFromCells = range.From;
fromInclusive = range.FromInclusive;

keyToCells = TSerializedCellVec(State.LastProcessedKey);
toInclusive = State.LastProcessedKeyErased;
keyToCells = TSerializedCellVec(LastProcessedKey);
toInclusive = LastProcessedKeyErased;
}
} else {
keyFromCells = range.From;
Expand Down Expand Up @@ -500,6 +503,7 @@ class TReader {
while (FirstUnprocessedQuery < State.Request->Ranges.size()) {
if (ReachedTotalRowsLimit()) {
FirstUnprocessedQuery = -1;
LastProcessedKey.clear();
return true;
}

Expand All @@ -526,6 +530,7 @@ class TReader {
FirstUnprocessedQuery++;
else
FirstUnprocessedQuery--;
LastProcessedKey.clear();
}

return true;
Expand All @@ -537,6 +542,7 @@ class TReader {
while (FirstUnprocessedQuery < State.Request->Keys.size()) {
if (ReachedTotalRowsLimit()) {
FirstUnprocessedQuery = -1;
LastProcessedKey.clear();
return true;
}

Expand All @@ -562,6 +568,7 @@ class TReader {
FirstUnprocessedQuery++;
else
FirstUnprocessedQuery--;
LastProcessedKey.clear();
}

return true;
Expand Down Expand Up @@ -727,6 +734,28 @@ class TReader {
}

void UpdateState(TReadIteratorState& state, bool sentResult) {
if (state.FirstUnprocessedQuery == FirstUnprocessedQuery &&
state.LastProcessedKey && !LastProcessedKey)
{
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"DataShard " << Self->TabletID() << " detected unexpected reset of LastProcessedKey:"
<< " ReadId# " << State.ReadId
<< " LastSeqNo# " << State.SeqNo
<< " LastQuery# " << State.FirstUnprocessedQuery
<< " RowsRead# " << RowsRead
<< " RowsProcessed# " << RowsProcessed
<< " RowsSinceLastCheck# " << RowsSinceLastCheck
<< " BytesInResult# " << BytesInResult
<< " DeletedRowSkips# " << DeletedRowSkips
<< " InvisibleRowSkips# " << InvisibleRowSkips
<< " Quota.Rows# " << State.Quota.Rows
<< " Quota.Bytes# " << State.Quota.Bytes
<< " State.TotalRows# " << State.TotalRows
<< " State.TotalRowsLimit# " << State.TotalRowsLimit
<< " State.MaxRowsInResult# " << State.MaxRowsInResult);
Self->IncCounterReadIteratorLastKeyReset();
}

state.TotalRows += RowsRead;
state.FirstUnprocessedQuery = FirstUnprocessedQuery;
state.LastProcessedKey = LastProcessedKey;
Expand Down Expand Up @@ -1632,6 +1661,7 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation {
if (Reader->HasUnreadQueries()) {
Reader->UpdateState(state, ResultSent);
if (!state.IsExhausted()) {
state.ReadContinuePending = true;
ctx.Send(
Self->SelfId(),
new TEvDataShard::TEvReadContinue(ReadId.Sender, ReadId.ReadId));
Expand Down Expand Up @@ -2282,6 +2312,15 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
Y_ASSERT(it->second);
auto& state = *it->second;

if (state.IsExhausted()) {
// iterator quota reduced and exhausted while ReadContinue was inflight
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for iterator# " << ReadId
<< ", quota exhausted while rescheduling");
state.ReadContinuePending = false;
Result.reset();
return true;
}

LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for iterator# " << ReadId
<< ", firstUnprocessedQuery# " << state.FirstUnprocessedQuery);

Expand Down Expand Up @@ -2394,6 +2433,7 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
if (Reader->Read(txc, ctx)) {
// Retry later when dependencies are resolved
if (!Reader->GetVolatileReadDependencies().empty()) {
state.ReadContinuePending = true;
Self->WaitVolatileDependenciesThenSend(
Reader->GetVolatileReadDependencies(),
Self->SelfId(),
Expand Down Expand Up @@ -2480,6 +2520,8 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
Y_ABORT_UNLESS(it->second);
auto& state = *it->second;

state.ReadContinuePending = false;

if (!Result) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << ReadId
<< " TTxReadContinue::Execute() finished without Result, aborting");
Expand Down Expand Up @@ -2527,14 +2569,14 @@ class TDataShard::TTxReadContinue : public NTabletFlatExecutor::TTransactionBase
}

if (Reader->HasUnreadQueries()) {
Y_ASSERT(it->second);
auto& state = *it->second;
bool wasExhausted = state.IsExhausted();
Reader->UpdateState(state, useful);
if (!state.IsExhausted()) {
state.ReadContinuePending = true;
ctx.Send(
Self->SelfId(),
new TEvDataShard::TEvReadContinue(ReadId.Sender, ReadId.ReadId));
} else {
} else if (!wasExhausted) {
Self->IncCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT);
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID()
<< " read iterator# " << ReadId << " exhausted");
Expand Down Expand Up @@ -2807,14 +2849,19 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext&
bool wasExhausted = state.IsExhausted();
state.UpQuota(
record.GetSeqNo(),
record.GetMaxRows(),
record.GetMaxBytes());
record.HasMaxRows() ? record.GetMaxRows() : Max<ui64>(),
record.HasMaxBytes() ? record.GetMaxBytes() : Max<ui64>());

if (wasExhausted && !state.IsExhausted()) {
DecCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT);
ctx.Send(
SelfId(),
new TEvDataShard::TEvReadContinue(ev->Sender, record.GetReadId()));
if (!state.ReadContinuePending) {
state.ReadContinuePending = true;
ctx.Send(
SelfId(),
new TEvDataShard::TEvReadContinue(ev->Sender, record.GetReadId()));
}
} else if (!wasExhausted && state.IsExhausted()) {
IncCounter(COUNTER_READ_ITERATORS_EXHAUSTED_COUNT);
}

LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadAck for read iterator# " << readId
Expand Down Expand Up @@ -2943,6 +2990,16 @@ void TDataShard::UnsubscribeReadIteratorSessions(const TActorContext& ctx) {
ReadIteratorSessions.clear();
}

void TDataShard::IncCounterReadIteratorLastKeyReset() {
if (!CounterReadIteratorLastKeyReset) {
CounterReadIteratorLastKeyReset = GetServiceCounters(AppData()->Counters, "tablets")
->GetSubgroup("type", "DataShard")
->GetSubgroup("category", "app")
->GetCounter("DataShard/ReadIteratorLastKeyReset", true);
}
++*CounterReadIteratorLastKeyReset;
}

} // NKikimr::NDataShard

template<>
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3319,6 +3319,10 @@ class TDataShard
bool AllowCancelROwithReadsets() const;

void ResolveTablePath(const TActorContext &ctx);

public:
NMonitoring::TDynamicCounters::TCounterPtr CounterReadIteratorLastKeyReset;
void IncCounterReadIteratorLastKeyReset();
};

NKikimrTxDataShard::TError::EKind ConvertErrCode(NMiniKQL::IEngineFlat::EResult code);
Expand Down
134 changes: 134 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4670,6 +4670,140 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorConsistency) {
"result2: " << result2);
}

template<class TEvType>
class TBlockEvents : public std::deque<typename TEvType::TPtr> {
public:
TBlockEvents(TTestActorRuntime& runtime, std::function<bool(typename TEvType::TPtr&)> condition = {})
: Runtime(runtime)
, Condition(std::move(condition))
, Holder(Runtime.AddObserver<TEvType>(
[this](typename TEvType::TPtr& ev) {
this->Process(ev);
}))
{}

TBlockEvents& Unblock(size_t count = -1) {
while (!this->empty() && count > 0) {
auto& ev = this->front();
IEventHandle* ptr = ev.Get();
UnblockedOnce.insert(ptr);
Runtime.Send(ev.Release(), 0, /* viaActorSystem */ true);
this->pop_front();
--count;
}
return *this;
}

void Stop() {
UnblockedOnce.clear();
Holder.Remove();
}

private:
void Process(typename TEvType::TPtr& ev) {
IEventHandle* ptr = ev.Get();
auto it = UnblockedOnce.find(ptr);
if (it != UnblockedOnce.end()) {
UnblockedOnce.erase(it);
return;
}

if (Condition && !Condition(ev)) {
return;
}

this->emplace_back(std::move(ev));
}

private:
TTestActorRuntime& Runtime;
std::function<bool(typename TEvType::TPtr&)> Condition;
TTestActorRuntime::TEventObserverHolder Holder;
THashSet<IEventHandle*> UnblockedOnce;
};

Y_UNIT_TEST(Bug_7674_IteratorDuplicateRows) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false);
TServer::TPtr server = new TServer(serverSettings);

auto& runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;

CreateShardedTable(server, sender, "/Root", "table-1", 1);

ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10), (2, 20), (3, 30), (4, 40), (5, 50);");
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (6, 60), (7, 70), (8, 80), (9, 90), (10, 100);");
runtime.SimulateSleep(TDuration::Seconds(1));

auto forceSmallChunks = runtime.AddObserver<TEvDataShard::TEvRead>(
[&](TEvDataShard::TEvRead::TPtr& ev) {
auto* msg = ev->Get();
// Force chunks of at most 3 rows
msg->Record.SetMaxRowsInResult(3);
});

TBlockEvents<TEvDataShard::TEvReadAck> blockedAcks(runtime);
TBlockEvents<TEvDataShard::TEvReadResult> blockedResults(runtime);
TBlockEvents<TEvDataShard::TEvReadContinue> blockedContinue(runtime);

auto waitFor = [&](const TString& description, const auto& condition, size_t count = 1) {
while (!condition()) {
UNIT_ASSERT_C(count > 0, "... failed to wait for " << description);
Cerr << "... waiting for " << description << Endl;
TDispatchOptions options;
options.CustomFinalCondition = [&]() {
return condition();
};
runtime.DispatchEvents(options);
--count;
}
};

auto readFuture = KqpSimpleSend(runtime, "SELECT key, value FROM `/Root/table-1` ORDER BY key LIMIT 7");
waitFor("first TEvReadContinue", [&]{ return blockedContinue.size() >= 1; });
waitFor("first TEvReadResult", [&]{ return blockedResults.size() >= 1; });

blockedContinue.Unblock(1);
waitFor("second TEvReadContinue", [&]{ return blockedContinue.size() >= 1; });
waitFor("second TEvReadResult", [&]{ return blockedResults.size() >= 2; });

// We need both results to arrive without pauses
blockedResults.Unblock();

waitFor("both TEvReadAcks", [&]{ return blockedAcks.size() >= 2; });

// Unblock the first TEvReadAck and then pending TEvReadContinue
blockedAcks.Unblock(1);
blockedContinue.Unblock(1);

// Give it some time to trigger the bug
runtime.SimulateSleep(TDuration::MilliSeconds(1));

// Stop blocking everything
blockedAcks.Unblock().Stop();
blockedResults.Unblock().Stop();
blockedContinue.Unblock().Stop();

UNIT_ASSERT_VALUES_EQUAL(
FormatResult(AwaitResponse(runtime, std::move(readFuture))),
"{ items { uint32_value: 1 } items { uint32_value: 10 } }, "
"{ items { uint32_value: 2 } items { uint32_value: 20 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 30 } }, "
"{ items { uint32_value: 4 } items { uint32_value: 40 } }, "
"{ items { uint32_value: 5 } items { uint32_value: 50 } }, "
"{ items { uint32_value: 6 } items { uint32_value: 60 } }, "
"{ items { uint32_value: 7 } items { uint32_value: 70 } }");
}

}

} // namespace NKikimr
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/read_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ struct TReadIteratorState {
TActorId SessionId;
TMonotonic StartTs;
bool IsFinished = false;
bool ReadContinuePending = false;

// note that we send SeqNo's starting from 1
ui64 SeqNo = 0;
Expand Down

0 comments on commit 83603dc

Please sign in to comment.