Skip to content

Commit

Permalink
Publish full rl api to oss
Browse files Browse the repository at this point in the history
  • Loading branch information
Enjection committed Dec 29, 2023
1 parent cbdd149 commit 56cfe1d
Show file tree
Hide file tree
Showing 2 changed files with 246 additions and 0 deletions.
161 changes: 161 additions & 0 deletions ydb/core/grpc_services/rpc_rate_limiter_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,33 @@ class TRateLimiterRequest : public TRpcOperationRequestActor<TDerived, TRequest>
, TrustedZone(trusted)
{}

static bool ValidateMetric (const Ydb::RateLimiter::MeteringConfig::Metric& srcMetric, Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) {
static const TSet<TString> supportedFields{
{"version"},
{"schema"},
{"cloud_id"},
{"folder_id"},
{"resource_id"},
{"source_id"},
};
auto& metricFields = srcMetric.metric_fields().fields();

for (auto& [key, field] : metricFields) {
if (supportedFields.count(key) == 0) {
status = StatusIds::BAD_REQUEST;
issues.AddIssue(TStringBuilder() << "Unsupported key for metric. Key: " << key << ".");
return false;
}
if (!field.has_string_value()) {
status = StatusIds::BAD_REQUEST;
issues.AddIssue(TStringBuilder() << "Unsupported type for metric. Key: " << key << ".");
return false;
}
}

return true;
};

bool ValidateResource(const Ydb::RateLimiter::Resource& resource, Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) {
if (!ValidateResourcePath(resource.resource_path(), status, issues)) {
return false;
Expand All @@ -37,6 +64,39 @@ class TRateLimiterRequest : public TRpcOperationRequestActor<TDerived, TRequest>
return false;
}

if (resource.has_metering_config()) {
auto self = static_cast<TDerived*>(this);
const auto& userTokenStr = self->Request_->GetSerializedToken();
bool allowed = false;
if (userTokenStr) {
NACLib::TUserToken userToken(userTokenStr);
for (auto &sid : AppData()->AdministrationAllowedSIDs) {
if (userToken.IsExist(sid)) {
allowed = true;
break;
}
}
}

if (!allowed) {
status = StatusIds::UNAUTHORIZED;
issues.AddIssue("Setting metering is allowed only for administrators");
return false;
}

const auto& acc = resource.metering_config();

if (acc.has_provisioned() && !ValidateMetric(acc.provisioned(), status, issues)) {
return false;
}
if (acc.has_on_demand() && !ValidateMetric(acc.on_demand(), status, issues)) {
return false;
}
if (acc.has_overshoot() && !ValidateMetric(acc.overshoot(), status, issues)) {
return false;
}
}

return true;
}

Expand Down Expand Up @@ -211,6 +271,64 @@ static void CopyProps(const Ydb::RateLimiter::Resource& src, NKikimrKesus::TStre
props.SetMaxBurstSizeCoefficient(srcProps.max_burst_size_coefficient());
props.SetPrefetchCoefficient(srcProps.prefetch_coefficient());
props.SetPrefetchWatermark(srcProps.prefetch_watermark());
if (srcProps.has_immediately_fill_up_to()) {
props.SetImmediatelyFillUpTo(srcProps.immediately_fill_up_to());
}
if (src.has_metering_config()) {
const auto& srcAcc = src.metering_config();
auto& acc = *dst.MutableAccountingConfig();
acc.SetEnabled(srcAcc.enabled());
acc.SetReportPeriodMs(srcAcc.report_period_ms());
acc.SetAccountPeriodMs(srcAcc.meter_period_ms());
acc.SetCollectPeriodSec(srcAcc.collect_period_sec());
acc.SetProvisionedUnitsPerSecond(srcAcc.provisioned_units_per_second());
acc.SetProvisionedCoefficient(srcAcc.provisioned_coefficient());
acc.SetOvershootCoefficient(srcAcc.overshoot_coefficient());
auto copyMetric = [] (const Ydb::RateLimiter::MeteringConfig::Metric& srcMetric, NKikimrKesus::TAccountingConfig::TMetric& metric) {
metric.SetEnabled(srcMetric.enabled());
metric.SetBillingPeriodSec(srcMetric.billing_period_sec());

/* overwrite if we have new fields */
/* TODO: support arbitrary fields in metering core */
if (srcMetric.has_metric_fields()) {
auto& metricFields = srcMetric.metric_fields().fields();
if (metricFields.contains("version") && metricFields.at("version").has_string_value()) {
metric.SetVersion(metricFields.at("version").string_value());
}
if (metricFields.contains("schema") && metricFields.at("schema").has_string_value()) {
metric.SetSchema(metricFields.at("schema").string_value());
}
if (metricFields.contains("cloud_id") && metricFields.at("cloud_id").has_string_value()) {
metric.SetCloudId(metricFields.at("cloud_id").string_value());
}
if (metricFields.contains("folder_id") && metricFields.at("folder_id").has_string_value()) {
metric.SetFolderId(metricFields.at("folder_id").string_value());
}
if (metricFields.contains("resource_id") && metricFields.at("resource_id").has_string_value()) {
metric.SetResourceId(metricFields.at("resource_id").string_value());
}
if (metricFields.contains("source_id") && metricFields.at("source_id").has_string_value()) {
metric.SetSourceId(metricFields.at("source_id").string_value());
}
}
};
if (srcAcc.has_provisioned()) {
copyMetric(srcAcc.provisioned(), *acc.MutableProvisioned());
}
if (srcAcc.has_on_demand()) {
copyMetric(srcAcc.on_demand(), *acc.MutableOnDemand());
}
if (srcAcc.has_overshoot()) {
copyMetric(srcAcc.overshoot(), *acc.MutableOvershoot());
}
}
if (srcProps.has_replicated_bucket()) {
const auto& srcRepl = srcProps.replicated_bucket();
auto& repl = *props.MutableReplicatedBucket();
if (srcRepl.has_report_interval_ms()) {
repl.SetReportIntervalMs(srcRepl.report_interval_ms());
}
}
}

