Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish full rl api to oss #808

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions ydb/core/grpc_services/rpc_rate_limiter_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,33 @@ class TRateLimiterRequest : public TRpcOperationRequestActor<TDerived, TRequest>
, TrustedZone(trusted)
{}

static bool ValidateMetric (const Ydb::RateLimiter::MeteringConfig::Metric& srcMetric, Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) {
static const TSet<TString> supportedFields{
{"version"},
{"schema"},
{"cloud_id"},
{"folder_id"},
{"resource_id"},
{"source_id"},
};
auto& metricFields = srcMetric.metric_fields().fields();

for (auto& [key, field] : metricFields) {
if (supportedFields.count(key) == 0) {
status = StatusIds::BAD_REQUEST;
issues.AddIssue(TStringBuilder() << "Unsupported key for metric. Key: " << key << ".");
return false;
}
if (!field.has_string_value()) {
status = StatusIds::BAD_REQUEST;
issues.AddIssue(TStringBuilder() << "Unsupported type for metric. Key: " << key << ".");
return false;
}
}

return true;
};

bool ValidateResource(const Ydb::RateLimiter::Resource& resource, Ydb::StatusIds::StatusCode& status, NYql::TIssues& issues) {
if (!ValidateResourcePath(resource.resource_path(), status, issues)) {
return false;
Expand All @@ -37,6 +64,39 @@ class TRateLimiterRequest : public TRpcOperationRequestActor<TDerived, TRequest>
return false;
}

if (resource.has_metering_config()) {
auto self = static_cast<TDerived*>(this);
const auto& userTokenStr = self->Request_->GetSerializedToken();
bool allowed = false;
if (userTokenStr) {
NACLib::TUserToken userToken(userTokenStr);
for (auto &sid : AppData()->AdministrationAllowedSIDs) {
if (userToken.IsExist(sid)) {
allowed = true;
break;
}
}
}

if (!allowed) {
status = StatusIds::UNAUTHORIZED;
issues.AddIssue("Setting metering is allowed only for administrators");
return false;
}

const auto& acc = resource.metering_config();

if (acc.has_provisioned() && !ValidateMetric(acc.provisioned(), status, issues)) {
return false;
}
if (acc.has_on_demand() && !ValidateMetric(acc.on_demand(), status, issues)) {
return false;
}
if (acc.has_overshoot() && !ValidateMetric(acc.overshoot(), status, issues)) {
return false;
}
}

return true;
}

Expand Down Expand Up @@ -211,6 +271,64 @@ static void CopyProps(const Ydb::RateLimiter::Resource& src, NKikimrKesus::TStre
props.SetMaxBurstSizeCoefficient(srcProps.max_burst_size_coefficient());
props.SetPrefetchCoefficient(srcProps.prefetch_coefficient());
props.SetPrefetchWatermark(srcProps.prefetch_watermark());
if (srcProps.has_immediately_fill_up_to()) {
props.SetImmediatelyFillUpTo(srcProps.immediately_fill_up_to());
}
if (src.has_metering_config()) {
const auto& srcAcc = src.metering_config();
auto& acc = *dst.MutableAccountingConfig();
acc.SetEnabled(srcAcc.enabled());
acc.SetReportPeriodMs(srcAcc.report_period_ms());
acc.SetAccountPeriodMs(srcAcc.meter_period_ms());
acc.SetCollectPeriodSec(srcAcc.collect_period_sec());
acc.SetProvisionedUnitsPerSecond(srcAcc.provisioned_units_per_second());
acc.SetProvisionedCoefficient(srcAcc.provisioned_coefficient());
acc.SetOvershootCoefficient(srcAcc.overshoot_coefficient());
auto copyMetric = [] (const Ydb::RateLimiter::MeteringConfig::Metric& srcMetric, NKikimrKesus::TAccountingConfig::TMetric& metric) {
metric.SetEnabled(srcMetric.enabled());
metric.SetBillingPeriodSec(srcMetric.billing_period_sec());

/* overwrite if we have new fields */
/* TODO: support arbitrary fields in metering core */
if (srcMetric.has_metric_fields()) {
auto& metricFields = srcMetric.metric_fields().fields();
if (metricFields.contains("version") && metricFields.at("version").has_string_value()) {
metric.SetVersion(metricFields.at("version").string_value());
}
if (metricFields.contains("schema") && metricFields.at("schema").has_string_value()) {
metric.SetSchema(metricFields.at("schema").string_value());
}
if (metricFields.contains("cloud_id") && metricFields.at("cloud_id").has_string_value()) {
metric.SetCloudId(metricFields.at("cloud_id").string_value());
}
if (metricFields.contains("folder_id") && metricFields.at("folder_id").has_string_value()) {
metric.SetFolderId(metricFields.at("folder_id").string_value());
}
if (metricFields.contains("resource_id") && metricFields.at("resource_id").has_string_value()) {
metric.SetResourceId(metricFields.at("resource_id").string_value());
}
if (metricFields.contains("source_id") && metricFields.at("source_id").has_string_value()) {
metric.SetSourceId(metricFields.at("source_id").string_value());
}
}
};
if (srcAcc.has_provisioned()) {
copyMetric(srcAcc.provisioned(), *acc.MutableProvisioned());
}
if (srcAcc.has_on_demand()) {
copyMetric(srcAcc.on_demand(), *acc.MutableOnDemand());
}
if (srcAcc.has_overshoot()) {
copyMetric(srcAcc.overshoot(), *acc.MutableOvershoot());
}
}
if (srcProps.has_replicated_bucket()) {
const auto& srcRepl = srcProps.replicated_bucket();
auto& repl = *props.MutableReplicatedBucket();
if (srcRepl.has_report_interval_ms()) {
repl.SetReportIntervalMs(srcRepl.report_interval_ms());
}
}
}

