diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 9691bc7f542f..f75eb3c765e9 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -832,7 +832,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { if (hasQueryService) { server.AddService(new NGRpcService::TGRpcYdbQueryService(ActorSystem.Get(), Counters, - grpcRequestProxies[0], hasDataStreams.IsRlAllowed())); + grpcRequestProxies, hasDataStreams.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue())); } if (hasLogStore) { diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 738c5ea52a41..3e642b39167c 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -24,6 +24,7 @@ using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall LastSeqNo; ui64 AckedFreeSpaceBytes = 0; + TActorId ActorId; }; class TRpcFlowControlState { @@ -244,8 +245,7 @@ class TExecuteQueryRPC : public TActorBootstrapped { const auto traceId = Request_->GetTraceId(); NYql::TIssues issues; - NKikimrKqp::EQueryAction queryAction; - if (!ParseQueryAction(*req, queryAction, issues)) { + if (!ParseQueryAction(*req, QueryAction, issues)) { return ReplyFinishStream(Ydb::StatusIds::BAD_REQUEST, std::move(issues)); } @@ -274,7 +274,7 @@ class TExecuteQueryRPC : public TActorBootstrapped { cachePolicy->set_keep_in_cache(true); auto ev = MakeHolder( - queryAction, + QueryAction, queryType, SelfId(), Request_, @@ -288,7 +288,8 @@ class TExecuteQueryRPC : public TActorBootstrapped { nullptr, // operationParams false, // keepSession false, // useCancelAfter - syntax); + syntax, + true); if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) { NYql::TIssues issues; @@ -322,23 +323,24 @@ class TExecuteQueryRPC : public TActorBootstrapped { ui64 freeSpaceBytes = FlowControl_.FreeSpaceBytes(); - for (auto& pair : StreamProducers_) { - const auto& producerId = pair.first; - auto& producer = pair.second; + for (auto& pair : StreamChannels_) { + const auto& channelId = pair.first; + auto& channel = pair.second; - if (freeSpaceBytes > 0 && producer.LastSeqNo && producer.AckedFreeSpaceBytes == 0) { + if (freeSpaceBytes > 0 && channel.LastSeqNo && channel.AckedFreeSpaceBytes == 0) { LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, " - << ", producer: " << producerId - << ", seqNo: " << producer.LastSeqNo + << ", channel: " << channelId + << ", seqNo: " << channel.LastSeqNo << ", freeSpace: " << freeSpaceBytes); auto resp = MakeHolder(); - resp->Record.SetSeqNo(*producer.LastSeqNo); + resp->Record.SetSeqNo(*channel.LastSeqNo); resp->Record.SetFreeSpace(freeSpaceBytes); + resp->Record.SetChannelId(channelId); - ctx.Send(producerId, resp.Release()); + ctx.Send(channel.ActorId, resp.Release()); - producer.AckedFreeSpaceBytes = freeSpaceBytes; + channel.AckedFreeSpaceBytes = freeSpaceBytes; } } @@ -358,9 +360,10 @@ class TExecuteQueryRPC : public TActorBootstrapped { Request_->SendSerializedResult(std::move(out), Ydb::StatusIds::SUCCESS); - auto& producer = StreamProducers_[ev->Sender]; - producer.LastSeqNo = ev->Get()->Record.GetSeqNo(); - producer.AckedFreeSpaceBytes = freeSpaceBytes; + auto& channel = StreamChannels_[ev->Get()->Record.GetChannelId()]; + channel.ActorId = ev->Sender; + channel.LastSeqNo = ev->Get()->Record.GetSeqNo(); + channel.AckedFreeSpaceBytes = freeSpaceBytes; LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Send stream data ack" << ", seqNo: " << ev->Get()->Record.GetSeqNo() @@ -371,8 +374,9 @@ class TExecuteQueryRPC : public TActorBootstrapped { auto resp = MakeHolder(); resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(freeSpaceBytes); + resp->Record.SetChannelId(ev->Get()->Record.GetChannelId()); - ctx.Send(ev->Sender, resp.Release()); + ctx.Send(channel.ActorId, resp.Release()); } void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) { @@ -381,7 +385,15 @@ class TExecuteQueryRPC : public TActorBootstrapped { const auto& issueMessage = record.GetResponse().GetQueryIssues(); bool hasTrailingMessage = false; - + + auto& kqpResponse = record.GetResponse(); + if (kqpResponse.GetYdbResults().size() > 1) { + auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, + "Unexpected trailing message with multiple result sets."); + ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, issue); + return; + } + if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { Request_->SetRuHeader(record.GetConsumedRu()); @@ -389,6 +401,14 @@ class TExecuteQueryRPC : public TActorBootstrapped { Ydb::Query::ExecuteQueryResponsePart response; + if (QueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE) { + for(int i = 0; i < kqpResponse.GetYdbResults().size(); i++) { + hasTrailingMessage = true; + response.set_result_set_index(i); + response.mutable_result_set()->Swap(record.MutableResponse()->MutableYdbResults(i)); + } + } + AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), response); if (kqpResponse.HasTxMeta()) { @@ -492,8 +512,9 @@ class TExecuteQueryRPC : public TActorBootstrapped { private: std::shared_ptr Request_; + NKikimrKqp::EQueryAction QueryAction; TRpcFlowControlState FlowControl_; - TMap StreamProducers_; + TMap StreamChannels_; }; } // namespace diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h index 38fa0f63d934..dac9241e55e7 100644 --- a/ydb/core/kqp/common/events/query.h +++ b/ydb/core/kqp/common/events/query.h @@ -34,7 +34,8 @@ struct TEvQueryRequest: public NActors::TEventLocal UserRequestContext; TDuration ProgressStatsPeriod; + bool SupportsStreamTrailingResult = false; }; struct TEvDataQueryStreamPart: public TEventPBGetDatabaseName().GetOrElse(""))) @@ -36,6 +37,7 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest( , HasOperationParams(operationParams) , KeepSession(keepSession) , Syntax(syntax) + , SupportsStreamTrailingResult(supportsStreamTrailingResult) { if (HasOperationParams) { OperationTimeout = GetDuration(operationParams->operation_timeout()); diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 219396b5cdcc..a6fdcac9a80d 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -130,10 +130,9 @@ class TKqpDataExecuter : public TKqpExecuterBase& userRequestContext, const bool enableOlapSink) : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation, - maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter" + maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter", streamResult ) , AsyncIoFactory(std::move(asyncIoFactory)) - , StreamResult(streamResult) , EnableOlapSink(enableOlapSink) { Target = creator; @@ -347,7 +346,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseGet()->Record; - auto& channelData = record.GetChannelData(); - - TDqSerializedBatch batch; - batch.Proto = std::move(*record.MutableChannelData()->MutableData()); - if (batch.Proto.HasPayloadId()) { - batch.Payload = ev->Get()->GetPayload(batch.Proto.GetPayloadId()); - } - - auto& channel = TasksGraph.GetChannel(channelData.GetChannelId()); - YQL_ENSURE(channel.DstTask == 0); - auto shardId = TasksGraph.GetTask(channel.SrcTask).Meta.ShardId; - - if (Stats) { - Stats->ResultBytes += batch.Size(); - Stats->ResultRows += batch.RowCount(); - } - - LOG_T("Got result, channelId: " << channel.Id << ", shardId: " << shardId - << ", inputIndex: " << channel.DstInputIndex << ", from: " << ev->Sender - << ", finished: " << channelData.GetFinished()); - - ResponseEv->TakeResult(channel.DstInputIndex, std::move(batch)); - { - LOG_T("Send ack to channelId: " << channel.Id << ", seqNo: " << record.GetSeqNo() << ", to: " << ev->Sender); - - auto ackEv = MakeHolder(); - ackEv->Record.SetSeqNo(record.GetSeqNo()); - ackEv->Record.SetChannelId(channel.Id); - ackEv->Record.SetFreeSpace(50_MB); - Send(ev->Sender, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channel.Id); - } - } - private: bool IsReadOnlyTx() const { if (Request.TopicOperations.HasOperations()) { @@ -2417,7 +2383,6 @@ class TKqpDataExecuter : public TKqpExecuterBase #include +#include #include #include @@ -34,6 +35,7 @@ #include #include +#include #include #include #include @@ -45,6 +47,7 @@ #include #include + #include @@ -119,7 +122,7 @@ class TKqpExecuterBase : public TActorBootstrapped { const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr& userRequestContext, - ui64 spanVerbosity = 0, TString spanName = "KqpExecuterBase") + ui64 spanVerbosity = 0, TString spanName = "KqpExecuterBase", bool streamResult = false) : Request(std::move(request)) , Database(database) , UserToken(userToken) @@ -130,6 +133,7 @@ class TKqpExecuterBase : public TActorBootstrapped { , MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime) , AggregationSettings(aggregation) , HasOlapTable(false) + , StreamResult(streamResult) { TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId); TasksGraph.GetMeta().Arena = MakeIntrusive(); @@ -234,6 +238,135 @@ class TKqpExecuterBase : public TActorBootstrapped { return true; } + struct TEvComputeChannelDataOOB { + NYql::NDqProto::TEvComputeChannelData Proto; + TRope Payload; + + size_t Size() const { + return Proto.GetChannelData().GetData().GetRaw().size() + Payload.size(); + } + + ui32 RowCount() const { + return Proto.GetChannelData().GetData().GetRows(); + } + }; + + void HandleChannelData(NYql::NDq::TEvDqCompute::TEvChannelData::TPtr& ev) { + auto& record = ev->Get()->Record; + auto& channelData = record.GetChannelData(); + auto& channel = TasksGraph.GetChannel(channelData.GetChannelId()); + auto& task = TasksGraph.GetTask(channel.SrcTask); + const TActorId channelComputeActorId = ev->Sender; + + auto& txResult = ResponseEv->TxResults[channel.DstInputIndex]; + auto [it, _] = ResultChannelToComputeActor.emplace(channel.Id, ev->Sender); + YQL_ENSURE(it->second == channelComputeActorId); + + if (StreamResult && txResult.IsStream && txResult.QueryResultIndex.Defined()) { + + TEvComputeChannelDataOOB computeData; + computeData.Proto = std::move(ev->Get()->Record); + if (computeData.Proto.GetChannelData().GetData().HasPayloadId()) { + computeData.Payload = ev->Get()->GetPayload(computeData.Proto.GetChannelData().GetData().GetPayloadId()); + } + + const bool trailingResults = ( + computeData.Proto.GetChannelData().GetFinished() && + Request.IsTrailingResultsAllowed()); + + TVector batches(1); + auto& batch = batches.front(); + + batch.Proto = std::move(*computeData.Proto.MutableChannelData()->MutableData()); + batch.Payload = std::move(computeData.Payload); + + TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry}; + auto resultSet = protoBuilder.BuildYdbResultSet(std::move(batches), txResult.MkqlItemType, txResult.ColumnOrder); + + if (!trailingResults) { + auto streamEv = MakeHolder(); + streamEv->Record.SetSeqNo(computeData.Proto.GetSeqNo()); + streamEv->Record.SetQueryResultIndex(*txResult.QueryResultIndex); + streamEv->Record.SetChannelId(channel.Id); + streamEv->Record.MutableResultSet()->Swap(&resultSet); + + LOG_D("Send TEvStreamData to " << Target << ", seqNo: " << streamEv->Record.GetSeqNo() + << ", nRows: " << streamEv->Record.GetResultSet().rows().size()); + + this->Send(Target, streamEv.Release()); + + } else { + auto ackEv = MakeHolder(); + ackEv->Record.SetSeqNo(computeData.Proto.GetSeqNo()); + ackEv->Record.SetChannelId(channel.Id); + ackEv->Record.SetFreeSpace(50_MB); + this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channel.Id); + txResult.TrailingResult.Swap(&resultSet); + txResult.HasTrailingResult = true; + LOG_D("staging TEvStreamData to " << Target << ", seqNo: " << computeData.Proto.GetSeqNo() + << ", nRows: " << txResult.TrailingResult.rows().size()); + } + + return; + } + + NYql::NDq::TDqSerializedBatch batch; + batch.Proto = std::move(*record.MutableChannelData()->MutableData()); + if (batch.Proto.HasPayloadId()) { + batch.Payload = ev->Get()->GetPayload(batch.Proto.GetPayloadId()); + } + + YQL_ENSURE(channel.DstTask == 0); + + if (Stats) { + Stats->ResultBytes += batch.Size(); + Stats->ResultRows += batch.RowCount(); + } + + LOG_T("Got result, channelId: " << channel.Id << ", shardId: " << task.Meta.ShardId + << ", inputIndex: " << channel.DstInputIndex << ", from: " << ev->Sender + << ", finished: " << channelData.GetFinished()); + + ResponseEv->TakeResult(channel.DstInputIndex, std::move(batch)); + LOG_T("Send ack to channelId: " << channel.Id << ", seqNo: " << record.GetSeqNo() << ", to: " << ev->Sender); + + auto ackEv = MakeHolder(); + ackEv->Record.SetSeqNo(record.GetSeqNo()); + ackEv->Record.SetChannelId(channel.Id); + ackEv->Record.SetFreeSpace(50_MB); + this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channel.Id); + } + + void HandleStreamAck(TEvKqpExecuter::TEvStreamDataAck::TPtr& ev) { + ui64 channelId; + if (ResponseEv->TxResults.size() == 1) { + channelId = ResultChannelToComputeActor.begin()->first; + } else { + channelId = ev->Get()->Record.GetChannelId(); + } + + auto it = ResultChannelToComputeActor.find(channelId); + YQL_ENSURE(it != ResultChannelToComputeActor.end()); + const auto channelComputeActorId = it->second; + + ui64 seqNo = ev->Get()->Record.GetSeqNo(); + i64 freeSpace = ev->Get()->Record.GetFreeSpace(); + + LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId + << ", send ack to channelId: " << channelId + << ", seqNo: " << seqNo + << ", enough: " << ev->Get()->Record.GetEnough() + << ", freeSpace: " << freeSpace + << ", to: " << channelComputeActorId); + + auto ackEv = MakeHolder(); + ackEv->Record.SetSeqNo(seqNo); + ackEv->Record.SetChannelId(channelId); + ackEv->Record.SetFreeSpace(freeSpace); + ackEv->Record.SetFinish(ev->Get()->Record.GetEnough()); + this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channelId); + } + void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) { TActorId computeActor = ev->Sender; auto& state = ev->Get()->Record; @@ -1595,6 +1728,12 @@ class TKqpExecuterBase : public TActorBootstrapped { } void InitializeChannelProxies() { + // notice: forward all respones to executer if + // trailing results are allowed. + // temporary, will be removed in the next pr. + if (Request.IsTrailingResultsAllowed()) + return; + for(const auto& channel: TasksGraph.GetChannels()) { if (channel.DstTask) { continue; @@ -1753,8 +1892,11 @@ class TKqpExecuterBase : public TActorBootstrapped { const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings; TVector ResourcesSnapshot; bool HasOlapTable = false; + bool StreamResult = false; bool HasDatashardSourceScan = false; bool UnknownAffectedShardCount = false; + + THashMap ResultChannelToComputeActor; THashMap> SourceScanStageIdToParititions; private: diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 047bc9e007eb..d6f6f1e9813b 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -51,7 +51,8 @@ class TKqpScanExecuter : public TKqpExecuterBase& userRequestContext) : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation, - maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::ScanExecuter, "ScanExecuter" + maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::ScanExecuter, "ScanExecuter", + false ) , PreparedQuery(preparedQuery) { @@ -64,13 +65,13 @@ class TKqpScanExecuter : public TKqpExecuterBaseResultsSize(); YQL_ENSURE(resultsSize != 0); - bool streamResult = Request.Transactions[0].Body->GetResults(0).GetIsStream(); + StreamResult = Request.Transactions[0].Body->GetResults(0).GetIsStream(); - if (streamResult) { + if (StreamResult) { YQL_ENSURE(resultsSize == 1); } else { for (size_t i = 1; i < resultsSize; ++i) { - YQL_ENSURE(Request.Transactions[0].Body->GetResults(i).GetIsStream() == streamResult); + YQL_ENSURE(Request.Transactions[0].Body->GetResults(i).GetIsStream() == StreamResult); } } } @@ -109,7 +110,8 @@ class TKqpScanExecuter : public TKqpExecuterBaseGetTypeRewrite()) { hFunc(TEvDqCompute::TEvState, HandleComputeStats); - hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleExecute); + hFunc(TEvDqCompute::TEvChannelData, HandleChannelData); // from CA + hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck); hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); hFunc(TEvents::TEvUndelivered, HandleUndelivered); hFunc(TEvPrivate::TEvRetry, HandleRetry); @@ -126,25 +128,6 @@ class TKqpScanExecuter : public TKqpExecuterBaseGet()->Record.GetSeqNo() - << ", freeSpace: " << ev->Get()->Record.GetFreeSpace() - << ", enough: " << ev->Get()->Record.GetEnough() - << ", from: " << ev->Sender); - - auto& resultChannelProxies = GetResultChannelProxies(); - if (resultChannelProxies.empty()) { - return; - } - - // Forward only for stream results, data results acks event theirselves. - YQL_ENSURE(!ResponseEv->TxResults.empty() && ResponseEv->TxResults[0].IsStream); - - auto channelIt = resultChannelProxies.begin(); - auto handle = ev->Forward(channelIt->second->SelfId()); - channelIt->second->Receive(handle); - } - private: void HandleResolve(TEvKqpExecuter::TEvTableResolveStatus::TPtr& ev) { if (!TBase::HandleResolve(ev)) return; @@ -247,8 +230,6 @@ class TKqpScanExecuter : public TKqpExecuterBase computeTasks; - InitializeChannelProxies(); - // calc stats for (auto& task : TasksGraph.GetTasks()) { auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index 1a68edf17d16..bf27aec9dbcb 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -165,6 +165,7 @@ struct TTaskMeta { TActorId ExecuterId; ui32 Type = Unknown; + TActorId ResultChannelActorId; THashMap TaskParams; // Params for sources/sinks TVector ReadRanges; // Partitioning for sources THashMap SecureParams; diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h index 21e16c3a710d..9171d2dbb71a 100644 --- a/ydb/core/kqp/gateway/kqp_gateway.h +++ b/ydb/core/kqp/gateway/kqp_gateway.h @@ -124,6 +124,8 @@ class IKqpGateway : public NYql::IKikimrGateway { : TxAlloc(txAlloc) {} + bool AllowTrailingResults = false; + NKikimrKqp::EQueryType QueryType = NKikimrKqp::EQueryType::QUERY_TYPE_UNDEFINED; NKikimr::TControlWrapper PerRequestDataSizeLimit; NKikimr::TControlWrapper MaxShardCount; TVector Transactions; @@ -150,6 +152,13 @@ class IKqpGateway : public NYql::IKikimrGateway { NWilson::TTraceId TraceId; NTopic::TTopicOperations TopicOperations; + + bool IsTrailingResultsAllowed() const { + return AllowTrailingResults && ( + QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY || + QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY + ); + } }; struct TExecPhysicalResult : public TGenericResult { diff --git a/ydb/core/kqp/query_data/kqp_query_data.cpp b/ydb/core/kqp/query_data/kqp_query_data.cpp index c673182b5111..c0bf8e338ed0 100644 --- a/ydb/core/kqp/query_data/kqp_query_data.cpp +++ b/ydb/core/kqp/query_data/kqp_query_data.cpp @@ -77,6 +77,19 @@ Ydb::ResultSet* TKqpExecuterTxResult::GetYdb(google::protobuf::Arena* arena, TMa return ydbResult; } +Ydb::ResultSet* TKqpExecuterTxResult::GetTrailingYdb(google::protobuf::Arena* arena) { + if (!HasTrailingResult) + return nullptr; + + Ydb::ResultSet* ydbResult = google::protobuf::Arena::CreateMessage(arena); + if (TrailingResult.rows().size() > 0) { + ydbResult->Swap(&TrailingResult); + } + + return ydbResult; +} + + void TKqpExecuterTxResult::FillYdb(Ydb::ResultSet* ydbResult, TMaybe rowsLimitPerWrite) { YQL_ENSURE(ydbResult); YQL_ENSURE(!Rows.IsWide()); @@ -89,7 +102,7 @@ void TKqpExecuterTxResult::FillYdb(Ydb::ResultSet* ydbResult, TMaybe rowsL column->set_name(TString(mkqlSrcRowStructType->GetMemberName(memberIndex))); ExportTypeToProto(mkqlSrcRowStructType->GetMemberType(memberIndex), *column->mutable_type()); } - + Rows.ForEachRow([&](const NUdf::TUnboxedValue& value) -> bool { if (rowsLimitPerWrite) { if (*rowsLimitPerWrite == 0) { @@ -224,6 +237,13 @@ NKikimrMiniKQL::TResult* TQueryData::GetMkqlTxResult(const NKqpProto::TKqpPhyRes return TxResults[txIndex][resultIndex].GetMkql(arena); } +Ydb::ResultSet* TQueryData::GetTrailingTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena) { + auto txIndex = rb.GetTxResultBinding().GetTxIndex(); + auto resultIndex = rb.GetTxResultBinding().GetResultIndex(); + return TxResults[txIndex][resultIndex].GetTrailingYdb(arena); +} + + Ydb::ResultSet* TQueryData::GetYdbTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena, TMaybe rowsLimitPerWrite) { auto txIndex = rb.GetTxResultBinding().GetTxIndex(); auto resultIndex = rb.GetTxResultBinding().GetResultIndex(); diff --git a/ydb/core/kqp/query_data/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h index 181e13bc3b8a..ca8e772d5bda 100644 --- a/ydb/core/kqp/query_data/kqp_query_data.h +++ b/ydb/core/kqp/query_data/kqp_query_data.h @@ -81,6 +81,8 @@ struct TKqpExecuterTxResult { const TVector* ColumnOrder = nullptr; TMaybe QueryResultIndex = 0; NKikimr::NMiniKQL::TUnboxedValueBatch Rows; + Ydb::ResultSet TrailingResult; + bool HasTrailingResult = false; explicit TKqpExecuterTxResult( bool isStream, @@ -98,6 +100,7 @@ struct TKqpExecuterTxResult { NKikimrMiniKQL::TResult* GetMkql(google::protobuf::Arena* arena); NKikimrMiniKQL::TResult GetMkql(); Ydb::ResultSet* GetYdb(google::protobuf::Arena* arena, TMaybe rowsLimitPerWrite); + Ydb::ResultSet* GetTrailingYdb(google::protobuf::Arena* arena); void FillMkql(NKikimrMiniKQL::TResult* mkqlResult); void FillYdb(Ydb::ResultSet* ydbResult, TMaybe rowsLimitPerWrite); @@ -250,6 +253,7 @@ class TQueryData : NMiniKQL::ITerminator { TTypedUnboxedValue GetTxResult(ui32 txIndex, ui32 resultIndex); NKikimrMiniKQL::TResult* GetMkqlTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena); Ydb::ResultSet* GetYdbTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena, TMaybe rowsLimitPerWrite); + Ydb::ResultSet* GetTrailingTxResult(const NKqpProto::TKqpPhyResultBinding& rb, google::protobuf::Arena* arena); std::pair GetInternalBindingValue(const NKqpProto::TKqpPhyParamBinding& paramBinding); TTypedUnboxedValue& GetParameterUnboxedValue(const TString& name); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 1977aad58837..00bb19061486 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -764,6 +764,12 @@ class TKqpSessionActor : public TActorBootstrapped { request.StatsMode = queryState->GetStatsMode(); request.ProgressStatsPeriod = queryState->GetProgressStatsPeriod(); + request.QueryType = queryState->GetType(); + if (Y_LIKELY(queryState->PreparedQuery)) { + ui64 resultSetsCount = queryState->PreparedQuery->GetPhysicalQuery().ResultBindingsSize(); + request.AllowTrailingResults = (resultSetsCount == 1); + request.AllowTrailingResults &= (QueryState->RequestEv->GetSupportsStreamTrailingResult()); + } } const auto& limits = GetQueryLimits(Settings); @@ -1537,10 +1543,24 @@ class TKqpSessionActor : public TActorBootstrapped { // Result for scan query is sent directly to target actor. Y_ABORT_UNLESS(response->GetArena()); - if (QueryState->PreparedQuery && !QueryState->IsStreamResult()) { + if (QueryState->PreparedQuery) { bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat(); auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); + size_t trailingResultsCount = 0; for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) { + if (QueryState->IsStreamResult()) { + auto ydbResult = QueryState->QueryData->GetTrailingTxResult( + phyQuery.GetResultBindings(i), response->GetArena()); + + if (ydbResult) { + ++trailingResultsCount; + YQL_ENSURE(trailingResultsCount <= 1); + response->AddYdbResults()->Swap(ydbResult); + } + + continue; + } + if (useYdbResponseFormat) { TMaybe effectiveRowsLimit = FillSettings.RowsLimitPerWrite; if (QueryState->PreparedQuery->GetResults(i).GetRowsLimit()) { diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 56497ea35034..bcfc0f9f499f 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -420,12 +420,16 @@ message TEvExecuterStreamData { optional Ydb.ResultSet ResultSet = 1; optional uint64 SeqNo = 2; optional uint32 QueryResultIndex = 3; + optional uint32 ChannelId = 4; + optional NActorsProto.TActorId ChannelActorId = 5; }; message TEvExecuterStreamDataAck { optional uint64 SeqNo = 1; optional int64 FreeSpace = 2; optional bool Enough = 3 [default = false]; + optional uint32 ChannelId = 4; + optional NActorsProto.TActorId ChannelActorId = 5; }; message TEvExecuterStreamProfile { @@ -434,7 +438,7 @@ message TEvExecuterStreamProfile { }; // 1. Executer fills progress stats from it's own execution -// 2. Session adds stats from early finished executions and builds complete plan +// 2. Session adds stats from early finished executions and builds complete plan message TEvExecuterProgress { optional NActorsProto.TActorId ExecuterActorId = 1; optional string QueryPlan = 2; diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 58b5e29299b8..b4c35f7375ac 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -387,7 +387,7 @@ namespace Tests { GRpcServer->AddService(new NQuoter::TRateLimiterGRpcService(system, counters, grpcRequestProxies[0])); GRpcServer->AddService(new NGRpcService::TGRpcDataStreamsService(system, counters, grpcRequestProxies[0], true)); GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxies[0], true)); - GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies[0], true)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies, true, 1)); if (Settings->EnableYq) { GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxies[0])); GRpcServer->AddService(new NGRpcService::TGRpcFqPrivateTaskService(system, counters, grpcRequestProxies[0])); diff --git a/ydb/services/ydb/ydb_query.cpp b/ydb/services/ydb/ydb_query.cpp index 03f031430f1e..760599e68ccc 100644 --- a/ydb/services/ydb/ydb_query.cpp +++ b/ydb/services/ydb/ydb_query.cpp @@ -7,76 +7,60 @@ namespace NKikimr::NGRpcService { +TGRpcYdbQueryService::TGRpcYdbQueryService(NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const NActors::TActorId& proxyId, + bool rlAllowed, + size_t handlersPerCompletionQueue) + : TGrpcServiceBase(system, counters, proxyId, rlAllowed) + , HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue)) +{ +} + +TGRpcYdbQueryService::TGRpcYdbQueryService(NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TVector& proxies, + bool rlAllowed, + size_t handlersPerCompletionQueue) + : TGrpcServiceBase(system, counters, proxies, rlAllowed) + , HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue)) +{ +} + void TGRpcYdbQueryService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { using namespace Ydb::Query; using namespace NQuery; auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + size_t proxyCounter = 0; #ifdef ADD_REQUEST #error ADD_REQUEST macro already defined #endif -#define ADD_REQUEST(NAME, IN, OUT, ACTION) \ - MakeIntrusive>(this, &Service_, CQ_, \ - [this](NYdbGrpc::IRequestContextBase* ctx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ACTION; \ - }, &Ydb::Query::V1::QueryService::AsyncService::Request ## NAME, \ - #NAME, logger, getCounterBlock("query", #NAME))->Run(); - - ADD_REQUEST(ExecuteQuery, ExecuteQueryRequest, ExecuteQueryResponsePart, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall - (ctx, &DoExecuteQuery, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Auditable})); - }) - - ADD_REQUEST(ExecuteScript, ExecuteScriptRequest, Ydb::Operations::Operation, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall - (ctx, &DoExecuteScript, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Auditable})); - }) - - ADD_REQUEST(FetchScriptResults, FetchScriptResultsRequest, FetchScriptResultsResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall - (ctx, &DoFetchScriptResults, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) - - ADD_REQUEST(CreateSession, CreateSessionRequest, CreateSessionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall - (ctx, &DoCreateSession, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) - - ADD_REQUEST(DeleteSession, DeleteSessionRequest, DeleteSessionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall - (ctx, &DoDeleteSession, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) - - ADD_REQUEST(AttachSession, AttachSessionRequest, SessionState, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall - (ctx, &DoAttachSession, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) - - ADD_REQUEST(BeginTransaction, BeginTransactionRequest, BeginTransactionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall - (ctx, &DoBeginTransaction, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) - - ADD_REQUEST(CommitTransaction, CommitTransactionRequest, CommitTransactionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall - (ctx, &DoCommitTransaction, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) - - ADD_REQUEST(RollbackTransaction, RollbackTransactionRequest, RollbackTransactionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, - new TGrpcRequestNoOperationCall - (ctx, &DoRollbackTransaction, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); - }) +#define ADD_REQUEST(NAME, IN, OUT, CB, ...) \ + for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \ + for (auto* cq: CQS) { \ + MakeIntrusive>(this, &Service_, cq, \ + [this, proxyCounter](NYdbGrpc::IRequestContextBase* ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcProxies_[proxyCounter % GRpcProxies_.size()], \ + new TGrpcRequestNoOperationCall \ + (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr __VA_OPT__(, TAuditMode::__VA_ARGS__)})); \ + }, &Ydb::Query::V1::QueryService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("query", #NAME))->Run(); \ + ++proxyCounter; \ + } \ + } + + ADD_REQUEST(ExecuteQuery, ExecuteQueryRequest, ExecuteQueryResponsePart, DoExecuteQuery, Auditable); + ADD_REQUEST(ExecuteScript, ExecuteScriptRequest, Ydb::Operations::Operation, DoExecuteScript, Auditable); + ADD_REQUEST(FetchScriptResults, FetchScriptResultsRequest, FetchScriptResultsResponse, DoFetchScriptResults); + ADD_REQUEST(CreateSession, CreateSessionRequest, CreateSessionResponse, DoCreateSession); + ADD_REQUEST(DeleteSession, DeleteSessionRequest, DeleteSessionResponse, DoDeleteSession); + ADD_REQUEST(AttachSession, AttachSessionRequest, SessionState, DoAttachSession); + ADD_REQUEST(BeginTransaction, BeginTransactionRequest, BeginTransactionResponse, DoBeginTransaction); + ADD_REQUEST(CommitTransaction, CommitTransactionRequest, CommitTransactionResponse, DoCommitTransaction); + ADD_REQUEST(RollbackTransaction, RollbackTransactionRequest, RollbackTransactionResponse, DoRollbackTransaction); #undef ADD_REQUEST } diff --git a/ydb/services/ydb/ydb_query.h b/ydb/services/ydb/ydb_query.h index 8dec5cc4fa7f..04e8ab915ee7 100644 --- a/ydb/services/ydb/ydb_query.h +++ b/ydb/services/ydb/ydb_query.h @@ -11,8 +11,25 @@ class TGRpcYdbQueryService public: using TGrpcServiceBase::TGrpcServiceBase; + TGRpcYdbQueryService( + NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const NActors::TActorId& proxyId, + bool rlAllowed, + size_t handlersPerCompletionQueue = 1); + + TGRpcYdbQueryService( + NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TVector& proxies, + bool rlAllowed, + size_t handlersPerCompletionQueue); + private: void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger); + +private: + const size_t HandlersPerCompletionQueue; }; } // namespace NKikimr::NGRpcService