Skip to content

Commit

Permalink
Merge bb1736b into 27e9095
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit authored Jan 30, 2024
2 parents 27e9095 + bb1736b commit ded3818
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 86 deletions.
26 changes: 22 additions & 4 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteQu
struct TProducerState {
TMaybe<ui64> LastSeqNo;
ui64 AckedFreeSpaceBytes = 0;
TActorId ActorId;
};

class TRpcFlowControlState {
Expand Down Expand Up @@ -335,8 +336,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(*producer.LastSeqNo);
resp->Record.SetFreeSpace(freeSpaceBytes);
resp->Record.SetQueryResultIndex(producerId);

ctx.Send(producerId, resp.Release());
ctx.Send(producer.ActorId, resp.Release());

producer.AckedFreeSpaceBytes = freeSpaceBytes;
}
Expand All @@ -347,6 +349,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) {
Ydb::Query::ExecuteQueryResponsePart response;
response.set_status(Ydb::StatusIds::SUCCESS);
ui64 queryResultIndex = ev->Get()->Record.GetQueryResultIndex();
response.set_result_set_index(ev->Get()->Record.GetQueryResultIndex());
response.mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet());

Expand All @@ -358,7 +361,8 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {

Request_->SendSerializedResult(std::move(out), Ydb::StatusIds::SUCCESS);

auto& producer = StreamProducers_[ev->Sender];
auto& producer = StreamProducers_[queryResultIndex];
producer.ActorId = ev->Sender;
producer.LastSeqNo = ev->Get()->Record.GetSeqNo();
producer.AckedFreeSpaceBytes = freeSpaceBytes;

Expand All @@ -371,6 +375,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
resp->Record.SetFreeSpace(freeSpaceBytes);
resp->Record.SetQueryResultIndex(queryResultIndex);

ctx.Send(ev->Sender, resp.Release());
}
Expand All @@ -381,14 +386,27 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
const auto& issueMessage = record.GetResponse().GetQueryIssues();

bool hasTrailingMessage = false;


auto& kqpResponse = record.GetResponse();
if (kqpResponse.GetYdbResults().size() > 1) {
auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
"Unexpected trailing message with multiple result sets.");
ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, issue);
return;
}

if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Request_->SetRuHeader(record.GetConsumedRu());

auto& kqpResponse = record.GetResponse();

Ydb::Query::ExecuteQueryResponsePart response;

for(int i = 0; i < kqpResponse.GetYdbResults().size(); i++) {
response.set_result_set_index(i);
response.mutable_result_set()->Swap(record.MutableResponse()->MutableYdbResults(i));
}

AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), response);

