Skip to content

Commit

Permalink
24-3: Fix resolved timestamp emitted too early for some displaced ups…
Browse files Browse the repository at this point in the history
…erts (#8870)
  • Loading branch information
snaury authored Sep 9, 2024
1 parent fac9434 commit 8d1c399
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 18 deletions.
32 changes: 16 additions & 16 deletions ydb/core/tx/datashard/cdc_stream_heartbeat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,27 +95,27 @@ void TDataShard::EmitHeartbeats() {
return;
}

// We may possibly have more writes at this version
TRowVersion edge = GetMvccTxVersion(EMvccTxMode::ReadWrite);
bool wait = true;

if (const auto& plan = TransQueue.GetPlan()) {
const auto version = Min(plan.begin()->ToRowVersion(), VolatileTxManager.GetMinUncertainVersion());
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) {
return Execute(new TTxCdcStreamEmitHeartbeats(this, version));
}
return;
edge = Min(edge, plan.begin()->ToRowVersion());
wait = false;
}

if (auto version = VolatileTxManager.GetMinUncertainVersion(); !version.IsMax()) {
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) {
return Execute(new TTxCdcStreamEmitHeartbeats(this, version));
}
return;
edge = Min(edge, version);
wait = false;
}

const TRowVersion nextWrite = GetMvccTxVersion(EMvccTxMode::ReadWrite);
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(nextWrite)) {
return Execute(new TTxCdcStreamEmitHeartbeats(this, nextWrite));
if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(edge)) {
return Execute(new TTxCdcStreamEmitHeartbeats(this, edge));
}

WaitPlanStep(lowest.Next().Step);
if (wait) {
WaitPlanStep(lowest.Next().Step);
}
}

void TCdcStreamHeartbeatManager::Reset() {
Expand Down Expand Up @@ -215,7 +215,7 @@ bool TCdcStreamHeartbeatManager::ShouldEmitHeartbeat(const TRowVersion& edge) co
return false;
}

if (Schedule.top().Version > edge) {
if (Schedule.top().Version >= edge) {
return false;
}

Expand All @@ -225,7 +225,7 @@ bool TCdcStreamHeartbeatManager::ShouldEmitHeartbeat(const TRowVersion& edge) co
THashMap<TPathId, TCdcStreamHeartbeatManager::THeartbeatInfo> TCdcStreamHeartbeatManager::EmitHeartbeats(
NTable::TDatabase& db, const TRowVersion& edge)
{
if (Schedule.empty() || Schedule.top().Version > edge) {
if (!ShouldEmitHeartbeat(edge)) {
return {};
}

Expand All @@ -234,7 +234,7 @@ THashMap<TPathId, TCdcStreamHeartbeatManager::THeartbeatInfo> TCdcStreamHeartbea

while (true) {
const auto& top = Schedule.top();
if (top.Version > edge) {
if (top.Version >= edge) {
break;
}

Expand Down
140 changes: 138 additions & 2 deletions ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb/core/persqueue/events/global.h>
#include <ydb/core/persqueue/user_info.h>
#include <ydb/core/persqueue/write_meta.h>
#include <ydb/core/testlib/actors/block_events.h>
#include <ydb/core/tx/scheme_board/events.h>
#include <ydb/core/tx/scheme_board/events_internal.h>
#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
Expand Down Expand Up @@ -1985,7 +1986,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
return result;
}

void WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
TVector<NJson::TJsonValue> WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
while (true) {
const auto records = GetRecords(*server->GetRuntime(), sender, path, 0);
for (ui32 i = 0; i < std::min(records.size(), expected.size()); ++i) {
Expand All @@ -1995,7 +1996,12 @@ Y_UNIT_TEST_SUITE(Cdc) {
if (records.size() >= expected.size()) {
UNIT_ASSERT_VALUES_EQUAL_C(records.size(), expected.size(),
"Unexpected record: " << records.at(expected.size()).second);
break;
TVector<NJson::TJsonValue> values;
for (const auto& pr : records) {
bool ok = NJson::ReadJsonTree(pr.second, &values.emplace_back());
Y_ABORT_UNLESS(ok);
}
return values;
}

SimulateSleep(server, TDuration::Seconds(1));
Expand Down Expand Up @@ -3692,6 +3698,136 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}

Y_UNIT_TEST(ResolvedTimestampForDisplacedUpsert) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
.SetUseRealThreads(false)
.SetDomainName("Root")
);

TDisableDataShardLogBatching disableDataShardLogBatching;

auto& runtime = *server->GetRuntime();
const auto edgeActor = runtime.AllocateEdgeActor();

SetupLogging(runtime);
InitRoot(server, edgeActor);
SetSplitMergePartCountLimit(&runtime, -1);
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());

WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
WithVirtualTimestamps(WithResolvedTimestamps(
TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson)))));

Cerr << "... prepare" << Endl;
WaitForContent(server, edgeActor, "/Root/Table/Stream", {
R"({"resolved":"***"})",
});

KqpSimpleExec(runtime, R"(
UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);
)");

