diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 69ac30badc98..31feafbf424d 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -833,7 +833,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { if (hasQueryService) { server.AddService(new NGRpcService::TGRpcYdbQueryService(ActorSystem.Get(), Counters, - grpcRequestProxies[0], hasDataStreams.IsRlAllowed())); + grpcRequestProxies, hasDataStreams.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue())); } if (hasLogStore) { diff --git a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp index 61f4e02f6889..d09a196b7af5 100644 --- a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp @@ -466,7 +466,7 @@ Y_UNIT_TEST_SUITE(KqpConstraints) { } } - Y_UNIT_TEST(AlterTableAddNotNullWithDefaultIndexed) { + Y_UNIT_TEST(IndexedTableAndNotNullColumn) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableSequences(false); @@ -583,6 +583,98 @@ Y_UNIT_TEST_SUITE(KqpConstraints) { } + Y_UNIT_TEST(IndexedTableAndNotNullColumnAddNotNullColumn) { + + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableSequences(false); + appConfig.MutableTableServiceConfig()->SetEnableColumnsWithDefault(true); + appConfig.MutableFeatureFlags()->SetEnableAddColumsWithDefaults(true); + + TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig()).SetAppConfig(appConfig)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + auto query = R"( + --!syntax_v1 + CREATE TABLE `/Root/AlterTableAddNotNullColumn` ( + Key Uint32, + Value String, + Value2 Int32 NOT NULL DEFAULT 1, + PRIMARY KEY (Key), + INDEX ByValue GLOBAL ON (Value) COVER (Value2) + ); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, + result.GetIssues().ToString()); + } + + auto fQuery = [&](TString query) -> TString { + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.KeepInQueryCache(true); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + + auto result = + session + .ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), + execSettings) + .ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, + result.GetIssues().ToString()); + if (result.GetResultSets().size() > 0) + return NYdb::FormatResultSetYson(result.GetResultSet(0)); + return ""; + }; + + fQuery(R"( + UPSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (1, "Old"); + )"); + + auto fCompareTable = [&](TString expected) { + TString query = R"( + SELECT * FROM `/Root/AlterTableAddNotNullColumn` ORDER BY Key; + )"; + CompareYson(expected, fQuery(query)); + }; + + fCompareTable(R"( + [ + [[1u];["Old"];1] + ] + )"); + + fQuery(R"( + INSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (2, "New"); + )"); + + fCompareTable(R"( + [ + [[1u];["Old"];1];[[2u];["New"];1] + ] + )"); + + { + auto query = R"( + --!syntax_v1 + ALTER TABLE `/Root/AlterTableAddNotNullColumn` ADD COLUMN Value3 Int32 NOT NULL DEFAULT 7; + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, + result.GetIssues().ToString()); + } + + fCompareTable(R"( + [ + [[1u];["Old"];1;7];[[2u];["New"];1;7] + ] + )"); + + } + Y_UNIT_TEST(AlterTableAddNotNullWithDefault) { NKikimrConfig::TAppConfig appConfig; diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 6ef5ad171ce5..f16b65755776 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -386,7 +386,7 @@ namespace Tests { GRpcServer->AddService(new NQuoter::TRateLimiterGRpcService(system, counters, grpcRequestProxies[0])); GRpcServer->AddService(new NGRpcService::TGRpcDataStreamsService(system, counters, grpcRequestProxies[0], true)); GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxies[0], true)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies, true, 1)); if (Settings->EnableYq) { GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxies[0])); GRpcServer->AddService(new NGRpcService::TGRpcFqPrivateTaskService(system, counters, grpcRequestProxies[0])); diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 0765fe1a29fc..bccb959d30c6 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -371,21 +371,6 @@ class TUploadRowsBase : public TActorBootstrappedResultSet.front(); - for (const auto& index : entry.Indexes) { - switch (index.GetType()) { - case NKikimrSchemeOp::EIndexTypeGlobalAsync: - if (AppData(ctx)->FeatureFlags.GetEnableBulkUpsertToAsyncIndexedTables()) { - continue; - } else { - errorMessage = "Bulk upsert is not supported for tables with indexes"; - return false; - } - default: - errorMessage = "Only async-indexed tables are supported by BulkUpsert"; - return false; - } - } - TVector keyColumnIds; THashMap columnByName; THashSet keyColumnsLeft; @@ -517,6 +502,40 @@ class TUploadRowsBase : public TActorBootstrapped UpdatingValueColumns; + if (UpsertIfExists) { + for(const auto& name: ValueColumnNames) { + UpdatingValueColumns.emplace(name); + } + } + + for (const auto& index : entry.Indexes) { + if (index.GetType() == NKikimrSchemeOp::EIndexTypeGlobalAsync && + AppData(ctx)->FeatureFlags.GetEnableBulkUpsertToAsyncIndexedTables()) { + continue; + } + + bool allowUpdate = UpsertIfExists; + for(auto& column : index.GetKeyColumnNames()) { + allowUpdate &= (UpdatingValueColumns.find(column) == UpdatingValueColumns.end()); + if (!allowUpdate) { + break; + } + } + + for(auto& column : index.GetDataColumnNames()) { + allowUpdate &= (UpdatingValueColumns.find(column) == UpdatingValueColumns.end()); + if (!allowUpdate) { + break; + } + } + + if (!allowUpdate) { + errorMessage = "Only async-indexed tables are supported by BulkUpsert"; + return false; + } + } + if (makeYqbSchema) { Id2Position.clear(); YdbSchema.resize(KeyColumnTypes.size() + ValueColumnTypes.size()); @@ -545,6 +564,12 @@ class TUploadRowsBase : public TActorBootstrapped counters, + const NActors::TActorId& proxyId, + bool rlAllowed, + size_t handlersPerCompletionQueue) + : TGrpcServiceBase(system, counters, proxyId, rlAllowed) + , HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue)) +{ +} + +TGRpcYdbQueryService::TGRpcYdbQueryService(NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TVector& proxies, + bool rlAllowed, + size_t handlersPerCompletionQueue) + : TGrpcServiceBase(system, counters, proxies, rlAllowed) + , HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue)) +{ +} + void TGRpcYdbQueryService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { using namespace Ydb::Query; using namespace NQuery; auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + size_t proxyCounter = 0; #ifdef ADD_REQUEST #error ADD_REQUEST macro already defined #endif -#define ADD_REQUEST(NAME, IN, OUT, ACTION) \ - MakeIntrusive>(this, &Service_, CQ_, \ - [this](NYdbGrpc::IRequestContextBase* ctx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ACTION; \ - }, &Ydb::Query::V1::QueryService::AsyncService::Request ## NAME, \ - #NAME, logger, getCounterBlock("query", #NAME))->Run(); - - ADD_REQUEST(ExecuteQuery, ExecuteQueryRequest, ExecuteQueryResponsePart, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall - (ctx, &DoExecuteQuery, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Auditable})); - }) - - ADD_REQUEST(ExecuteScript, ExecuteScriptRequest, Ydb::Operations::Operation, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall - (ctx, &DoExecuteScript, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Auditable})); - }) - - ADD_REQUEST(FetchScriptResults, FetchScriptResultsRequest, FetchScriptResultsResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall - (ctx, &DoFetchScriptResults, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) - - ADD_REQUEST(CreateSession, CreateSessionRequest, CreateSessionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall - (ctx, &DoCreateSession, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) - - ADD_REQUEST(DeleteSession, DeleteSessionRequest, DeleteSessionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall - (ctx, &DoDeleteSession, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) - - ADD_REQUEST(AttachSession, AttachSessionRequest, SessionState, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall - (ctx, &DoAttachSession, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) - - ADD_REQUEST(BeginTransaction, BeginTransactionRequest, BeginTransactionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall - (ctx, &DoBeginTransaction, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) - - ADD_REQUEST(CommitTransaction, CommitTransactionRequest, CommitTransactionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall - (ctx, &DoCommitTransaction, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) - - ADD_REQUEST(RollbackTransaction, RollbackTransactionRequest, RollbackTransactionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall - (ctx, &DoRollbackTransaction, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) +#define ADD_REQUEST(NAME, IN, OUT, CB, ...) \ + for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \ + for (auto* cq: CQS) { \ + MakeIntrusive>(this, &Service_, cq, \ + [this, proxyCounter](NYdbGrpc::IRequestContextBase* ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcProxies_[proxyCounter % GRpcProxies_.size()], \ + new TGrpcRequestNoOperationCall \ + (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr __VA_OPT__(, TAuditMode::__VA_ARGS__)})); \ + }, &Ydb::Query::V1::QueryService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("query", #NAME))->Run(); \ + ++proxyCounter; \ + } \ + } + + ADD_REQUEST(ExecuteQuery, ExecuteQueryRequest, ExecuteQueryResponsePart, DoExecuteQuery, Auditable); + ADD_REQUEST(ExecuteScript, ExecuteScriptRequest, Ydb::Operations::Operation, DoExecuteScript, Auditable); + ADD_REQUEST(FetchScriptResults, FetchScriptResultsRequest, FetchScriptResultsResponse, DoFetchScriptResults); + ADD_REQUEST(CreateSession, CreateSessionRequest, CreateSessionResponse, DoCreateSession); + ADD_REQUEST(DeleteSession, DeleteSessionRequest, DeleteSessionResponse, DoDeleteSession); + ADD_REQUEST(AttachSession, AttachSessionRequest, SessionState, DoAttachSession); + ADD_REQUEST(BeginTransaction, BeginTransactionRequest, BeginTransactionResponse, DoBeginTransaction); + ADD_REQUEST(CommitTransaction, CommitTransactionRequest, CommitTransactionResponse, DoCommitTransaction); + ADD_REQUEST(RollbackTransaction, RollbackTransactionRequest, RollbackTransactionResponse, DoRollbackTransaction); #undef ADD_REQUEST } diff --git a/ydb/services/ydb/ydb_query.h b/ydb/services/ydb/ydb_query.h index 8dec5cc4fa7f..04e8ab915ee7 100644 --- a/ydb/services/ydb/ydb_query.h +++ b/ydb/services/ydb/ydb_query.h @@ -11,8 +11,25 @@ class TGRpcYdbQueryService public: using TGrpcServiceBase::TGrpcServiceBase; + TGRpcYdbQueryService( + NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const NActors::TActorId& proxyId, + bool rlAllowed, + size_t handlersPerCompletionQueue = 1); + + TGRpcYdbQueryService( + NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TVector& proxies, + bool rlAllowed, + size_t handlersPerCompletionQueue); + private: void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger); + +private: + const size_t HandlersPerCompletionQueue; }; } // namespace NKikimr::NGRpcService