Skip to content

Commit

Permalink
Merge f0c060a into a2be43b
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit authored Oct 7, 2024
2 parents a2be43b + f0c060a commit 5233dea
Show file tree
Hide file tree
Showing 35 changed files with 28,239 additions and 231 deletions.
29 changes: 18 additions & 11 deletions ydb/core/client/server/msgbus_server_pq_metacache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
req->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
req->Record.MutableRequest()->SetKeepSession(false);
req->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig));
req->Record.MutableRequest()->SetUsePublicResponseDataFormat(true);

req->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
req->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
Expand Down Expand Up @@ -274,9 +275,14 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc

const auto& record = ev->Get()->Record.GetRef();

Y_ABORT_UNLESS(record.GetResponse().GetResults().size() == 1);
const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
ui64 newVersion = rr.ListSize() == 0 ? 0 : rr.GetList(0).GetStruct(0).GetOptional().GetInt64();
Y_VERIFY(record.GetResponse().YdbResultsSize() == 1);
NYdb::TResultSetParser parser(record.GetResponse().GetYdbResults(0));

ui64 newVersion = 0;
if (parser.RowsCount() != 0) {
parser.TryNextRow();
newVersion = *parser.ColumnParser(0).GetOptionalInt64();
}

LastVersionUpdate = ctx.Now();
if (newVersion > CurrentTopicsVersion || CurrentTopicsVersion == 0 || SkipVersionCheck) {
Expand All @@ -293,17 +299,18 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc

const auto& record = ev->Get()->Record.GetRef();

Y_ABORT_UNLESS(record.GetResponse().GetResults().size() == 1);
Y_VERIFY(record.GetResponse().YdbResultsSize() == 1);
TString path, dc;
const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
for (const auto& row : rr.GetList()) {

path = row.GetStruct(0).GetOptional().GetText();
dc = row.GetStruct(1).GetOptional().GetText();
NYdb::TResultSetParser parser(record.GetResponse().GetYdbResults(0));
const ui32 rowCount = parser.RowsCount();
while (parser.TryNextRow()) {
path = *parser.ColumnParser(0).GetOptionalUtf8();
dc = *parser.ColumnParser(1).GetOptionalUtf8();

NewTopics.emplace_back(decltype(NewTopics)::value_type{path, dc});
}
if (rr.ListSize() > 0) {

if (rowCount > 0) {
LastTopicKey = {path, dc};
return RunQuery(EQueryType::EGetTopics, ctx);
} else {
Expand Down Expand Up @@ -710,7 +717,7 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
void ProcessNodesInfoWaitersQueue(bool status, const TActorContext& ctx) {
if (DynamicNodesMapping == nullptr) {
Y_ABORT_UNLESS(!status);
DynamicNodesMapping.reset(new THashMap<ui32, ui32>);
DynamicNodesMapping.reset(new THashMap<ui32, ui32>);
}
while(!NodesMappingWaiters.empty()) {
ctx.Send(NodesMappingWaiters.front(),
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/grpc_services/rpc_execute_data_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,8 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
// https://protobuf.dev/reference/cpp/arenas/#swap
// Actualy will be copy in case pf remote execution
queryResult->mutable_result_sets()->Swap(record.MutableResponse()->MutableYdbResults());
} else {
NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
}

ConvertQueryStats(kqpResponse, queryResult);
if (kqpResponse.HasTxMeta()) {
queryResult->mutable_tx_meta()->CopyFrom(kqpResponse.GetTxMeta());
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/grpc_services/rpc_execute_yql_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ class TExecuteYqlScriptRPC : public TRpcKqpRequestActor<TExecuteYqlScriptRPC, TE
auto queryResult = TEvExecuteYqlScriptRequest::AllocateResult<TResult>(Request_);

try {
NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
const auto& results = kqpResponse.GetYdbResults();
for (const auto& result : results) {
queryResult->add_result_sets()->CopyFrom(result);
}

} catch (const std::exception& ex) {
NYql::TIssues issues;
issues.AddIssue(NYql::ExceptionToIssue(ex));
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace {
{}

NKqp::TEvKqp::TEvDataQueryStreamPart::TPtr Handle;
google::protobuf::RepeatedPtrField<NKikimrMiniKQL::TResult>::const_iterator ResultIterator;
google::protobuf::RepeatedPtrField<Ydb::ResultSet>::const_iterator ResultIterator;
};

enum EStreamRpcWakeupTag : ui64 {
Expand Down Expand Up @@ -218,7 +218,7 @@ class TStreamExecuteYqlScriptRPC
auto result = response.mutable_result();

try {
NKqp::ConvertKqpQueryResultToDbResult(kqpResult, result->mutable_result_set());
result->mutable_result_set()->CopyFrom(kqpResult);
} catch (std::exception ex) {
ReplyFinishStream(ex.what());
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
const TQueryRequestSettings& querySettings = TQueryRequestSettings(),
const TString& poolId = "");

TEvQueryRequest() = default;
TEvQueryRequest() {
Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
}

bool IsSerializable() const override {
return true;
Expand Down
11 changes: 1 addition & 10 deletions ydb/core/kqp/common/kqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,9 @@

namespace NKikimr::NKqp {

void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to);

TString ScriptExecutionRunnerActorIdString(const NActors::TActorId& actorId);
bool ScriptExecutionRunnerActorIdFromString(const TString& executionId, TActorId& actorId);

template<typename TFrom, typename TTo>
inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) {
const auto& results = from.GetResults();
for (const auto& result : results) {
ConvertKqpQueryResultToDbResult(result, to->add_result_sets());
}
}

class TKqpRequestInfo {
public:
Expand Down Expand Up @@ -80,7 +71,7 @@ class IQueryReplayBackend : public TNonCopyable {
/// Accepts query text
virtual void Collect(const TString& queryData) = 0;

virtual bool IsNull() { return false; }
virtual bool IsNull() { return false; }

virtual ~IQueryReplayBackend() {};

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/common/kqp_event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetPoolId(PoolId);
}

Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
Record.MutableRequest()->SetSessionId(SessionId);
Record.MutableRequest()->SetAction(QueryAction);
Record.MutableRequest()->SetType(QueryType);
Expand Down
20 changes: 9 additions & 11 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class TKqpScanQueryRequestHandler : public TRequestHandlerBase<
void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) {
auto& response = *ev->Get()->Record.GetRef().MutableResponse();

NKikimr::ConvertYdbResultToKqpResult(ResultSet,*response.AddResults());
response.AddYdbResults()->CopyFrom(ResultSet);
for (auto& execStats : Executions) {
response.MutableQueryStats()->AddExecutions()->Swap(&execStats);
}
Expand Down Expand Up @@ -285,20 +285,18 @@ class TKqpStreamRequestHandler : public TRequestHandlerBase<
virtual void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) {
auto& record = ev->Get()->Record.GetRef();
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
if (record.MutableResponse()->GetResults().size()) {
if (record.MutableResponse()->GetYdbResults().size()) {
// Send result sets to RPC actor TStreamExecuteYqlScriptRPC
auto evStreamPart = MakeHolder<NKqp::TEvKqp::TEvDataQueryStreamPart>();
ActorIdToProto(this->SelfId(), evStreamPart->Record.MutableGatewayActorId());

for (int i = 0; i < record.MutableResponse()->MutableResults()->size(); ++i) {
for (int i = 0; i < record.MutableResponse()->MutableYdbResults()->size(); ++i) {
// Workaround to avoid errors on Pull execution stage which would expect some results
Ydb::ResultSet resultSet;
NKikimr::ConvertYdbResultToKqpResult(resultSet, *evStreamPart->Record.AddResults());
evStreamPart->Record.AddResults();
}

evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableResults());
evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableYdbResults());
this->Send(TargetActorId, evStreamPart.Release());

// Save response without data to send it later
ResponseHandle = ev.Release();
} else {
Expand Down Expand Up @@ -404,7 +402,7 @@ class TKqpForwardStreamRequestHandler : public TRequestHandlerBase<
auto& response = *ev->Get()->Record.GetRef().MutableResponse();

Ydb::ResultSet resultSet;
NKikimr::ConvertYdbResultToKqpResult(resultSet, *response.AddResults());
response.AddYdbResults()->CopyFrom(resultSet);
for (auto& execStats : Executions) {
response.MutableQueryStats()->AddExecutions()->Swap(&execStats);
}
Expand Down Expand Up @@ -510,7 +508,7 @@ class TKqpGenericQueryRequestHandler: public TRequestHandlerBase<
auto& response = *ev->Get()->Record.GetRef().MutableResponse();

for (auto& resultSet : ResultSets) {
ConvertYdbResultToKqpResult(std::move(resultSet.ResultSet), *response.AddResults());
response.AddYdbResults()->Swap(&resultSet.ResultSet);
}

TBase::HandleResponse(ev, ctx);
Expand Down Expand Up @@ -671,8 +669,8 @@ void KqpResponseToQueryResult(const NKikimrKqp::TEvQueryResponse& response, IKqp
queryResult.AddIssue(NYql::IssueFromMessage(issue));
}

for (auto& result : queryResponse.GetResults()) {
auto arenaResult = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
for (auto& result : queryResponse.GetYdbResults()) {
auto arenaResult = google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(
queryResult.ProtobufArenaPtr.get());

arenaResult->CopyFrom(result);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ class TAsyncExecuteYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult

for (auto& resultStr : ResultProviderConfig.CommittedResults) {
queryResult.Results.emplace_back(
google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(queryResult.ProtobufArenaPtr.get()));
NKikimrMiniKQL::TResult* result = queryResult.Results.back();
google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(queryResult.ProtobufArenaPtr.get()));
Ydb::ResultSet* result = queryResult.Results.back();

if (!result->ParseFromArray(resultStr.data(), resultStr.size())) {
queryResult = ResultFromError<TResult>("Failed to parse run result.");
Expand Down
10 changes: 0 additions & 10 deletions ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,7 @@ class TPrepareQueryAsyncResult : public TKqpAsyncResultBase<IKikimrQueryExecutor
}

void FillResult(TResult& queryResult) const override {
TVector<NKikimrMiniKQL::TResult*> results;
for (auto& phyResult : TransformCtx.PhysicalQueryResults) {
auto result = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
queryResult.ProtobufArenaPtr.get());

result->CopyFrom(phyResult);
results.push_back(result);
}

queryResult.QueryStats.CopyFrom(TransformCtx.QueryStats);
queryResult.Results = std::move(results);
}

private:
Expand Down
5 changes: 0 additions & 5 deletions ydb/core/kqp/host/kqp_transform.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,13 @@ struct TKqlTransformContext : TThrRefBase {
NKqpProto::TKqpStatsQuery QueryStats;
std::shared_ptr<const NKqpProto::TKqpPhyQuery> PhysicalQuery;

TVector<TSimpleSharedPtr<NKikimrMiniKQL::TResult>> MkqlResults;
TVector<NKikimrMiniKQL::TResult> PhysicalQueryResults;

NYql::TExprNode::TPtr ExplainTransformerInput; // Explain transformer must work after other transformers, but use input before peephole
TMaybe<NYql::NNodes::TKiDataQueryBlocks> DataQueryBlocks;

void Reset() {
ReplyTarget = {};
MkqlResults.clear();
QueryStats = {};
PhysicalQuery = nullptr;
PhysicalQueryResults.clear();
ExplainTransformerInput = nullptr;
DataQueryBlocks = Nothing();
}
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -980,16 +980,12 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T

private:
static TExprNode::TPtr GetResOrPullResult(const TExprNode& node, const IDataProvider::TFillSettings& fillSettings,
const NKikimrMiniKQL::TResult& resultValue, TExprContext& ctx)
const Ydb::ResultSet& resultValue, TExprContext& ctx)
{
TVector<TString> columnHints(NCommon::GetResOrPullColumnHints(node));

auto protoValue = &resultValue;
YQL_ENSURE(resultValue.GetArena());
if (IsRawKikimrResult(resultValue)) {
protoValue = KikimrResultToProto(resultValue, columnHints, fillSettings, resultValue.GetArena());
}

YQL_ENSURE(fillSettings.Format == IDataProvider::EResultFormat::Custom);
YQL_ENSURE(fillSettings.FormatDetails == KikimrMkqlProtoFormat);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ class IKikimrGateway : public TThrRefBase {

struct TQueryResult : public TGenericResult {
TString SessionId;
TVector<NKikimrMiniKQL::TResult*> Results;
TVector<Ydb::ResultSet*> Results;
TMaybe<NKikimrKqp::TQueryProfile> Profile; // TODO: Deprecate.
NKqpProto::TKqpStatsQuery QueryStats;
std::unique_ptr<NKikimrKqp::TPreparedQuery> PreparingQuery;
Expand Down
30 changes: 0 additions & 30 deletions ydb/core/kqp/session_actor/kqp_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,36 +53,6 @@ bool HasSchemeOrFatalIssues(const TIssue& issue) {

} // namespace

void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to) {
const auto& type = from.GetType();
TStackVec<NKikimrMiniKQL::TType> columnTypes;
Y_ENSURE(type.GetKind() == NKikimrMiniKQL::ETypeKind::Struct);
for (const auto& member : type.GetStruct().GetMember()) {
if (member.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::List) {
for (const auto& column : member.GetType().GetList().GetItem().GetStruct().GetMember()) {
auto columnMeta = to->add_columns();
columnMeta->set_name(column.GetName());
columnTypes.push_back(column.GetType());
ConvertMiniKQLTypeToYdbType(column.GetType(), *columnMeta->mutable_type());
}
}
}
for (const auto& responseStruct : from.GetValue().GetStruct()) {
for (const auto& row : responseStruct.GetList()) {
auto newRow = to->add_rows();
ui32 columnCount = static_cast<ui32>(row.StructSize());
Y_ENSURE(columnCount == columnTypes.size());
for (ui32 i = 0; i < columnCount; i++) {
const auto& column = row.GetStruct(i);
ConvertMiniKQLValueToYdbValue(columnTypes[i], column, *newRow->add_items());
}
}
if (responseStruct.Getvalue_valueCase() == NKikimrMiniKQL::TValue::kBool) {
to->set_truncated(responseStruct.GetBool());
}
}
}

TMaybe<Ydb::StatusIds::StatusCode> GetYdbStatus(const TIssue& issue) {
if (issue.GetSeverity() == TSeverityIds::S_FATAL) {
return Ydb::StatusIds::INTERNAL_ERROR;
Expand Down
27 changes: 5 additions & 22 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1771,7 +1771,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
// Result for scan query is sent directly to target actor.
Y_ABORT_UNLESS(response->GetArena());
if (QueryState->PreparedQuery) {
bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat();
auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
size_t trailingResultsCount = 0;
for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) {
Expand All @@ -1788,28 +1787,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
continue;
}

if (useYdbResponseFormat) {
TMaybe<ui64> effectiveRowsLimit = FillSettings.RowsLimitPerWrite;
if (QueryState->PreparedQuery->GetResults(i).GetRowsLimit()) {
effectiveRowsLimit = QueryState->PreparedQuery->GetResults(i).GetRowsLimit();
}
auto* ydbResult = QueryState->QueryData->GetYdbTxResult(phyQuery.GetResultBindings(i), response->GetArena(), effectiveRowsLimit);
response->AddYdbResults()->Swap(ydbResult);
} else {
auto* protoRes = QueryState->QueryData->GetMkqlTxResult(phyQuery.GetResultBindings(i), response->GetArena());
std::optional<IDataProvider::TFillSettings> fillSettings;
if (QueryState->PreparedQuery->ResultsSize()) {
YQL_ENSURE(phyQuery.ResultBindingsSize() == QueryState->PreparedQuery->ResultsSize(), ""
<< phyQuery.ResultBindingsSize() << " != " << QueryState->PreparedQuery->ResultsSize());
const auto& result = QueryState->PreparedQuery->GetResults(i);
if (result.GetRowsLimit()) {
fillSettings = FillSettings;
fillSettings->RowsLimitPerWrite = result.GetRowsLimit();
}
}
auto* finalResult = KikimrResultToProto(*protoRes, {}, fillSettings.value_or(FillSettings), response->GetArena());
response->AddResults()->Swap(finalResult);
TMaybe<ui64> effectiveRowsLimit = FillSettings.RowsLimitPerWrite;
if (QueryState->PreparedQuery->GetResults(i).GetRowsLimit()) {
effectiveRowsLimit = QueryState->PreparedQuery->GetResults(i).GetRowsLimit();
}
auto* ydbResult = QueryState->QueryData->GetYdbTxResult(phyQuery.GetResultBindings(i), response->GetArena(), effectiveRowsLimit);
response->AddYdbResults()->Swap(ydbResult);
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/session_actor/kqp_worker_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {
// If we have result it must be allocated on protobuf arena
Y_ASSERT(result->GetArena());
Y_ASSERT(resp->GetArena() == result->GetArena());
resp->AddResults()->Swap(result);
resp->AddYdbResults()->Swap(result);
}
} else {
auto resp = ev.MutableResponse();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/session_actor/kqp_worker_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ void SlowLogQuery(const TActorContext &ctx, const TKikimrConfiguration* config,
<< 'b';

ui64 resultsSize = 0;
for (auto& result : record->GetResponse().GetResults()) {
for (auto& result : record->GetResponse().GetYdbResults()) {
resultsSize += result.ByteSize();
}

Expand Down
Loading

0 comments on commit 5233dea

Please sign in to comment.