From f2427c93da55a7687e9745d65aa1839be6a0482d Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Tue, 2 Apr 2024 14:51:39 +0000 Subject: [PATCH] Option to limit batch size (bytes) for query service --- .../grpc_services/query/rpc_execute_query.cpp | 3 +- ydb/core/kqp/common/events/query.h | 10 ++ .../kqp/executer_actor/kqp_data_executer.cpp | 3 +- ydb/core/kqp/executer_actor/kqp_planner.cpp | 6 + ydb/core/kqp/executer_actor/kqp_planner.h | 2 + .../kqp/executer_actor/kqp_scan_executer.cpp | 3 +- ydb/core/kqp/gateway/kqp_gateway.h | 2 + .../kqp/node_service/kqp_node_service.cpp | 2 + ydb/core/kqp/session_actor/kqp_query_state.h | 4 + .../kqp/session_actor/kqp_session_actor.cpp | 1 + ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 113 ++++++++++++++++++ ydb/core/protos/kqp.proto | 1 + .../yql/dq/actors/compute/dq_compute_actor.h | 1 + .../compute/dq_sync_compute_actor_base.h | 6 +- ydb/public/api/protos/ydb_query.proto | 3 + .../cpp/client/ydb_query/impl/exec_query.cpp | 4 + ydb/public/sdk/cpp/client/ydb_query/query.h | 1 + 17 files changed, 161 insertions(+), 4 deletions(-) diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index ef52c40ceef4..4a42e702804f 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -277,7 +277,8 @@ class TExecuteQueryRPC : public TActorBootstrapped { .SetKeepSession(false) .SetUseCancelAfter(false) .SetSyntax(syntax) - .SetSupportStreamTrailingResult(true); + .SetSupportStreamTrailingResult(true) + .SetOutputChunkMaxSize(req->response_part_limit_bytes()); auto ev = MakeHolder( QueryAction, diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h index 782f7797b9b4..8576a9e23208 100644 --- a/ydb/core/kqp/common/events/query.h +++ b/ydb/core/kqp/common/events/query.h @@ -39,6 +39,12 @@ struct TQueryRequestSettings { return *this; } + TQueryRequestSettings& SetOutputChunkMaxSize(ui64 size) { + OutputChunkMaxSize = size; + return *this; + } + + ui64 OutputChunkMaxSize = 0; bool KeepSession = false; bool UseCancelAfter = true; ::Ydb::Query::Syntax Syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED; @@ -315,6 +321,10 @@ struct TEvQueryRequest: public NActors::TEventLocalPlanExecution(); diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index c5e159ba7f6e..cee268c66698 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -78,6 +78,7 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args) , AllowSinglePartitionOpt(args.AllowSinglePartitionOpt) , UserRequestContext(args.UserRequestContext) , FederatedQuerySetup(args.FederatedQuerySetup) + , OutputChunkMaxSize(args.OutputChunkMaxSize) { if (!Database) { // a piece of magic for tests @@ -198,6 +199,10 @@ std::unique_ptr TKqpPlanner::SerializeReque request.MutableSnapshot()->SetStep(Snapshot.Step); } + if (OutputChunkMaxSize) { + request.SetOutputChunkMaxSize(OutputChunkMaxSize); + } + return result; } @@ -332,6 +337,7 @@ void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox, bool op NYql::NDq::TComputeMemoryLimits limits; limits.ChannelBufferSize = 32_MB; // Depends on NYql::NDq::TDqOutputChannelSettings::ChunkSizeLimit (now 48 MB) with a ratio of 1.5 + limits.OutputChunkMaxSize = OutputChunkMaxSize; limits.MkqlLightProgramMemoryLimit = MkqlMemoryLimit > 0 ? std::min(500_MB, MkqlMemoryLimit) : 500_MB; limits.MkqlHeavyProgramMemoryLimit = MkqlMemoryLimit > 0 ? std::min(2_GB, MkqlMemoryLimit) : 2_GB; diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 74f2eb04a0bf..b9cb5b6a1c10 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -61,6 +61,7 @@ class TKqpPlanner { const bool AllowSinglePartitionOpt; const TIntrusivePtr& UserRequestContext; const std::optional& FederatedQuerySetup; + const ui64 OutputChunkMaxSize = 0; }; TKqpPlanner(TKqpPlanner::TArgs&& args); @@ -122,6 +123,7 @@ class TKqpPlanner { TIntrusivePtr UserRequestContext; const std::optional FederatedQuerySetup; + const ui64 OutputChunkMaxSize; public: static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 48066618418a..98ce4d188c42 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -337,7 +337,8 @@ class TKqpScanExecuter : public TKqpExecuterBase { ui64 txId = msg.GetTxId(); bool isScan = msg.HasSnapshot(); + const ui64 outputChunkMaxSize = msg.GetOutputChunkMaxSize(); YQL_ENSURE(msg.GetStartAllOrFail()); // todo: support partial start @@ -437,6 +438,7 @@ class TKqpNodeService : public TActorBootstrapped { inputChannelsCount += i.ChannelsSize(); } memoryLimits.ChannelBufferSize = std::max(taskCtx.ChannelSize / std::max(1, inputChannelsCount), Config.GetMinChannelBufferSize()); + memoryLimits.OutputChunkMaxSize = outputChunkMaxSize; AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "channel_info") ("ch_size", taskCtx.ChannelSize)("ch_count", taskCtx.Channels)("ch_limit", memoryLimits.ChannelBufferSize) ("inputs", dqTask.InputsSize())("input_channels_count", inputChannelsCount); diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 18a1e82a6e0f..d47aa9c11fd7 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -176,6 +176,10 @@ class TKqpQueryState : public TNonCopyable { return RequestEv->GetUsePublicResponseDataFormat(); } + ui64 GetOutputChunkMaxSize() const { + return RequestEv->GetOutputChunkMaxSize(); + } + void UpdateTempTablesState(const TKqpTempTablesState& tempTablesState) { TempTablesState = std::make_shared(tempTablesState); } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index f08338526737..1a10b5102e84 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -856,6 +856,7 @@ class TKqpSessionActor : public TActorBootstrapped { request.StatsMode = queryState->GetStatsMode(); request.ProgressStatsPeriod = queryState->GetProgressStatsPeriod(); request.QueryType = queryState->GetType(); + request.OutputChunkMaxSize = queryState->GetOutputChunkMaxSize(); if (Y_LIKELY(queryState->PreparedQuery)) { ui64 resultSetsCount = queryState->PreparedQuery->GetPhysicalQuery().ResultBindingsSize(); request.AllowTrailingResults = (resultSetsCount == 1 && queryState->Statements.size() <= 1); diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index ab5971ac69df..8912fb1e8c17 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -235,6 +235,119 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { } } + std::pair CalcRowsAndBatches(TExecuteQueryIterator& it) { + ui32 totalRows = 0; + ui32 totalBatches = 0; + for (;;) { + auto streamPart = it.ReadNext().GetValueSync(); + if (!streamPart.IsSuccess()) { + UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); + break; + } + + if (streamPart.HasResultSet()) { + auto result = streamPart.ExtractResultSet(); + UNIT_ASSERT(!result.Truncated()); + totalRows += result.RowsCount(); + totalBatches++; + } + } + return {totalRows, totalBatches}; + } + + Y_UNIT_TEST(FlowControllOnHugeLiteralAsTable) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + const TString query = "SELECT * FROM AS_TABLE(ListReplicate(AsStruct(\"12345678\" AS Key), 100000))"; + +//TODO: it looks like this check triggers grpc request proxy request leak +/* + { + // Check range for chunk size settings + auto settings = TExecuteQuerySettings().OutputChunkMaxSize(48_MB); + auto it = db.StreamExecuteQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + auto streamPart = it.ReadNext().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(streamPart.GetStatus(), EStatus::BAD_REQUEST, streamPart.GetIssues().ToString()); + } +*/ + auto settings = TExecuteQuerySettings().OutputChunkMaxSize(10000); + auto it = db.StreamExecuteQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + + auto [totalRows, totalBatches] = CalcRowsAndBatches(it); + + UNIT_ASSERT_VALUES_EQUAL(totalRows, 100000); + // 100000 rows * 9 (?) byte per row / 10000 chunk size limit -> expect 90 batches + UNIT_ASSERT(totalBatches >= 90); // but got 91 in our case + UNIT_ASSERT(totalBatches < 100); + } + + TString GetQueryToFillTable(bool longRow) { + TString s = "12345678"; + int rows = 100000; + if (longRow) { + rows /= 1000; + s.resize(1000, 'x'); + } + return Sprintf("UPSERT INTO test SELECT * FROM AS_TABLE (ListMap(ListEnumerate(ListReplicate(\"%s\", %d)), " + "($x) -> {RETURN AsStruct($x.0 AS Key, $x.1 as Value)}))", + s.c_str(), rows); + } + + void DoFlowControllOnHugeRealTable(bool longRow) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + { + const TString q = "CREATE TABLE test (Key Uint64, Value String, PRIMARY KEY (Key))"; + auto r = db.ExecuteQuery(q, TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(r.GetStatus(), EStatus::SUCCESS, r.GetIssues().ToString()); + } + + { + auto q = GetQueryToFillTable(longRow); + auto r = db.ExecuteQuery(q, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(r.GetStatus(), EStatus::SUCCESS, r.GetIssues().ToString()); + } + + const TString query = "SELECT * FROM test"; + if (longRow) { + // Check the case of limit less than one row size - expect one batch for each row + auto settings = TExecuteQuerySettings().OutputChunkMaxSize(100); + auto it = db.StreamExecuteQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + + auto [totalRows, totalBatches] = CalcRowsAndBatches(it); + + UNIT_ASSERT_VALUES_EQUAL(totalRows, 100); + UNIT_ASSERT_VALUES_EQUAL(totalBatches, 100); + } + + auto settings = TExecuteQuerySettings().OutputChunkMaxSize(10000); + auto it = db.StreamExecuteQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + + auto [totalRows, totalBatches] = CalcRowsAndBatches(it); + + if (longRow) { + UNIT_ASSERT_VALUES_EQUAL(totalRows, 100); + // 100 rows * 1000 byte per row / 10000 chunk size limit -> expect 10 batches + UNIT_ASSERT(10 <= totalBatches); + UNIT_ASSERT(totalBatches < 12); + } else { + UNIT_ASSERT_VALUES_EQUAL(totalRows, 100000); + // 100000 rows * 12 byte per row / 10000 chunk size limit -> expect 120 batches + UNIT_ASSERT(120 <= totalBatches); + UNIT_ASSERT(totalBatches < 122); + } + } + + Y_UNIT_TEST_TWIN(FlowControllOnHugeRealTable, LongRow) { + DoFlowControllOnHugeRealTable(LongRow); + } + Y_UNIT_TEST(ExecuteQueryExplicitTxTLI) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 5fbcf123c839..cefbcd6d5e6c 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -562,6 +562,7 @@ message TEvStartKqpTasksRequest { optional NActorsProto.TActorId ExecuterActorId = 4; optional TKqpSnapshot Snapshot = 5; optional bool StartAllOrFail = 6 [default = true]; + optional uint64 OutputChunkMaxSize = 7 [default = 0]; // 0 - use some default value } message TEvStartKqpTasksResponse { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index 88c5d8e458e9..c85bd509e35f 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -360,6 +360,7 @@ struct TComputeMemoryLimits { ui64 MinMemAllocSize = 30_MB; ui64 MinMemFreeSize = 30_MB; + ui64 OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize; IMemoryQuotaManager::TPtr MemoryQuotaManager; }; diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h index e4b7671cfa03..deb5ede71f5c 100644 --- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h +++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h @@ -223,7 +223,11 @@ class TDqSyncComputeActorBase: public TDqComputeActorBaseMemoryLimits.ChannelBufferSize; - limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize; + limits.OutputChunkMaxSize = this->MemoryLimits.OutputChunkMaxSize; + + if (!limits.OutputChunkMaxSize) { + limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize; + } TaskRunner->Prepare(this->Task, limits, execCtx); diff --git a/ydb/public/api/protos/ydb_query.proto b/ydb/public/api/protos/ydb_query.proto index ec0bbe861f44..10d7750995ea 100644 --- a/ydb/public/api/protos/ydb_query.proto +++ b/ydb/public/api/protos/ydb_query.proto @@ -167,6 +167,9 @@ message ExecuteQueryRequest { // For queries with multiple result sets, some of them may be computed concurrently. // If true, parts of different results sets may be interleaved in response stream. bool concurrent_result_sets = 8; + + // Allows to set size limitation (in bytes) for one result part + int64 response_part_limit_bytes = 9 [(Ydb.value) = "[0; 33554432]"]; } message ResultSetMeta { diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp index f78de4ec95c3..a6de4f0c7890 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp @@ -219,6 +219,10 @@ TFuture> StreamExecuteQueryIm request.set_concurrent_result_sets(*settings.ConcurrentResultSets_); } + if (settings.OutputChunkMaxSize_) { + request.set_response_part_limit_bytes(*settings.OutputChunkMaxSize_); + } + if (txControl.HasTx()) { auto requestTxControl = request.mutable_tx_control(); requestTxControl->set_commit_tx(txControl.CommitTx_); diff --git a/ydb/public/sdk/cpp/client/ydb_query/query.h b/ydb/public/sdk/cpp/client/ydb_query/query.h index 2b2be8bd9866..07e0c75237a7 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/query.h +++ b/ydb/public/sdk/cpp/client/ydb_query/query.h @@ -72,6 +72,7 @@ class TExecuteQueryIterator : public TStatus { using TAsyncExecuteQueryIterator = NThreading::TFuture; struct TExecuteQuerySettings : public TRequestSettings { + FLUENT_SETTING_OPTIONAL(ui32, OutputChunkMaxSize); FLUENT_SETTING_DEFAULT(ESyntax, Syntax, ESyntax::YqlV1); FLUENT_SETTING_DEFAULT(EExecMode, ExecMode, EExecMode::Execute); FLUENT_SETTING_DEFAULT(EStatsMode, StatsMode, EStatsMode::None);