Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove MvccTestOutOfOrderRestartLocksSingleWithoutBarrier (#1905) #1907

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 0 additions & 123 deletions ydb/core/tx/datashard/datashard_ut_order.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1662,129 +1662,6 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) {
}
}

Y_UNIT_TEST(MvccTestOutOfOrderRestartLocksSingleWithoutBarrier) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetAppConfig(app)
.SetUseRealThreads(false);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

// This test requires barrier to be disabled
runtime.GetAppData().FeatureFlags.SetDisableDataShardBarrier(true);

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_DEBUG);

const bool usesVolatileTxs = runtime.GetAppData(0).FeatureFlags.GetEnableDataShardVolatileTransactions();

InitRoot(server, sender);

CreateShardedTable(server, sender, "/Root", "table-1", 1);
CreateShardedTable(server, sender, "/Root", "table-2", 1);
auto table1shards = GetTableShards(server, sender, "/Root/table-1");
auto table2shards = GetTableShards(server, sender, "/Root/table-2");

ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 1);"));

TString sessionId = CreateSessionRPC(runtime);

TString txId;
{
auto result = KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
SELECT * FROM `/Root/table-1` WHERE key = 1
UNION ALL
SELECT * FROM `/Root/table-2` WHERE key = 2
ORDER BY key)"));
UNIT_ASSERT_VALUES_EQUAL(
result,
"{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
"{ items { uint32_value: 2 } items { uint32_value: 1 } }");
}

// Capture and block all readset messages
TVector<THolder<IEventHandle>> readSets;
auto captureRS = [&](TAutoPtr<IEventHandle> &event) -> auto {
if (event->GetTypeRewrite() == TEvTxProcessing::EvReadSet) {
readSets.push_back(std::move(event));
return TTestActorRuntime::EEventAction::DROP;
}
return TTestActorRuntime::EEventAction::PROCESS;
};
auto prevObserverFunc = runtime.SetObserverFunc(captureRS);

// Send a commit request, it would block on readset exchange
SendRequest(runtime, MakeSimpleRequestRPC(Q_(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 2);
UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 2))"), sessionId, txId, true));

// Wait until we captured both readsets
const size_t expectedReadSets = usesVolatileTxs ? 4 : 2;
if (readSets.size() < expectedReadSets) {
TDispatchOptions options;
options.FinalEvents.emplace_back(
[&](IEventHandle &) -> bool {
return readSets.size() >= expectedReadSets;
});
runtime.DispatchEvents(options);
}
UNIT_ASSERT_VALUES_EQUAL(readSets.size(), expectedReadSets);

// Reboot table-1 tablet
readSets.clear();
RebootTablet(runtime, table1shards[0], sender);

// Wait until we captured both readsets again
if (readSets.size() < expectedReadSets) {
TDispatchOptions options;
options.FinalEvents.emplace_back(
[&](IEventHandle &) -> bool {
return readSets.size() >= expectedReadSets;
});
runtime.DispatchEvents(options);
}
UNIT_ASSERT_VALUES_EQUAL(readSets.size(), expectedReadSets);

// Select keys 1 and 3, we expect this immediate tx to succeed
// Note that key 3 is not written yet, but we pretend immediate tx
// executes before that waiting transaction (no key 3 yet).
// Note: volatile transactions block reads until they are resolved, so this read is skipped
if (!usesVolatileTxs) {
auto result = KqpSimpleExec(runtime, Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 1 OR key = 3;"));
UNIT_ASSERT_VALUES_EQUAL(result, "{ items { uint32_value: 1 } items { uint32_value: 1 } }");
}

// Upsert key 1, we expect this immediate tx to be executed successfully because it lies to the right on the global timeline
{
auto result = KqpSimpleExec(runtime, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 3);"));
UNIT_ASSERT_VALUES_EQUAL(result, "<empty>");
}

// Upsert key 5, this immediate tx should be executed successfully too
{
auto result = KqpSimpleExec(runtime, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 3);"));
UNIT_ASSERT_VALUES_EQUAL(result, "<empty>");
}

// Release readsets allowing tx to progress
runtime.SetObserverFunc(prevObserverFunc);
for (auto& ev : readSets) {
runtime.Send(ev.Release(), 0, /* viaActorSystem */ true);
}

// Select key 3, we expect a success
{
auto result = KqpSimpleExec(runtime, Q_("SELECT key, value FROM `/Root/table-1` WHERE key = 3;"));
UNIT_ASSERT_VALUES_EQUAL(result, "{ items { uint32_value: 3 } items { uint32_value: 2 } }");
}
}

Y_UNIT_TEST_TWIN(TestOutOfOrderRestartLocksReorderedWithoutBarrier, StreamLookup) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
Expand Down
Loading