Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix volatile result sent before it is fully committed (24-1) #2624

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 60 additions & 63 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2006,23 +2006,68 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TTopicTabletTxs& topicTxs) {
TDatashardTxs datashardTxs;

std::vector<ui64> affectedShardsSet;
affectedShardsSet.reserve(datashardTasks.size());

for (auto& [shardId, tasks]: datashardTasks) {
auto [it, success] = datashardTxs.emplace(
shardId,
TasksGraph.GetMeta().Allocate<NKikimrTxDataShard::TKqpTransaction>());

YQL_ENSURE(success, "unexpected duplicates in datashard transactions");
affectedShardsSet.emplace_back(shardId);
NKikimrTxDataShard::TKqpTransaction* dsTxs = it->second;
dsTxs->MutableTasks()->Reserve(tasks.size());
for (auto& task: tasks) {
dsTxs->AddTasks()->Swap(task);
}
}

// Note: when locks map is present it will be mutated to avoid copying data
auto& locksMap = Request.DataShardLocks;
if (!locksMap.empty()) {
YQL_ENSURE(Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback);
}

// Materialize (possibly empty) txs for all shards with locks (either commit or rollback)
for (auto& [shardId, locksList] : locksMap) {
YQL_ENSURE(!locksList.empty(), "unexpected empty locks list in DataShardLocks");

auto it = datashardTxs.find(shardId);
if (it == datashardTxs.end()) {
auto [emplaced, success] = datashardTxs.emplace(
shardId,
TasksGraph.GetMeta().Allocate<NKikimrTxDataShard::TKqpTransaction>());

YQL_ENSURE(success, "unexpected failure to emplace a datashard transaction");
it = emplaced;
}

NKikimrTxDataShard::TKqpTransaction* tx = it->second;
switch (Request.LocksOp) {
case ELocksOp::Commit:
tx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
break;
case ELocksOp::Rollback:
tx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Rollback);
break;
case ELocksOp::Unspecified:
break;
}

// Move lock descriptions to the datashard tx
auto* protoLocks = tx->MutableLocks()->MutableLocks();
protoLocks->Reserve(locksList.size());
bool hasWrites = false;
for (auto& lock : locksList) {
hasWrites = hasWrites || lock.GetHasWrites();
protoLocks->Add(std::move(lock));
}
locksList.clear();

// When locks with writes are committed this commits accumulated effects
if (Request.LocksOp == ELocksOp::Commit && hasWrites) {
ShardsWithEffects.insert(shardId);
YQL_ENSURE(!ReadOnlyTx);
}
}

Request.TopicOperations.BuildTopicTxs(topicTxs);

const bool needRollback = Request.LocksOp == ELocksOp::Rollback;
Expand All @@ -2042,7 +2087,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
// TODO: add support in the future
topicTxs.empty() &&
// We only want to use volatile transactions for multiple shards
(affectedShardsSet.size() + topicTxs.size()) > 1 &&
(datashardTxs.size() + topicTxs.size()) > 1 &&
// We cannot use volatile transactions with persistent channels
// Note: currently persistent channels are never used
!HasPersistentChannels);
Expand All @@ -2055,30 +2100,29 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
// Transactions with topics must always use generic readsets
!topicTxs.empty());