if (kqpResponse.HasTxMeta()) {
Expand Down Expand Up @@ -493,7 +511,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
std::shared_ptr<TEvExecuteQueryRequest> Request_;

TRpcFlowControlState FlowControl_;
TMap<TActorId, TProducerState> StreamProducers_;
TMap<ui64, TProducerState> StreamProducers_;
};

} // namespace
Expand Down
48 changes: 5 additions & 43 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
const bool enableOlapSink)
: TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation,
maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter"
maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter", streamResult
)
, AsyncIoFactory(std::move(asyncIoFactory))
, StreamResult(streamResult)
, EnableOlapSink(enableOlapSink)
{
Target = creator;
Expand Down Expand Up @@ -347,7 +346,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
hFunc(TEvPersQueue::TEvProposeTransactionResult, HandlePrepare);
hFunc(TEvPrivate::TEvReattachToShard, HandleExecute);
hFunc(TEvDqCompute::TEvState, HandlePrepare); // from CA
hFunc(TEvDqCompute::TEvChannelData, HandleExecute); // from CA
hFunc(TEvDqCompute::TEvChannelData, HandleChannelData); // from CA
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
hFunc(TEvPipeCache::TEvDeliveryProblem, HandlePrepare);
hFunc(TEvKqp::TEvAbortExecution, HandlePrepare);
hFunc(TEvents::TEvUndelivered, HandleUndelivered);
Expand Down Expand Up @@ -935,7 +935,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
hFunc(TEvDqCompute::TEvChannelData, HandleExecute);
hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
default:
Expand Down Expand Up @@ -1286,41 +1287,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

void HandleExecute(TEvDqCompute::TEvChannelData::TPtr& ev) {
auto& record = ev->Get()->Record;
auto& channelData = record.GetChannelData();

TDqSerializedBatch batch;
batch.Proto = std::move(*record.MutableChannelData()->MutableData());
if (batch.Proto.HasPayloadId()) {
batch.Payload = ev->Get()->GetPayload(batch.Proto.GetPayloadId());
}

auto& channel = TasksGraph.GetChannel(channelData.GetChannelId());
YQL_ENSURE(channel.DstTask == 0);
auto shardId = TasksGraph.GetTask(channel.SrcTask).Meta.ShardId;

if (Stats) {
Stats->ResultBytes += batch.Size();
Stats->ResultRows += batch.RowCount();
}

LOG_T("Got result, channelId: " << channel.Id << ", shardId: " << shardId
<< ", inputIndex: " << channel.DstInputIndex << ", from: " << ev->Sender
<< ", finished: " << channelData.GetFinished());

ResponseEv->TakeResult(channel.DstInputIndex, std::move(batch));
{
LOG_T("Send ack to channelId: " << channel.Id << ", seqNo: " << record.GetSeqNo() << ", to: " << ev->Sender);

auto ackEv = MakeHolder<TEvDqCompute::TEvChannelDataAck>();
ackEv->Record.SetSeqNo(record.GetSeqNo());
ackEv->Record.SetChannelId(channel.Id);
ackEv->Record.SetFreeSpace(50_MB);
Send(ev->Sender, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channel.Id);
}
}

private:
bool IsReadOnlyTx() const {
if (Request.TopicOperations.HasOperations()) {
Expand Down Expand Up @@ -1770,10 +1736,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
THashMap<ui64, TVector<ui64>> remoteComputeTasks; // shardId -> [task]
TVector<ui64> computeTasks;

if (StreamResult) {
InitializeChannelProxies();
}

for (auto& task : TasksGraph.GetTasks()) {
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
if (task.Meta.ShardId && (task.Meta.Reads || task.Meta.Writes)) {
Expand Down
141 changes: 130 additions & 11 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include <ydb/core/kqp/common/kqp_ru_calc.h>
#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
#include <ydb/core/kqp/runtime/kqp_transport.h>

#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
Expand All @@ -34,6 +35,7 @@

#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <ydb/library/yql/dq/runtime/dq_transport.h>
#include <ydb/library/yql/dq/common/dq_serialized_batch.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
Expand All @@ -45,6 +47,7 @@
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/log.h>


#include <util/generic/size_literals.h>


Expand Down Expand Up @@ -119,7 +122,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
ui64 spanVerbosity = 0, TString spanName = "KqpExecuterBase")
ui64 spanVerbosity = 0, TString spanName = "KqpExecuterBase", bool streamResult = false)
: Request(std::move(request))
, Database(database)
, UserToken(userToken)
Expand All @@ -130,6 +133,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
, MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime)
, AggregationSettings(aggregation)
, HasOlapTable(false)
, StreamResult(streamResult)
{
TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId);
TasksGraph.GetMeta().Arena = MakeIntrusive<NActors::TProtoArenaHolder>();
Expand Down Expand Up @@ -234,6 +238,128 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
return true;
}

struct TEvComputeChannelDataOOB {
NYql::NDqProto::TEvComputeChannelData Proto;
TRope Payload;

size_t Size() const {
return Proto.GetChannelData().GetData().GetRaw().size() + Payload.size();
}

ui32 RowCount() const {
return Proto.GetChannelData().GetData().GetRows();
}
};

void HandleChannelData(NYql::NDq::TEvDqCompute::TEvChannelData::TPtr& ev) {
auto& record = ev->Get()->Record;
auto& channelData = record.GetChannelData();
auto& channel = TasksGraph.GetChannel(channelData.GetChannelId());
auto& task = TasksGraph.GetTask(channel.SrcTask);
const TActorId channelComputeActorId = ev->Sender;
auto [it, _] = ResultChannelToComputeActor.emplace(channel.DstInputIndex, std::make_pair(ev->Sender, channel.Id));

YQL_ENSURE(it->second.first == channelComputeActorId);

auto& txResult = ResponseEv->TxResults[channel.DstInputIndex];
if (StreamResult && txResult.IsStream && txResult.QueryResultIndex.Defined()) {

TEvComputeChannelDataOOB computeData;
computeData.Proto = std::move(ev->Get()->Record);
if (computeData.Proto.GetChannelData().GetData().HasPayloadId()) {
computeData.Payload = ev->Get()->GetPayload(computeData.Proto.GetChannelData().GetData().GetPayloadId());
}

const bool trailingResults = (
computeData.Proto.GetChannelData().GetFinished() &&
(ResponseEv->TxResults.size() == 1) &&
Request.IsTrailingResultsAllowed());

TVector<NYql::NDq::TDqSerializedBatch> batches(1);
auto& batch = batches.front();

batch.Proto = std::move(*computeData.Proto.MutableChannelData()->MutableData());
batch.Payload = std::move(computeData.Payload);

TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry};
auto resultSet = protoBuilder.BuildYdbResultSet(std::move(batches), txResult.MkqlItemType, txResult.ColumnOrder);

if (!trailingResults) {
auto streamEv = MakeHolder<TEvKqpExecuter::TEvStreamData>();
streamEv->Record.SetSeqNo(computeData.Proto.GetSeqNo());
streamEv->Record.SetQueryResultIndex(*txResult.QueryResultIndex);
streamEv->Record.MutableResultSet()->Swap(&resultSet);

LOG_E("Send TEvStreamData to " << Target << ", seqNo: " << streamEv->Record.GetSeqNo()
<< ", nRows: " << streamEv->Record.GetResultSet().rows().size());

this->Send(Target, streamEv.Release());

} else {
auto ackEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelDataAck>();
ackEv->Record.SetSeqNo(computeData.Proto.GetSeqNo());
ackEv->Record.SetChannelId(channel.Id);
ackEv->Record.SetFreeSpace(50_MB);
this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channel.Id);
txResult.TrailingResult.Swap(&resultSet);
LOG_E("staging TEvStreamData to " << Target << ", seqNo: " << computeData.Proto.GetSeqNo()
<< ", nRows: " << txResult.TrailingResult.rows().size());
}

return;
}