auto records = WaitForContent(server, edgeActor, "/Root/Table/Stream", {
R"({"resolved":"***"})",
R"({"update":{"value":10},"key":[1],"ts":"***"})",
R"({"resolved":"***"})",
});

// Take the final step
ui64 lastStep = records.back()["resolved"][0].GetUInteger();
Cerr << "... last heartbeat at " << lastStep << Endl;

const auto tableId = ResolveTableId(server, edgeActor, "/Root/Table");
const auto shards = GetTableShards(server, edgeActor, "/Root/Table");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);

ui64 coordinator = ChangeStateStorage(Coordinator, server->GetSettings().Domain);
ui64 snapshotStep = lastStep + 3000 - 1;
ForwardToTablet(runtime, coordinator, edgeActor, new TEvTxProxy::TEvRequirePlanSteps(coordinator, snapshotStep));

TBlockEvents<TEvMediatorTimecast::TEvUpdate> blockedUpdates(runtime,
[&](auto& ev) {
return ev->Get()->Record.GetTimeBarrier() > snapshotStep;
});

Cerr << "... performing a read from snapshot just before the next heartbeat" << Endl;
{
auto req = std::make_unique<TEvDataShard::TEvRead>();
{
auto& record = req->Record;
record.SetReadId(1);
record.MutableTableId()->SetOwnerId(tableId.PathId.OwnerId);
record.MutableTableId()->SetTableId(tableId.PathId.LocalPathId);
record.AddColumns(1);
record.AddColumns(2);
record.SetResultFormat(NKikimrDataEvents::FORMAT_CELLVEC);
ui32 key = 1;
TVector<TCell> keys;
keys.push_back(TCell::Make(key));
req->Keys.push_back(TSerializedCellVec(TSerializedCellVec::Serialize(keys)));
record.MutableSnapshot()->SetStep(snapshotStep);
record.MutableSnapshot()->SetTxId(Max<ui64>());
}
ForwardToTablet(runtime, shards.at(0), edgeActor, req.release());
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvReadResult>(edgeActor);
auto* res = ev->Get();
UNIT_ASSERT_VALUES_EQUAL(res->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS);
UNIT_ASSERT_VALUES_EQUAL(res->Record.GetFinished(), true);
Cerr << "... read finished" << Endl;
}
for (int i = 0; i < 10; ++i) {
runtime.SimulateSleep(TDuration::MilliSeconds(1));
}

Cerr << "... starting upsert 1 (expected to displace)" << Endl;
auto upsert1 = KqpSimpleSend(runtime, R"(
UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);
)");
for (int i = 0; i < 10; ++i) {
runtime.SimulateSleep(TDuration::MilliSeconds(1));
}

Cerr << "... starting upsert 2 (expected to displace)" << Endl;
auto upsert2 = KqpSimpleSend(runtime, R"(
UPSERT INTO `/Root/Table` (key, value) VALUES (3, 30);
)");
for (int i = 0; i < 10; ++i) {
runtime.SimulateSleep(TDuration::MilliSeconds(1));
}

Cerr << "... unblocking updates" << Endl;
blockedUpdates.Unblock().Stop();
for (int i = 0; i < 10; ++i) {
runtime.SimulateSleep(TDuration::MilliSeconds(1));
}

Cerr << "... checking the update is logged before the new resolved timestamp" << Endl;
records = WaitForContent(server, edgeActor, "/Root/Table/Stream", {
R"({"resolved":"***"})",
R"({"update":{"value":10},"key":[1],"ts":"***"})",
R"({"resolved":"***"})",
R"({"update":{"value":20},"key":[2],"ts":"***"})",
R"({"update":{"value":30},"key":[3],"ts":"***"})",
R"({"resolved":"***"})",
});

TRowVersion resolved(0, 0);
for (auto& record : records) {
if (record.Has("resolved")) {
resolved.Step = record["resolved"][0].GetUInteger();
resolved.TxId = record["resolved"][1].GetUInteger();
}
if (record.Has("ts")) {
TRowVersion ts(
record["ts"][0].GetUInteger(),
record["ts"][1].GetUInteger());
UNIT_ASSERT_C(resolved < ts,
"Record with ts " << ts << " after resolved " << resolved);
}
}
}

} // Cdc

} // NKikimr
Expand Down

0 comments on commit 8d1c399

Please sign in to comment.