Skip to content

Commit

Permalink
24-3: Fix volatile transactions getting stuck after a restart (ydb-pl…
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Oct 22, 2024
1 parent 7c22bbb commit 7ff1c86
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 5 deletions.
89 changes: 89 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_volatile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <ydb/core/base/blobstorage.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
#include <ydb/core/testlib/actors/block_events.h>

namespace NKikimr {

Expand Down Expand Up @@ -2988,6 +2989,94 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
"ERROR: ABORTED");
}

Y_UNIT_TEST(UpsertDependenciesShardsRestart) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetEnableDataShardVolatileTransactions(true);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE);

InitRoot(server, sender);

UNIT_ASSERT_VALUES_EQUAL(
KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key))
WITH (PARTITION_AT_KEYS = (10));
)"),
"SUCCESS");

const auto shards = GetTableShards(server, sender, "/Root/table");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);

// We need to fill table with some data
Cerr << "========= Upserting initial values =========" << Endl;
KqpSimpleExec(runtime, R"(
UPSERT INTO `/Root/table` (key, subkey, value)
VALUES (1, 1), (11, 11)
)");

TForceVolatileProposeArbiter forceArbiter(runtime, shards.at(0));
TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlan(runtime,
[actor = ResolveTablet(runtime, shards.at(0))](const auto& ev) {
return ev->GetRecipientRewrite() == actor;
});

Cerr << "========= Starting upsert 1 =========" << Endl;
auto upsertFuture1 = KqpSimpleSend(runtime, R"(
UPSERT INTO `/Root/table` (key, value)
VALUES (2, 2), (12, 12);
)");
runtime.SimulateSleep(TDuration::Seconds(1));

Cerr << "========= Starting upsert 2 =========" << Endl;
auto upsertFuture2 = KqpSimpleSend(runtime, R"(
UPSERT INTO `/Root/table` (key, value)
VALUES (2, 1002), (12, 1012);
)");
runtime.SimulateSleep(TDuration::Seconds(1));

UNIT_ASSERT_VALUES_EQUAL(blockedPlan.size(), 2u);

// We expect transaction to execute at shards[1]
// However at shards[0] it didn't even start due to blocked plans
// Now we need to restart both shards, without giving them a chance to communicate
std::vector<TActorId> shardActors{
ResolveTablet(runtime, shards.at(0)),
ResolveTablet(runtime, shards.at(1)),
};
for (auto& shardActor : shardActors) {
Cerr << "... killing actor " << shardActor << Endl;
// Perform a synchronous send, this makes sure both shards handle TEvPoison before anything else
runtime.Send(new IEventHandle(shardActor, TActorId(), new TEvents::TEvPoison), 0, /* viaActorSystem */ false);
}

blockedPlan.Stop().clear();

// Both queries should abort with UNDETERMINED
Cerr << "... waiting for query results" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
FormatResult(runtime.WaitFuture(std::move(upsertFuture1))),
"ERROR: UNDETERMINED");
UNIT_ASSERT_VALUES_EQUAL(
FormatResult(runtime.WaitFuture(std::move(upsertFuture2))),
"ERROR: UNDETERMINED");

// Split the second shard, which makes sure it's not stuck
Cerr << "========= Splitting shard 2 =========" << Endl;
SetSplitMergePartCountLimit(server->GetRuntime(), -1);
ui64 txId = AsyncSplitTable(server, sender, "/Root/table", shards.at(1), 15);
Cerr << "... split txId# " << txId << " started" << Endl;
WaitTxNotification(server, sender, txId);
Cerr << "... split finished" << Endl;
}

} // Y_UNIT_TEST_SUITE(DataShardVolatile)

} // namespace NKikimr
9 changes: 4 additions & 5 deletions ydb/core/tx/datashard/volatile_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,6 @@ namespace NKikimr::NDataShard {

void TVolatileTxManager::Start(const TActorContext& ctx) {
for (auto& pr : VolatileTxs) {
if (!pr.second->Dependencies.empty()) {
continue;
}
switch (pr.second->State) {
case EVolatileTxState::Waiting:
for (ui64 target : pr.second->Participants) {
Expand Down Expand Up @@ -875,7 +872,7 @@ namespace NKikimr::NDataShard {
if (info->AddCommitted) {
RunCommitCallbacks(info);
}
if (info->Dependencies.empty() && ReadyToDbCommit(info)) {
if (ReadyToDbCommit(info)) {
AddPendingCommit(txId);
}
}
Expand Down Expand Up @@ -926,7 +923,9 @@ namespace NKikimr::NDataShard {
case EVolatileTxState::Waiting:
break;
case EVolatileTxState::Committed:
AddPendingCommit(dependentTxId);
if (ReadyToDbCommit(dependent)) {
AddPendingCommit(dependentTxId);
}
break;
case EVolatileTxState::Aborting:
Y_ABORT("FIXME: unexpected dependency removed from aborting tx");
Expand Down

0 comments on commit 7ff1c86

Please sign in to comment.