NYql::NDq::TDqSerializedBatch batch;
batch.Proto = std::move(*record.MutableChannelData()->MutableData());
if (batch.Proto.HasPayloadId()) {
batch.Payload = ev->Get()->GetPayload(batch.Proto.GetPayloadId());
}

YQL_ENSURE(channel.DstTask == 0);

if (Stats) {
Stats->ResultBytes += batch.Size();
Stats->ResultRows += batch.RowCount();
}

LOG_T("Got result, channelId: " << channel.Id << ", shardId: " << task.Meta.ShardId
<< ", inputIndex: " << channel.DstInputIndex << ", from: " << ev->Sender
<< ", finished: " << channelData.GetFinished());

ResponseEv->TakeResult(channel.DstInputIndex, std::move(batch));
LOG_T("Send ack to channelId: " << channel.Id << ", seqNo: " << record.GetSeqNo() << ", to: " << ev->Sender);

auto ackEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelDataAck>();
ackEv->Record.SetSeqNo(record.GetSeqNo());
ackEv->Record.SetChannelId(channel.Id);
ackEv->Record.SetFreeSpace(50_MB);
this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channel.Id);
}

void HandleStreamAck(TEvKqpExecuter::TEvStreamDataAck::TPtr& ev) {
ui64 queryResultIndex = ev->Get()->Record.GetQueryResultIndex();
auto it = ResultChannelToComputeActor.find(queryResultIndex);
YQL_ENSURE(it != ResultChannelToComputeActor.end());
const auto [channelComputeActorId, channelId] = it->second;

ui64 seqNo = ev->Get()->Record.GetSeqNo();
i64 freeSpace = ev->Get()->Record.GetFreeSpace();

LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId
<< ", send ack to channelId: " << channelId
<< ", seqNo: " << seqNo
<< ", enough: " << ev->Get()->Record.GetEnough()
<< ", freeSpace: " << freeSpace
<< ", to: " << channelComputeActorId);

auto ackEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelDataAck>();
ackEv->Record.SetSeqNo(seqNo);
ackEv->Record.SetChannelId(channelId);
ackEv->Record.SetFreeSpace(freeSpace);
ackEv->Record.SetFinish(ev->Get()->Record.GetEnough());
this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channelId);
}

void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
TActorId computeActor = ev->Sender;
auto& state = ev->Get()->Record;
Expand Down Expand Up @@ -1594,16 +1720,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
return true;
}

void InitializeChannelProxies() {
for(const auto& channel: TasksGraph.GetChannels()) {
if (channel.DstTask) {
continue;
}

CreateChannelProxy(channel);
}
}

const IKqpGateway::TKqpSnapshot& GetSnapshot() const {
return TasksGraph.GetMeta().Snapshot;
}
Expand Down Expand Up @@ -1753,8 +1869,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings;
TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
bool HasOlapTable = false;
bool StreamResult = false;
bool HasDatashardSourceScan = false;
bool UnknownAffectedShardCount = false;

THashMap<ui64, std::pair<TActorId, ui64>> ResultChannelToComputeActor;
THashMap<NYql::NDq::TStageId, THashMap<ui64, TShardInfo>> SourceScanStageIdToParititions;

private:
Expand Down
Loading

0 comments on commit ded3818

Please sign in to comment.