Skip to content

Commit

Permalink
YQ-2918: add processing speed statistics to logs (#2720)
Browse files Browse the repository at this point in the history
  • Loading branch information
EgorkaZ authored Mar 18, 2024
1 parent d937d09 commit c82b9ce
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 18 deletions.
13 changes: 13 additions & 0 deletions ydb/core/fq/libs/compute/common/ut/utils_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,17 @@ Y_UNIT_TEST_SUITE(FormatTimes) {
UNIT_ASSERT_EQUAL(NFq::FormatDurationMs(6'000'000'000), "1666h");
UNIT_ASSERT_EQUAL(NFq::FormatDurationMs(1'000'000'000'000), "277777h");
}

Y_UNIT_TEST(ParseDuration) {
UNIT_ASSERT_EQUAL(NFq::ParseDuration("41us"), TDuration::MicroSeconds(41));
UNIT_ASSERT_EQUAL(NFq::ParseDuration("0us"), TDuration::MicroSeconds(0));
UNIT_ASSERT_EQUAL(NFq::ParseDuration("1ms"), TDuration::MilliSeconds(1));
UNIT_ASSERT_EQUAL(NFq::ParseDuration("33ms"), TDuration::MilliSeconds(33));
UNIT_ASSERT_EQUAL(NFq::ParseDuration("1s"), TDuration::Seconds(1));
UNIT_ASSERT_EQUAL(NFq::ParseDuration("1.00s"), TDuration::Seconds(1));
UNIT_ASSERT_EQUAL(NFq::ParseDuration("0.11s"), TDuration::MilliSeconds(110));
UNIT_ASSERT_EQUAL(NFq::ParseDuration("31.02s"), TDuration::Seconds(31) + TDuration::MilliSeconds(20));
UNIT_ASSERT_EQUAL(NFq::ParseDuration("1h 02m 3.45s"), TDuration::Hours(1) + TDuration::Minutes(2) + TDuration::MilliSeconds(3450));
UNIT_ASSERT_EQUAL(NFq::ParseDuration("3000h"), TDuration::Hours(3000));
}
}
87 changes: 87 additions & 0 deletions ydb/core/fq/libs/compute/common/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,93 @@ TString FormatDurationUs(ui64 durationUs) {
return FormatDurationMs(durationUs / 1000);
}

namespace detail {

struct TDurationParser {
constexpr static bool IsDigit(char c) noexcept {
return '0' <= c && c <= '9';
}

constexpr std::string_view ConsumeLastFraction() noexcept {
ConsumeWhitespace();
if (Src.empty()) {
return Src;
}

auto it = Src.end() - 1;
while (true) {
// we rely on non-empty number before fraction
if (IsDigit(*it) || it == Src.begin()) {
++it;
break;
}
--it;
}
auto start = it - Src.begin();
auto res = Src.substr(start);
Src = Src.substr(0, start);
return res;
}

constexpr ui32 ConsumeNumberPortion() noexcept {
ui32 dec = 1;
ui32 res = 0;
while (!Src.empty() && IsDigit(Src.back())) {
res += (Src.back() - '0') * dec;
dec *= 10;
Src.remove_suffix(1);
}
return res;
}

constexpr void ConsumeWhitespace() noexcept {
while (!Src.empty() && Src.back() == ' ') {
Src.remove_suffix(1);
}
}

constexpr std::chrono::microseconds ParseDuration() {
auto fraction = ConsumeLastFraction();
if (fraction == "us") {
return std::chrono::microseconds{ConsumeNumberPortion()};
} else if (fraction == "ms") {
return std::chrono::milliseconds{ConsumeNumberPortion()};
}

std::chrono::microseconds result{};
if (fraction == "s") {
auto part = ConsumeNumberPortion();
if (!Src.empty() && Src.back() == '.') {
// parsed milliseconds (cantiseconds actually)
part *= 10;
result += std::chrono::milliseconds(part);

Src.remove_suffix(1);
result += std::chrono::seconds(ConsumeNumberPortion());
} else {
result += std::chrono::seconds{part};
}
fraction = ConsumeLastFraction();
}
if (fraction == "m") {
result += std::chrono::minutes{ConsumeNumberPortion()};
fraction = ConsumeLastFraction();
}

if (fraction == "h") {
result += std::chrono::hours{ConsumeNumberPortion()};
}
return result;
}

std::string_view Src;
};
}

TDuration ParseDuration(TStringBuf str) {
return detail::TDurationParser{.Src = str}.ParseDuration();
}

TString FormatInstant(TInstant instant) {
TStringBuilder builder;
builder << instant.FormatLocalTime("%H:%M:%S.");
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/compute/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ TString GetV1StatFromV2PlanV2(const TString& plan);
TString FormatDurationMs(ui64 durationMs);
TString FormatDurationUs(ui64 durationUs);
TString FormatInstant(TInstant instant);
TDuration ParseDuration(TStringBuf str);

struct TPublicStat {
std::optional<int> MemoryUsageBytes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ TPingTaskParams ConstructHardPingTask(
if (request.statistics()) {
TString statistics = request.statistics();
internal.clear_statistics();
PackStatisticsToProtobuf(*internal.mutable_statistics(), statistics);
PackStatisticsToProtobuf(*internal.mutable_statistics(), statistics, TInstant::Now() - NProtoInterop::CastFromProto(job.meta().created_at()));

// global dumpRawStatistics will be removed with YQv1
if (!dumpRawStatistics && !request.dump_raw_statistics()) {
Expand Down
22 changes: 17 additions & 5 deletions ydb/core/fq/libs/control_plane_storage/internal/ut/utils_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace {
void ValidateStats(std::string_view statisticsStr, const std::unordered_map<std::string_view, i64>& expected) {
FederatedQuery::Internal::QueryInternal internal;
auto statisticsPtr = internal.mutable_statistics();
PackStatisticsToProtobuf(*statisticsPtr, statisticsStr);
PackStatisticsToProtobuf(*statisticsPtr, statisticsStr, TDuration::MicroSeconds(expected.at("ExecutionTimeUs")));

for (const auto& statsElement : *statisticsPtr) {
const auto& name = statsElement.name();
Expand All @@ -32,19 +32,27 @@ Y_UNIT_TEST_SUITE(ParseStats) {
{"IngressBytes", 53},
{"InputBytes", 30},
{"OutputBytes", 60},
{"S3Source", 53}};
{"S3Source", 53},
{"ExecutionTimeUs", 1234}};

expectedS3Source["CpuTimeUs"] = (TDuration::Seconds(2) + TDuration::MilliSeconds(410)).MicroSeconds();
ValidateStats(v1S3Source, expectedS3Source);
expectedS3Source["CpuTimeUs"] = TDuration::MilliSeconds(7).MicroSeconds();
ValidateStats(v2S3Source, expectedS3Source);
}

Y_UNIT_TEST(ParseJustOutput) {
auto v1Output = NResource::Find("v1_output.json");
auto v2Output = NResource::Find("v2_output.json");

std::unordered_map<std::string_view, i64> expectedOutput{{"OutputBytes", 3}};
std::unordered_map<std::string_view, i64> expectedOutput{
{"OutputBytes", 3},
{"CpuTimeUs", TDuration::MilliSeconds(47).MicroSeconds()},
{"ExecutionTimeUs", 4321}};

expectedOutput["CpuTimeUs"] = TDuration::MilliSeconds(47).MicroSeconds();
ValidateStats(v1Output, expectedOutput);
expectedOutput["CpuTimeUs"] = 534;
ValidateStats(v2Output, expectedOutput);
}

Expand All @@ -54,7 +62,9 @@ Y_UNIT_TEST_SUITE(ParseStats) {
{"OutputBytes", 129},
{"InputBytes", 76},
{"IngressBytes", 106},
{"S3Source", 106}
{"S3Source", 106},
{"CpuTimeUs", (TDuration::Seconds(2) + TDuration::MilliSeconds(570)).MicroSeconds()},
{"ExecutionTimeUs", 0}
};
ValidateStats(v1TwoResults, expectedOutput);
}
Expand All @@ -65,7 +75,9 @@ Y_UNIT_TEST_SUITE(ParseStats) {
{"OutputBytes", 106},
{"InputBytes", 53},
{"IngressBytes", 106},
{"S3Source", 106}
{"S3Source", 106},
{"CpuTimeUs", TDuration::MilliSeconds(2).MicroSeconds()},
{"ExecutionTimeUs", 42}
};
ValidateStats(v2TwoResults, expectedOutput);
}
Expand Down
56 changes: 45 additions & 11 deletions ydb/core/fq/libs/control_plane_storage/internal/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <contrib/libs/fmt/include/fmt/format.h>
#include <library/cpp/json/yson/json2yson.h>

#include <ydb/core/fq/libs/compute/common/utils.h>
#include <ydb/core/metering/bill_record.h>
#include <ydb/core/metering/metering.h>
#include <ydb/public/lib/fq/scope.h>
Expand Down Expand Up @@ -336,27 +337,31 @@ void AggregateStatisticsBySources(const NJson::TJsonValue& root, std::unordered_
}
if (auto valuePtr = partStats.GetValueByPath(ingressPath)) {
TString valueKey{partKey, matchedPrefix.size(), partKey.size() - matchedPrefix.size()};
i64 value = valuePtr->GetIntegerSafe();
aggregatedStats[valueKey] += value;
aggregatedStats[valueKey] += valuePtr->GetIntegerSafe();
break;
}
}
}
}

void CollectTotalStatistics(const NJson::TJsonValue& stats, std::unordered_map<TString, i64>& aggregatedStatistics) {
using namespace std::string_view_literals;
auto fieldToPath = {
std::make_pair(TString{"IngressBytes"}, "IngressBytes.sum"sv),
constexpr std::initializer_list<std::pair<std::string_view, std::string_view>> FieldToPath = {
std::pair("IngressBytes"sv, "IngressBytes.sum"sv),
{"EgressBytes", "EgressBytes.sum"},
{"InputBytes", "InputBytes.sum"},
{"OutputBytes", "OutputBytes.sum"}};
{"OutputBytes", "OutputBytes.sum"},
{"CpuTimeUs", "CpuTimeUs.sum"},
{"ExecutionTimeUs", "ExecutionTimeUs.sum"}};

void CollectTotalStatistics(const NJson::TJsonValue& stats, std::unordered_map<TString, i64>& aggregatedStatistics) {
for (const auto& [rootKey, graph] : stats.GetMap()) {
bool isV1 = rootKey.find('=') != TString::npos;
for (auto [field, path] : fieldToPath) {
for (auto [field, path] : FieldToPath) {
if (auto jsonField = graph.GetValueByPath(fmt::format("{}{}", (isV1 ? "TaskRunner.Stage=Total." : ""), path))) {
aggregatedStatistics[field] += jsonField->GetIntegerSafe();
if (jsonField->IsInteger()) {
aggregatedStatistics[TString{field}] += jsonField->GetInteger();
} else {
aggregatedStatistics[TString{field}] += ParseDuration(jsonField->GetStringSafe()).MicroSeconds();
}
}
}
}
Expand All @@ -367,9 +372,34 @@ void CollectDetalizationStatistics(const NJson::TJsonValue& stats, std::unordere
AggregateStatisticsBySources(graph, aggregatedStatistics);
}
}

bool IsIngressStat(TStringBuf statName) {
return std::none_of(FieldToPath.begin() + 1, FieldToPath.end(), [&](const auto& field_to_path) { return field_to_path.first == statName; });
}

void PrintSpeeds(TStringBuilder& builder, const StatsValuesList& stats, std::string_view postfix, TDuration execTime) {
for (const auto& [statName, value] : stats) {
if (!IsIngressStat(statName)) {
continue;
}
// getting bytes/second = 1'000'000 * bytes/microsecond
auto speed = (value * 1000000.) / std::max(execTime.MicroSeconds(), ui64{1});
builder << ", \"" << statName << postfix << "\": " << speed;
}
}

void PrintSpeeds(TStringBuilder& builder, const StatsValuesList& stats) {
for (const auto& [statName, stat] : stats) {
if (statName == "ExecutionTimeUs") {
PrintSpeeds(builder, stats, "PerSecond", TDuration::MicroSeconds(stat));
} else if (statName == "CpuTimeUs") {
PrintSpeeds(builder, stats, "PerCpuPerSecond", TDuration::MicroSeconds(stat));
}
}
}
}

void PackStatisticsToProtobuf(google::protobuf::RepeatedPtrField<FederatedQuery::Internal::StatisticsNamedValue>& dest, std::string_view statsStr) {
void PackStatisticsToProtobuf(google::protobuf::RepeatedPtrField<FederatedQuery::Internal::StatisticsNamedValue>& dest, std::string_view statsStr, TDuration executionTime) {
NJson::TJsonValue statsJson;
if (!NJson::ReadJsonFastTree(statsStr, &statsJson)) {
return;
Expand All @@ -382,6 +412,7 @@ void PackStatisticsToProtobuf(google::protobuf::RepeatedPtrField<FederatedQuery:
std::unordered_map<TString, i64> aggregatedStatistics;
CollectTotalStatistics(statsJson, aggregatedStatistics);
CollectDetalizationStatistics(statsJson, aggregatedStatistics);
aggregatedStatistics["ExecutionTimeUs"] = executionTime.MicroSeconds();

for (auto [field, stat] : aggregatedStatistics) {
auto newStat = dest.Add();
Expand All @@ -401,13 +432,16 @@ StatsValuesList ExtractStatisticsFromProtobuf(const google::protobuf::RepeatedPt

TStringBuilder& operator<<(TStringBuilder& builder, const Statistics& statistics) {
bool first = true;
builder << '{';
for (const auto& [field, value] : statistics.Stats) {
if (!first) {
builder << ", ";
}
builder << field << ": [" << value << "]";
builder << '"' << field << "\": " << value;
first = false;
}
PrintSpeeds(builder, statistics.Stats);
builder << '}';
return builder;
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/control_plane_storage/internal/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ NYql::TIssues ValidateCreateOrDeleteRateLimiterResource(const TString& queryId,
std::vector<TString> GetMeteringRecords(const TString& statistics, bool billable, const TString& jobId, const TString& scope, const TString& sourceId);
TString GetPrettyStatistics(const TString& statistics);

void PackStatisticsToProtobuf(google::protobuf::RepeatedPtrField<FederatedQuery::Internal::StatisticsNamedValue>& dest, std::string_view statsStr);
void PackStatisticsToProtobuf(google::protobuf::RepeatedPtrField<FederatedQuery::Internal::StatisticsNamedValue>& dest, std::string_view statsStr, TDuration executionTime);

using StatsValuesList = std::vector<std::pair<TString, ui64>>;

Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/control_plane_storage/internal/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ PEERDIR(
ydb/core/base
ydb/core/metering
ydb/core/fq/libs/common
ydb/core/fq/libs/compute/common
ydb/core/fq/libs/config
ydb/core/fq/libs/control_plane_storage/proto
ydb/core/fq/libs/exceptions
Expand Down

0 comments on commit c82b9ce

Please sign in to comment.