Skip to content

Commit

Permalink
Merge 9396e13 into 071fc9a
Browse files Browse the repository at this point in the history
  • Loading branch information
shnikd authored Nov 14, 2024
2 parents 071fc9a + 9396e13 commit d798313
Show file tree
Hide file tree
Showing 17 changed files with 217 additions and 16 deletions.
4 changes: 3 additions & 1 deletion ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
cachePolicy,
nullptr, // operationParams
settings,
req->pool_id());
req->pool_id(),
req->Getcollect_full_diagnostics());

if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0, Span_.GetTraceId())) {
NYql::TIssues issues;
Expand Down Expand Up @@ -394,6 +395,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
hasTrailingMessage = true;
response.mutable_tx_meta()->set_id(kqpResponse.GetTxMeta().id());
}
response.set_query_full_diagnostics(kqpResponse.GetQueryDiagnostics());
}

if (hasTrailingMessage) {
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/grpc_services/rpc_execute_data_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
&req->parameters(),
req->collect_stats(),
req->has_query_cache_policy() ? &req->query_cache_policy() : nullptr,
req->has_operation_params() ? &req->operation_params() : nullptr);
req->has_operation_params() ? &req->operation_params() : nullptr,
NKqp::NPrivateEvents::TQueryRequestSettings(),
"",
req->Getcollect_full_diagnostics());

ReportCostInfo_ = req->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED;

Expand Down Expand Up @@ -203,6 +206,7 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
queryMeta.mutable_parameters_types()->insert({queryParameter.GetName(), parameterType});
}
}
queryResult->set_query_full_diagnostics(kqpResponse.GetQueryDiagnostics());
} catch (const std::exception& ex) {
NYql::TIssues issues;
issues.AddIssue(NYql::ExceptionToIssue(ex));
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
const ::Ydb::Table::QueryCachePolicy* queryCachePolicy,
const ::Ydb::Operations::OperationParams* operationParams,
const TQueryRequestSettings& querySettings = TQueryRequestSettings(),
const TString& poolId = "");
const TString& poolId = "",
bool collectFullDiagnostics = false);

TEvQueryRequest() {
Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
Expand Down Expand Up @@ -395,6 +396,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
TIntrusivePtr<TUserRequestContext> UserRequestContext;
TDuration ProgressStatsPeriod;
std::optional<NResourcePool::TPoolSettings> PoolConfig;
bool CollectFullDiagnostics = false;
};

struct TEvDataQueryStreamPart: public TEventPB<TEvDataQueryStreamPart,
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/kqp/common/kqp_event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest(
const ::Ydb::Table::QueryCachePolicy* queryCachePolicy,
const ::Ydb::Operations::OperationParams* operationParams,
const TQueryRequestSettings& querySettings,
const TString& poolId)
const TString& poolId,
bool collectFullDiagnostics)
: RequestCtx(ctx)
, RequestActorId(requestActorId)
, Database(CanonizePath(ctx->GetDatabaseName().GetOrElse("")))
Expand All @@ -35,6 +36,7 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest(
, QueryCachePolicy(queryCachePolicy)
, HasOperationParams(operationParams)
, QuerySettings(querySettings)
, CollectFullDiagnostics(collectFullDiagnostics)
{
if (HasOperationParams) {
OperationTimeout = GetDuration(operationParams->operation_timeout());
Expand Down Expand Up @@ -107,6 +109,8 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetIsInternalCall(RequestCtx->IsInternalCall());
Record.MutableRequest()->SetOutputChunkMaxSize(QuerySettings.OutputChunkMaxSize);

Record.MutableRequest()->SetCollectDiagnostics(CollectFullDiagnostics);

RequestCtx.reset();
}
}
Expand Down
66 changes: 66 additions & 0 deletions ydb/core/kqp/ut/query/kqp_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,72 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
UNIT_ASSERT_VALUES_EQUAL(counters.RecompileRequestGet()->Val(), 1);
}

