diff --git a/ydb/core/kqp/common/kqp_tx.h b/ydb/core/kqp/common/kqp_tx.h index 23dc0069576b..74a543b4022a 100644 --- a/ydb/core/kqp/common/kqp_tx.h +++ b/ydb/core/kqp/common/kqp_tx.h @@ -268,7 +268,8 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { } bool ShouldExecuteDeferredEffects() const { - if (HasUncommittedChangesRead) { + if (HasUncommittedChangesRead || HasOlapTable) { + YQL_ENSURE(EnableImmediateEffects); return !DeferredEffects.Empty(); } @@ -297,7 +298,8 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { } bool CanDeferEffects() const { - if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) { + if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution() || HasOlapTable) { + YQL_ENSURE(EnableImmediateEffects); return false; } diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index c1a1f9b1fa33..9f0c8ec59f94 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -121,22 +121,22 @@ class TKqpDataExecuter: public TKqpExecuterBase& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, + const TIntrusiveConstPtr& userToken, + TKqpRequestCounters::TPtr counters, bool streamResult, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, - const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, - const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const TActorId& creator, - const TIntrusivePtr& userRequestContext, const bool useEvWrite, ui32 statementResultIndex, - const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, - const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx) - : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation, userRequestContext, - statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult) + const TActorId& creator, const TIntrusivePtr& userRequestContext, + ui32 statementResultIndex, const std::optional& federatedQuerySetup, + const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo) + : TBase(std::move(request), database, userToken, counters, tableServiceConfig, + userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult) , AsyncIoFactory(std::move(asyncIoFactory)) - , UseEvWrite(useEvWrite) + , UseEvWriteForOltp(tableServiceConfig.GetEnableOltpSink()) + , HtapTx(tableServiceConfig.GetEnableHtapTx()) , FederatedQuerySetup(federatedQuerySetup) , GUCSettings(GUCSettings) , ShardIdToTableInfo(shardIdToTableInfo) - , HtapTx(htapTx) { + { Target = creator; YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED); @@ -1445,8 +1445,8 @@ class TKqpDataExecuter: public TKqpExecuterBase TTask& { - YQL_ENSURE(!UseEvWrite); - auto it = shardTasks.find(shardId); + YQL_ENSURE(!UseEvWriteForOltp); + auto it = shardTasks.find(shardId); if (it != shardTasks.end()) { return TasksGraph.GetTask(it->second); } @@ -1577,8 +1577,9 @@ class TKqpDataExecuter: public TKqpExecuterBaseGet(shardId).IsOlap) { if (auto it = evWriteTxs.find(shardId); it != evWriteTxs.end()) { locks = it->second->MutableLocks(); } else { @@ -2248,7 +2248,7 @@ class TKqpDataExecuter: public TKqpExecuterBaseMutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit); - *shardTx->MutableLocks()->MutableSendingShards() = sendingShards; - *shardTx->MutableLocks()->MutableReceivingShards() = receivingShards; - if (arbiter) { - shardTx->MutableLocks()->SetArbiterShard(arbiter); + if (columnShardArbiter) { + shardTx->MutableLocks()->AddSendingShards(*columnShardArbiter); + shardTx->MutableLocks()->AddReceivingShards(*columnShardArbiter); + if (sendingShardsSet.contains(shardId)) { + shardTx->MutableLocks()->AddSendingShards(shardId); + } + if (receivingShardsSet.contains(shardId)) { + shardTx->MutableLocks()->AddReceivingShards(shardId); + } + AFL_ENSURE(!arbiter); + } else { + *shardTx->MutableLocks()->MutableSendingShards() = sendingShards; + *shardTx->MutableLocks()->MutableReceivingShards() = receivingShards; + if (arbiter) { + shardTx->MutableLocks()->SetArbiterShard(arbiter); + } } } @@ -2747,11 +2761,11 @@ class TKqpDataExecuter: public TKqpExecuterBase FederatedQuerySetup; const TGUCSettings::TPtr GUCSettings; TShardIdToTableInfoPtr ShardIdToTableInfo; - const bool HtapTx = false; bool HasExternalSources = false; bool SecretSnapshotRequired = false; @@ -2792,15 +2806,15 @@ class TKqpDataExecuter: public TKqpExecuterBase& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, - const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, - const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, - const TIntrusivePtr& userRequestContext, const bool useEvWrite, ui32 statementResultIndex, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, + NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, + const TActorId& creator, const TIntrusivePtr& userRequestContext, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, - const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx) { - return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, - std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, userRequestContext, useEvWrite, statementResultIndex, - federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx); + const TShardIdToTableInfoPtr& shardIdToTableInfo) +{ + return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, + tableServiceConfig, std::move(asyncIoFactory), creator, userRequestContext, + statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 03d739b4f6c0..182b0aa1619d 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -102,14 +102,11 @@ struct TKqpFederatedQuerySetup; IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, - const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, - const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, - const TIntrusivePtr& userRequestContext, - const bool useEvWrite, ui32 statementResultIndex, + const TActorId& creator, const TIntrusivePtr& userRequestContext, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, - const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx); + const TShardIdToTableInfoPtr& shardIdToTableInfo); IActor* CreateKqpSchemeExecuter( TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index c1c617a3a277..89da1bd869d6 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -78,23 +78,19 @@ TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken, IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, - const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, - const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, - const TIntrusivePtr& userRequestContext, - const bool useEvWrite, ui32 statementResultIndex, + const TActorId& creator, const TIntrusivePtr& userRequestContext, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, - const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx) + const TShardIdToTableInfoPtr& shardIdToTableInfo) { if (request.Transactions.empty()) { // commit-only or rollback-only data transaction return CreateKqpDataExecuter( std::move(request), database, userToken, counters, false, - aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, - userRequestContext, useEvWrite, statementResultIndex, - federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx - ); + tableServiceConfig, std::move(asyncIoFactory), creator, + userRequestContext, statementResultIndex, + federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo); } TMaybe txsType; @@ -114,24 +110,24 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt case NKqpProto::TKqpPhyTx::TYPE_DATA: return CreateKqpDataExecuter( std::move(request), database, userToken, counters, false, - aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, - userRequestContext, useEvWrite, statementResultIndex, - federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx + tableServiceConfig, std::move(asyncIoFactory), creator, + userRequestContext, statementResultIndex, + federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo ); case NKqpProto::TKqpPhyTx::TYPE_SCAN: return CreateKqpScanExecuter( - std::move(request), database, userToken, counters, aggregation, - executerRetriesConfig, preparedQuery, chanTransportVersion, userRequestContext, + std::move(request), database, userToken, counters, + tableServiceConfig, preparedQuery, userRequestContext, statementResultIndex ); case NKqpProto::TKqpPhyTx::TYPE_GENERIC: return CreateKqpDataExecuter( std::move(request), database, userToken, counters, true, - aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, - userRequestContext, useEvWrite, statementResultIndex, - federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx + tableServiceConfig, std::move(asyncIoFactory), creator, + userRequestContext, statementResultIndex, + federatedQuerySetup, GUCSettings, shardIdToTableInfo ); default: diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index a3546e919bef..7d235f0ab68c 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -123,9 +123,7 @@ class TKqpExecuterBase : public TActorBootstrapped { TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, - const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, + const NKikimrConfig::TTableServiceConfig tableServiceConfig, const TIntrusivePtr& userRequestContext, ui32 statementResultIndex, ui64 spanVerbosity = 0, TString spanName = "KqpExecuterBase", bool streamResult = false) : Request(std::move(request)) @@ -134,8 +132,8 @@ class TKqpExecuterBase : public TActorBootstrapped { , Counters(counters) , ExecuterSpan(spanVerbosity, std::move(Request.TraceId), spanName) , Planner(nullptr) - , ExecuterRetriesConfig(executerRetriesConfig) - , AggregationSettings(aggregation) + , ExecuterRetriesConfig(tableServiceConfig.GetExecuterRetriesConfig()) + , AggregationSettings(tableServiceConfig.GetAggregationConfig()) , HasOlapTable(false) , StreamResult(streamResult) , StatementResultIndex(statementResultIndex) @@ -143,7 +141,7 @@ class TKqpExecuterBase : public TActorBootstrapped { TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId); TasksGraph.GetMeta().Arena = MakeIntrusive(); TasksGraph.GetMeta().Database = Database; - TasksGraph.GetMeta().ChannelTransportVersion = chanTransportVersion; + TasksGraph.GetMeta().ChannelTransportVersion = tableServiceConfig.GetChannelTransportVersion(); TasksGraph.GetMeta().UserRequestContext = userRequestContext; ResponseEv = std::make_unique(Request.TxAlloc, ExecType); ResponseEv->Orbit = std::move(Request.Orbit); @@ -1993,20 +1991,15 @@ class TKqpExecuterBase : public TActorBootstrapped { IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, - const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, - const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, - const TIntrusivePtr& userRequestContext, - const bool useEvWrite, ui32 statementResultIndex, + const TActorId& creator, const TIntrusivePtr& userRequestContext, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, - const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx); + const TShardIdToTableInfoPtr& shardIdToTableInfo); IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, - const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, TPreparedQueryHolder::TConstPtr preparedQuery, const TIntrusivePtr& userRequestContext, ui32 statementResultIndex); } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index a4b7363fe7a2..6c652f602f58 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -45,13 +45,11 @@ class TKqpScanExecuter : public TKqpExecuterBase& userToken, TKqpRequestCounters::TPtr counters, - const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, TPreparedQueryHolder::TConstPtr preparedQuery, - const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TIntrusivePtr& userRequestContext, ui32 statementResultIndex) - : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation, + : TBase(std::move(request), database, userToken, counters, tableServiceConfig, userRequestContext, statementResultIndex, TWilsonKqp::ScanExecuter, "ScanExecuter", false ) @@ -363,13 +361,11 @@ class TKqpScanExecuter : public TKqpExecuterBase& userToken, TKqpRequestCounters::TPtr counters, - const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, - const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, + const NKikimrConfig::TTableServiceConfig& tableServiceConfig, TPreparedQueryHolder::TConstPtr preparedQuery, const TIntrusivePtr& userRequestContext, ui32 statementResultIndex) { - return new TKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, - preparedQuery, chanTransportVersion, userRequestContext, statementResultIndex); + return new TKqpScanExecuter(std::move(request), database, userToken, counters, tableServiceConfig, + preparedQuery, userRequestContext, statementResultIndex); } } // namespace NKqp diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 715027a5a7e5..787ab042b5bb 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -348,6 +348,11 @@ class TKqpQueryState : public TNonCopyable { return false; } + if (TxCtx->HasOlapTable) { + // HTAP/OLAP transactions always use separate commit. + return false; + } + if (TxCtx->HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) { if (tx && tx->GetHasEffects()) { YQL_ENSURE(tx->ResultsSize() == 0); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index e557230c5f6a..8769dafefcc7 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1232,7 +1232,8 @@ class TKqpSessionActor : public TActorBootstrapped { } else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) { request.AcquireLocksTxId = txCtx.Locks.GetLockTxId(); - if (txCtx.HasUncommittedChangesRead || Config->FeatureFlags.GetEnableForceImmediateEffectsExecution()) { + if (txCtx.HasUncommittedChangesRead || Config->FeatureFlags.GetEnableForceImmediateEffectsExecution() || txCtx.HasOlapTable) { + YQL_ENSURE(txCtx.EnableImmediateEffects); request.UseImmediateEffects = true; } } @@ -1291,29 +1292,12 @@ class TKqpSessionActor : public TActorBootstrapped { request.ResourceManager_ = ResourceManager_; LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize()); - bool useEvWrite = ( - (txCtx->HasOlapTable // olap only - && !txCtx->HasOltpTable - && Settings.TableService.GetEnableOlapSink()) - || (txCtx->HasOltpTable // oltp only - && !txCtx->HasOlapTable - && Settings.TableService.GetEnableOltpSink()) - || (txCtx->HasOlapTable // htap - && txCtx->HasOltpTable - && Settings.TableService.GetEnableOlapSink() - && Settings.TableService.GetEnableHtapTx())) - && (request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_UNDEFINED - || request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY - || request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY - || (!txCtx->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_DML) - || (!txCtx->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_PREPARED_DML)); auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, QueryState ? QueryState->UserToken : TIntrusiveConstPtr(), - RequestCounters, Settings.TableService.GetAggregationConfig(), Settings.TableService.GetExecuterRetriesConfig(), - AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, Settings.TableService.GetChannelTransportVersion(), SelfId(), + RequestCounters, Settings.TableService, + AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, SelfId(), QueryState ? QueryState->UserRequestContext : MakeIntrusive("", Settings.Database, SessionId), - useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings, txCtx->ShardIdToTableInfo, - Settings.TableService.GetEnableHtapTx()); + QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings, txCtx->ShardIdToTableInfo); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 98a4f407d66a..ae2ebf08cda0 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -3037,10 +3037,10 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { } } - Y_UNIT_TEST(TableSink_Htap) { + Y_UNIT_TEST_TWIN(TableSink_Htap, withOltpSink) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); - appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withOltpSink); appConfig.MutableTableServiceConfig()->SetEnableHtapTx(true); auto settings = TKikimrSettings() .SetAppConfig(appConfig) @@ -3053,7 +3053,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { const TString query = R"( CREATE TABLE `/Root/ColumnShard` ( Col1 Uint64 NOT NULL, - Col2 String, + Col2 String NOT NULL, Col3 Int32 NOT NULL, PRIMARY KEY (Col1) ) @@ -3062,7 +3062,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { CREATE TABLE `/Root/DataShard` ( Col1 Uint64 NOT NULL, - Col2 String, + Col2 String NOT NULL, Col3 Int32 NOT NULL, PRIMARY KEY (Col1) ) @@ -3073,12 +3073,13 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); auto client = kikimr.GetQueryClient(); + { auto result = client.ExecuteQuery(R"( UPSERT INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES - (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13); + (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, "test", 13); UPSERT INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES - (10u, "test1", 10), (20u, "test2", 11), (30u, "test3", 12), (40u, NULL, 13); + (10u, "test1", 10), (20u, "test2", 11), (30u, "test3", 12), (40u, "test", 13); INSERT INTO `/Root/ColumnShard` SELECT * FROM `/Root/DataShard`; REPLACE INTO `/Root/DataShard` SELECT * FROM `/Root/ColumnShard`; SELECT * FROM `/Root/ColumnShard` ORDER BY Col1; @@ -3189,13 +3190,12 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { { auto result = client.ExecuteQuery(R"( - SELECT * FROM `/Root/DataShard` ORDER BY Col1; - SELECT * FROM `/Root/ColumnShard` ORDER BY Col1; + SELECT * FROM `/Root/DataShard`; + SELECT * FROM `/Root/ColumnShard`; )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - CompareYson(R"([[10u;"test1";10];[20u;"test2";11];[30u;"test3";12];[40u;"test";13];[101u;"test";101];[102u;"test";101];[103u;"test";101];[104u;"test";101];[1001u;"test";1001];[1002u;"test";1001];[1003u;"test";1001];[1004u;"test";1001]])", - FormatResultSetYson(result.GetResultSet(0))); - CompareYson(R"([[1u;"test";1];[2u;"test";1];[3u;"test";1];[4u;"test";1]])", FormatResultSetYson(result.GetResultSet(1))); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(1))); } { @@ -3210,7 +3210,6 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto result = client.ExecuteQuery(R"( SELECT * FROM `/Root/DataShard`; SELECT * FROM `/Root/ColumnShard`; - SELECT * FROM `/Root/DataShard`; )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));