Skip to content

Commit

Permalink
Support all rate limiter's properties in cpp sdk (ydb-platform#14405)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Feb 11, 2025
1 parent 1d9eb5a commit 4b90b10
Show file tree
Hide file tree
Showing 2 changed files with 304 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,52 @@

#include <ydb-cpp-sdk/client/driver/driver.h>

#include <chrono>
#include <unordered_map>
#include <variant>

namespace Ydb::RateLimiter {
class CreateResourceRequest;
class DescribeResourceResult;
class HierarchicalDrrSettings;
class CreateResourceRequest;
class DescribeResourceResult;
class HierarchicalDrrSettings;
class ReplicatedBucketSettings;
class MeteringConfig;
class MeteringConfig_Metric;
} // namespace Ydb::RateLimiter

namespace NYdb::inline V3::NRateLimiter {

struct TReplicatedBucketSettings {
using TSelf = TReplicatedBucketSettings;

TReplicatedBucketSettings() = default;
TReplicatedBucketSettings(const Ydb::RateLimiter::ReplicatedBucketSettings&);

void SerializeTo(Ydb::RateLimiter::ReplicatedBucketSettings&) const;

// Interval between syncs from kesus and between consumption reports.
// Default value equals 5000 ms and not inherited.
FLUENT_SETTING_OPTIONAL(std::chrono::milliseconds, ReportInterval);
};

class TLeafBehavior {
public:
enum EBehavior {
REPLICATED_BUCKET,
};

EBehavior GetBehavior() const;

TLeafBehavior(const TReplicatedBucketSettings&);
TLeafBehavior(const Ydb::RateLimiter::ReplicatedBucketSettings&);
const TReplicatedBucketSettings& GetReplicatedBucket() const;

void SerializeTo(Ydb::RateLimiter::HierarchicalDrrSettings&) const;

private:
std::variant<TReplicatedBucketSettings> BehaviorSettings_;
};

// Settings for hierarchical deficit round robin (HDRR) algorithm.
template <class TDerived>
struct THierarchicalDrrSettings {
Expand Down Expand Up @@ -46,6 +84,91 @@ struct THierarchicalDrrSettings {
// Default value is inherited from parent or 0.75 for root.
// Must be nonnegative and less than or equal to 1.
FLUENT_SETTING_OPTIONAL(double, PrefetchWatermark);

// Prevents bucket from going too deep in negative values. If somebody reports value that will exceed
// this limit the final amount in bucket will be equal to this limit.
// Should be negative value.
// Unset means no limit.
FLUENT_SETTING_OPTIONAL(double, ImmediatelyFillUpTo);

// Behavior of leafs in tree.
// Not inherited.
FLUENT_SETTING_OPTIONAL(TLeafBehavior, LeafBehavior);
};

struct TMetric {
using TSelf = TMetric;
using TLabels = std::unordered_map<std::string, std::string>;

TMetric() = default;
TMetric(const Ydb::RateLimiter::MeteringConfig_Metric&);

void SerializeTo(Ydb::RateLimiter::MeteringConfig_Metric&) const;

// Send this metric to billing.
// Default value is false (not inherited).
FLUENT_SETTING_DEFAULT(bool, Enabled, false);

// Billing metric period (aligned to hour boundary).
// Default value is inherited from parent or equals 60 seconds for root.
FLUENT_SETTING_OPTIONAL(std::chrono::seconds, BillingPeriod);

// User-defined labels.
FLUENT_SETTING(TLabels, Labels);

// Billing metric JSON fields (inherited from parent if not set)
FLUENT_SETTING(std::string, MetricFieldsJson);
};

struct TMeteringConfig {
using TSelf = TMeteringConfig;

TMeteringConfig() = default;
TMeteringConfig(const Ydb::RateLimiter::MeteringConfig&);

void SerializeTo(Ydb::RateLimiter::MeteringConfig&) const;

// Meter consumed resources and send billing metrics.
FLUENT_SETTING_DEFAULT(bool, Enabled, false);

// Period to report consumption history from clients to kesus
// Default value is inherited from parent or equals 5000 ms for root.
FLUENT_SETTING_OPTIONAL(std::chrono::milliseconds, ReportPeriod);

// Consumption history period that is sent in one message to metering actor.
// Default value is inherited from parent or equals 1000 ms for root.
FLUENT_SETTING_OPTIONAL(std::chrono::milliseconds, MeterPeriod);

// Time window to collect data from every client.
// Any client metering message that is `collect_period` late is discarded (not metered or billed).
// Default value is inherited from parent or equals 30 seconds for root.
FLUENT_SETTING_OPTIONAL(std::chrono::seconds, CollectPeriod);

// Provisioned consumption limit in units per second.
// Effective value is limited by corresponding `max_units_per_second`.
// Default value is 0 (not inherited).
FLUENT_SETTING_OPTIONAL(double, ProvisionedUnitsPerSecond);

// Provisioned allowed burst equals `provisioned_coefficient * provisioned_units_per_second` units.
// Effective value is limited by corresponding PrefetchCoefficient.
// Default value is inherited from parent or equals 60 for root.
FLUENT_SETTING_OPTIONAL(double, ProvisionedCoefficient);

// On-demand allowed burst equals `overshoot_coefficient * prefetch_coefficient * max_units_per_second` units.
// Should be greater or equal to 1.0
// Default value is inherited from parent or equals 1.1 for root
FLUENT_SETTING_OPTIONAL(double, OvershootCoefficient);

// Consumption within provisioned limit.
// Informative metric that should be sent to billing (not billed).
FLUENT_SETTING_OPTIONAL(TMetric, Provisioned);

// Consumption that exceeds provisioned limit is billed as on-demand.
FLUENT_SETTING_OPTIONAL(TMetric, OnDemand);

// Consumption that exceeds even on-demand limit.
// Normally it is free and should not be billed.
FLUENT_SETTING_OPTIONAL(TMetric, Overshoot);
};

// Settings for create resource request.
Expand All @@ -55,13 +178,16 @@ struct TCreateResourceSettings
{
TCreateResourceSettings() = default;
TCreateResourceSettings(const Ydb::RateLimiter::CreateResourceRequest&);

FLUENT_SETTING_OPTIONAL(TMeteringConfig, MeteringConfig);
};

// Settings for alter resource request.
struct TAlterResourceSettings
: public TOperationRequestSettings<TAlterResourceSettings>
, public THierarchicalDrrSettings<TAlterResourceSettings>
{
FLUENT_SETTING_OPTIONAL(TMeteringConfig, MeteringConfig);
};

// Settings for drop resource request.
Expand Down Expand Up @@ -128,6 +254,14 @@ struct TDescribeResourceResult : public TStatus {
std::optional<double> GetPrefetchWatermark() const {
return PrefetchWatermark_;
}

std::optional<double> GetImmediatelyFillUpTo() const {
return ImmediatelyFillUpTo_;
}

const std::optional<TLeafBehavior>& GetLeafBehavior() const {
return LeafBehavior_;
}
};

TDescribeResourceResult(TStatus status, const Ydb::RateLimiter::DescribeResourceResult& result);
Expand All @@ -141,9 +275,14 @@ struct TDescribeResourceResult : public TStatus {
return HierarchicalDrrProps_;
}

const TMeteringConfig& GetMeteringConfig() const {
return MeteringConfig_;
}

private:
std::string ResourcePath_;
THierarchicalDrrProps HierarchicalDrrProps_;
TMeteringConfig MeteringConfig_;
};

using TAsyncDescribeResourceResult = NThreading::TFuture<TDescribeResourceResult>;
Expand Down
162 changes: 162 additions & 0 deletions ydb/public/sdk/cpp/src/client/rate_limiter/rate_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,47 @@
#include <ydb/public/api/grpc/ydb_rate_limiter_v1.grpc.pb.h>
#include <src/client/common_client/impl/client.h>

#include <google/protobuf/util/json_util.h>

namespace NYdb::inline V3::NRateLimiter {

TReplicatedBucketSettings::TReplicatedBucketSettings(const Ydb::RateLimiter::ReplicatedBucketSettings& proto) {
if (proto.has_report_interval_ms()) {
ReportInterval_ = std::chrono::milliseconds(proto.report_interval_ms());
}
}

void TReplicatedBucketSettings::SerializeTo(Ydb::RateLimiter::ReplicatedBucketSettings& proto) const {
if (ReportInterval_) {
proto.set_report_interval_ms(ReportInterval_->count());
}
}

TLeafBehavior::EBehavior TLeafBehavior::GetBehavior() const {
return static_cast<EBehavior>(BehaviorSettings_.index());
}

TLeafBehavior::TLeafBehavior(const TReplicatedBucketSettings& replicatedBucket)
: BehaviorSettings_(replicatedBucket)
{
}

TLeafBehavior::TLeafBehavior(const Ydb::RateLimiter::ReplicatedBucketSettings& replicatedBucket)
: BehaviorSettings_(replicatedBucket)
{
}

const TReplicatedBucketSettings& TLeafBehavior::GetReplicatedBucket() const {
return std::get<TReplicatedBucketSettings>(BehaviorSettings_);
}

void TLeafBehavior::SerializeTo(Ydb::RateLimiter::HierarchicalDrrSettings& proto) const {
switch (GetBehavior()) {
case REPLICATED_BUCKET:
return GetReplicatedBucket().SerializeTo(*proto.mutable_replicated_bucket());
}
}

template <class TDerived>
THierarchicalDrrSettings<TDerived>::THierarchicalDrrSettings(const Ydb::RateLimiter::HierarchicalDrrSettings& proto) {
if (proto.max_units_per_second()) {
Expand All @@ -26,6 +65,18 @@ THierarchicalDrrSettings<TDerived>::THierarchicalDrrSettings(const Ydb::RateLimi
if (proto.prefetch_watermark()) {
PrefetchWatermark_ = proto.prefetch_watermark();
}

if (proto.has_immediately_fill_up_to()) {
ImmediatelyFillUpTo_ = proto.immediately_fill_up_to();
}

switch (proto.leaf_behavior_case()) {
case Ydb::RateLimiter::HierarchicalDrrSettings::kReplicatedBucket:
LeafBehavior_.emplace(proto.replicated_bucket());
break;
case Ydb::RateLimiter::HierarchicalDrrSettings::LEAF_BEHAVIOR_NOT_SET:
break;
}
}

template <class TDerived>
Expand All @@ -45,6 +96,105 @@ void THierarchicalDrrSettings<TDerived>::SerializeTo(Ydb::RateLimiter::Hierarchi
if (PrefetchWatermark_) {
proto.set_prefetch_watermark(*PrefetchWatermark_);
}

if (ImmediatelyFillUpTo_) {
proto.set_immediately_fill_up_to(*ImmediatelyFillUpTo_);
}

if (LeafBehavior_) {
LeafBehavior_->SerializeTo(proto);
}
}

TMetric::TMetric(const Ydb::RateLimiter::MeteringConfig_Metric& proto) {
Enabled_ = proto.enabled();
if (proto.billing_period_sec()) {
BillingPeriod_ = std::chrono::seconds(proto.billing_period_sec());
}
for (const auto& [k, v] : proto.labels()) {
Labels_[k] = v;
}
if (proto.has_metric_fields()) {
TString jsonStr;
if (auto st = google::protobuf::util::MessageToJsonString(proto.metric_fields(), &jsonStr); st.ok()) {
MetricFieldsJson_ = jsonStr;
}
}
}

void TMetric::SerializeTo(Ydb::RateLimiter::MeteringConfig_Metric& proto) const {
proto.set_enabled(Enabled_);
if (BillingPeriod_) {
proto.set_billing_period_sec(BillingPeriod_->count());
}
for (const auto& [k, v] : Labels_) {
(*proto.mutable_labels())[k] = v;
}
if (!MetricFieldsJson_.empty()) {
google::protobuf::util::JsonStringToMessage(MetricFieldsJson_, proto.mutable_metric_fields());
}
}

TMeteringConfig::TMeteringConfig(const Ydb::RateLimiter::MeteringConfig& proto) {
Enabled_ = proto.enabled();
if (proto.report_period_ms()) {
ReportPeriod_ = std::chrono::milliseconds(proto.report_period_ms());
}
if (proto.meter_period_ms()) {
MeterPeriod_ = std::chrono::milliseconds(proto.meter_period_ms());
}
if (proto.collect_period_sec()) {
CollectPeriod_ = std::chrono::seconds(proto.collect_period_sec());
}
if (proto.provisioned_units_per_second()) {
ProvisionedUnitsPerSecond_ = proto.provisioned_units_per_second();
}
if (proto.provisioned_coefficient()) {
ProvisionedCoefficient_ = proto.provisioned_coefficient();
}
if (proto.overshoot_coefficient()) {
OvershootCoefficient_ = proto.overshoot_coefficient();
}
if (proto.has_provisioned()) {
Provisioned_.emplace(proto.provisioned());
}
if (proto.has_on_demand()) {
OnDemand_.emplace(proto.on_demand());
}
if (proto.has_overshoot()) {
Overshoot_.emplace(proto.overshoot());
}
}

void TMeteringConfig::SerializeTo(Ydb::RateLimiter::MeteringConfig& proto) const {
proto.set_enabled(Enabled_);
if (ReportPeriod_) {
proto.set_report_period_ms(ReportPeriod_->count());
}
if (MeterPeriod_) {
proto.set_meter_period_ms(MeterPeriod_->count());
}
if (CollectPeriod_) {
proto.set_collect_period_sec(CollectPeriod_->count());
}
if (ProvisionedUnitsPerSecond_) {
proto.set_provisioned_units_per_second(*ProvisionedUnitsPerSecond_);
}
if (ProvisionedCoefficient_) {
proto.set_provisioned_coefficient(*ProvisionedCoefficient_);
}
if (OvershootCoefficient_) {
proto.set_overshoot_coefficient(*OvershootCoefficient_);
}
if (Provisioned_) {
Provisioned_->SerializeTo(*proto.mutable_provisioned());
}
if (OnDemand_) {
OnDemand_->SerializeTo(*proto.mutable_on_demand());
}
if (Overshoot_) {
Overshoot_->SerializeTo(*proto.mutable_overshoot());
}
}

template struct THierarchicalDrrSettings<TCreateResourceSettings>;
Expand All @@ -67,6 +217,9 @@ TDescribeResourceResult::TDescribeResourceResult(TStatus status, const Ydb::Rate
, ResourcePath_(result.resource().resource_path())
, HierarchicalDrrProps_(result.resource().hierarchical_drr())
{
if (result.resource().has_metering_config()) {
MeteringConfig_ = result.resource().metering_config();
}
}

TDescribeResourceResult::THierarchicalDrrProps::THierarchicalDrrProps(const Ydb::RateLimiter::HierarchicalDrrSettings& settings)
Expand Down Expand Up @@ -102,6 +255,15 @@ class TRateLimiterClient::TImpl : public TClientImplCommon<TRateLimiterClient::T
if (settings.PrefetchWatermark_) {
hdrr.set_prefetch_watermark(*settings.PrefetchWatermark_);
}
if (settings.ImmediatelyFillUpTo_) {
hdrr.set_immediately_fill_up_to(*settings.ImmediatelyFillUpTo_);
}
if (settings.LeafBehavior_) {
settings.LeafBehavior_->SerializeTo(hdrr);
}
if (settings.MeteringConfig_) {
settings.MeteringConfig_->SerializeTo(*resource.mutable_metering_config());
}

return request;
}
Expand Down

0 comments on commit 4b90b10

Please sign in to comment.