Y_UNIT_TEST(ExecuteDataQueryCollectFullDiagnostics) {
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetKqpSettings({setting});

TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

{
UNIT_ASSERT(session.ExecuteSchemeQuery(R"(
CREATE TABLE `/Root/TestTable` (
Key Uint64,
Value String,
PRIMARY KEY (Key)
);
)").GetValueSync().IsSuccess());
}

{
const TString query(Q1_(R"(
SELECT * FROM `/Root/TestTable`;
)"));

{
auto settings = TExecDataQuerySettings();
settings.CollectFullDiagnostics(true);

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString().c_str());

UNIT_ASSERT_C(!result.GetDiagnostics().empty(), "Query result diagnostics is empty");

TStringStream in;
in << result.GetDiagnostics();
NJson::TJsonValue value;
ReadJsonTree(&in, &value);

UNIT_ASSERT_C(value.IsMap(), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_id"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("version"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_text"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_parameter_types"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("table_metadata"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value["table_metadata"].IsArray(), "Incorrect Diagnostics: table_metadata type should be an array");
UNIT_ASSERT_C(value.Has("created_at"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_syntax"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_database"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_cluster"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_plan"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_type"), "Incorrect Diagnostics");
}

{
auto settings = TExecDataQuerySettings();

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString().c_str());

UNIT_ASSERT_C(result.GetDiagnostics().empty(), "Query result diagnostics should be empty, but it's not");
}
}
}

Y_UNIT_TEST(QueryCachePermissionsLoss) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
Expand Down
49 changes: 49 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,55 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}
}

Y_UNIT_TEST(ExecuteCollectFullDiagnostics) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();

{
TExecuteQuerySettings settings;
settings.CollectFullDiagnostics(true);

auto result = db.ExecuteQuery(R"(
SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0;
)", TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_C(!result.GetDiagnostics().empty(), "Query result diagnostics is empty");

TStringStream in;
in << result.GetDiagnostics();
NJson::TJsonValue value;
ReadJsonTree(&in, &value);

UNIT_ASSERT_C(value.IsMap(), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_id"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("version"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_text"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_parameter_types"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("table_metadata"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value["table_metadata"].IsArray(), "Incorrect Diagnostics: table_metadata type should be an array");
UNIT_ASSERT_C(value.Has("created_at"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_syntax"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_database"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_cluster"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_plan"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_type"), "Incorrect Diagnostics");
}

{
TExecuteQuerySettings settings;
settings.CollectFullDiagnostics(true);

auto result = db.ExecuteQuery(R"(
SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0;
)", TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString().c_str());

UNIT_ASSERT_C(result.GetDiagnostics().empty(), "Query result diagnostics should be empty, but it's not");
}
}

void CheckQueryResult(TExecuteQueryResult result) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
Expand Down
5 changes: 5 additions & 0 deletions ydb/public/api/protos/ydb_query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ message ExecuteQueryRequest {
int64 response_part_limit_bytes = 9 [(Ydb.value) = "[0; 33554432]"];

string pool_id = 10; // Workload manager pool id

bool collect_full_diagnostics = 11;
}

message ResultSetMeta {
Expand All @@ -191,6 +193,9 @@ message ExecuteQueryResponsePart {
Ydb.TableStats.QueryStats exec_stats = 5;

TransactionMeta tx_meta = 6;

// Full query diagnostics
string query_full_diagnostics = 7;
}

message ExecuteScriptRequest {
Expand Down
3 changes: 3 additions & 0 deletions ydb/public/api/protos/ydb_table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,7 @@ message ExecuteDataQueryRequest {
QueryCachePolicy query_cache_policy = 5;
Ydb.Operations.OperationParams operation_params = 6;
QueryStatsCollection.Mode collect_stats = 7;
bool collect_full_diagnostics = 8;
}

message ExecuteDataQueryResponse {
Expand Down Expand Up @@ -946,6 +947,8 @@ message ExecuteQueryResult {
QueryMeta query_meta = 3;
// Query execution statistics
Ydb.TableStats.QueryStats query_stats = 4;
// Full query diagnostics
string query_full_diagnostics = 5;
}

// Explain data query
Expand Down
24 changes: 24 additions & 0 deletions ydb/public/lib/ydb_cli/commands/ydb_service_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ void TCommandExecuteQuery::Config(TConfig& config) {
config.Opts->AddLongOption('q', "query", "Text of query to execute").RequiredArgument("[String]").StoreResult(&Query);
config.Opts->AddLongOption('f', "file", "Path to file with query text to execute")
.RequiredArgument("PATH").StoreResult(&QueryFile);
config.Opts->AddLongOption("collect-diagnostics", "Collects diagnostics and saves it to file")
.StoreTrue(&CollectFullDiagnostics);

AddOutputFormats(config, {
EDataFormat::Pretty,
Expand Down Expand Up @@ -432,6 +434,9 @@ int TCommandExecuteQuery::ExecuteDataQuery(TConfig& config) {
NTable::TExecDataQuerySettings settings;
settings.KeepInQueryCache(true);
settings.CollectQueryStats(ParseQueryStatsModeOrThrow(CollectStatsMode, defaultStatsMode));
if (CollectFullDiagnostics) {
settings.CollectFullDiagnostics(true);
}

NTable::TTxSettings txSettings;
if (TxMode) {
Expand Down Expand Up @@ -516,6 +521,11 @@ void TCommandExecuteQuery::PrintDataQueryResponse(NTable::TDataQueryResult& resu
{
Cout << Endl << "Flame graph is available for full or profile stats only" << Endl;
}
if (CollectFullDiagnostics)
{
TFileOutput file(TStringBuilder() << "diagnostics_" << TGUID::Create().AsGuidString() << ".txt");
file << result.GetDiagnostics();
}
}

int TCommandExecuteQuery::ExecuteSchemeQuery(TConfig& config) {
Expand Down Expand Up @@ -558,6 +568,9 @@ namespace {
if (timeout.has_value()) {
settings.ClientTimeout(*timeout);
}
if (CollectFullDiagnostics) {
settings.CollectFullDiagnostics(true);
}
return settings;
} else if constexpr (std::is_same_v<TClient, NQuery::TQueryClient>) {
const auto defaultStatsMode = basicStats
Expand All @@ -568,6 +581,9 @@ namespace {
if (timeout.has_value()) {
settings.ClientTimeout(*timeout);
}
if (CollectFullDiagnostics) {
settings.CollectFullDiagnostics(true);
}
return settings;
}
Y_UNREACHABLE();
Expand Down Expand Up @@ -753,6 +769,8 @@ bool TCommandExecuteQuery::PrintQueryResponse(TIterator& result) {
fullStats = queryStats.GetPlan();
}
}

if ()
}
} // TResultSetPrinter destructor should be called before printing stats

Expand All @@ -767,6 +785,12 @@ bool TCommandExecuteQuery::PrintQueryResponse(TIterator& result) {
queryPlanPrinter.Print(*fullStats);
}

if (CollectFullDiagnostics)
{
TFileOutput file(TStringBuilder() << "diagnostics_" << TGUID::Create().AsGuidString() << ".txt");
file << result.GetDiagnostics();
}

PrintFlameGraph(fullStats);

if (IsInterrupted()) {
Expand Down
1 change: 1 addition & 0 deletions ydb/public/lib/ydb_cli/commands/ydb_service_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class TCommandExecuteQuery : public TTableCommand, TCommandQueryBase, TCommandWi
TString TxMode;
TString QueryType;
bool BasicStats = false;
bool CollectFullDiagnostics = false;
};

class TCommandExplain : public TTableCommand, public TCommandWithOutput, TCommandQueryBase, TInterruptibleCommand {
Expand Down
5 changes: 5 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_query/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -723,4 +723,9 @@ TResultSetParser TExecuteQueryResult::GetResultSetParser(size_t resultIndex) con
return TResultSetParser(GetResultSet(resultIndex));
}

const TString& TExecuteQueryResult::GetDiagnostics() const {
CheckStatusOk("TExecuteQueryResult::GetDiagnostics");
return Diagnostics_;
}

} // namespace NYdb::NQuery
Loading

0 comments on commit d798313

Please sign in to comment.