Skip to content

Commit

Permalink
Option to limit batch size (bytes) for query service
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik committed Apr 4, 2024
1 parent 45af022 commit f2427c9
Show file tree
Hide file tree
Showing 17 changed files with 161 additions and 4 deletions.
3 changes: 2 additions & 1 deletion ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
.SetKeepSession(false)
.SetUseCancelAfter(false)
.SetSyntax(syntax)
.SetSupportStreamTrailingResult(true);
.SetSupportStreamTrailingResult(true)
.SetOutputChunkMaxSize(req->response_part_limit_bytes());

auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
QueryAction,
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ struct TQueryRequestSettings {
return *this;
}

TQueryRequestSettings& SetOutputChunkMaxSize(ui64 size) {
OutputChunkMaxSize = size;
return *this;
}

ui64 OutputChunkMaxSize = 0;
bool KeepSession = false;
bool UseCancelAfter = true;
::Ydb::Query::Syntax Syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED;
Expand Down Expand Up @@ -315,6 +321,10 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
return QuerySettings.SupportsStreamTrailingResult;
}

ui64 GetOutputChunkMaxSize() const {
return QuerySettings.OutputChunkMaxSize;
}

TDuration GetProgressStatsPeriod() const {
return ProgressStatsPeriod;
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2418,7 +2418,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
.AsyncIoFactory = AsyncIoFactory,
.AllowSinglePartitionOpt = singlePartitionOptAllowed,
.UserRequestContext = GetUserRequestContext(),
.FederatedQuerySetup = FederatedQuerySetup
.FederatedQuerySetup = FederatedQuerySetup,
.OutputChunkMaxSize = Request.OutputChunkMaxSize
});

auto err = Planner->PlanExecution();
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
, AllowSinglePartitionOpt(args.AllowSinglePartitionOpt)
, UserRequestContext(args.UserRequestContext)
, FederatedQuerySetup(args.FederatedQuerySetup)
, OutputChunkMaxSize(args.OutputChunkMaxSize)
{
if (!Database) {
// a piece of magic for tests
Expand Down Expand Up @@ -198,6 +199,10 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
request.MutableSnapshot()->SetStep(Snapshot.Step);
}

if (OutputChunkMaxSize) {
request.SetOutputChunkMaxSize(OutputChunkMaxSize);
}

return result;
}

Expand Down Expand Up @@ -332,6 +337,7 @@ void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox, bool op

NYql::NDq::TComputeMemoryLimits limits;
limits.ChannelBufferSize = 32_MB; // Depends on NYql::NDq::TDqOutputChannelSettings::ChunkSizeLimit (now 48 MB) with a ratio of 1.5
limits.OutputChunkMaxSize = OutputChunkMaxSize;
limits.MkqlLightProgramMemoryLimit = MkqlMemoryLimit > 0 ? std::min(500_MB, MkqlMemoryLimit) : 500_MB;
limits.MkqlHeavyProgramMemoryLimit = MkqlMemoryLimit > 0 ? std::min(2_GB, MkqlMemoryLimit) : 2_GB;

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class TKqpPlanner {
const bool AllowSinglePartitionOpt;
const TIntrusivePtr<TUserRequestContext>& UserRequestContext;
const std::optional<TKqpFederatedQuerySetup>& FederatedQuerySetup;
const ui64 OutputChunkMaxSize = 0;
};

TKqpPlanner(TKqpPlanner::TArgs&& args);
Expand Down Expand Up @@ -122,6 +123,7 @@ class TKqpPlanner {

TIntrusivePtr<TUserRequestContext> UserRequestContext;
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
const ui64 OutputChunkMaxSize;

public:
static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
.AsyncIoFactory = nullptr,
.AllowSinglePartitionOpt = false,
.UserRequestContext = GetUserRequestContext(),
.FederatedQuerySetup = std::nullopt
.FederatedQuerySetup = std::nullopt,
.OutputChunkMaxSize = Request.OutputChunkMaxSize
});

