Skip to content

Commit

Permalink
generic query for load actor (ydb-platform#512)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored and adameat committed Dec 29, 2023
1 parent 7159348 commit a0ac3be
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 4 deletions.
1 change: 1 addition & 0 deletions ydb/core/load_test/config_examples.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ TVector<TConfigTemplate> BuildExamples() {
UniformPartitionsCount: 1000
DeleteTableOnFinish: 1
WorkloadType: 0
QueryType: "data"
Kv: {
InitRowCount: 1000
PartitionsByLoad: true
Expand Down
17 changes: 13 additions & 4 deletions ydb/core/load_test/kqp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct MonitoringData {
ui64 Errors = 0;
};

void SendQueryRequest(const TActorContext& ctx, NYdbWorkload::TQueryInfo& q, const TString& session, const TString& workingDir) {
void SendQueryRequest(const TActorContext& ctx, NYdbWorkload::TQueryInfo& q, const NKikimrKqp::EQueryType queryType, const TString& session, const TString& workingDir) {
TString query_text = TString(q.Query);
auto request = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();

Expand All @@ -55,7 +55,7 @@ void SendQueryRequest(const TActorContext& ctx, NYdbWorkload::TQueryInfo& q, con
request->Record.MutableRequest()->SetDatabase(workingDir);

request->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
request->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
request->Record.MutableRequest()->SetType(queryType);
request->Record.MutableRequest()->SetQuery(query_text);

request->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
Expand Down Expand Up @@ -89,6 +89,7 @@ class TKqpLoadWorker : public TActorBootstrapped<TKqpLoadWorker> {
TString working_dir,
std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> workload_query_gen,
ui64 workload_type,
NKikimrKqp::EQueryType queryType,
ui64 parentTag,
ui64 workerTag,
TInstant endTimestamp,
Expand All @@ -101,6 +102,7 @@ class TKqpLoadWorker : public TActorBootstrapped<TKqpLoadWorker> {
, ParentTag(parentTag)
, WorkerTag(workerTag)
, EndTimestamp(endTimestamp)
, QueryType(queryType)
, LatencyHist(60000, 2)
, Transactions(transactions)
, TransactionsBytesWritten(transactionsBytesWritten)
Expand Down Expand Up @@ -180,7 +182,7 @@ class TKqpLoadWorker : public TActorBootstrapped<TKqpLoadWorker> {

LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Worker Tag# " << ParentTag << "." << WorkerTag << " using session: " << WorkerSession);

SendQueryRequest(ctx, q, WorkerSession, WorkingDir);
SendQueryRequest(ctx, q, QueryType, WorkerSession, WorkingDir);
}

void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
Expand Down Expand Up @@ -214,6 +216,7 @@ class TKqpLoadWorker : public TActorBootstrapped<TKqpLoadWorker> {
TInstant EndTimestamp;
NYdbWorkload::TQueryInfoList Queries;
TString WorkerSession = "wrong sessionId";
NKikimrKqp::EQueryType QueryType;

// monitoring
NHdr::THistogram LatencyHist;
Expand Down Expand Up @@ -242,6 +245,10 @@ class TKqpLoadActor : public TActorBootstrapped<TKqpLoadActor> {
DeleteTableOnFinish = cmd.GetDeleteTableOnFinish();
WorkingDir = cmd.GetWorkingDir();
WorkloadType = cmd.GetWorkloadType();
Y_ABORT_UNLESS(cmd.GetQueryType() == "generic" || cmd.GetQueryType() == "data");
QueryType = cmd.GetQueryType() == "generic"
? NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY
: NKikimrKqp::QUERY_TYPE_SQL_DML;
DurationSeconds = cmd.GetDurationSeconds();
NumOfSessions = cmd.GetNumOfSessions();
IncreaseSessions = cmd.GetIncreaseSessions();
Expand Down Expand Up @@ -530,7 +537,7 @@ class TKqpLoadActor : public TActorBootstrapped<TKqpLoadActor> {
LOG_DEBUG_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag
<< " Creating request for init query, need to exec: " << InitData.size() + 1 << " session: " << TableSession);

SendQueryRequest(ctx, q, TableSession, WorkingDir);
SendQueryRequest(ctx, q, QueryType, TableSession, WorkingDir);
}

void HandleDataQueryResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
Expand Down Expand Up @@ -627,6 +634,7 @@ class TKqpLoadActor : public TActorBootstrapped<TKqpLoadActor> {
WorkingDir,
WorkloadQueryGen,
WorkloadType,
QueryType,
Tag,
Workers.size(),
TestStartTime + TDuration::Seconds(DurationSeconds),
Expand Down Expand Up @@ -659,6 +667,7 @@ class TKqpLoadActor : public TActorBootstrapped<TKqpLoadActor> {
bool IncreaseSessions = false;
size_t ResultsReceived = 0;
NYdbWorkload::EWorkload WorkloadClass;
NKikimrKqp::EQueryType QueryType;

NYdbWorkload::TQueryInfoList InitData;

Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/load_test.proto
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ message TEvLoadTestRequest {
optional bool DeleteTableOnFinish = 6;
optional uint32 UniformPartitionsCount = 7;
optional uint32 WorkloadType = 8;
optional string QueryType = 12;
oneof Workload {
TStockWorkload Stock = 9;
TKvWorkload Kv = 10;
Expand Down

0 comments on commit a0ac3be

Please sign in to comment.