if (auto locksMap = Request.DataShardLocks;
!locksMap.empty() ||
VolatileTx ||
if (!locksMap.empty() || VolatileTx ||
Request.TopicOperations.HasReadOperations())
{
YQL_ENSURE(Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback || VolatileTx);

bool needCommit = Request.LocksOp == ELocksOp::Commit || VolatileTx;

auto locksOp = needCommit
? NKikimrDataEvents::TKqpLocks::Commit
: NKikimrDataEvents::TKqpLocks::Rollback;

absl::flat_hash_set<ui64> sendingShardsSet;
absl::flat_hash_set<ui64> receivingShardsSet;

// Gather shards that need to send/receive readsets (shards with effects)
if (needCommit) {
for (auto& shardId: affectedShardsSet) {
for (auto& [shardId, tx] : datashardTxs) {
if (tx->HasLocks()) {
// Locks may be broken so shards with locks need to send readsets
sendingShardsSet.insert(shardId);
}
if (ShardsWithEffects.contains(shardId)) {
// Volatile transactions may abort effects, so they send readsets
if (VolatileTx) {
sendingShardsSet.insert(shardId);
}
// Effects are only applied when all locks are valid
receivingShardsSet.insert(shardId);
}
}
Expand All @@ -2093,44 +2137,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

// Gather locks that need to be committed or erased
for (auto& [shardId, locksList] : locksMap) {
NKikimrTxDataShard::TKqpTransaction* tx = nullptr;
auto it = datashardTxs.find(shardId);
if (it != datashardTxs.end()) {
tx = it->second;
} else {
auto [eIt, success] = datashardTxs.emplace(
shardId,
TasksGraph.GetMeta().Allocate<NKikimrTxDataShard::TKqpTransaction>());
tx = eIt->second;
}

tx->MutableLocks()->SetOp(locksOp);

if (!locksList.empty()) {
auto* protoLocks = tx->MutableLocks()->MutableLocks();
protoLocks->Reserve(locksList.size());
bool hasWrites = false;
for (auto& lock : locksList) {
hasWrites = hasWrites || lock.GetHasWrites();
protoLocks->Add()->Swap(&lock);
}

if (needCommit) {
// We also send the result on commit
sendingShardsSet.insert(shardId);

if (hasWrites) {
// Tx with uncommitted changes can be aborted due to conflicts,
// so shards with write locks should receive readsets
receivingShardsSet.insert(shardId);
YQL_ENSURE(!ReadOnlyTx);
}
}
}
}

// Encode sending/receiving shards in tx bodies
if (needCommit) {
NProtoBuf::RepeatedField<ui64> sendingShards(sendingShardsSet.begin(), sendingShardsSet.end());
NProtoBuf::RepeatedField<ui64> receivingShards(receivingShardsSet.begin(), receivingShardsSet.end());
Expand All @@ -2139,23 +2146,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
std::sort(receivingShards.begin(), receivingShards.end());

for (auto& [shardId, shardTx] : datashardTxs) {
shardTx->MutableLocks()->SetOp(locksOp);
shardTx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
}

for (auto& [_, tx] : topicTxs) {
switch (locksOp) {
case NKikimrDataEvents::TKqpLocks::Commit:
tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
break;
case NKikimrDataEvents::TKqpLocks::Rollback:
tx.SetOp(NKikimrPQ::TDataTransaction::Rollback);
break;
case NKikimrDataEvents::TKqpLocks::Unspecified:
break;
}

tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
*tx.MutableSendingShards() = sendingShards;
*tx.MutableReceivingShards() = receivingShards;
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/datashard_active_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ TValidatedDataTx::TPtr TActiveTransaction::BuildDataTx(TDataShard *self,
if (!DataTx) {
Y_ABORT_UNLESS(TxBody);
DataTx = std::make_shared<TValidatedDataTx>(self, txc, ctx, GetStepOrder(),
GetReceivedAt(), TxBody, MvccSnapshotRepeatable);
GetReceivedAt(), TxBody, IsMvccSnapshotRepeatable());
if (DataTx->HasStreamResponse())
SetStreamSink(DataTx->GetSink());
}
Expand Down Expand Up @@ -635,7 +635,7 @@ ERestoreDataStatus TActiveTransaction::RestoreTxData(

bool extractKeys = DataTx->IsTxInfoLoaded();
DataTx = std::make_shared<TValidatedDataTx>(self, txc, ctx, GetStepOrder(),
GetReceivedAt(), TxBody, MvccSnapshotRepeatable);
GetReceivedAt(), TxBody, IsMvccSnapshotRepeatable());
if (DataTx->Ready() && extractKeys) {
DataTx->ExtractKeys(true);
}
Expand Down
22 changes: 10 additions & 12 deletions ydb/core/tx/datashard/datashard_ut_common_kqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace NKqpHelpers {
inline TString CreateSessionRPC(TTestActorRuntime& runtime, const TString& database = {}) {
Ydb::Table::CreateSessionRequest request;
auto future = NRpcService::DoLocalRpc<TEvCreateSessionRequest>(
std::move(request), database, "", /* token */ runtime.GetActorSystem(0));
std::move(request), database, /* token */ "", runtime.GetActorSystem(0));
TString sessionId;
auto response = AwaitResponse(runtime, future);
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
Expand Down Expand Up @@ -71,7 +71,7 @@ namespace NKqpHelpers {
TTestActorRuntime& runtime, Ydb::Table::ExecuteDataQueryRequest&& request, const TString& database = {})
{
return NRpcService::DoLocalRpc<TEvExecuteDataQueryRequest>(
std::move(request), database, "" /* token */, runtime.GetActorSystem(0));
std::move(request), database, /* token */ "", runtime.GetActorSystem(0));
}

inline Ydb::Table::ExecuteDataQueryRequest MakeSimpleRequestRPC(
Expand Down Expand Up @@ -119,7 +119,7 @@ namespace NKqpHelpers {
Ydb::Table::DeleteSessionRequest request;
request.set_session_id(sessionId);
auto future = NRpcService::DoLocalRpc<TEvDeleteSessionRequest>(
std::move(request), "", "", /* token */ runtime.GetActorSystem(0));
std::move(request), "", /* token */ "", runtime.GetActorSystem(0));
}

inline THolder<NKqp::TEvKqp::TEvQueryRequest> MakeStreamRequest(
Expand Down Expand Up @@ -168,17 +168,15 @@ namespace NKqpHelpers {
return FormatResult(result);
}

inline TString KqpSimpleExec(TTestActorRuntime& runtime, const TString& query, bool staleRo = false, const TString& database = {}) {
inline auto KqpSimpleSend(TTestActorRuntime& runtime, const TString& query, bool staleRo = false, const TString& database = {}) {
TString sessionId = CreateSessionRPC(runtime, database);
TString txId;
auto response = AwaitResponse(
runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */, staleRo), database));
if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
return TStringBuilder() << "ERROR: " << response.operation().status();
}
Ydb::Table::ExecuteQueryResult result;
response.operation().result().UnpackTo(&result);
return FormatResult(result);
return SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, /* commitTx */ true, staleRo), database);
}

inline TString KqpSimpleExec(TTestActorRuntime& runtime, const TString& query, bool staleRo = false, const TString& database = {}) {
auto response = AwaitResponse(runtime, KqpSimpleSend(runtime, query, staleRo, database));
return FormatResult(response);
}

inline TString KqpSimpleStaleRoExec(TTestActorRuntime& runtime, const TString& query, const TString& database = {}) {
Expand Down
34 changes: 19 additions & 15 deletions ydb/core/tx/datashard/datashard_ut_order.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4279,6 +4279,7 @@ Y_UNIT_TEST(UncommittedReadSetAck) {

bool capturePlanSteps = true;
TVector<THolder<IEventHandle>> capturedPlanSteps;
TVector<ui64> capturedPlanTxIds;
THashSet<ui64> passReadSetTxIds;
ui64 observedReadSets = 0;
TVector<THolder<IEventHandle>> capturedReadSets;
Expand All @@ -4294,6 +4295,12 @@ Y_UNIT_TEST(UncommittedReadSetAck) {
case TEvTxProcessing::TEvPlanStep::EventType: {
if (nodeIndex == 1 && ev->GetRecipientRewrite() == table3actor && capturePlanSteps) {
Cerr << "... captured plan step for table-3" << Endl;
auto* msg = ev->Get<TEvTxProcessing::TEvPlanStep>();
for (const auto& tx : msg->Record.GetTransactions()) {
ui64 txId = tx.GetTxId();
capturedPlanTxIds.push_back(txId);
Cerr << "... captured plan step tx " << txId << " for table-3" << Endl;
}
capturedPlanSteps.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
}
Expand All @@ -4303,6 +4310,12 @@ Y_UNIT_TEST(UncommittedReadSetAck) {
if (nodeIndex == 1 && ev->GetRecipientRewrite() == table3actor) {
auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
ui64 txId = msg->Record.GetTxId();
if ((msg->Record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET) &&
(msg->Record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_DATA))
{
Cerr << "... passing expectation for txid# " << txId << Endl;
break;
}
++observedReadSets;
if (!passReadSetTxIds.contains(txId)) {
Cerr << "... readset for txid# " << txId << " was blocked" << Endl;
Expand Down Expand Up @@ -4353,20 +4366,11 @@ Y_UNIT_TEST(UncommittedReadSetAck) {
}
};

waitFor([&]{ return capturedPlanSteps.size() > 0; }, "plan step");
UNIT_ASSERT_VALUES_EQUAL(capturedPlanSteps.size(), 1u);
ui64 realTxId1, realTxId2;
{
auto* msg = capturedPlanSteps[0]->Get<TEvTxProcessing::TEvPlanStep>();
TVector<ui64> realTxIds;
for (const auto& tx : msg->Record.GetTransactions()) {
realTxIds.emplace_back(tx.GetTxId());
}
UNIT_ASSERT_VALUES_EQUAL(realTxIds.size(), 2u);
std::sort(realTxIds.begin(), realTxIds.end());
realTxId1 = realTxIds.at(0);
realTxId2 = realTxIds.at(1);
}
waitFor([&]{ return capturedPlanTxIds.size() >= 2; }, "captured transactions");
UNIT_ASSERT_C(capturedPlanTxIds.size(), 2u);
std::sort(capturedPlanTxIds.begin(), capturedPlanTxIds.end());
ui64 realTxId1 = capturedPlanTxIds.at(0);
ui64 realTxId2 = capturedPlanTxIds.at(1);

// Unblock and resend the plan step message
capturePlanSteps = false;
Expand All @@ -4375,7 +4379,7 @@ Y_UNIT_TEST(UncommittedReadSetAck) {
}
capturedPlanSteps.clear();

// Wait until there are 2 readset messages
// Wait until there are 2 readset messages (with data)
waitFor([&]{ return capturedReadSets.size() >= 2; }, "initial readsets");
SimulateSleep(runtime, TDuration::MilliSeconds(5));

Expand Down
19 changes: 14 additions & 5 deletions ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ struct TTestHelper {
auto &runtime = *Server->GetRuntime();
Sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE);
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_INFO);

InitRoot(Server, Sender);
Expand Down Expand Up @@ -818,7 +818,11 @@ struct TTestHelper {
break;
}
case TEvTxProcessing::EvReadSet: {
if (dropRS) {
auto* msg = event->Get<TEvTxProcessing::TEvReadSet>();
auto flags = msg->Record.GetFlags();
auto isExpect = flags & NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET;
auto isNoData = flags & NKikimrTx::TEvReadSet::FLAG_NO_DATA;
if (dropRS && !(isExpect && isNoData)) {
result.ReadSets.push_back(std::move(event));
return TTestActorRuntime::EEventAction::DROP;
}
Expand Down Expand Up @@ -852,7 +856,10 @@ struct TTestHelper {
)"));
}

waitFor([&]{ return result.ReadSets.size() == 1; }, "intercepted RS");
const bool usesVolatileTxs = runtime.GetAppData(0).FeatureFlags.GetEnableDataShardVolatileTransactions();
const size_t expectedReadSets = 1 + (finalUpserts && usesVolatileTxs ? 2 : 0);

waitFor([&]{ return result.ReadSets.size() == expectedReadSets; }, "intercepted RS");

// restore original observer (note we used lambda function and stack variables)
Server->GetRuntime()->SetObserverFunc(prevObserverFunc);
Expand Down Expand Up @@ -2576,7 +2583,9 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false);
.SetUseRealThreads(false)
// Blocked volatile transactions block reads, disable
.SetEnableDataShardVolatileTransactions(false);

const ui64 shardCount = 1;
TTestHelper helper(serverSettings, shardCount);
Expand Down Expand Up @@ -3600,7 +3609,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorPageFaults) {
auto& runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE);
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_INFO);
// runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG);

Expand Down
Loading
Loading