LOG_D("Execute scan tx, PendingComputeTasks: " << TasksGraph.GetTasks().size());
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/gateway/kqp_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ class IKqpGateway : public NYql::IKikimrGateway {

NTopic::TTopicOperations TopicOperations;

ui64 OutputChunkMaxSize = 0;

bool IsTrailingResultsAllowed() const {
return AllowTrailingResults && (
QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY ||
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/node_service/kqp_node_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {

ui64 txId = msg.GetTxId();
bool isScan = msg.HasSnapshot();
const ui64 outputChunkMaxSize = msg.GetOutputChunkMaxSize();

YQL_ENSURE(msg.GetStartAllOrFail()); // todo: support partial start

Expand Down Expand Up @@ -437,6 +438,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
inputChannelsCount += i.ChannelsSize();
}
memoryLimits.ChannelBufferSize = std::max<ui32>(taskCtx.ChannelSize / std::max<ui32>(1, inputChannelsCount), Config.GetMinChannelBufferSize());
memoryLimits.OutputChunkMaxSize = outputChunkMaxSize;
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "channel_info")
("ch_size", taskCtx.ChannelSize)("ch_count", taskCtx.Channels)("ch_limit", memoryLimits.ChannelBufferSize)
("inputs", dqTask.InputsSize())("input_channels_count", inputChannelsCount);
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ class TKqpQueryState : public TNonCopyable {
return RequestEv->GetUsePublicResponseDataFormat();
}

ui64 GetOutputChunkMaxSize() const {
return RequestEv->GetOutputChunkMaxSize();
}

void UpdateTempTablesState(const TKqpTempTablesState& tempTablesState) {
TempTablesState = std::make_shared<const TKqpTempTablesState>(tempTablesState);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
request.StatsMode = queryState->GetStatsMode();
request.ProgressStatsPeriod = queryState->GetProgressStatsPeriod();
request.QueryType = queryState->GetType();
request.OutputChunkMaxSize = queryState->GetOutputChunkMaxSize();
if (Y_LIKELY(queryState->PreparedQuery)) {
ui64 resultSetsCount = queryState->PreparedQuery->GetPhysicalQuery().ResultBindingsSize();
request.AllowTrailingResults = (resultSetsCount == 1 && queryState->Statements.size() <= 1);
Expand Down
113 changes: 113 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,119 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}
}

std::pair<ui32, ui32> CalcRowsAndBatches(TExecuteQueryIterator& it) {
ui32 totalRows = 0;
ui32 totalBatches = 0;
for (;;) {
auto streamPart = it.ReadNext().GetValueSync();
if (!streamPart.IsSuccess()) {
UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
break;
}

if (streamPart.HasResultSet()) {
auto result = streamPart.ExtractResultSet();
UNIT_ASSERT(!result.Truncated());
totalRows += result.RowsCount();
totalBatches++;
}
}
return {totalRows, totalBatches};
}

Y_UNIT_TEST(FlowControllOnHugeLiteralAsTable) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();

const TString query = "SELECT * FROM AS_TABLE(ListReplicate(AsStruct(\"12345678\" AS Key), 100000))";

//TODO: it looks like this check triggers grpc request proxy request leak
/*
{
// Check range for chunk size settings
auto settings = TExecuteQuerySettings().OutputChunkMaxSize(48_MB);
auto it = db.StreamExecuteQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
auto streamPart = it.ReadNext().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(streamPart.GetStatus(), EStatus::BAD_REQUEST, streamPart.GetIssues().ToString());
}
*/
auto settings = TExecuteQuerySettings().OutputChunkMaxSize(10000);
auto it = db.StreamExecuteQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());

auto [totalRows, totalBatches] = CalcRowsAndBatches(it);

UNIT_ASSERT_VALUES_EQUAL(totalRows, 100000);
// 100000 rows * 9 (?) byte per row / 10000 chunk size limit -> expect 90 batches
UNIT_ASSERT(totalBatches >= 90); // but got 91 in our case
UNIT_ASSERT(totalBatches < 100);
}

TString GetQueryToFillTable(bool longRow) {
TString s = "12345678";
int rows = 100000;
if (longRow) {
rows /= 1000;
s.resize(1000, 'x');
}
return Sprintf("UPSERT INTO test SELECT * FROM AS_TABLE (ListMap(ListEnumerate(ListReplicate(\"%s\", %d)), "
"($x) -> {RETURN AsStruct($x.0 AS Key, $x.1 as Value)}))",
s.c_str(), rows);
}

void DoFlowControllOnHugeRealTable(bool longRow) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();

