Skip to content

Commit

Permalink
Added unit test for queries
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Feb 7, 2024
1 parent ddd909d commit 68037dc
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 28 deletions.
38 changes: 17 additions & 21 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,12 +394,18 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
return;
}

if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Request_->SetRuHeader(record.GetConsumedRu());
Ydb::Query::ExecuteQueryResponsePart response;

auto& kqpResponse = record.GetResponse();
if (NeedReportStats(*Request_->GetProtoRequest())) {
hasTrailingMessage = true;
FillQueryStats(*response.mutable_exec_stats(), kqpResponse);
if (NeedReportAst(*Request_->GetProtoRequest())) {
response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst());
}
}

Ydb::Query::ExecuteQueryResponsePart response;
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Request_->SetRuHeader(record.GetConsumedRu());

if (QueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE) {
for(int i = 0; i < kqpResponse.GetYdbResults().size(); i++) {
Expand All @@ -415,25 +421,15 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
hasTrailingMessage = true;
response.mutable_tx_meta()->set_id(kqpResponse.GetTxMeta().id());
}

if (NeedReportStats(*Request_->GetProtoRequest())) {
hasTrailingMessage = true;
FillQueryStats(*response.mutable_exec_stats(), kqpResponse);
if (NeedReportAst(*Request_->GetProtoRequest())) {
response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst());
}
}

if (hasTrailingMessage) {
response.set_status(Ydb::StatusIds::SUCCESS);
response.mutable_issues()->CopyFrom(issueMessage);
TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
ReplySerializedAndFinishStream(record.GetYdbStatus(), std::move(out));
}
}

if (!hasTrailingMessage) {
if (hasTrailingMessage) {
response.set_status(record.GetYdbStatus());
response.mutable_issues()->CopyFrom(issueMessage);
TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
ReplySerializedAndFinishStream(record.GetYdbStatus(), std::move(out));
} else {
NYql::TIssues issues;
NYql::IssuesFromMessage(issueMessage, issues);
ReplyFinishStream(record.GetYdbStatus(), issueMessage);
Expand Down
23 changes: 23 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,29 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_VALUES_EQUAL(totalTasks, 2);
}

Y_UNIT_TEST(ExecStatsAst) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();

auto settings = TExecuteQuerySettings()
.StatsMode(EStatsMode::Full);

std::vector<std::pair<TString, EStatus>> cases = {
{ "SELECT 42 AS test_ast_column", EStatus::SUCCESS },
{ "SELECT test_ast_column FROM TwoShard", EStatus::GENERIC_ERROR },
{ "SELECT UNWRAP(42 / 0) AS test_ast_column", EStatus::PRECONDITION_FAILED },
};

for (const auto& [sql, status] : cases) {
auto result = db.ExecuteQuery(sql, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), status, result.GetIssues().ToString());

UNIT_ASSERT(result.GetStats().Defined());
UNIT_ASSERT(result.GetStats()->GetAst().Defined());
UNIT_ASSERT_STRING_CONTAINS(*result.GetStats()->GetAst(), "test_ast_column");
}
}

Y_UNIT_TEST(Ddl) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
Expand Down
15 changes: 8 additions & 7 deletions ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,21 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
Iterator_.ReadNext().Subscribe([self](TAsyncExecuteQueryPart partFuture) mutable {
auto part = partFuture.ExtractValue();

if (const auto& st = part.GetStats()) {
self->Stats_ = st;
}

if (!part.IsSuccess()) {
TMaybe<TExecStats> stats;
std::swap(self->Stats_, stats);

if (part.EOS()) {
TVector<NYql::TIssue> issues;
TVector<Ydb::ResultSet> resultProtos;
TMaybe<TExecStats> stats;
TMaybe<TTransaction> tx;

std::swap(self->Issues_, issues);
std::swap(self->ResultSets_, resultProtos);
std::swap(self->Stats_, stats);
std::swap(self->Tx_, tx);

TVector<TResultSet> resultSets;
Expand All @@ -160,7 +165,7 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
std::move(tx)
));
} else {
self->Promise_.SetValue(TExecuteQueryResult(std::move(part), {}, {}, {}));
self->Promise_.SetValue(TExecuteQueryResult(std::move(part), {}, std::move(stats), {}));
}

return;
Expand All @@ -185,10 +190,6 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
resultSet.mutable_rows()->Add(inRsProto.rows().begin(), inRsProto.rows().end());
}

if (const auto& st = part.GetStats()) {
self->Stats_ = st;
}

if (const auto& tx = part.GetTransaction()) {
self->Tx_ = tx;
}
Expand Down
10 changes: 10 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_query/stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ TMaybe<TString> TExecStats::GetPlan() const {
return proto.query_plan();
}

TMaybe<TString> TExecStats::GetAst() const {
auto proto = Impl_->Proto;

if (proto.query_ast().empty()) {
return {};
}

return proto.query_ast();
}

TDuration TExecStats::GetTotalDuration() const {
return TDuration::MicroSeconds(Impl_->Proto.total_duration_us());
}
Expand Down
1 change: 1 addition & 0 deletions ydb/public/sdk/cpp/client/ydb_query/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class TExecStats {
TString ToString(bool withPlan = false) const;

TMaybe<TString> GetPlan() const;
TMaybe<TString> GetAst() const;

TDuration GetTotalDuration() const;
TDuration GetTotalCpuTime() const;
Expand Down

0 comments on commit 68037dc

Please sign in to comment.