Skip to content

Commit

Permalink
Fix volatile result sent before it is fully committed (24-1) (#2624)
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Mar 12, 2024
1 parent 623524f commit ed139af
Show file tree
Hide file tree
Showing 10 changed files with 336 additions and 108 deletions.
123 changes: 60 additions & 63 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2006,23 +2006,68 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TTopicTabletTxs& topicTxs) {
TDatashardTxs datashardTxs;

std::vector<ui64> affectedShardsSet;
affectedShardsSet.reserve(datashardTasks.size());

for (auto& [shardId, tasks]: datashardTasks) {
auto [it, success] = datashardTxs.emplace(
shardId,
TasksGraph.GetMeta().Allocate<NKikimrTxDataShard::TKqpTransaction>());

YQL_ENSURE(success, "unexpected duplicates in datashard transactions");
affectedShardsSet.emplace_back(shardId);
NKikimrTxDataShard::TKqpTransaction* dsTxs = it->second;
dsTxs->MutableTasks()->Reserve(tasks.size());
for (auto& task: tasks) {
dsTxs->AddTasks()->Swap(task);
}
}

// Note: when locks map is present it will be mutated to avoid copying data
auto& locksMap = Request.DataShardLocks;
if (!locksMap.empty()) {
YQL_ENSURE(Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback);
}

// Materialize (possibly empty) txs for all shards with locks (either commit or rollback)
for (auto& [shardId, locksList] : locksMap) {
YQL_ENSURE(!locksList.empty(), "unexpected empty locks list in DataShardLocks");

auto it = datashardTxs.find(shardId);
if (it == datashardTxs.end()) {
auto [emplaced, success] = datashardTxs.emplace(
shardId,
TasksGraph.GetMeta().Allocate<NKikimrTxDataShard::TKqpTransaction>());

YQL_ENSURE(success, "unexpected failure to emplace a datashard transaction");
it = emplaced;
}

NKikimrTxDataShard::TKqpTransaction* tx = it->second;
switch (Request.LocksOp) {
case ELocksOp::Commit:
tx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
break;
case ELocksOp::Rollback:
tx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Rollback);
break;
case ELocksOp::Unspecified:
break;
}

// Move lock descriptions to the datashard tx
auto* protoLocks = tx->MutableLocks()->MutableLocks();
protoLocks->Reserve(locksList.size());
bool hasWrites = false;
for (auto& lock : locksList) {
hasWrites = hasWrites || lock.GetHasWrites();
protoLocks->Add(std::move(lock));
}
locksList.clear();

// When locks with writes are committed this commits accumulated effects
if (Request.LocksOp == ELocksOp::Commit && hasWrites) {
ShardsWithEffects.insert(shardId);
YQL_ENSURE(!ReadOnlyTx);
}
}

Request.TopicOperations.BuildTopicTxs(topicTxs);

const bool needRollback = Request.LocksOp == ELocksOp::Rollback;
Expand All @@ -2042,7 +2087,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
// TODO: add support in the future
topicTxs.empty() &&
// We only want to use volatile transactions for multiple shards
(affectedShardsSet.size() + topicTxs.size()) > 1 &&
(datashardTxs.size() + topicTxs.size()) > 1 &&
// We cannot use volatile transactions with persistent channels
// Note: currently persistent channels are never used
!HasPersistentChannels);
Expand All @@ -2055,30 +2100,29 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
// Transactions with topics must always use generic readsets
!topicTxs.empty());

