Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-2918: add processing speed statistics to logs #2720

Merged
merged 3 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions ydb/core/fq/libs/compute/common/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,101 @@ TString FormatDurationUs(ui64 durationUs) {
return FormatDurationMs(durationUs / 1000);
}

namespace detail {

struct TDurationParser {
constexpr static bool IsDigit(char c) noexcept {
return '0' <= c && c <= '9';
}

constexpr std::string_view ConsumeLastFraction() noexcept {
ConsumeWhitespace();
if (Src.empty()) {
return Src;
}

auto it = Src.end() - 1;
while (true) {
// we rely on non-empty number before fraction
if (IsDigit(*it) || it == Src.begin()) {
++it;
break;
}
--it;
}
auto start = it - Src.begin();
auto res = Src.substr(start);
Src = Src.substr(0, start);
return res;
}

constexpr ui32 ConsumeNumberPortion() noexcept {
ui32 dec = 1;
ui32 res = 0;
while (!Src.empty() && IsDigit(Src.back())) {
res += (Src.back() - '0') * dec;
dec *= 10;
Src.remove_suffix(1);
}
return res;
}

constexpr void ConsumeWhitespace() noexcept {
while (!Src.empty() && Src.back() == ' ') {
Src.remove_suffix(1);
}
}

constexpr std::chrono::microseconds ParseDuration() {
auto fraction = ConsumeLastFraction();
if (fraction == "us") {
return std::chrono::microseconds{ConsumeNumberPortion()};
} else if (fraction == "ms") {
return std::chrono::milliseconds{ConsumeNumberPortion()};
}

std::chrono::microseconds result{};
if (fraction == "s") {
auto part = ConsumeNumberPortion();
if (!Src.empty() && Src.back() == '.') {
// parsed milliseconds (cantiseconds actually)
part *= 10;
result += std::chrono::milliseconds(part);

Src.remove_suffix(1);
result += std::chrono::seconds(ConsumeNumberPortion());
} else {
result += std::chrono::seconds{part};
}
fraction = ConsumeLastFraction();
}
if (fraction == "m") {
result += std::chrono::minutes{ConsumeNumberPortion()};
fraction = ConsumeLastFraction();
}

if (fraction == "h") {
result += std::chrono::hours{ConsumeNumberPortion()};
}
return result;
}

std::string_view Src;
};

static_assert(TDurationParser("41us").ParseDuration() == TDuration::MicroSeconds(41));
EgorkaZ marked this conversation as resolved.
Show resolved Hide resolved
static_assert(TDurationParser("0us").ParseDuration() == TDuration::MicroSeconds(0));
static_assert(TDurationParser("1ms").ParseDuration() == TDuration::MilliSeconds(1));
static_assert(TDurationParser("33ms").ParseDuration() == TDuration::MilliSeconds(33));
static_assert(TDurationParser("1s").ParseDuration() == TDuration::Seconds(1));
static_assert(TDurationParser("31.02s").ParseDuration() == TDuration::Seconds(31) + TDuration::MilliSeconds(20));
static_assert(TDurationParser("1h 02m 3.45s").ParseDuration() == TDuration::Hours(1) + TDuration::Minutes(2) + TDuration::MilliSeconds(3450));
}

TDuration ParseDuration(TStringBuf str) {
return detail::TDurationParser{.Src = str}.ParseDuration();
}

