diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index eeca1aa36882..0611f423bab9 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2117,6 +2117,7 @@ class TKqpDataExecuter : public TKqpExecuterBase sendingShardsSet; absl::flat_hash_set receivingShardsSet; + ui64 arbiter = 0; // Gather shards that need to send/receive readsets (shards with effects) if (needCommit) { @@ -2143,6 +2144,48 @@ class TKqpDataExecuter : public TKqpExecuterBase 4 + // n = 3: 12 -> 8 + // n = 4: 24 -> 12 + // n = 5: 40 -> 16 + // n = 6: 60 -> 20 + // n = 7: 84 -> 24 + // + // The ideal crossover is at n = 4, since the readset count + // doesn't change when going from 3 to 4 shards, but the + // increase in latency may not really be worth it. With n = 5 + // the readset count lowers from 24 to 16 readsets when going + // from 4 to 5 shards. This makes 5 shards potentially cheaper + // than 4 shards when readsets dominate the workload, but at + // the price of possible increase in latency. Too many readsets + // cause interconnect overload and reduce throughput however, + // so we don't want to use a crossover value that is too high. + const size_t minArbiterMeshSize = 5; // TODO: make configurable? + if (VolatileTx && + receivingShardsSet.size() >= minArbiterMeshSize && + AppData()->FeatureFlags.GetEnableVolatileTransactionArbiters()) + { + std::vector candidates; + candidates.reserve(receivingShardsSet.size()); + for (ui64 candidate : receivingShardsSet) { + // Note: all receivers are also senders in volatile transactions + if (Y_LIKELY(sendingShardsSet.contains(candidate))) { + candidates.push_back(candidate); + } + } + if (candidates.size() >= minArbiterMeshSize) { + // Select a random arbiter + ui32 index = RandomNumber(candidates.size()); + arbiter = candidates.at(index); + } + } } // Encode sending/receiving shards in tx bodies @@ -2157,12 +2200,16 @@ class TKqpDataExecuter : public TKqpExecuterBaseMutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit); *shardTx->MutableLocks()->MutableSendingShards() = sendingShards; *shardTx->MutableLocks()->MutableReceivingShards() = receivingShards; + if (arbiter) { + shardTx->MutableLocks()->SetArbiterShard(arbiter); + } } for (auto& [_, tx] : topicTxs) { tx.SetOp(NKikimrPQ::TDataTransaction::Commit); *tx.MutableSendingShards() = sendingShards; *tx.MutableReceivingShards() = receivingShards; + YQL_ENSURE(!arbiter); } } } diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index 30c770825054..538af6a81322 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -129,4 +129,7 @@ message TFeatureFlags { optional bool EnableAccessServiceBulkAuthorization = 114 [default = false]; optional bool EnableAddColumsWithDefaults = 115 [ default = false]; optional bool EnableReplaceIfExistsForExternalEntities = 116 [ default = false]; + optional bool EnableCMSRequestPriorities = 117 [default = false]; + optional bool EnableStableNodeNames = 122 [default = false]; + optional bool EnableVolatileTransactionArbiters = 124 [default = false]; }