if (auto locksMap = Request.DataShardLocks;
!locksMap.empty() ||
VolatileTx ||
if (!locksMap.empty() || VolatileTx ||
Request.TopicOperations.HasReadOperations())
{
YQL_ENSURE(Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback || VolatileTx);

bool needCommit = Request.LocksOp == ELocksOp::Commit || VolatileTx;

auto locksOp = needCommit
? NKikimrDataEvents::TKqpLocks::Commit
: NKikimrDataEvents::TKqpLocks::Rollback;

absl::flat_hash_set<ui64> sendingShardsSet;
absl::flat_hash_set<ui64> receivingShardsSet;

// Gather shards that need to send/receive readsets (shards with effects)
if (needCommit) {
for (auto& shardId: affectedShardsSet) {
for (auto& [shardId, tx] : datashardTxs) {
if (tx->HasLocks()) {
// Locks may be broken so shards with locks need to send readsets
sendingShardsSet.insert(shardId);
}
if (ShardsWithEffects.contains(shardId)) {
// Volatile transactions may abort effects, so they send readsets
if (VolatileTx) {
sendingShardsSet.insert(shardId);
}
// Effects are only applied when all locks are valid
receivingShardsSet.insert(shardId);
}
}
Expand All @@ -2093,44 +2137,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

// Gather locks that need to be committed or erased
for (auto& [shardId, locksList] : locksMap) {
NKikimrTxDataShard::TKqpTransaction* tx = nullptr;
auto it = datashardTxs.find(shardId);
if (it != datashardTxs.end()) {
tx = it->second;
} else {
auto [eIt, success] = datashardTxs.emplace(
shardId,
TasksGraph.GetMeta().Allocate<NKikimrTxDataShard::TKqpTransaction>());
tx = eIt->second;
}

tx->MutableLocks()->SetOp(locksOp);

if (!locksList.empty()) {
auto* protoLocks = tx->MutableLocks()->MutableLocks();
protoLocks->Reserve(locksList.size());
bool hasWrites = false;
for (auto& lock : locksList) {
hasWrites = hasWrites || lock.GetHasWrites();
protoLocks->Add()->Swap(&lock);
}

if (needCommit) {
// We also send the result on commit
sendingShardsSet.insert(shardId);

if (hasWrites) {
// Tx with uncommitted changes can be aborted due to conflicts,
// so shards with write locks should receive readsets
receivingShardsSet.insert(shardId);
YQL_ENSURE(!ReadOnlyTx);
}
}
}
}

// Encode sending/receiving shards in tx bodies
if (needCommit) {
NProtoBuf::RepeatedField<ui64> sendingShards(sendingShardsSet.begin(), sendingShardsSet.end());
NProtoBuf::RepeatedField<ui64> receivingShards(receivingShardsSet.begin(), receivingShardsSet.end());
Expand All @@ -2139,23 +2146,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
std::sort(receivingShards.begin(), receivingShards.end());

for (auto& [shardId, shardTx] : datashardTxs) {
shardTx->MutableLocks()->SetOp(locksOp);
shardTx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
}

for (auto& [_, tx] : topicTxs) {
switch (locksOp) {
case NKikimrDataEvents::TKqpLocks::Commit:
tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
break;
case NKikimrDataEvents::TKqpLocks::Rollback:
tx.SetOp(NKikimrPQ::TDataTransaction::Rollback);
break;
case NKikimrDataEvents::TKqpLocks::Unspecified:
break;
}

tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
*tx.MutableSendingShards() = sendingShards;
*tx.MutableReceivingShards() = receivingShards;
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/datashard_active_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ TValidatedDataTx::TPtr TActiveTransaction::BuildDataTx(TDataShard *self,
if (!DataTx) {
Y_ABORT_UNLESS(TxBody);
DataTx = std::make_shared<TValidatedDataTx>(self, txc, ctx, GetStepOrder(),
GetReceivedAt(), TxBody, MvccSnapshotRepeatable);
GetReceivedAt(), TxBody, IsMvccSnapshotRepeatable());
if (DataTx->HasStreamResponse())
SetStreamSink(DataTx->GetSink());
}
Expand Down Expand Up @@ -635,7 +635,7 @@ ERestoreDataStatus TActiveTransaction::RestoreTxData(

bool extractKeys = DataTx->IsTxInfoLoaded();
DataTx = std::make_shared<TValidatedDataTx>(self, txc, ctx, GetStepOrder(),
GetReceivedAt(), TxBody, MvccSnapshotRepeatable);
GetReceivedAt(), TxBody, IsMvccSnapshotRepeatable());
if (DataTx->Ready() && extractKeys) {
DataTx->ExtractKeys(true);
}
Expand Down
22 changes: 10 additions & 12 deletions ydb/core/tx/datashard/datashard_ut_common_kqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace NKqpHelpers {
inline TString CreateSessionRPC(TTestActorRuntime& runtime, const TString& database = {}) {
Ydb::Table::CreateSessionRequest request;
auto future = NRpcService::DoLocalRpc<TEvCreateSessionRequest>(
std::move(request), database, "", /* token */ runtime.GetActorSystem(0));
std::move(request), database, /* token */ "", runtime.GetActorSystem(0));
TString sessionId;
auto response = AwaitResponse(runtime, future);
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
Expand Down Expand Up @@ -71,7 +71,7 @@ namespace NKqpHelpers {
TTestActorRuntime& runtime, Ydb::Table::ExecuteDataQueryRequest&& request, const TString& database = {})
{
return NRpcService::DoLocalRpc<TEvExecuteDataQueryRequest>(
std::move(request), database, "" /* token */, runtime.GetActorSystem(0));
std::move(request), database, /* token */ "", runtime.GetActorSystem(0));
}

inline Ydb::Table::ExecuteDataQueryRequest MakeSimpleRequestRPC(
Expand Down Expand Up @@ -119,7 +119,7 @@ namespace NKqpHelpers {
Ydb::Table::DeleteSessionRequest request;
request.set_session_id(sessionId);
auto future = NRpcService::DoLocalRpc<TEvDeleteSessionRequest>(
std::move(request), "", "", /* token */ runtime.GetActorSystem(0));
std::move(request), "", /* token */ "", runtime.GetActorSystem(0));
}

inline THolder<NKqp::TEvKqp::TEvQueryRequest> MakeStreamRequest(
Expand Down Expand Up @@ -168,17 +168,15 @@ namespace NKqpHelpers {
return FormatResult(result);
}

inline TString KqpSimpleExec(TTestActorRuntime& runtime, const TString& query, bool staleRo = false, const TString& database = {}) {
inline auto KqpSimpleSend(TTestActorRuntime& runtime, const TString& query, bool staleRo = false, const TString& database = {}) {
TString sessionId = CreateSessionRPC(runtime, database);
TString txId;
auto response = AwaitResponse(
runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */, staleRo), database));
if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
return TStringBuilder() << "ERROR: " << response.operation().status();
}
Ydb::Table::ExecuteQueryResult result;
response.operation().result().UnpackTo(&result);
return FormatResult(result);
return SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, /* commitTx */ true, staleRo), database);
}

inline TString KqpSimpleExec(TTestActorRuntime& runtime, const TString& query, bool staleRo = false, const TString& database = {}) {
auto response = AwaitResponse(runtime, KqpSimpleSend(runtime, query, staleRo, database));
return FormatResult(response);
}

inline TString KqpSimpleStaleRoExec(TTestActorRuntime& runtime, const TString& query, const TString& database = {}) {
Expand Down
34 changes: 19 additions & 15 deletions ydb/core/tx/datashard/datashard_ut_order.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4279,6 +4279,7 @@ Y_UNIT_TEST(UncommittedReadSetAck) {

bool capturePlanSteps = true;
TVector<THolder<IEventHandle>> capturedPlanSteps;
TVector<ui64> capturedPlanTxIds;
THashSet<ui64> passReadSetTxIds;
ui64 observedReadSets = 0;
TVector<THolder<IEventHandle>> capturedReadSets;
Expand All @@ -4294,6 +4295,12 @@ Y_UNIT_TEST(UncommittedReadSetAck) {
case TEvTxProcessing::TEvPlanStep::EventType: {
if (nodeIndex == 1 && ev->GetRecipientRewrite() == table3actor && capturePlanSteps) {
Cerr << "... captured plan step for table-3" << Endl;
auto* msg = ev->Get<TEvTxProcessing::TEvPlanStep>();
for (const auto& tx : msg->Record.GetTransactions()) {
ui64 txId = tx.GetTxId();
capturedPlanTxIds.push_back(txId);
Cerr << "... captured plan step tx " << txId << " for table-3" << Endl;
}
capturedPlanSteps.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
}
Expand All @@ -4303,6 +4310,12 @@ Y_UNIT_TEST(UncommittedReadSetAck) {
if (nodeIndex == 1 && ev->GetRecipientRewrite() == table3actor) {
auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
ui64 txId = msg->Record.GetTxId();
if ((msg->Record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET) &&
(msg->Record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_DATA))
{
Cerr << "... passing expectation for txid# " << txId << Endl;
break;
}
++observedReadSets;
if (!passReadSetTxIds.contains(txId)) {
Cerr << "... readset for txid# " << txId << " was blocked" << Endl;
Expand Down Expand Up @@ -4353,20 +4366,11 @@ Y_UNIT_TEST(UncommittedReadSetAck) {
}
};

waitFor([&]{ return capturedPlanSteps.size() > 0; }, "plan step");
UNIT_ASSERT_VALUES_EQUAL(capturedPlanSteps.size(), 1u);
ui64 realTxId1, realTxId2;
{
auto* msg = capturedPlanSteps[0]->Get<TEvTxProcessing::TEvPlanStep>();
TVector<ui64> realTxIds;
for (const auto& tx : msg->Record.GetTransactions()) {
realTxIds.emplace_back(tx.GetTxId());
}
UNIT_ASSERT_VALUES_EQUAL(realTxIds.size(), 2u);
std::sort(realTxIds.begin(), realTxIds.end());
realTxId1 = realTxIds.at(0);
realTxId2 = realTxIds.at(1);
}
waitFor([&]{ return capturedPlanTxIds.size() >= 2; }, "captured transactions");
UNIT_ASSERT_C(capturedPlanTxIds.size(), 2u);
std::sort(capturedPlanTxIds.begin(), capturedPlanTxIds.end());
ui64 realTxId1 = capturedPlanTxIds.at(0);
ui64 realTxId2 = capturedPlanTxIds.at(1);

// Unblock and resend the plan step message
capturePlanSteps = false;
Expand All @@ -4375,7 +4379,7 @@ Y_UNIT_TEST(UncommittedReadSetAck) {
}
capturedPlanSteps.clear();

// Wait until there are 2 readset messages
// Wait until there are 2 readset messages (with data)
waitFor([&]{ return capturedReadSets.size() >= 2; }, "initial readsets");
SimulateSleep(runtime, TDuration::MilliSeconds(5));

Expand Down
19 changes: 14 additions & 5 deletions ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ struct TTestHelper {
auto &runtime = *Server->GetRuntime();
Sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE);
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_INFO);

InitRoot(Server, Sender);
Expand Down Expand Up @@ -818,7 +818,11 @@ struct TTestHelper {
break;
}
case TEvTxProcessing::EvReadSet: {
if (dropRS) {
auto* msg = event->Get<TEvTxProcessing::TEvReadSet>();
auto flags = msg->Record.GetFlags();
auto isExpect = flags & NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET;
auto isNoData = flags & NKikimrTx::TEvReadSet::FLAG_NO_DATA;
if (dropRS && !(isExpect && isNoData)) {
result.ReadSets.push_back(std::move(event));
return TTestActorRuntime::EEventAction::DROP;
}
Expand Down Expand Up @@ -852,7 +856,10 @@ struct TTestHelper {
)"));
}

waitFor([&]{ return result.ReadSets.size() == 1; }, "intercepted RS");
const bool usesVolatileTxs = runtime.GetAppData(0).FeatureFlags.GetEnableDataShardVolatileTransactions();
const size_t expectedReadSets = 1 + (finalUpserts && usesVolatileTxs ? 2 : 0);

waitFor([&]{ return result.ReadSets.size() == expectedReadSets; }, "intercepted RS");

// restore original observer (note we used lambda function and stack variables)
Server->GetRuntime()->SetObserverFunc(prevObserverFunc);
Expand Down Expand Up @@ -2576,7 +2583,9 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false);
.SetUseRealThreads(false)
// Blocked volatile transactions block reads, disable
.SetEnableDataShardVolatileTransactions(false);

const ui64 shardCount = 1;
TTestHelper helper(serverSettings, shardCount);
Expand Down Expand Up @@ -3600,7 +3609,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorPageFaults) {
auto& runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE);
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_INFO);
// runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG);

Expand Down
Loading

0 comments on commit ed139af

Please sign in to comment.