TString FormatInstant(TInstant instant) {
TStringBuilder builder;
builder << instant.FormatLocalTime("%H:%M:%S.");
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/compute/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ TString GetV1StatFromV2PlanV2(const TString& plan);
TString FormatDurationMs(ui64 durationMs);
TString FormatDurationUs(ui64 durationUs);
TString FormatInstant(TInstant instant);
TDuration ParseDuration(TStringBuf str);

struct TPublicStat {
std::optional<int> MemoryUsageBytes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ TPingTaskParams ConstructHardPingTask(
if (request.statistics()) {
TString statistics = request.statistics();
internal.clear_statistics();
PackStatisticsToProtobuf(*internal.mutable_statistics(), statistics);
PackStatisticsToProtobuf(*internal.mutable_statistics(), statistics, TInstant::Now() - NProtoInterop::CastFromProto(job.meta().created_at()));

if (!dumpRawStatistics) {
try {
Expand Down
22 changes: 17 additions & 5 deletions ydb/core/fq/libs/control_plane_storage/internal/ut/utils_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace {
void ValidateStats(std::string_view statisticsStr, const std::unordered_map<std::string_view, i64>& expected) {
FederatedQuery::Internal::QueryInternal internal;
auto statisticsPtr = internal.mutable_statistics();
PackStatisticsToProtobuf(*statisticsPtr, statisticsStr);
PackStatisticsToProtobuf(*statisticsPtr, statisticsStr, TDuration::MicroSeconds(expected.at("ExecutionTimeUs")));

for (const auto& statsElement : *statisticsPtr) {
const auto& name = statsElement.name();
Expand All @@ -32,19 +32,27 @@ Y_UNIT_TEST_SUITE(ParseStats) {
{"IngressBytes", 53},
{"InputBytes", 30},
{"OutputBytes", 60},
{"S3Source", 53}};
{"S3Source", 53},
{"ExecutionTimeUs", 1234}};

expectedS3Source["CpuTimeUs"] = (TDuration::Seconds(2) + TDuration::MilliSeconds(410)).MicroSeconds();
ValidateStats(v1S3Source, expectedS3Source);
expectedS3Source["CpuTimeUs"] = TDuration::MilliSeconds(7).MicroSeconds();
ValidateStats(v2S3Source, expectedS3Source);
}

Y_UNIT_TEST(ParseJustOutput) {
auto v1Output = NResource::Find("v1_output.json");
auto v2Output = NResource::Find("v2_output.json");

std::unordered_map<std::string_view, i64> expectedOutput{{"OutputBytes", 3}};
std::unordered_map<std::string_view, i64> expectedOutput{
{"OutputBytes", 3},
{"CpuTimeUs", TDuration::MilliSeconds(47).MicroSeconds()},
{"ExecutionTimeUs", 4321}};

expectedOutput["CpuTimeUs"] = TDuration::MilliSeconds(47).MicroSeconds();
ValidateStats(v1Output, expectedOutput);
expectedOutput["CpuTimeUs"] = 534;
ValidateStats(v2Output, expectedOutput);
}

Expand All @@ -54,7 +62,9 @@ Y_UNIT_TEST_SUITE(ParseStats) {
{"OutputBytes", 129},
{"InputBytes", 76},
{"IngressBytes", 106},
{"S3Source", 106}
{"S3Source", 106},
{"CpuTimeUs", (TDuration::Seconds(2) + TDuration::MilliSeconds(570)).MicroSeconds()},
{"ExecutionTimeUs", 0}
};
ValidateStats(v1TwoResults, expectedOutput);
}
Expand All @@ -65,7 +75,9 @@ Y_UNIT_TEST_SUITE(ParseStats) {
{"OutputBytes", 106},
{"InputBytes", 53},
{"IngressBytes", 106},
{"S3Source", 106}
{"S3Source", 106},
{"CpuTimeUs", TDuration::MilliSeconds(2).MicroSeconds()},
{"ExecutionTimeUs", 42}
};
ValidateStats(v2TwoResults, expectedOutput);
}
Expand Down
56 changes: 45 additions & 11 deletions ydb/core/fq/libs/control_plane_storage/internal/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <contrib/libs/fmt/include/fmt/format.h>
#include <library/cpp/json/yson/json2yson.h>

#include <ydb/core/fq/libs/compute/common/utils.h>
#include <ydb/core/metering/bill_record.h>
#include <ydb/core/metering/metering.h>
#include <ydb/public/lib/fq/scope.h>
Expand Down Expand Up @@ -336,27 +337,31 @@ void AggregateStatisticsBySources(const NJson::TJsonValue& root, std::unordered_
}
if (auto valuePtr = partStats.GetValueByPath(ingressPath)) {
TString valueKey{partKey, matchedPrefix.size(), partKey.size() - matchedPrefix.size()};
i64 value = valuePtr->GetIntegerSafe();
aggregatedStats[valueKey] += value;
aggregatedStats[valueKey] += valuePtr->GetIntegerSafe();
break;
}
}
}
}

void CollectTotalStatistics(const NJson::TJsonValue& stats, std::unordered_map<TString, i64>& aggregatedStatistics) {
using namespace std::string_view_literals;
auto fieldToPath = {
std::make_pair(TString{"IngressBytes"}, "IngressBytes.sum"sv),
constexpr std::initializer_list<std::pair<std::string_view, std::string_view>> FieldToPath = {
std::pair("IngressBytes"sv, "IngressBytes.sum"sv),
{"EgressBytes", "EgressBytes.sum"},
{"InputBytes", "InputBytes.sum"},
{"OutputBytes", "OutputBytes.sum"}};
{"OutputBytes", "OutputBytes.sum"},
{"CpuTimeUs", "CpuTimeUs.sum"},
{"ExecutionTimeUs", "ExecutionTimeUs.sum"}};

void CollectTotalStatistics(const NJson::TJsonValue& stats, std::unordered_map<TString, i64>& aggregatedStatistics) {
for (const auto& [rootKey, graph] : stats.GetMap()) {
bool isV1 = rootKey.find('=') != TString::npos;
for (auto [field, path] : fieldToPath) {
for (auto [field, path] : FieldToPath) {
if (auto jsonField = graph.GetValueByPath(fmt::format("{}{}", (isV1 ? "TaskRunner.Stage=Total." : ""), path))) {
aggregatedStatistics[field] += jsonField->GetIntegerSafe();
if (jsonField->IsInteger()) {
aggregatedStatistics[TString{field}] += jsonField->GetInteger();
} else {
aggregatedStatistics[TString{field}] += ParseDuration(jsonField->GetStringSafe()).MicroSeconds();
}
}
}
}
Expand All @@ -367,9 +372,34 @@ void CollectDetalizationStatistics(const NJson::TJsonValue& stats, std::unordere
AggregateStatisticsBySources(graph, aggregatedStatistics);
}
}

bool IsIngressStat(TStringBuf statName) {
return std::none_of(FieldToPath.begin() + 1, FieldToPath.end(), [&](const auto& field_to_path) { return field_to_path.first == statName; });
}

void PrintSpeeds(TStringBuilder& builder, const StatsValuesList& stats, std::string_view postfix, TDuration execTime) {
for (const auto& [statName, value] : stats) {
if (!IsIngressStat(statName)) {
continue;
}
// getting bytes/second = 1'000'000 * bytes/microsecond
auto speed = (value * 1000000.) / std::max(execTime.MicroSeconds(), ui64{1});
builder << ", \"" << statName << postfix << "\": " << speed;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

размерность бы к скорости дописать bytes/sec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

У нас и так названия полей выглядят как IngressBytesPerSecond / IngressBytesPerCpuPerSecond, плюс тут печаетется жсон, проще жить, когда его не надо дополнительно парсить

}
}

void PrintSpeeds(TStringBuilder& builder, const StatsValuesList& stats) {
for (const auto& [statName, stat] : stats) {
if (statName == "ExecutionTimeUs") {
PrintSpeeds(builder, stats, "PerSecond", TDuration::MicroSeconds(stat));
} else if (statName == "CpuTimeUs") {
PrintSpeeds(builder, stats, "PerCpuPerSecond", TDuration::MicroSeconds(stat));
}
}
}
}

void PackStatisticsToProtobuf(google::protobuf::RepeatedPtrField<FederatedQuery::Internal::StatisticsNamedValue>& dest, std::string_view statsStr) {
void PackStatisticsToProtobuf(google::protobuf::RepeatedPtrField<FederatedQuery::Internal::StatisticsNamedValue>& dest, std::string_view statsStr, TDuration executionTime) {
NJson::TJsonValue statsJson;
if (!NJson::ReadJsonFastTree(statsStr, &statsJson)) {
return;
Expand All @@ -382,6 +412,7 @@ void PackStatisticsToProtobuf(google::protobuf::RepeatedPtrField<FederatedQuery:
std::unordered_map<TString, i64> aggregatedStatistics;
CollectTotalStatistics(statsJson, aggregatedStatistics);
CollectDetalizationStatistics(statsJson, aggregatedStatistics);
aggregatedStatistics["ExecutionTimeUs"] = executionTime.MicroSeconds();

for (auto [field, stat] : aggregatedStatistics) {
auto newStat = dest.Add();
Expand All @@ -401,13 +432,16 @@ StatsValuesList ExtractStatisticsFromProtobuf(const google::protobuf::RepeatedPt

TStringBuilder& operator<<(TStringBuilder& builder, const Statistics& statistics) {
bool first = true;
builder << '{';
for (const auto& [field, value] : statistics.Stats) {
if (!first) {
builder << ", ";
}
builder << field << ": [" << value << "]";
builder << '"' << field << "\": " << value;
first = false;
}
PrintSpeeds(builder, statistics.Stats);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Нужно две чиселки печатать. Одна скорость по статистике, а другая скорость на все выполнение query. Полное время выполнения query можно из query.meta достать

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Времени выполнения по статистике нет в v2, сейчас вычисляется из времени начала в job.meta(). Вторая чиселка из CpuTimeUs в статистике: считаем скорость per cpu

builder << '}';
return builder;
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/control_plane_storage/internal/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ NYql::TIssues ValidateCreateOrDeleteRateLimiterResource(const TString& queryId,
std::vector<TString> GetMeteringRecords(const TString& statistics, bool billable, const TString& jobId, const TString& scope, const TString& sourceId);
TString GetPrettyStatistics(const TString& statistics);

void PackStatisticsToProtobuf(google::protobuf::RepeatedPtrField<FederatedQuery::Internal::StatisticsNamedValue>& dest, std::string_view statsStr);
void PackStatisticsToProtobuf(google::protobuf::RepeatedPtrField<FederatedQuery::Internal::StatisticsNamedValue>& dest, std::string_view statsStr, TDuration executionTime);

using StatsValuesList = std::vector<std::pair<TString, ui64>>;

Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/control_plane_storage/internal/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ PEERDIR(
ydb/core/base
ydb/core/metering
ydb/core/fq/libs/common
ydb/core/fq/libs/compute/common
ydb/core/fq/libs/config
ydb/core/fq/libs/control_plane_storage/proto
ydb/core/fq/libs/exceptions
Expand Down
Loading