From 56cfe1d9c3fba653f1307c2417007a09489a6cd5 Mon Sep 17 00:00:00 2001 From: Innokentii Mokin Date: Fri, 29 Dec 2023 13:28:18 +0000 Subject: [PATCH] Publish full rl api to oss --- .../grpc_services/rpc_rate_limiter_api.cpp | 161 ++++++++++++++++++ ydb/public/api/protos/ydb_rate_limiter.proto | 85 +++++++++ 2 files changed, 246 insertions(+) diff --git a/ydb/core/grpc_services/rpc_rate_limiter_api.cpp b/ydb/core/grpc_services/rpc_rate_limiter_api.cpp index 274f137b0926..c7ab9a8d3121 100644 --- a/ydb/core/grpc_services/rpc_rate_limiter_api.cpp +++ b/ydb/core/grpc_services/rpc_rate_limiter_api.cpp @@ -26,6 +26,33 @@ class TRateLimiterRequest : public TRpcOperationRequestActor , TrustedZone(trusted) {} + static bool ValidateMetric (const Ydb::RateLimiter::MeteringConfig::Metric& srcMetric, Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) { + static const TSet supportedFields{ + {"version"}, + {"schema"}, + {"cloud_id"}, + {"folder_id"}, + {"resource_id"}, + {"source_id"}, + }; + auto& metricFields = srcMetric.metric_fields().fields(); + + for (auto& [key, field] : metricFields) { + if (supportedFields.count(key) == 0) { + status = StatusIds::BAD_REQUEST; + issues.AddIssue(TStringBuilder() << "Unsupported key for metric. Key: " << key << "."); + return false; + } + if (!field.has_string_value()) { + status = StatusIds::BAD_REQUEST; + issues.AddIssue(TStringBuilder() << "Unsupported type for metric. Key: " << key << "."); + return false; + } + } + + return true; + }; + bool ValidateResource(const Ydb::RateLimiter::Resource& resource, Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) { if (!ValidateResourcePath(resource.resource_path(), status, issues)) { return false; @@ -37,6 +64,39 @@ class TRateLimiterRequest : public TRpcOperationRequestActor return false; } + if (resource.has_metering_config()) { + auto self = static_cast(this); + const auto& userTokenStr = self->Request_->GetSerializedToken(); + bool allowed = false; + if (userTokenStr) { + NACLib::TUserToken userToken(userTokenStr); + for (auto &sid : AppData()->AdministrationAllowedSIDs) { + if (userToken.IsExist(sid)) { + allowed = true; + break; + } + } + } + + if (!allowed) { + status = StatusIds::UNAUTHORIZED; + issues.AddIssue("Setting metering is allowed only for administrators"); + return false; + } + + const auto& acc = resource.metering_config(); + + if (acc.has_provisioned() && !ValidateMetric(acc.provisioned(), status, issues)) { + return false; + } + if (acc.has_on_demand() && !ValidateMetric(acc.on_demand(), status, issues)) { + return false; + } + if (acc.has_overshoot() && !ValidateMetric(acc.overshoot(), status, issues)) { + return false; + } + } + return true; } @@ -211,6 +271,64 @@ static void CopyProps(const Ydb::RateLimiter::Resource& src, NKikimrKesus::TStre props.SetMaxBurstSizeCoefficient(srcProps.max_burst_size_coefficient()); props.SetPrefetchCoefficient(srcProps.prefetch_coefficient()); props.SetPrefetchWatermark(srcProps.prefetch_watermark()); + if (srcProps.has_immediately_fill_up_to()) { + props.SetImmediatelyFillUpTo(srcProps.immediately_fill_up_to()); + } + if (src.has_metering_config()) { + const auto& srcAcc = src.metering_config(); + auto& acc = *dst.MutableAccountingConfig(); + acc.SetEnabled(srcAcc.enabled()); + acc.SetReportPeriodMs(srcAcc.report_period_ms()); + acc.SetAccountPeriodMs(srcAcc.meter_period_ms()); + acc.SetCollectPeriodSec(srcAcc.collect_period_sec()); + acc.SetProvisionedUnitsPerSecond(srcAcc.provisioned_units_per_second()); + acc.SetProvisionedCoefficient(srcAcc.provisioned_coefficient()); + acc.SetOvershootCoefficient(srcAcc.overshoot_coefficient()); + auto copyMetric = [] (const Ydb::RateLimiter::MeteringConfig::Metric& srcMetric, NKikimrKesus::TAccountingConfig::TMetric& metric) { + metric.SetEnabled(srcMetric.enabled()); + metric.SetBillingPeriodSec(srcMetric.billing_period_sec()); + + /* overwrite if we have new fields */ + /* TODO: support arbitrary fields in metering core */ + if (srcMetric.has_metric_fields()) { + auto& metricFields = srcMetric.metric_fields().fields(); + if (metricFields.contains("version") && metricFields.at("version").has_string_value()) { + metric.SetVersion(metricFields.at("version").string_value()); + } + if (metricFields.contains("schema") && metricFields.at("schema").has_string_value()) { + metric.SetSchema(metricFields.at("schema").string_value()); + } + if (metricFields.contains("cloud_id") && metricFields.at("cloud_id").has_string_value()) { + metric.SetCloudId(metricFields.at("cloud_id").string_value()); + } + if (metricFields.contains("folder_id") && metricFields.at("folder_id").has_string_value()) { + metric.SetFolderId(metricFields.at("folder_id").string_value()); + } + if (metricFields.contains("resource_id") && metricFields.at("resource_id").has_string_value()) { + metric.SetResourceId(metricFields.at("resource_id").string_value()); + } + if (metricFields.contains("source_id") && metricFields.at("source_id").has_string_value()) { + metric.SetSourceId(metricFields.at("source_id").string_value()); + } + } + }; + if (srcAcc.has_provisioned()) { + copyMetric(srcAcc.provisioned(), *acc.MutableProvisioned()); + } + if (srcAcc.has_on_demand()) { + copyMetric(srcAcc.on_demand(), *acc.MutableOnDemand()); + } + if (srcAcc.has_overshoot()) { + copyMetric(srcAcc.overshoot(), *acc.MutableOvershoot()); + } + } + if (srcProps.has_replicated_bucket()) { + const auto& srcRepl = srcProps.replicated_bucket(); + auto& repl = *props.MutableReplicatedBucket(); + if (srcRepl.has_report_interval_ms()) { + repl.SetReportIntervalMs(srcRepl.report_interval_ms()); + } + } } static void CopyProps(const NKikimrKesus::TStreamingQuoterResource& src, Ydb::RateLimiter::Resource& dst) { @@ -221,6 +339,49 @@ static void CopyProps(const NKikimrKesus::TStreamingQuoterResource& src, Ydb::Ra props.set_max_burst_size_coefficient(srcProps.GetMaxBurstSizeCoefficient()); props.set_prefetch_coefficient(srcProps.GetPrefetchCoefficient()); props.set_prefetch_watermark(srcProps.GetPrefetchWatermark()); + if (srcProps.HasImmediatelyFillUpTo()) { + props.set_immediately_fill_up_to(srcProps.GetImmediatelyFillUpTo()); + } + if (src.HasAccountingConfig()) { + const auto& srcAcc = src.GetAccountingConfig(); + auto& acc = *dst.mutable_metering_config(); + acc.set_enabled(srcAcc.GetEnabled()); + acc.set_report_period_ms(srcAcc.GetReportPeriodMs()); + acc.set_meter_period_ms(srcAcc.GetAccountPeriodMs()); + acc.set_collect_period_sec(srcAcc.GetCollectPeriodSec()); + acc.set_provisioned_units_per_second(srcAcc.GetProvisionedUnitsPerSecond()); + acc.set_provisioned_coefficient(srcAcc.GetProvisionedCoefficient()); + acc.set_overshoot_coefficient(srcAcc.GetOvershootCoefficient()); + auto copyMetric = [] (const NKikimrKesus::TAccountingConfig::TMetric& srcMetric, Ydb::RateLimiter::MeteringConfig::Metric& metric) { + metric.set_enabled(srcMetric.GetEnabled()); + metric.set_billing_period_sec(srcMetric.GetBillingPeriodSec()); + + /* TODO: support arbitrary fields in metering core */ + auto& metricFields = *metric.mutable_metric_fields()->mutable_fields(); + metricFields["version"].set_string_value(srcMetric.GetVersion()); + metricFields["schema"].set_string_value(srcMetric.GetSchema()); + metricFields["cloud_id"].set_string_value(srcMetric.GetCloudId()); + metricFields["folder_id"].set_string_value(srcMetric.GetFolderId()); + metricFields["resource_id"].set_string_value(srcMetric.GetResourceId()); + metricFields["source_id"].set_string_value(srcMetric.GetSourceId()); + }; + if (srcAcc.HasProvisioned()) { + copyMetric(srcAcc.GetProvisioned(), *acc.mutable_provisioned()); + } + if (srcAcc.HasOnDemand()) { + copyMetric(srcAcc.GetOnDemand(), *acc.mutable_on_demand()); + } + if (srcAcc.HasOvershoot()) { + copyMetric(srcAcc.GetOvershoot(), *acc.mutable_overshoot()); + } + } + if (srcProps.HasReplicatedBucket()) { + const auto& srcRepl = srcProps.GetReplicatedBucket(); + auto& repl = *props.mutable_replicated_bucket(); + if (srcRepl.HasReportIntervalMs()) { + repl.set_report_interval_ms(srcRepl.GetReportIntervalMs()); + } + } } class TCreateRateLimiterResourceRPC : public TRateLimiterControlRequest { diff --git a/ydb/public/api/protos/ydb_rate_limiter.proto b/ydb/public/api/protos/ydb_rate_limiter.proto index c18430c32b69..a3aa37ae4420 100644 --- a/ydb/public/api/protos/ydb_rate_limiter.proto +++ b/ydb/public/api/protos/ydb_rate_limiter.proto @@ -7,8 +7,11 @@ option java_package = "com.yandex.ydb.rate_limiter"; option java_outer_classname = "RateLimiterProtos"; option java_multiple_files = true; +import "ydb/public/api/protos/annotations/validation.proto"; import "ydb/public/api/protos/ydb_operation.proto"; +import "google/protobuf/struct.proto"; + // // Rate Limiter control API. // @@ -17,6 +20,71 @@ import "ydb/public/api/protos/ydb_operation.proto"; // Resource properties. // +message MeteringConfig { + // Meter consumed resources and send billing metrics. + // Default value is false (not inherited). + bool enabled = 1; + + // Period to report consumption history from clients to kesus + // Default value is inherited from parent or equals 5000 ms for root. + uint64 report_period_ms = 2; + + // Consumption history period that is sent in one message to metering actor. + // Default value is inherited from parent or equals 1000 ms for root. + uint64 meter_period_ms = 3; + + // Time window to collect data from every client. + // Any client metering message that is `collect_period` late is discarded (not metered or billed). + // Default value is inherited from parent or equals 30 seconds for root. + uint64 collect_period_sec = 4; + + // Provisioned consumption limit in units per second. + // Effective value is limited by corresponding `max_units_per_second`. + // Default value is 0 (not inherited). + double provisioned_units_per_second = 5; + + // Provisioned allowed burst equals `provisioned_coefficient * provisioned_units_per_second` units. + // Effective value is limited by corresponding PrefetchCoefficient. + // Default value is inherited from parent or equals 60 for root. + double provisioned_coefficient = 6; + + // On-demand allowed burst equals `overshoot_coefficient * prefetch_coefficient * max_units_per_second` units. + // Should be greater or equal to 1.0 + // Default value is inherited from parent or equals 1.1 for root + double overshoot_coefficient = 7; + + // Billing metric description. + message Metric { + // Send this metric to billing. + // Default value is false (not inherited). + bool enabled = 1; + + // Billing metric period (aligned to hour boundary). + // Default value is inherited from parent or equals 60 seconds for root. + uint64 billing_period_sec = 2; + + // Billing metric JSON fields (inherited from parent if not set) + google.protobuf.Struct metric_fields = 10; + } + + // Consumption within provisioned limit. + // Informative metric that should be sent to billing (not billed). + Metric provisioned = 8; + + // Consumption that exceeds provisioned limit is billed as on-demand. + Metric on_demand = 9; + + // Consumption that exceeds even on-demand limit. + // Normally it is free and should not be billed. + Metric overshoot = 10; +} + +message ReplicatedBucketSettings { + // Interval between syncs from kesus and between consumption reports in Ms + // Default value equals 5000 ms and not inherited + optional uint64 report_interval_ms = 1 [(Ydb.value) = "> 0"]; +} + // Settings for hierarchical deficit round robin (HDRR) algorithm. message HierarchicalDrrSettings { // Resource consumption speed limit. @@ -43,6 +111,21 @@ message HierarchicalDrrSettings { // Default value is inherited from parent or 0.75 for root. // Must be nonnegative and less than or equal to 1. double prefetch_watermark = 4; + + // NOTE: This API is experimental + // Prevents bucket from going too deep in negative values. If somebody reports value that will exceed + // this limit the final amount in bucket will be equal to this limit + // Should be negative value + // Unset means no limit + optional double immediately_fill_up_to = 5 [(Ydb.value) = "< 0"]; + + // NOTE: This API is experimental + // Behavior of leafs in tree + // Not inherited + oneof leaf_behavior { + // Make leafs behave as single bucket replicated from kesus + ReplicatedBucketSettings replicated_bucket = 6; + } } // Rate limiter resource description. @@ -57,6 +140,8 @@ message Resource { // Settings for Hierarchical DRR algorithm. HierarchicalDrrSettings hierarchical_drr = 2; } + + MeteringConfig metering_config = 3; } //