static void CopyProps(const NKikimrKesus::TStreamingQuoterResource& src, Ydb::RateLimiter::Resource& dst) {
Expand All @@ -221,6 +339,49 @@ static void CopyProps(const NKikimrKesus::TStreamingQuoterResource& src, Ydb::Ra
props.set_max_burst_size_coefficient(srcProps.GetMaxBurstSizeCoefficient());
props.set_prefetch_coefficient(srcProps.GetPrefetchCoefficient());
props.set_prefetch_watermark(srcProps.GetPrefetchWatermark());
if (srcProps.HasImmediatelyFillUpTo()) {
props.set_immediately_fill_up_to(srcProps.GetImmediatelyFillUpTo());
}
if (src.HasAccountingConfig()) {
const auto& srcAcc = src.GetAccountingConfig();
auto& acc = *dst.mutable_metering_config();
acc.set_enabled(srcAcc.GetEnabled());
acc.set_report_period_ms(srcAcc.GetReportPeriodMs());
acc.set_meter_period_ms(srcAcc.GetAccountPeriodMs());
acc.set_collect_period_sec(srcAcc.GetCollectPeriodSec());
acc.set_provisioned_units_per_second(srcAcc.GetProvisionedUnitsPerSecond());
acc.set_provisioned_coefficient(srcAcc.GetProvisionedCoefficient());
acc.set_overshoot_coefficient(srcAcc.GetOvershootCoefficient());
auto copyMetric = [] (const NKikimrKesus::TAccountingConfig::TMetric& srcMetric, Ydb::RateLimiter::MeteringConfig::Metric& metric) {
metric.set_enabled(srcMetric.GetEnabled());
metric.set_billing_period_sec(srcMetric.GetBillingPeriodSec());

/* TODO: support arbitrary fields in metering core */
auto& metricFields = *metric.mutable_metric_fields()->mutable_fields();
metricFields["version"].set_string_value(srcMetric.GetVersion());
metricFields["schema"].set_string_value(srcMetric.GetSchema());
metricFields["cloud_id"].set_string_value(srcMetric.GetCloudId());
metricFields["folder_id"].set_string_value(srcMetric.GetFolderId());
metricFields["resource_id"].set_string_value(srcMetric.GetResourceId());
metricFields["source_id"].set_string_value(srcMetric.GetSourceId());
};
if (srcAcc.HasProvisioned()) {
copyMetric(srcAcc.GetProvisioned(), *acc.mutable_provisioned());
}
if (srcAcc.HasOnDemand()) {
copyMetric(srcAcc.GetOnDemand(), *acc.mutable_on_demand());
}
if (srcAcc.HasOvershoot()) {
copyMetric(srcAcc.GetOvershoot(), *acc.mutable_overshoot());
}
}
if (srcProps.HasReplicatedBucket()) {
const auto& srcRepl = srcProps.GetReplicatedBucket();
auto& repl = *props.mutable_replicated_bucket();
if (srcRepl.HasReportIntervalMs()) {
repl.set_report_interval_ms(srcRepl.GetReportIntervalMs());
}
}
}

class TCreateRateLimiterResourceRPC : public TRateLimiterControlRequest<TEvCreateRateLimiterResource> {
Expand Down
85 changes: 85 additions & 0 deletions ydb/public/api/protos/ydb_rate_limiter.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ option java_package = "com.yandex.ydb.rate_limiter";
option java_outer_classname = "RateLimiterProtos";
option java_multiple_files = true;

import "ydb/public/api/protos/annotations/validation.proto";
import "ydb/public/api/protos/ydb_operation.proto";

import "google/protobuf/struct.proto";

//
// Rate Limiter control API.
//
Expand All @@ -17,6 +20,71 @@ import "ydb/public/api/protos/ydb_operation.proto";
// Resource properties.
//

message MeteringConfig {
// Meter consumed resources and send billing metrics.
// Default value is false (not inherited).
bool enabled = 1;

// Period to report consumption history from clients to kesus
// Default value is inherited from parent or equals 5000 ms for root.
uint64 report_period_ms = 2;

// Consumption history period that is sent in one message to metering actor.
// Default value is inherited from parent or equals 1000 ms for root.
uint64 meter_period_ms = 3;

// Time window to collect data from every client.
// Any client metering message that is `collect_period` late is discarded (not metered or billed).
// Default value is inherited from parent or equals 30 seconds for root.
uint64 collect_period_sec = 4;

// Provisioned consumption limit in units per second.
// Effective value is limited by corresponding `max_units_per_second`.
// Default value is 0 (not inherited).
double provisioned_units_per_second = 5;

// Provisioned allowed burst equals `provisioned_coefficient * provisioned_units_per_second` units.
// Effective value is limited by corresponding PrefetchCoefficient.
// Default value is inherited from parent or equals 60 for root.
double provisioned_coefficient = 6;

// On-demand allowed burst equals `overshoot_coefficient * prefetch_coefficient * max_units_per_second` units.
// Should be greater or equal to 1.0
// Default value is inherited from parent or equals 1.1 for root
double overshoot_coefficient = 7;

// Billing metric description.
message Metric {
// Send this metric to billing.
// Default value is false (not inherited).
bool enabled = 1;

// Billing metric period (aligned to hour boundary).
// Default value is inherited from parent or equals 60 seconds for root.
uint64 billing_period_sec = 2;

// Billing metric JSON fields (inherited from parent if not set)
google.protobuf.Struct metric_fields = 10;
}

// Consumption within provisioned limit.
// Informative metric that should be sent to billing (not billed).
Metric provisioned = 8;

// Consumption that exceeds provisioned limit is billed as on-demand.
Metric on_demand = 9;

// Consumption that exceeds even on-demand limit.
// Normally it is free and should not be billed.
Metric overshoot = 10;
}

message ReplicatedBucketSettings {
// Interval between syncs from kesus and between consumption reports in Ms
// Default value equals 5000 ms and not inherited
optional uint64 report_interval_ms = 1 [(Ydb.value) = "> 0"];
}

// Settings for hierarchical deficit round robin (HDRR) algorithm.
message HierarchicalDrrSettings {
// Resource consumption speed limit.
Expand All @@ -43,6 +111,21 @@ message HierarchicalDrrSettings {
// Default value is inherited from parent or 0.75 for root.
// Must be nonnegative and less than or equal to 1.
double prefetch_watermark = 4;

// NOTE: This API is experimental
// Prevents bucket from going too deep in negative values. If somebody reports value that will exceed
// this limit the final amount in bucket will be equal to this limit
// Should be negative value
// Unset means no limit
optional double immediately_fill_up_to = 5 [(Ydb.value) = "< 0"];

// NOTE: This API is experimental
// Behavior of leafs in tree
// Not inherited
oneof leaf_behavior {
// Make leafs behave as single bucket replicated from kesus
ReplicatedBucketSettings replicated_bucket = 6;
}
}

// Rate limiter resource description.
Expand All @@ -57,6 +140,8 @@ message Resource {
// Settings for Hierarchical DRR algorithm.
HierarchicalDrrSettings hierarchical_drr = 2;
}

MeteringConfig metering_config = 3;
}

//
Expand Down
Loading