Skip to content

Commit

Permalink
24-2: Support arbiters feature flag for volatile transactions (ydb-pl…
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored and uzhastik committed Jul 5, 2024
1 parent bb9ed8e commit 739f901
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
47 changes: 47 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2117,6 +2117,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

absl::flat_hash_set<ui64> sendingShardsSet;
absl::flat_hash_set<ui64> receivingShardsSet;
ui64 arbiter = 0;

// Gather shards that need to send/receive readsets (shards with effects)
if (needCommit) {
Expand All @@ -2143,6 +2144,48 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (auto tabletIds = Request.TopicOperations.GetReceivingTabletIds()) {
receivingShardsSet.insert(tabletIds.begin(), tabletIds.end());
}

// The current value of 5 is arbitrary. Writing to 5 shards in
// a single transaction is unusual enough, and having latency
// regressions is unlikely. Full mesh readset count grows like
// 2n(n-1), and arbiter reduces it to 4(n-1). Here's a readset
// count table for various small `n`:
//
// n = 2: 4 -> 4
// n = 3: 12 -> 8
// n = 4: 24 -> 12
// n = 5: 40 -> 16
// n = 6: 60 -> 20
// n = 7: 84 -> 24
//
// The ideal crossover is at n = 4, since the readset count
// doesn't change when going from 3 to 4 shards, but the
// increase in latency may not really be worth it. With n = 5
// the readset count lowers from 24 to 16 readsets when going
// from 4 to 5 shards. This makes 5 shards potentially cheaper
// than 4 shards when readsets dominate the workload, but at
// the price of possible increase in latency. Too many readsets
// cause interconnect overload and reduce throughput however,
// so we don't want to use a crossover value that is too high.
const size_t minArbiterMeshSize = 5; // TODO: make configurable?
if (VolatileTx &&
receivingShardsSet.size() >= minArbiterMeshSize &&
AppData()->FeatureFlags.GetEnableVolatileTransactionArbiters())
{
std::vector<ui64> candidates;
candidates.reserve(receivingShardsSet.size());
for (ui64 candidate : receivingShardsSet) {
// Note: all receivers are also senders in volatile transactions
if (Y_LIKELY(sendingShardsSet.contains(candidate))) {
candidates.push_back(candidate);
}
}
if (candidates.size() >= minArbiterMeshSize) {
// Select a random arbiter
ui32 index = RandomNumber<ui32>(candidates.size());
arbiter = candidates.at(index);
}
}
}

// Encode sending/receiving shards in tx bodies
Expand All @@ -2157,12 +2200,16 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
shardTx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
if (arbiter) {
shardTx->MutableLocks()->SetArbiterShard(arbiter);
}
}

for (auto& [_, tx] : topicTxs) {
tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
*tx.MutableSendingShards() = sendingShards;
*tx.MutableReceivingShards() = receivingShards;
YQL_ENSURE(!arbiter);
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,7 @@ message TFeatureFlags {
optional bool EnableAccessServiceBulkAuthorization = 114 [default = false];
optional bool EnableAddColumsWithDefaults = 115 [ default = false];
optional bool EnableReplaceIfExistsForExternalEntities = 116 [ default = false];
optional bool EnableCMSRequestPriorities = 117 [default = false];
optional bool EnableStableNodeNames = 122 [default = false];
optional bool EnableVolatileTransactionArbiters = 124 [default = false];
}

0 comments on commit 739f901

Please sign in to comment.