static void CopyProps(const NKikimrKesus::TStreamingQuoterResource& src, Ydb::RateLimiter::Resource& dst) {
Expand All @@ -221,6 +339,49 @@ static void CopyProps(const NKikimrKesus::TStreamingQuoterResource& src, Ydb::Ra
props.set_max_burst_size_coefficient(srcProps.GetMaxBurstSizeCoefficient());
props.set_prefetch_coefficient(srcProps.GetPrefetchCoefficient());
props.set_prefetch_watermark(srcProps.GetPrefetchWatermark());
if (srcProps.HasImmediatelyFillUpTo()) {
props.set_immediately_fill_up_to(srcProps.GetImmediatelyFillUpTo());
}
if (src.HasAccountingConfig()) {
const auto& srcAcc = src.GetAccountingConfig();
auto& acc = *dst.mutable_metering_config();
acc.set_enabled(srcAcc.GetEnabled());
acc.set_report_period_ms(srcAcc.GetReportPeriodMs());
acc.set_meter_period_ms(srcAcc.GetAccountPeriodMs());
acc.set_collect_period_sec(srcAcc.GetCollectPeriodSec());
acc.set_provisioned_units_per_second(srcAcc.GetProvisionedUnitsPerSecond());
acc.set_provisioned_coefficient(srcAcc.GetProvisionedCoefficient());
acc.set_overshoot_coefficient(srcAcc.GetOvershootCoefficient());
auto copyMetric = [] (const NKikimrKesus::TAccountingConfig::TMetric& srcMetric, Ydb::RateLimiter::MeteringConfig::Metric& metric) {
metric.set_enabled(srcMetric.GetEnabled());
metric.set_billing_period_sec(srcMetric.GetBillingPeriodSec());

/* TODO: support arbitrary fields in metering core */
auto& metricFields = *metric.mutable_metric_fields()->mutable_fields();
metricFields["version"].set_string_value(srcMetric.GetVersion());
metricFields["schema"].set_string_value(srcMetric.GetSchema());
metricFields["cloud_id"].set_string_value(srcMetric.GetCloudId());
metricFields["folder_id"].set_string_value(srcMetric.GetFolderId());
metricFields["resource_id"].set_string_value(srcMetric.GetResourceId());
metricFields["source_id"].set_string_value(srcMetric.GetSourceId());
};
if (srcAcc.HasProvisioned()) {
copyMetric(srcAcc.GetProvisioned(), *acc.mutable_provisioned());
}
if (srcAcc.HasOnDemand()) {
copyMetric(srcAcc.GetOnDemand(), *acc.mutable_on_demand());
}
if (srcAcc.HasOvershoot()) {
copyMetric(srcAcc.GetOvershoot(), *acc.mutable_overshoot());
}
}
if (srcProps.HasReplicatedBucket()) {
const auto& srcRepl = srcProps.GetReplicatedBucket();
auto& repl = *props.mutable_replicated_bucket();
if (srcRepl.HasReportIntervalMs()) {
repl.set_report_interval_ms(srcRepl.GetReportIntervalMs());
}
}
}

class TCreateRateLimiterResourceRPC : public TRateLimiterControlRequest<TEvCreateRateLimiterResource> {
Expand Down
85 changes: 85 additions & 0 deletions ydb/public/api/protos/ydb_rate_limiter.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ option java_package = "com.yandex.ydb.rate_limiter";
option java_outer_classname = "RateLimiterProtos";
option java_multiple_files = true;

import "ydb/public/api/protos/annotations/validation.proto";
import "ydb/public/api/protos/ydb_operation.proto";

import "google/protobuf/struct.proto";

//
// Rate Limiter control API.
//
Expand All @@ -17,6 +20,71 @@ import "ydb/public/api/protos/ydb_operation.proto";
// Resource properties.
//

message MeteringConfig {
// Meter consumed resources and send billing metrics.
// Default value is false (not inherited).
bool enabled = 1;

// Period to report consumption history from clients to kesus
// Default value is inherited from parent or equals 5000 ms for root.
uint64 report_period_ms = 2;

// Consumption history period that is sent in one message to metering actor.
// Default value is inherited from parent or equals 1000 ms for root.
uint64 meter_period_ms = 3;

// Time window to collect data from every client.
// Any client metering message that is `collect_period` late is discarded (not metered or billed).
// Default value is inherited from parent or equals 30 seconds for root.
uint64 collect_period_sec = 4;

// Provisioned consumption limit in units per second.
// Effective value is limited by corresponding `max_units_per_second`.
// Default value is 0 (not inherited).
double provisioned_units_per_second = 5;

// Provisioned allowed burst equals `provisioned_coefficient * provisioned_units_per_second` units.
// Effective value is limited by corresponding PrefetchCoefficient.
// Default value is inherited from parent or equals 60 for root.
double provisioned_coefficient = 6;

// On-demand allowed burst equals `overshoot_coefficient * prefetch_coefficient * max_units_per_second` units.
// Should be greater or equal to 1.0
// Default value is inherited from parent or equals 1.1 for root
double overshoot_coefficient = 7;

// Billing metric description.
message Metric {
// Send this metric to billing.
// Default value is false (not inherited).
bool enabled = 1;

// Billing metric period (aligned to hour boundary).
// Default value is inherited from parent or equals 60 seconds for root.
uint64 billing_period_sec = 2;

// Billing metric JSON fields (inherited from parent if not set)
google.protobuf.Struct metric_fields = 10;
}

// Consumption within provisioned limit.
// Informative metric that should be sent to billing (not billed).
Metric provisioned = 8;

// Consumption that exceeds provisioned limit is billed as on-demand.
Metric on_demand = 9;

// Consumption that exceeds even on-demand limit.
// Normally it is free and should not be billed.
Metric overshoot = 10;
}

message ReplicatedBucketSettings {
// Interval between syncs from kesus and between consumption reports in Ms
// Default value equals 5000 ms and not inherited
optional uint64 report_interval_ms = 1 [(Ydb.value) = "> 0"];
}

// Settings for hierarchical deficit round robin (HDRR) algorithm.
message HierarchicalDrrSettings {
// Resource consumption speed limit.
Expand All @@ -43,6 +111,21 @@ message HierarchicalDrrSettings {
// Default value is inherited from parent or 0.75 for root.
// Must be nonnegative and less than or equal to 1.
double prefetch_watermark = 4;

// NOTE: This API is experimental
// Prevents bucket from going too deep in negative values. If somebody reports value that will exceed
// this limit the final amount in bucket will be equal to this limit
// Should be negative value
// Unset means no limit
optional double immediately_fill_up_to = 5 [(Ydb.value) = "< 0"];

// NOTE: This API is experimental
// Behavior of leafs in tree
// Not inherited
oneof leaf_behavior {
// Make leafs behave as single bucket replicated from kesus
ReplicatedBucketSettings replicated_bucket = 6;
}
}

// Rate limiter resource description.
Expand All @@ -57,6 +140,8 @@ message Resource {
// Settings for Hierarchical DRR algorithm.
HierarchicalDrrSettings hierarchical_drr = 2;
}

MeteringConfig metering_config = 3;
}

//
Expand Down

0 comments on commit 56cfe1d

Please sign in to comment.