Skip to content

Commit

Permalink
YQ-3342 fix script results writing (ydb-platform#5851)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jun 24, 2024
1 parent df3736b commit 193ba44
Showing 1 changed file with 97 additions and 61 deletions.
158 changes: 97 additions & 61 deletions ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ namespace NKikimr::NKqp {
namespace {

constexpr ui32 LEASE_UPDATE_FREQUENCY = 2;
constexpr ui32 MAX_SAVE_RESULT_IN_FLIGHT = 1;

constexpr ui64 MIN_SAVE_RESULT_BATCH_SIZE = 5_MB;
constexpr i32 MIN_SAVE_RESULT_BATCH_ROWS = 5000;
constexpr ui64 RUN_SCRIPT_ACTOR_BUFFER_SIZE = 40_MB;

class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
enum class ERunState {
Expand All @@ -48,14 +51,20 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
UpdateLeaseEvent,
};

struct TPendingSaveResult {
ui32 ResultSetIndex;
ui64 FirstRow;
ui64 AccumulatedSize;
Ydb::ResultSet ResultSet;
struct TResultSetInfo {
bool Truncated = false;
ui64 RowCount = 0;
ui64 ByteCount = 0;
NJson::TJsonValue* Meta;

ui64 FirstRowId = 0;
ui64 AccumulatedSize = 0;
Ydb::ResultSet PendingResult;
};

struct TPendingAck {
TActorId ReplyActorId;
THolder<TEvKqpExecuter::TEvStreamDataAck> SaveResultResponse;
THolder<TEvKqpExecuter::TEvStreamDataAck> AckEvent;
};

public:
Expand Down Expand Up @@ -255,29 +264,55 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
PassAway();
}

void SendStreamDataResponse(TActorId replyActorId, THolder<TEvKqpExecuter::TEvStreamDataAck> saveResultResponse) const {
LOG_D("Send stream data ack"
<< ", seqNo: " << saveResultResponse->Record.GetSeqNo()
<< ", to: " << replyActorId);
void SendStreamDataResponse() {
if (PendingAcks.empty()) {
return;
}

if (PendingResultSetsSize > RUN_SCRIPT_ACTOR_BUFFER_SIZE) {
// Try to save any pending result
SaveResult();
}

Send(replyActorId, saveResultResponse.Release());
if (PendingResultSetsSize <= RUN_SCRIPT_ACTOR_BUFFER_SIZE) {
while (!PendingAcks.empty()) {
auto response = std::move(PendingAcks.front());
PendingAcks.pop();

LOG_D("Send stream data ack"
<< ", seqNo: " << response.AckEvent->Record.GetSeqNo()
<< ", to: " << response.ReplyActorId);

Send(response.ReplyActorId, response.AckEvent.Release());
}
}
}

void SaveResult() {
if (SaveResultInflight >= MAX_SAVE_RESULT_IN_FLIGHT || PendingSaveResults.empty()) {
void SaveResult(size_t resultSetId) {
if (SaveResultInflight) {
return;
}

if (!ExpireAt && ResultsTtl > TDuration::Zero()) {
ExpireAt = TInstant::Now() + ResultsTtl;
}

TPendingSaveResult& result = PendingSaveResults.back();
Register(CreateSaveScriptExecutionResultActor(SelfId(), Database, ExecutionId, result.ResultSetIndex, ExpireAt, result.FirstRow, result.AccumulatedSize, std::move(result.ResultSet)));
SendStreamDataResponse(result.ReplyActorId, std::move(result.SaveResultResponse));

PendingSaveResults.pop_back();
auto& resultSetInfo = ResultSetInfos[resultSetId];
Register(CreateSaveScriptExecutionResultActor(SelfId(), Database, ExecutionId, resultSetId, ExpireAt, resultSetInfo.FirstRowId, resultSetInfo.AccumulatedSize, std::move(resultSetInfo.PendingResult)));
SaveResultInflight++;
PendingResultSetsSize -= resultSetInfo.ByteCount - resultSetInfo.AccumulatedSize;
resultSetInfo.FirstRowId = resultSetInfo.RowCount;
resultSetInfo.AccumulatedSize = resultSetInfo.ByteCount;
resultSetInfo.PendingResult = Ydb::ResultSet();
}

void SaveResult() {
for (size_t resultSetId = 0; resultSetId < ResultSetInfos.size(); ++resultSetId) {
if (ResultSetInfos[resultSetId].PendingResult.rows_size()) {
SaveResult(resultSetId);
break;
}
}
}

void Handle(TEvKqpExecuter::TEvStreamData::TPtr& ev) {
Expand All @@ -299,57 +334,52 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {

auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex();

if (resultSetIndex >= ResultSetMetaArray.size()) {
if (resultSetIndex >= ResultSetInfos.size()) {
// we don't know result set count, so just accept all of them
// it's possible to have several result sets per script
// they can arrive in any order and may be missed for some indices
ResultSetRowCount.resize(resultSetIndex + 1);
ResultSetByteCount.resize(resultSetIndex + 1);
Truncated.resize(resultSetIndex + 1);
ResultSetMetaArray.resize(resultSetIndex + 1, nullptr);
ResultSetInfos.resize(resultSetIndex + 1);
}

bool saveResultRequired = false;
if (IsExecuting() && !Truncated[resultSetIndex]) {
auto& rowCount = ResultSetRowCount[resultSetIndex];
auto& byteCount = ResultSetByteCount[resultSetIndex];
auto firstRow = rowCount;
auto accumulatedSize = byteCount;
auto& resultSetInfo = ResultSetInfos[resultSetIndex];
if (IsExecuting() && !resultSetInfo.Truncated) {
auto& rowCount = resultSetInfo.RowCount;
auto& byteCount = resultSetInfo.ByteCount;

Ydb::ResultSet resultSet;
for (auto& row : *ev->Get()->Record.MutableResultSet()->mutable_rows()) {
if (QueryServiceConfig.GetScriptResultRowsLimit() && rowCount + 1 > QueryServiceConfig.GetScriptResultRowsLimit()) {
Truncated[resultSetIndex] = true;
resultSetInfo.Truncated = true;
break;
}

auto serializedSize = row.ByteSizeLong();
if (QueryServiceConfig.GetScriptResultSizeLimit() && byteCount + serializedSize > QueryServiceConfig.GetScriptResultSizeLimit()) {
Truncated[resultSetIndex] = true;
resultSetInfo.Truncated = true;
break;
}

rowCount++;
byteCount += serializedSize;
*resultSet.add_rows() = std::move(row);
PendingResultSetsSize += serializedSize;
*resultSetInfo.PendingResult.add_rows() = std::move(row);
}

bool newResultSet = ResultSetMetaArray[resultSetIndex] == nullptr;
if (newResultSet || Truncated[resultSetIndex]) {
bool newResultSet = resultSetInfo.Meta == nullptr;
if (newResultSet || resultSetInfo.Truncated) {
Ydb::Query::Internal::ResultSetMeta meta;
if (newResultSet) {
*meta.mutable_columns() = ev->Get()->Record.GetResultSet().columns();
}
if (Truncated[resultSetIndex]) {
if (resultSetInfo.Truncated) {
meta.set_truncated(true);
}

NJson::TJsonValue* value;
if (newResultSet) {
value = &ResultSetMetas[resultSetIndex];
ResultSetMetaArray[resultSetIndex] = value;
resultSetInfo.Meta = value;
} else {
value = ResultSetMetaArray[resultSetIndex];
value = resultSetInfo.Meta;
}
NProtobufJson::Proto2Json(meta, *value, NProtobufJson::TProto2JsonConfig());

Expand All @@ -362,24 +392,13 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
}
}

if (resultSet.rows_size() > 0) {
saveResultRequired = true;
PendingSaveResults.push_back({
resultSetIndex,
firstRow,
accumulatedSize,
std::move(resultSet),
ev->Sender,
std::move(resp)
});
if (ShouldSaveResult(resultSetInfo)) {
SaveResult(resultSetIndex);
}
}

if (saveResultRequired) {
SaveResult();
} else {
SendStreamDataResponse(ev->Sender, std::move(resp));
}
PendingAcks.push({.ReplyActorId = ev->Sender, .AckEvent = std::move(resp)});
SendStreamDataResponse();
}

void SaveResultMeta() {
Expand Down Expand Up @@ -504,9 +523,15 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
Status = ev->Get()->Status;
Issues.AddIssues(ev->Get()->Issues);
} else {
SaveResult();
for (size_t resultSetId = 0; resultSetId < ResultSetInfos.size(); ++resultSetId) {
if (ShouldSaveResult(ResultSetInfos[resultSetId])) {
SaveResult(resultSetId);
break;
}
}
}
}
SendStreamDataResponse();
CheckInflight();
}

Expand All @@ -533,6 +558,12 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
return;
}

if (PendingResultSetsSize) {
// Complete results saving
SaveResult();
return;
}

if (!LeaseUpdateQueryRunning) {
RunScriptExecutionFinisher();
} else {
Expand All @@ -545,7 +576,7 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
Status = status;

// if query has no results, save empty json array
if (ResultSetMetaArray.empty()) {
if (ResultSetInfos.empty()) {
ResultSetMetas.SetType(NJson::JSON_ARRAY);
SaveResultMeta();
SaveResultMetaInflight++;
Expand All @@ -566,6 +597,13 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
&& RunState != ERunState::Cancelling;
}

static bool ShouldSaveResult(TResultSetInfo& resultInfo) {
if (!resultInfo.PendingResult.rows_size()) {
return false;
}
return resultInfo.Truncated || resultInfo.PendingResult.rows_size() >= MIN_SAVE_RESULT_BATCH_ROWS || resultInfo.ByteCount - resultInfo.AccumulatedSize >= MIN_SAVE_RESULT_BATCH_SIZE;
}

private:
const TString ExecutionId;
NKikimrKqp::TEvQueryRequest Request;
Expand All @@ -589,16 +627,14 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;

// Result
std::vector<TPendingSaveResult> PendingSaveResults;
std::vector<ui64> ResultSetRowCount;
std::vector<ui64> ResultSetByteCount;
std::vector<bool> Truncated;
std::vector<NJson::TJsonValue*> ResultSetMetaArray;
std::vector<TResultSetInfo> ResultSetInfos;
std::queue<TPendingAck> PendingAcks;
TMaybe<TInstant> ExpireAt;
NJson::TJsonValue ResultSetMetas;
ui32 SaveResultInflight = 0;
ui32 SaveResultMetaInflight = 0;
bool PendingResultMeta = false;
ui64 PendingResultSetsSize = 0;
std::optional<TString> QueryPlan;
std::optional<TString> QueryAst;
std::optional<NKqpProto::TKqpStatsQuery> QueryStats;
Expand Down

0 comments on commit 193ba44

Please sign in to comment.