Skip to content

Commit

Permalink
Support part size limit in case of request via kqp proxy service
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik committed Apr 9, 2024
1 parent 61d76b4 commit 1bb2e52
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 1 deletion.
2 changes: 1 addition & 1 deletion ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
}

ui64 GetOutputChunkMaxSize() const {
return QuerySettings.OutputChunkMaxSize;
return RequestCtx ? QuerySettings.OutputChunkMaxSize : Record.GetRequest().GetOutputChunkMaxSize();
}

TDuration GetProgressStatsPeriod() const {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/common/kqp_event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetTimeoutMs(OperationTimeout.MilliSeconds());
}
Record.MutableRequest()->SetIsInternalCall(RequestCtx->IsInternalCall());
Record.MutableRequest()->SetOutputChunkMaxSize(QuerySettings.OutputChunkMaxSize);

RequestCtx.reset();
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/kqp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ message TQueryRequest {
optional string ClientSdkBuildInfo = 31;
optional string ApplicationName = 32;
optional string UserSID = 33;
optional uint64 OutputChunkMaxSize = 34;
}

message TKqpPathIdProto {
Expand Down
137 changes: 137 additions & 0 deletions ydb/tests/functional/kqp/kqp_query_svc/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#include <util/system/env.h>
#include <util/generic/size_literals.h>
#include <library/cpp/testing/unittest/registar.h>

#include <ydb/library/grpc/client/grpc_common.h>
#include <ydb/library/grpc/client/grpc_client_low.h>
#include <ydb/public/api/grpc/ydb_query_v1.grpc.pb.h>
#include <ydb/public/lib/ut_helpers/ut_helpers_query.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/public/sdk/cpp/client/ydb_query/query.h>
#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
#include <ydb/public/sdk/cpp/client/ydb_discovery/discovery.h>

using namespace NYdbGrpc;
using namespace NYdb;
using namespace NYdb::NQuery;
using namespace NYdb::NDiscovery;
using namespace NTestHelpers;

static TString CreateHostWithPort(const TEndpointInfo& info) {
return info.Address + ":" + ToString(info.Port);
}

using TProcessor = NYdbGrpc::IStreamRequestReadProcessor<Ydb::Query::ExecuteQueryResponsePart>::TPtr;
using TProcessorPromise = NThreading::TPromise<TProcessor>;

struct TStats {
ui32 TotalRows = 0;
ui32 TotalBatches = 0;
};

void DoRead(TProcessor processor, Ydb::Query::ExecuteQueryResponsePart* part, Ydb::StatusIds::StatusCode expected,
bool& allDoneOk, TStats& stats, TProcessorPromise promise)
{
processor->Read(part, [&allDoneOk, part, promise, processor, expected, &stats](TGrpcStatus grpcStatus) mutable {
if (grpcStatus.GRpcStatusCode == grpc::StatusCode::OK) {
allDoneOk &= (part->status() == expected);
if (!allDoneOk) {
promise.SetValue(processor);
} else {
stats.TotalBatches++;
stats.TotalRows += part->result_set().rows_size();

DoRead(processor, part, expected, allDoneOk, stats, promise);
}
} else {
promise.SetValue(processor);
}
});
}

// rows, batches
static TStats ExecuteQuery(NYdbGrpc::TGRpcClientLow& clientLow, const TGRpcClientConfig& clientConfig,
const TString& id, const TString& query, int batchLimit, int code, bool& allDoneOk)
{
const Ydb::StatusIds::StatusCode expected = static_cast<Ydb::StatusIds::StatusCode>(code);
auto connection = clientLow.CreateGRpcServiceConnection<Ydb::Query::V1::QueryService>(clientConfig);

Ydb::Query::ExecuteQueryRequest request;
request.set_session_id(id);
request.set_exec_mode(Ydb::Query::EXEC_MODE_EXECUTE);
request.set_response_part_limit_bytes(batchLimit);
request.mutable_query_content()->set_text(query);

auto promise = NThreading::NewPromise<TProcessor>();
TStats stats;

Ydb::Query::ExecuteQueryResponsePart part;
auto cb = [&allDoneOk, promise, expected, &stats, &part](TGrpcStatus grpcStatus, TProcessor processor) mutable {
UNIT_ASSERT(grpcStatus.GRpcStatusCode == grpc::StatusCode::OK);
DoRead(processor, &part, expected, allDoneOk, stats, promise);
};

connection->DoStreamRequest<Ydb::Query::ExecuteQueryRequest, Ydb::Query::ExecuteQueryResponsePart>(
request,
cb,
&Ydb::Query::V1::QueryService::Stub::AsyncExecuteQuery);

promise.GetFuture().GetValueSync()->Cancel();

return stats;
}

Y_UNIT_TEST_SUITE(KqpQueryService)
{
Y_UNIT_TEST(ReplyPartLimitProxyNode)
{
TString connectionString = GetEnv("YDB_ENDPOINT") + "/?database=" + GetEnv("YDB_DATABASE");
auto config = TDriverConfig(connectionString);
auto driver = TDriver(config);
auto db = TQueryClient(driver);

auto client = TDiscoveryClient(driver);
auto res = TDiscoveryClient(driver).ListEndpoints().GetValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
UNIT_ASSERT(res.GetEndpointsInfo().size() > 2);

// Create and attach to session on the node #0
auto sessionId = CreateQuerySession(TGRpcClientConfig(CreateHostWithPort(res.GetEndpointsInfo()[0])));

bool allDoneOk = true;

NYdbGrpc::TGRpcClientLow clientLow;

auto p = CheckAttach(clientLow, TGRpcClientConfig(CreateHostWithPort(res.GetEndpointsInfo()[0])), sessionId, Ydb::StatusIds::SUCCESS, allDoneOk);

Y_DEFER {
p->Cancel();
};

UNIT_ASSERT(allDoneOk);

const TString query = "SELECT * FROM AS_TABLE(ListReplicate(AsStruct(\"12345678\" AS Key), 100000))";

{
// Check range for chunk size settings
ExecuteQuery(clientLow, TGRpcClientConfig(CreateHostWithPort(res.GetEndpointsInfo()[1])),
sessionId, query, 48_MB, Ydb::StatusIds::BAD_REQUEST, allDoneOk);

UNIT_ASSERT(allDoneOk);
}

{
// Check range for chunk size settings
auto stats = ExecuteQuery(clientLow, TGRpcClientConfig(CreateHostWithPort(res.GetEndpointsInfo()[1])),
sessionId, query, 10000, Ydb::StatusIds::SUCCESS, allDoneOk);

UNIT_ASSERT(allDoneOk);
UNIT_ASSERT_VALUES_EQUAL(stats.TotalRows, 100000);
// 100000 rows * 9 (?) byte per row / 10000 chunk size limit -> expect 90 batches
UNIT_ASSERT(stats.TotalBatches >= 90); // but got 91 in our case
UNIT_ASSERT(stats.TotalBatches < 100);
}

p->Cancel();
}
}
26 changes: 26 additions & 0 deletions ydb/tests/functional/kqp/kqp_query_svc/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
UNITTEST()

ENV(YDB_USE_IN_MEMORY_PDISKS=true)

ENV(YDB_ERASURE=block_4-2)

PEERDIR(
library/cpp/threading/local_executor
ydb/public/lib/ut_helpers
ydb/public/sdk/cpp/client/ydb_discovery
ydb/public/sdk/cpp/client/ydb_query
ydb/public/sdk/cpp/client/draft
)

SRCS(
main.cpp
)

INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)

SIZE(MEDIUM)
TIMEOUT(30)

REQUIREMENTS(ram:16)

END()
1 change: 1 addition & 0 deletions ydb/tests/functional/kqp/ya.make
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
RECURSE(
kqp_indexes
kqp_query_session
kqp_query_svc
)

0 comments on commit 1bb2e52

Please sign in to comment.