From 4b90b10cf60b38930d8f17881961b780d00dc73b Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Tue, 11 Feb 2025 13:54:17 +0300 Subject: [PATCH] Support all rate limiter's properties in cpp sdk (#14405) --- .../client/rate_limiter/rate_limiter.h | 145 +++++++++++++++- .../src/client/rate_limiter/rate_limiter.cpp | 162 ++++++++++++++++++ 2 files changed, 304 insertions(+), 3 deletions(-) diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/rate_limiter/rate_limiter.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/rate_limiter/rate_limiter.h index 836d42315458..2f03acc9f375 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/rate_limiter/rate_limiter.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/rate_limiter/rate_limiter.h @@ -2,14 +2,52 @@ #include +#include +#include +#include + namespace Ydb::RateLimiter { -class CreateResourceRequest; -class DescribeResourceResult; -class HierarchicalDrrSettings; + class CreateResourceRequest; + class DescribeResourceResult; + class HierarchicalDrrSettings; + class ReplicatedBucketSettings; + class MeteringConfig; + class MeteringConfig_Metric; } // namespace Ydb::RateLimiter namespace NYdb::inline V3::NRateLimiter { +struct TReplicatedBucketSettings { + using TSelf = TReplicatedBucketSettings; + + TReplicatedBucketSettings() = default; + TReplicatedBucketSettings(const Ydb::RateLimiter::ReplicatedBucketSettings&); + + void SerializeTo(Ydb::RateLimiter::ReplicatedBucketSettings&) const; + + // Interval between syncs from kesus and between consumption reports. + // Default value equals 5000 ms and not inherited. + FLUENT_SETTING_OPTIONAL(std::chrono::milliseconds, ReportInterval); +}; + +class TLeafBehavior { +public: + enum EBehavior { + REPLICATED_BUCKET, + }; + + EBehavior GetBehavior() const; + + TLeafBehavior(const TReplicatedBucketSettings&); + TLeafBehavior(const Ydb::RateLimiter::ReplicatedBucketSettings&); + const TReplicatedBucketSettings& GetReplicatedBucket() const; + + void SerializeTo(Ydb::RateLimiter::HierarchicalDrrSettings&) const; + +private: + std::variant BehaviorSettings_; +}; + // Settings for hierarchical deficit round robin (HDRR) algorithm. template struct THierarchicalDrrSettings { @@ -46,6 +84,91 @@ struct THierarchicalDrrSettings { // Default value is inherited from parent or 0.75 for root. // Must be nonnegative and less than or equal to 1. FLUENT_SETTING_OPTIONAL(double, PrefetchWatermark); + + // Prevents bucket from going too deep in negative values. If somebody reports value that will exceed + // this limit the final amount in bucket will be equal to this limit. + // Should be negative value. + // Unset means no limit. + FLUENT_SETTING_OPTIONAL(double, ImmediatelyFillUpTo); + + // Behavior of leafs in tree. + // Not inherited. + FLUENT_SETTING_OPTIONAL(TLeafBehavior, LeafBehavior); +}; + +struct TMetric { + using TSelf = TMetric; + using TLabels = std::unordered_map; + + TMetric() = default; + TMetric(const Ydb::RateLimiter::MeteringConfig_Metric&); + + void SerializeTo(Ydb::RateLimiter::MeteringConfig_Metric&) const; + + // Send this metric to billing. + // Default value is false (not inherited). + FLUENT_SETTING_DEFAULT(bool, Enabled, false); + + // Billing metric period (aligned to hour boundary). + // Default value is inherited from parent or equals 60 seconds for root. + FLUENT_SETTING_OPTIONAL(std::chrono::seconds, BillingPeriod); + + // User-defined labels. + FLUENT_SETTING(TLabels, Labels); + + // Billing metric JSON fields (inherited from parent if not set) + FLUENT_SETTING(std::string, MetricFieldsJson); +}; + +struct TMeteringConfig { + using TSelf = TMeteringConfig; + + TMeteringConfig() = default; + TMeteringConfig(const Ydb::RateLimiter::MeteringConfig&); + + void SerializeTo(Ydb::RateLimiter::MeteringConfig&) const; + + // Meter consumed resources and send billing metrics. + FLUENT_SETTING_DEFAULT(bool, Enabled, false); + + // Period to report consumption history from clients to kesus + // Default value is inherited from parent or equals 5000 ms for root. + FLUENT_SETTING_OPTIONAL(std::chrono::milliseconds, ReportPeriod); + + // Consumption history period that is sent in one message to metering actor. + // Default value is inherited from parent or equals 1000 ms for root. + FLUENT_SETTING_OPTIONAL(std::chrono::milliseconds, MeterPeriod); + + // Time window to collect data from every client. + // Any client metering message that is `collect_period` late is discarded (not metered or billed). + // Default value is inherited from parent or equals 30 seconds for root. + FLUENT_SETTING_OPTIONAL(std::chrono::seconds, CollectPeriod); + + // Provisioned consumption limit in units per second. + // Effective value is limited by corresponding `max_units_per_second`. + // Default value is 0 (not inherited). + FLUENT_SETTING_OPTIONAL(double, ProvisionedUnitsPerSecond); + + // Provisioned allowed burst equals `provisioned_coefficient * provisioned_units_per_second` units. + // Effective value is limited by corresponding PrefetchCoefficient. + // Default value is inherited from parent or equals 60 for root. + FLUENT_SETTING_OPTIONAL(double, ProvisionedCoefficient); + + // On-demand allowed burst equals `overshoot_coefficient * prefetch_coefficient * max_units_per_second` units. + // Should be greater or equal to 1.0 + // Default value is inherited from parent or equals 1.1 for root + FLUENT_SETTING_OPTIONAL(double, OvershootCoefficient); + + // Consumption within provisioned limit. + // Informative metric that should be sent to billing (not billed). + FLUENT_SETTING_OPTIONAL(TMetric, Provisioned); + + // Consumption that exceeds provisioned limit is billed as on-demand. + FLUENT_SETTING_OPTIONAL(TMetric, OnDemand); + + // Consumption that exceeds even on-demand limit. + // Normally it is free and should not be billed. + FLUENT_SETTING_OPTIONAL(TMetric, Overshoot); }; // Settings for create resource request. @@ -55,6 +178,8 @@ struct TCreateResourceSettings { TCreateResourceSettings() = default; TCreateResourceSettings(const Ydb::RateLimiter::CreateResourceRequest&); + + FLUENT_SETTING_OPTIONAL(TMeteringConfig, MeteringConfig); }; // Settings for alter resource request. @@ -62,6 +187,7 @@ struct TAlterResourceSettings : public TOperationRequestSettings , public THierarchicalDrrSettings { + FLUENT_SETTING_OPTIONAL(TMeteringConfig, MeteringConfig); }; // Settings for drop resource request. @@ -128,6 +254,14 @@ struct TDescribeResourceResult : public TStatus { std::optional GetPrefetchWatermark() const { return PrefetchWatermark_; } + + std::optional GetImmediatelyFillUpTo() const { + return ImmediatelyFillUpTo_; + } + + const std::optional& GetLeafBehavior() const { + return LeafBehavior_; + } }; TDescribeResourceResult(TStatus status, const Ydb::RateLimiter::DescribeResourceResult& result); @@ -141,9 +275,14 @@ struct TDescribeResourceResult : public TStatus { return HierarchicalDrrProps_; } + const TMeteringConfig& GetMeteringConfig() const { + return MeteringConfig_; + } + private: std::string ResourcePath_; THierarchicalDrrProps HierarchicalDrrProps_; + TMeteringConfig MeteringConfig_; }; using TAsyncDescribeResourceResult = NThreading::TFuture; diff --git a/ydb/public/sdk/cpp/src/client/rate_limiter/rate_limiter.cpp b/ydb/public/sdk/cpp/src/client/rate_limiter/rate_limiter.cpp index 8571d4545219..bf71542d6de7 100644 --- a/ydb/public/sdk/cpp/src/client/rate_limiter/rate_limiter.cpp +++ b/ydb/public/sdk/cpp/src/client/rate_limiter/rate_limiter.cpp @@ -7,8 +7,47 @@ #include #include +#include + namespace NYdb::inline V3::NRateLimiter { +TReplicatedBucketSettings::TReplicatedBucketSettings(const Ydb::RateLimiter::ReplicatedBucketSettings& proto) { + if (proto.has_report_interval_ms()) { + ReportInterval_ = std::chrono::milliseconds(proto.report_interval_ms()); + } +} + +void TReplicatedBucketSettings::SerializeTo(Ydb::RateLimiter::ReplicatedBucketSettings& proto) const { + if (ReportInterval_) { + proto.set_report_interval_ms(ReportInterval_->count()); + } +} + +TLeafBehavior::EBehavior TLeafBehavior::GetBehavior() const { + return static_cast(BehaviorSettings_.index()); +} + +TLeafBehavior::TLeafBehavior(const TReplicatedBucketSettings& replicatedBucket) + : BehaviorSettings_(replicatedBucket) +{ +} + +TLeafBehavior::TLeafBehavior(const Ydb::RateLimiter::ReplicatedBucketSettings& replicatedBucket) + : BehaviorSettings_(replicatedBucket) +{ +} + +const TReplicatedBucketSettings& TLeafBehavior::GetReplicatedBucket() const { + return std::get(BehaviorSettings_); +} + +void TLeafBehavior::SerializeTo(Ydb::RateLimiter::HierarchicalDrrSettings& proto) const { + switch (GetBehavior()) { + case REPLICATED_BUCKET: + return GetReplicatedBucket().SerializeTo(*proto.mutable_replicated_bucket()); + } +} + template THierarchicalDrrSettings::THierarchicalDrrSettings(const Ydb::RateLimiter::HierarchicalDrrSettings& proto) { if (proto.max_units_per_second()) { @@ -26,6 +65,18 @@ THierarchicalDrrSettings::THierarchicalDrrSettings(const Ydb::RateLimi if (proto.prefetch_watermark()) { PrefetchWatermark_ = proto.prefetch_watermark(); } + + if (proto.has_immediately_fill_up_to()) { + ImmediatelyFillUpTo_ = proto.immediately_fill_up_to(); + } + + switch (proto.leaf_behavior_case()) { + case Ydb::RateLimiter::HierarchicalDrrSettings::kReplicatedBucket: + LeafBehavior_.emplace(proto.replicated_bucket()); + break; + case Ydb::RateLimiter::HierarchicalDrrSettings::LEAF_BEHAVIOR_NOT_SET: + break; + } } template @@ -45,6 +96,105 @@ void THierarchicalDrrSettings::SerializeTo(Ydb::RateLimiter::Hierarchi if (PrefetchWatermark_) { proto.set_prefetch_watermark(*PrefetchWatermark_); } + + if (ImmediatelyFillUpTo_) { + proto.set_immediately_fill_up_to(*ImmediatelyFillUpTo_); + } + + if (LeafBehavior_) { + LeafBehavior_->SerializeTo(proto); + } +} + +TMetric::TMetric(const Ydb::RateLimiter::MeteringConfig_Metric& proto) { + Enabled_ = proto.enabled(); + if (proto.billing_period_sec()) { + BillingPeriod_ = std::chrono::seconds(proto.billing_period_sec()); + } + for (const auto& [k, v] : proto.labels()) { + Labels_[k] = v; + } + if (proto.has_metric_fields()) { + TString jsonStr; + if (auto st = google::protobuf::util::MessageToJsonString(proto.metric_fields(), &jsonStr); st.ok()) { + MetricFieldsJson_ = jsonStr; + } + } +} + +void TMetric::SerializeTo(Ydb::RateLimiter::MeteringConfig_Metric& proto) const { + proto.set_enabled(Enabled_); + if (BillingPeriod_) { + proto.set_billing_period_sec(BillingPeriod_->count()); + } + for (const auto& [k, v] : Labels_) { + (*proto.mutable_labels())[k] = v; + } + if (!MetricFieldsJson_.empty()) { + google::protobuf::util::JsonStringToMessage(MetricFieldsJson_, proto.mutable_metric_fields()); + } +} + +TMeteringConfig::TMeteringConfig(const Ydb::RateLimiter::MeteringConfig& proto) { + Enabled_ = proto.enabled(); + if (proto.report_period_ms()) { + ReportPeriod_ = std::chrono::milliseconds(proto.report_period_ms()); + } + if (proto.meter_period_ms()) { + MeterPeriod_ = std::chrono::milliseconds(proto.meter_period_ms()); + } + if (proto.collect_period_sec()) { + CollectPeriod_ = std::chrono::seconds(proto.collect_period_sec()); + } + if (proto.provisioned_units_per_second()) { + ProvisionedUnitsPerSecond_ = proto.provisioned_units_per_second(); + } + if (proto.provisioned_coefficient()) { + ProvisionedCoefficient_ = proto.provisioned_coefficient(); + } + if (proto.overshoot_coefficient()) { + OvershootCoefficient_ = proto.overshoot_coefficient(); + } + if (proto.has_provisioned()) { + Provisioned_.emplace(proto.provisioned()); + } + if (proto.has_on_demand()) { + OnDemand_.emplace(proto.on_demand()); + } + if (proto.has_overshoot()) { + Overshoot_.emplace(proto.overshoot()); + } +} + +void TMeteringConfig::SerializeTo(Ydb::RateLimiter::MeteringConfig& proto) const { + proto.set_enabled(Enabled_); + if (ReportPeriod_) { + proto.set_report_period_ms(ReportPeriod_->count()); + } + if (MeterPeriod_) { + proto.set_meter_period_ms(MeterPeriod_->count()); + } + if (CollectPeriod_) { + proto.set_collect_period_sec(CollectPeriod_->count()); + } + if (ProvisionedUnitsPerSecond_) { + proto.set_provisioned_units_per_second(*ProvisionedUnitsPerSecond_); + } + if (ProvisionedCoefficient_) { + proto.set_provisioned_coefficient(*ProvisionedCoefficient_); + } + if (OvershootCoefficient_) { + proto.set_overshoot_coefficient(*OvershootCoefficient_); + } + if (Provisioned_) { + Provisioned_->SerializeTo(*proto.mutable_provisioned()); + } + if (OnDemand_) { + OnDemand_->SerializeTo(*proto.mutable_on_demand()); + } + if (Overshoot_) { + Overshoot_->SerializeTo(*proto.mutable_overshoot()); + } } template struct THierarchicalDrrSettings; @@ -67,6 +217,9 @@ TDescribeResourceResult::TDescribeResourceResult(TStatus status, const Ydb::Rate , ResourcePath_(result.resource().resource_path()) , HierarchicalDrrProps_(result.resource().hierarchical_drr()) { + if (result.resource().has_metering_config()) { + MeteringConfig_ = result.resource().metering_config(); + } } TDescribeResourceResult::THierarchicalDrrProps::THierarchicalDrrProps(const Ydb::RateLimiter::HierarchicalDrrSettings& settings) @@ -102,6 +255,15 @@ class TRateLimiterClient::TImpl : public TClientImplCommonSerializeTo(hdrr); + } + if (settings.MeteringConfig_) { + settings.MeteringConfig_->SerializeTo(*resource.mutable_metering_config()); + } return request; }