{
const TString q = "CREATE TABLE test (Key Uint64, Value String, PRIMARY KEY (Key))";
auto r = db.ExecuteQuery(q, TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(r.GetStatus(), EStatus::SUCCESS, r.GetIssues().ToString());
}

{
auto q = GetQueryToFillTable(longRow);
auto r = db.ExecuteQuery(q, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(r.GetStatus(), EStatus::SUCCESS, r.GetIssues().ToString());
}

const TString query = "SELECT * FROM test";
if (longRow) {
// Check the case of limit less than one row size - expect one batch for each row
auto settings = TExecuteQuerySettings().OutputChunkMaxSize(100);
auto it = db.StreamExecuteQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());

auto [totalRows, totalBatches] = CalcRowsAndBatches(it);

UNIT_ASSERT_VALUES_EQUAL(totalRows, 100);
UNIT_ASSERT_VALUES_EQUAL(totalBatches, 100);
}

auto settings = TExecuteQuerySettings().OutputChunkMaxSize(10000);
auto it = db.StreamExecuteQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());

auto [totalRows, totalBatches] = CalcRowsAndBatches(it);

if (longRow) {
UNIT_ASSERT_VALUES_EQUAL(totalRows, 100);
// 100 rows * 1000 byte per row / 10000 chunk size limit -> expect 10 batches
UNIT_ASSERT(10 <= totalBatches);
UNIT_ASSERT(totalBatches < 12);
} else {
UNIT_ASSERT_VALUES_EQUAL(totalRows, 100000);
// 100000 rows * 12 byte per row / 10000 chunk size limit -> expect 120 batches
UNIT_ASSERT(120 <= totalBatches);
UNIT_ASSERT(totalBatches < 122);
}
}

Y_UNIT_TEST_TWIN(FlowControllOnHugeRealTable, LongRow) {
DoFlowControllOnHugeRealTable(LongRow);
}

Y_UNIT_TEST(ExecuteQueryExplicitTxTLI) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/kqp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ message TEvStartKqpTasksRequest {
optional NActorsProto.TActorId ExecuterActorId = 4;
optional TKqpSnapshot Snapshot = 5;
optional bool StartAllOrFail = 6 [default = true];
optional uint64 OutputChunkMaxSize = 7 [default = 0]; // 0 - use some default value
}

message TEvStartKqpTasksResponse {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ struct TComputeMemoryLimits {

ui64 MinMemAllocSize = 30_MB;
ui64 MinMemFreeSize = 30_MB;
ui64 OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;

IMemoryQuotaManager::TPtr MemoryQuotaManager;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,11 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo

TDqTaskRunnerMemoryLimits limits;
limits.ChannelBufferSize = this->MemoryLimits.ChannelBufferSize;
limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
limits.OutputChunkMaxSize = this->MemoryLimits.OutputChunkMaxSize;

if (!limits.OutputChunkMaxSize) {
limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
}

TaskRunner->Prepare(this->Task, limits, execCtx);

Expand Down
3 changes: 3 additions & 0 deletions ydb/public/api/protos/ydb_query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ message ExecuteQueryRequest {
// For queries with multiple result sets, some of them may be computed concurrently.
// If true, parts of different results sets may be interleaved in response stream.
bool concurrent_result_sets = 8;

// Allows to set size limitation (in bytes) for one result part
int64 response_part_limit_bytes = 9 [(Ydb.value) = "[0; 33554432]"];
}

message ResultSetMeta {
Expand Down
4 changes: 4 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryIm
request.set_concurrent_result_sets(*settings.ConcurrentResultSets_);
}

if (settings.OutputChunkMaxSize_) {
request.set_response_part_limit_bytes(*settings.OutputChunkMaxSize_);
}

if (txControl.HasTx()) {
auto requestTxControl = request.mutable_tx_control();
requestTxControl->set_commit_tx(txControl.CommitTx_);
Expand Down
1 change: 1 addition & 0 deletions ydb/public/sdk/cpp/client/ydb_query/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class TExecuteQueryIterator : public TStatus {
using TAsyncExecuteQueryIterator = NThreading::TFuture<TExecuteQueryIterator>;

struct TExecuteQuerySettings : public TRequestSettings<TExecuteQuerySettings> {
FLUENT_SETTING_OPTIONAL(ui32, OutputChunkMaxSize);
FLUENT_SETTING_DEFAULT(ESyntax, Syntax, ESyntax::YqlV1);
FLUENT_SETTING_DEFAULT(EExecMode, ExecMode, EExecMode::Execute);
FLUENT_SETTING_DEFAULT(EStatsMode, StatsMode, EStatsMode::None);
Expand Down

0 comments on commit f2427c9

Please sign in to comment.