Skip to content

Commit

Permalink
Session actor perf (ydb-platform#724)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Dec 27, 2023
1 parent 74f9996 commit a196e8b
Show file tree
Hide file tree
Showing 17 changed files with 458 additions and 275 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/common/compilation/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::Ev
}

TKqpCompileResult::TConstPtr CompileResult;
NKqpProto::TKqpStatsCompile Stats;
TKqpStatsCompile Stats;
std::optional<TString> ReplayMessage;
std::optional<TString> ReplayMessageUserView;

Expand Down
6 changes: 6 additions & 0 deletions ydb/core/kqp/common/compilation/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,10 @@ struct TKqpCompileResult {

std::shared_ptr<const TPreparedQueryHolder> PreparedQuery;
};

struct TKqpStatsCompile {
bool FromCache = false;
ui64 DurationUs = 0;
ui64 CpuTimeUs = 0;
};
} // namespace NKikimr::NKqp
72 changes: 17 additions & 55 deletions ydb/core/kqp/common/kqp_ru_calc.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
#include "kqp_ru_calc.h"

#include <ydb/core/protos/kqp_stats.pb.h>

#include <ydb/core/kqp/executer_actor/kqp_executer_stats.h>

#include <ydb/core/kqp/common/kqp.h>

#include <util/generic/size_literals.h>
Expand All @@ -14,8 +10,6 @@ namespace NKikimr {
namespace NKqp {
namespace NRuCalc {

namespace {

ui64 CalcReadIORu(const TTableStat& stat) {
constexpr ui64 bytesPerUnit = 4_KB;
constexpr ui64 bytesPerUnitAdjust = bytesPerUnit - 1;
Expand All @@ -24,65 +18,33 @@ ui64 CalcReadIORu(const TTableStat& stat) {
return std::max(bytes, stat.Rows);
}

class TIoReadStat: public TTableStat {
public:
void Add(const NYql::NDqProto::TDqTableStats& tableAggr) {
Rows += tableAggr.GetReadRows();
Bytes += tableAggr.GetReadBytes();
}

ui64 CalcRu() const {
return CalcReadIORu(*this);
}
};
void TIoReadStat::Add(const NYql::NDqProto::TDqTableStats& tableAggr) {
Rows += tableAggr.GetReadRows();
Bytes += tableAggr.GetReadBytes();
}

class TIoWriteStat: public TTableStat {
public:
void Add(const NYql::NDqProto::TDqTableStats& tableAggr) {
Rows += tableAggr.GetWriteRows();
Rows += tableAggr.GetEraseRows();
Bytes += tableAggr.GetWriteBytes();
}
ui64 TIoReadStat::CalcRu() const {
return CalcReadIORu(*this);
}

ui64 CalcRu() const {
constexpr ui64 bytesPerUnit = 1_KB;
constexpr ui64 bytesPerUnitAdjust = bytesPerUnit - 1;
void TIoWriteStat::Add(const NYql::NDqProto::TDqTableStats& tableAggr) {
Rows += tableAggr.GetWriteRows();
Rows += tableAggr.GetEraseRows();
Bytes += tableAggr.GetWriteBytes();
}

auto bytes = (Bytes + bytesPerUnitAdjust) / bytesPerUnit;
return 2 * std::max(bytes, Rows);
}
};
ui64 TIoWriteStat::CalcRu() const {
constexpr ui64 bytesPerUnit = 1_KB;
constexpr ui64 bytesPerUnitAdjust = bytesPerUnit - 1;

auto bytes = (Bytes + bytesPerUnitAdjust) / bytesPerUnit;
return 2 * std::max(bytes, Rows);
}

ui64 CpuTimeToUnit(TDuration cpuTime) {
return std::floor(cpuTime.MicroSeconds() / 1500.0);
}

ui64 CalcRequestUnit(const NKqpProto::TKqpStatsQuery& stats) {
TDuration totalCpuTime;
TIoReadStat totalReadStat;
TIoWriteStat totalWriteStat;

for (const auto& exec : stats.GetExecutions()) {
totalCpuTime += TDuration::MicroSeconds(exec.GetCpuTimeUs());

for (auto& table : exec.GetTables()) {
totalReadStat.Add(table);
}
}

if (stats.HasCompilation()) {
totalCpuTime += TDuration::MicroSeconds(stats.GetCompilation().GetCpuTimeUs());
}

totalCpuTime += TDuration::MicroSeconds(stats.GetWorkerCpuTimeUs());

auto totalIoRu = totalReadStat.CalcRu() + totalWriteStat.CalcRu();

return std::max(std::max(CpuTimeToUnit(totalCpuTime), totalIoRu), (ui64)1);
}

ui64 CalcRequestUnit(const TProgressStatEntry& stats) {
auto ioRu = CalcReadIORu(stats.ReadIOStat);

Expand Down
18 changes: 17 additions & 1 deletion ydb/core/kqp/common/kqp_ru_calc.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
#include <util/system/types.h>
#include <util/datetime/base.h>

#include <ydb/core/protos/kqp_stats.pb.h>
#include <ydb/core/kqp/executer_actor/kqp_executer_stats.h>

namespace NKqpProto {
class TKqpStatsQuery;
}
Expand All @@ -14,8 +17,21 @@ struct TProgressStatEntry;

namespace NRuCalc {

ui64 CalcReadIORu(const TTableStat& stat);

class TIoReadStat: public TTableStat {
public:
void Add(const NYql::NDqProto::TDqTableStats& tableAggr);
ui64 CalcRu() const;
};

class TIoWriteStat: public TTableStat {
public:
void Add(const NYql::NDqProto::TDqTableStats& tableAggr);
ui64 CalcRu() const;
};

ui64 CpuTimeToUnit(TDuration cpuTimeUs);
ui64 CalcRequestUnit(const NKqpProto::TKqpStatsQuery& stats);
ui64 CalcRequestUnit(const TProgressStatEntry& stats);

} // namespace NRuCalc
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
ReplayMessage = std::nullopt;
ReplayMessageUserView = std::nullopt;
auto& stats = responseEv->Stats;
stats.SetFromCache(false);
stats.SetDurationUs((TInstant::Now() - StartTime).MicroSeconds());
stats.SetCpuTimeUs(CompileCpuTime.MicroSeconds());
stats.FromCache = false;
stats.DurationUs = (TInstant::Now() - StartTime).MicroSeconds();
stats.CpuTimeUs = CompileCpuTime.MicroSeconds();
Send(Owner, responseEv.Release());

Counters->ReportCompileFinish(DbCounters);
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
}

void Reply(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, ui64 cookie,
const TKqpStatsCompile& compileStats, const TActorContext& ctx, ui64 cookie,
NLWTrace::TOrbit orbit, NWilson::TSpan span, const std::optional<TString>& replayMessage = std::nullopt)
{
const auto& query = compileResult->Query;
Expand All @@ -953,7 +953,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
<< ", status:" << compileResult->Status);

auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(compileResult, std::move(orbit), replayMessage);
responseEv->Stats.CopyFrom(compileStats);
responseEv->Stats = compileStats;

if (span) {
span.End();
Expand All @@ -965,8 +965,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
void ReplyFromCache(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
NKqpProto::TKqpStatsCompile stats;
stats.SetFromCache(true);
TKqpStatsCompile stats;
stats.FromCache = true;

LWTRACK(KqpCompileServiceReplyFromCache, orbit);
Reply(sender, compileResult, stats, ctx, cookie, std::move(orbit), std::move(span));
Expand All @@ -976,7 +976,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
const TIssues& issues, const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
LWTRACK(KqpCompileServiceReplyError, orbit);
Reply(sender, TKqpCompileResult::Make(uid, status, issues, ETableReadType::Other), NKqpProto::TKqpStatsCompile(), ctx, cookie, std::move(orbit), std::move(span));
Reply(sender, TKqpCompileResult::Make(uid, status, issues, ETableReadType::Other), TKqpStatsCompile{}, ctx, cookie, std::move(orbit), std::move(span));
}

void ReplyInternalError(const TActorId& sender, const TString& uid, const TString& message,
Expand Down
53 changes: 35 additions & 18 deletions ydb/core/kqp/provider/yql_kikimr_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,41 +308,52 @@ class TKikimrTransactionContextBase : public TThrRefBase {

bool hasScheme = false;
bool hasData = false;
for (auto& pair : TableOperations) {
hasScheme = hasScheme || (pair.second & KikimrSchemeOps());
hasData = hasData || (pair.second & KikimrDataOps());
for (auto& [_, operation] : TableOperations) {
hasScheme = hasScheme || (operation & KikimrSchemeOps());
hasData = hasData || (operation & KikimrDataOps());
}

THashMap<TStringBuf, const NKqpProto::TKqpTableInfo*> tableInfoMap;
tableInfoMap.reserve(tableInfos.size());
if (TableByIdMap.empty()) {
TableByIdMap.reserve(tableInfos.size());
}
if (TableOperations.empty()) {
TableOperations.reserve(operations.size());
}

THashMap<TString, NKqpProto::TKqpTableInfo> tableInfoMap;
for (const auto& info : tableInfos) {
tableInfoMap.insert(std::make_pair(info.GetTableName(), info));
tableInfoMap.emplace(info.GetTableName(), &info);

TKikimrPathId pathId(info.GetTableId().GetOwnerId(), info.GetTableId().GetTableId());
TableByIdMap.insert(std::make_pair(pathId, info.GetTableName()));
TableByIdMap.emplace(pathId, info.GetTableName());
}

for (const auto& op : operations) {
auto table = op.GetTable();
const auto& table = [&]() -> const TString& {
const auto tempTable = TempTables.FindPtr(op.GetTable());
if (tempTable) {
return *tempTable;
} else {
return op.GetTable();
}
}();

auto newOp = TYdbOperation(op.GetOperation());
TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());

auto tempTable = TempTables.FindPtr(table);
if (tempTable) {
table = *tempTable;
}
const auto newOp = TYdbOperation(op.GetOperation());

const auto info = tableInfoMap.FindPtr(table);
if (!info) {
TString message = TStringBuilder()
<< "Unable to find table info for table '" << table << "'";
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_SCHEME_ERROR, message));
return {false, issues};
}

if (queryType == EKikimrQueryType::Dml && (newOp & KikimrSchemeOps())) {
TString message = TStringBuilder() << "Operation '" << newOp
<< "' can't be performed in data query";
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
return {false, issues};
}
Expand All @@ -351,6 +362,7 @@ class TKikimrTransactionContextBase : public TThrRefBase {
if (EffectiveIsolationLevel) {
TString message = TStringBuilder() << "Scheme operations can't be performed inside transaction, "
<< "operation: " << newOp;
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
return {false, issues};
}
Expand All @@ -359,13 +371,15 @@ class TKikimrTransactionContextBase : public TThrRefBase {
if (queryType == EKikimrQueryType::Ddl && (newOp & KikimrDataOps())) {
TString message = TStringBuilder() << "Operation '" << newOp
<< "' can't be performed in scheme query";
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
return {false, issues};
}

if (queryType == EKikimrQueryType::Scan && (newOp & KikimrModifyOps())) {
TString message = TStringBuilder() << "Operation '" << newOp
<< "' can't be performed in scan query";
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
return {false, issues};
}
Expand All @@ -379,26 +393,28 @@ class TKikimrTransactionContextBase : public TThrRefBase {
message = TStringBuilder() << message
<< " Use COMMIT statement to indicate end of transaction between scheme and data operations.";
}

const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_MIXED_SCHEME_DATA_TX, message));
return {false, issues};
}

if (Readonly && (newOp & KikimrModifyOps())) {
TString message = TStringBuilder() << "Operation '" << newOp
<< "' can't be performed in read only transaction";
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
return {false, issues};
}

auto& currentOps = TableOperations[table];
bool currentModify = currentOps & KikimrModifyOps();
const bool currentModify = currentOps & KikimrModifyOps();
if (currentModify) {
if (KikimrReadOps() & newOp) {
if (!EnableImmediateEffects) {
TString message = TStringBuilder() << "Data modifications previously made to table '" << table
<< "' in current transaction won't be seen by operation: '"
<< newOp << "'";
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
auto newIssue = AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_READ_MODIFIED_TABLE, message));
issues.AddIssue(newIssue);
return {false, issues};
Expand All @@ -407,10 +423,11 @@ class TKikimrTransactionContextBase : public TThrRefBase {
HasUncommittedChangesRead = true;
}

if (info->GetHasIndexTables()) {
if ((*info)->GetHasIndexTables()) {
if (!EnableImmediateEffects) {
TString message = TStringBuilder()
<< "Multiple modification of table with secondary indexes is not supported yet";
const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow());
issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
return {false, issues};
}
Expand All @@ -428,9 +445,9 @@ class TKikimrTransactionContextBase : public TThrRefBase {
virtual ~TKikimrTransactionContextBase() = default;

public:
THashMap<TString, TYdbOperations> TableOperations;
bool HasUncommittedChangesRead = false;
const bool EnableImmediateEffects;
THashMap<TString, TYdbOperations> TableOperations;
THashMap<TKikimrPathId, TString> TableByIdMap;
TMaybe<NKikimrKqp::EIsolationLevel> EffectiveIsolationLevel;
THashMap<TString, TString> TempTables;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) {
YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1,
"Unexpected prepared query version: " << compiledVersion);

CompileStats.Swap(&ev->Stats);
CompileStats = ev->Stats;
PreparedQuery = CompileResult->PreparedQuery;
if (ev->ReplayMessage) {
ReplayMessage = *ev->ReplayMessage;
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "kqp_query_stats.h"
#include "kqp_worker_common.h"

#include <ydb/library/actors/core/actor_bootstrapped.h>
Expand Down Expand Up @@ -85,7 +86,7 @@ class TKqpQueryState : public TNonCopyable {
ui64 ParametersSize = 0;
TPreparedQueryHolder::TConstPtr PreparedQuery;
TKqpCompileResult::TConstPtr CompileResult;
NKqpProto::TKqpStatsCompile CompileStats;
TKqpStatsCompile CompileStats;
TIntrusivePtr<TKqpTransactionContext> TxCtx;
TQueryData::TPtr QueryData;

Expand All @@ -97,8 +98,7 @@ class TKqpQueryState : public TNonCopyable {

TInstant StartTime;
NYql::TKikimrQueryDeadlines QueryDeadlines;

NKqpProto::TKqpStatsQuery Stats;
TKqpQueryStats QueryStats;
bool KeepSession = false;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
NActors::TMonotonic StartedAt;
Expand Down Expand Up @@ -235,7 +235,7 @@ class TKqpQueryState : public TNonCopyable {
}

bool NeedCheckTableVersions() const {
return CompileStats.GetFromCache();
return CompileStats.FromCache;
}

TString ExtractQueryText() const {
Expand Down
Loading

0 comments on commit a196e8b

Please sign in to comment.