Skip to content

Commit

Permalink
HTAP: ProposeTx + EvWrite (ydb-platform#9236)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 committed Oct 25, 2024
1 parent 6655b3e commit deb2359
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 116 deletions.
6 changes: 4 additions & 2 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
}

bool ShouldExecuteDeferredEffects() const {
if (HasUncommittedChangesRead) {
if (HasUncommittedChangesRead || HasOlapTable) {
YQL_ENSURE(EnableImmediateEffects);
return !DeferredEffects.Empty();
}

Expand Down Expand Up @@ -297,7 +298,8 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
}

bool CanDeferEffects() const {
if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution() || HasOlapTable) {
YQL_ENSURE(EnableImmediateEffects);
return false;
}

Expand Down
80 changes: 47 additions & 33 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,22 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
}

TKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpRequestCounters::TPtr counters, bool streamResult,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, const bool useEvWrite, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx)
: TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation, userRequestContext,
statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult)
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo)
: TBase(std::move(request), database, userToken, counters, tableServiceConfig,
userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult)
, AsyncIoFactory(std::move(asyncIoFactory))
, UseEvWrite(useEvWrite)
, UseEvWriteForOltp(tableServiceConfig.GetEnableOltpSink())
, HtapTx(tableServiceConfig.GetEnableHtapTx())
, FederatedQuerySetup(federatedQuerySetup)
, GUCSettings(GUCSettings)
, ShardIdToTableInfo(shardIdToTableInfo)
, HtapTx(htapTx) {
{
Target = creator;

YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED);
Expand Down Expand Up @@ -1445,8 +1445,8 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);

auto getShardTask = [&](ui64 shardId) -> TTask& {
YQL_ENSURE(!UseEvWrite);
auto it = shardTasks.find(shardId);
YQL_ENSURE(!UseEvWriteForOltp);
auto it = shardTasks.find(shardId);
if (it != shardTasks.end()) {
return TasksGraph.GetTask(it->second);
}
Expand Down Expand Up @@ -1577,8 +1577,9 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
}
}

void ExecuteDatashardTransaction(ui64 shardId, NKikimrTxDataShard::TKqpTransaction& kqpTx, const bool isOlap) {
YQL_ENSURE(!UseEvWrite);
void ExecuteDatashardTransaction(ui64 shardId, NKikimrTxDataShard::TKqpTransaction& kqpTx, const bool isOlap)
{
YQL_ENSURE(!UseEvWriteForOltp);
TShardState shardState;
shardState.State = ImmediateTx ? TShardState::EState::Executing : TShardState::EState::Preparing;
shardState.DatashardState.ConstructInPlace();
Expand Down Expand Up @@ -1955,7 +1956,6 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
TDatashardTxs datashardTxs;
TEvWriteTxs evWriteTxs;
BuildDatashardTxs(datashardTasks, datashardTxs, evWriteTxs, topicTxs);
YQL_ENSURE(evWriteTxs.empty() || datashardTxs.empty());

// Single-shard datashard transactions are always immediate
ImmediateTx = (datashardTxs.size() + evWriteTxs.size() + Request.TopicOperations.GetSize() + sourceScanPartitionsCount) <= 1 &&
Expand Down Expand Up @@ -2180,7 +2180,7 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
YQL_ENSURE(!locksList.empty(), "unexpected empty locks list in DataShardLocks");
NKikimrDataEvents::TKqpLocks* locks = nullptr;

if (UseEvWrite) {
if (UseEvWriteForOltp || ShardIdToTableInfo->Get(shardId).IsOlap) {
if (auto it = evWriteTxs.find(shardId); it != evWriteTxs.end()) {
locks = it->second->MutableLocks();
} else {
Expand Down Expand Up @@ -2248,15 +2248,17 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
// Note: currently persistent channels are never used
!HasPersistentChannels &&
// Can't use volatile transactions for EvWrite at current time
!UseEvWrite);
evWriteTxs.empty());

const bool useGenericReadSets = (
// Use generic readsets when feature is explicitly enabled
AppData()->FeatureFlags.GetEnableDataShardGenericReadSets() ||
// Volatile transactions must always use generic readsets
VolatileTx ||
// Transactions with topics must always use generic readsets
!topicTxs.empty());
!topicTxs.empty() ||
// HTAP transactions always use generic readsets
!evWriteTxs.empty());

if (!locksMap.empty() || VolatileTx || Request.TopicOperations.HasReadOperations() || Request.TopicOperations.HasWriteOperations()) {
YQL_ENSURE(Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback || VolatileTx);
Expand Down Expand Up @@ -2375,10 +2377,22 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
for (auto& [shardId, shardTx] : datashardTxs) {
AFL_ENSURE(!columnShardArbiter);
shardTx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
if (arbiter) {
shardTx->MutableLocks()->SetArbiterShard(arbiter);
if (columnShardArbiter) {
shardTx->MutableLocks()->AddSendingShards(*columnShardArbiter);
shardTx->MutableLocks()->AddReceivingShards(*columnShardArbiter);
if (sendingShardsSet.contains(shardId)) {
shardTx->MutableLocks()->AddSendingShards(shardId);
}
if (receivingShardsSet.contains(shardId)) {
shardTx->MutableLocks()->AddReceivingShards(shardId);
}
AFL_ENSURE(!arbiter);
} else {
*shardTx->MutableLocks()->MutableSendingShards() = sendingShards;
*shardTx->MutableLocks()->MutableReceivingShards() = receivingShards;
if (arbiter) {
shardTx->MutableLocks()->SetArbiterShard(arbiter);
}
}
}

Expand Down Expand Up @@ -2747,11 +2761,11 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat

private:
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
bool UseEvWrite = false;
const bool UseEvWriteForOltp = false;
const bool HtapTx = false;
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
const TGUCSettings::TPtr GUCSettings;
TShardIdToTableInfoPtr ShardIdToTableInfo;
const bool HtapTx = false;

bool HasExternalSources = false;
bool SecretSnapshotRequired = false;
Expand Down Expand Up @@ -2792,15 +2806,15 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat

IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, const bool useEvWrite, ui32 statementResultIndex,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx) {
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig,
std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, userRequestContext, useEvWrite, statementResultIndex,
federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx);
const TShardIdToTableInfoPtr& shardIdToTableInfo)
{
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult,
tableServiceConfig, std::move(asyncIoFactory), creator, userRequestContext,
statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo);
}

} // namespace NKqp
Expand Down
9 changes: 3 additions & 6 deletions ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,11 @@ struct TKqpFederatedQuerySetup;

IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool useEvWrite, ui32 statementResultIndex,
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx);
const TShardIdToTableInfoPtr& shardIdToTableInfo);

IActor* CreateKqpSchemeExecuter(
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,
Expand Down
32 changes: 14 additions & 18 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,19 @@ TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken,

IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool useEvWrite, ui32 statementResultIndex,
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx)
const TShardIdToTableInfoPtr& shardIdToTableInfo)
{
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, false,
aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator,
userRequestContext, useEvWrite, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx
);
tableServiceConfig, std::move(asyncIoFactory), creator,
userRequestContext, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo);
}

TMaybe<NKqpProto::TKqpPhyTx::EType> txsType;
Expand All @@ -114,24 +110,24 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
case NKqpProto::TKqpPhyTx::TYPE_DATA:
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, false,
aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator,
userRequestContext, useEvWrite, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx
tableServiceConfig, std::move(asyncIoFactory), creator,
userRequestContext, statementResultIndex,
federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo
);

case NKqpProto::TKqpPhyTx::TYPE_SCAN:
return CreateKqpScanExecuter(
std::move(request), database, userToken, counters, aggregation,
executerRetriesConfig, preparedQuery, chanTransportVersion, userRequestContext,
std::move(request), database, userToken, counters,
tableServiceConfig, preparedQuery, userRequestContext,
statementResultIndex
);

case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
return CreateKqpDataExecuter(
std::move(request), database, userToken, counters, true,
aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator,
userRequestContext, useEvWrite, statementResultIndex,
federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx
tableServiceConfig, std::move(asyncIoFactory), creator,
userRequestContext, statementResultIndex,
federatedQuerySetup, GUCSettings, shardIdToTableInfo
);

default:
Expand Down
23 changes: 8 additions & 15 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig tableServiceConfig,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
ui32 statementResultIndex, ui64 spanVerbosity = 0, TString spanName = "KqpExecuterBase", bool streamResult = false)
: Request(std::move(request))
Expand All @@ -134,16 +132,16 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
, Counters(counters)
, ExecuterSpan(spanVerbosity, std::move(Request.TraceId), spanName)
, Planner(nullptr)
, ExecuterRetriesConfig(executerRetriesConfig)
, AggregationSettings(aggregation)
, ExecuterRetriesConfig(tableServiceConfig.GetExecuterRetriesConfig())
, AggregationSettings(tableServiceConfig.GetAggregationConfig())
, HasOlapTable(false)
, StreamResult(streamResult)
, StatementResultIndex(statementResultIndex)
{
TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId);
TasksGraph.GetMeta().Arena = MakeIntrusive<NActors::TProtoArenaHolder>();
TasksGraph.GetMeta().Database = Database;
TasksGraph.GetMeta().ChannelTransportVersion = chanTransportVersion;
TasksGraph.GetMeta().ChannelTransportVersion = tableServiceConfig.GetChannelTransportVersion();
TasksGraph.GetMeta().UserRequestContext = userRequestContext;
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc, ExecType);
ResponseEv->Orbit = std::move(Request.Orbit);
Expand Down Expand Up @@ -1993,20 +1991,15 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {

IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool useEvWrite, ui32 statementResultIndex,
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx);
const TShardIdToTableInfoPtr& shardIdToTableInfo);

IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig, TPreparedQueryHolder::TConstPtr preparedQuery,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex);

} // namespace NKqp
Expand Down
Loading

0 comments on commit deb2359

Please sign in to comment.