Skip to content

Commit

Permalink
Fix readset acks sent too early in volatile transactions (#1961)
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Feb 16, 2024
1 parent c076edf commit 0c2f282
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 2 deletions.
108 changes: 108 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_volatile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "datashard_ut_common_pq.h"
#include "datashard_active_transaction.h"

#include <ydb/core/base/blobstorage.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>

namespace NKikimr {
Expand Down Expand Up @@ -2106,6 +2107,113 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
"{ items { uint32_value: 6 } items { uint32_value: 6 } }");
}

// Regression test for KIKIMR-21060
Y_UNIT_TEST(DistributedWriteRSNotAckedBeforeCommit) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetDomainPlanResolution(1000)
.SetEnableDataShardVolatileTransactions(true);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);

InitRoot(server, sender);

CreateShardedTable(server, sender, "/Root", "table-1", 1);
CreateShardedTable(server, sender, "/Root", "table-2", 1);

ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10);");
ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20);");

// Block readset exchange
std::vector<std::unique_ptr<IEventHandle>> readSets;
auto blockReadSets = runtime.AddObserver<TEvTxProcessing::TEvReadSet>([&](TEvTxProcessing::TEvReadSet::TPtr& ev) {
Cerr << "... blocking readset" << Endl;
readSets.emplace_back(ev.Release());
});

// Start a distributed write to both tables
TString sessionId = CreateSessionRPC(runtime, "/Root");
auto upsertResult = SendRequest(
runtime,
MakeSimpleRequestRPC(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 30);
UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 40);
)", sessionId, /* txId */ "", /* commitTx */ true),
"/Root");
WaitFor(runtime, [&]{ return readSets.size() >= 4; }, "readsets");

// Stop blocking further readsets
blockReadSets.Remove();

// Sleep a little to make sure everything so far is fully committed
runtime.SimulateSleep(TDuration::Seconds(1));

// Start blocking commits for table-1
const auto shards1 = GetTableShards(server, sender, "/Root/table-1");
UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u);
std::vector<std::unique_ptr<IEventHandle>> putResponses;
auto blockCommits = runtime.AddObserver<TEvBlobStorage::TEvPut>([&](TEvBlobStorage::TEvPut::TPtr& ev) {
auto* msg = ev->Get();
// Drop all put requests for table-1
if (msg->Id.TabletID() == shards1.at(0)) {
// We can't just drop requests, we must reply to it later
putResponses.emplace_back(new IEventHandle(
ev->Sender,
ev->GetRecipientRewrite(),
msg->MakeErrorResponse(NKikimrProto::BLOCKED, "Fake blocked response", 0).release(),
0,
ev->Cookie));
Cerr << "... dropping put " << msg->Id << Endl;
ev.Reset();
}
});

// Unblock readsets
for (auto& ev : readSets) {
runtime.Send(ev.release(), 0, true);
}
readSets.clear();

// Sleep to make sure those readsets are fully processed
// Bug was acknowledging readsets before tx state is fully persisted
runtime.SimulateSleep(TDuration::Seconds(1));

// Transaction will return success even when commits are blocked at this point
Cerr << "... awaiting upsert result" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
FormatResult(AwaitResponse(runtime, std::move(upsertResult))),
"<empty>");

// Now we stop blocking commits and gracefully restart the tablet, all pending commits will be lost
blockCommits.Remove();
for (auto& ev : putResponses) {
runtime.Send(ev.release(), 0, true);
}
Cerr << "... restarting tablet " << shards1.at(0) << Endl;
GracefulRestartTablet(runtime, shards1.at(0), sender);

// We must see all rows as committed, i.e. nothing should be lost
Cerr << "... reading final result" << Endl;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleExec(runtime, R"(
SELECT key, value FROM `/Root/table-1`
UNION ALL
SELECT key, value FROM `/Root/table-2`
ORDER BY key
)"),
"{ items { uint32_value: 1 } items { uint32_value: 10 } }, "
"{ items { uint32_value: 2 } items { uint32_value: 20 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 30 } }, "
"{ items { uint32_value: 4 } items { uint32_value: 40 } }");
}

} // Y_UNIT_TEST_SUITE(DataShardVolatile)

} // namespace NKikimr
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/volatile_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ namespace NKikimr::NDataShard {
}
info->DelayedConfirmations.clear();

// Send delayed acks on commit
// Send delayed acks when changes are persisted
// TODO: maybe move it into a parameter?
struct TDelayedAcksState : public TThrRefBase {
TVector<THolder<IEventHandle>> DelayedAcks;
Expand All @@ -833,7 +833,7 @@ namespace NKikimr::NDataShard {
: DelayedAcks(std::move(info->DelayedAcks))
{}
};
txc.DB.OnCommit([state = MakeIntrusive<TDelayedAcksState>(info)]() {
txc.DB.OnPersistent([state = MakeIntrusive<TDelayedAcksState>(info)]() {
for (auto& ev : state->DelayedAcks) {
TActivationContext::Send(ev.Release());
}
Expand Down

0 comments on commit 0c2f282

Please sign in to comment.