Skip to content

Commit

Permalink
[SDK] Cardinality limits for metrics streams (Sync Instruments + Delt…
Browse files Browse the repository at this point in the history
…a Temporality) (open-telemetry#2255)
  • Loading branch information
lalitb authored Nov 21, 2023
1 parent 8cba762 commit 9b89843
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 2 deletions.
63 changes: 63 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/state/attributes_hashmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ namespace metrics

using opentelemetry::sdk::common::OrderedAttributeMap;

constexpr size_t kAggregationCardinalityLimit = 2000;
const std::string kAggregationCardinalityLimitOverflowError =
"Maximum data points for metric stream exceeded. Entry added to overflow";
const std::string kAttributesLimitOverflowKey = "otel.metrics.overflow";
const bool kAttributesLimitOverflowValue = true;
const size_t kOverflowAttributesHash = common::GetHashForAttributeMap(
{{kAttributesLimitOverflowKey,
kAttributesLimitOverflowValue}}); // precalculated for optimization

class AttributeHashGenerator
{
public:
Expand All @@ -35,6 +44,9 @@ class AttributeHashGenerator
class AttributesHashMap
{
public:
AttributesHashMap(size_t attributes_limit = kAggregationCardinalityLimit)
: attributes_limit_(attributes_limit)
{}
Aggregation *Get(size_t hash) const
{
auto it = hash_map_.find(hash);
Expand Down Expand Up @@ -66,6 +78,11 @@ class AttributesHashMap
return it->second.second.get();
}

if (IsOverflowAttributes())
{
return GetOrSetOveflowAttributes(aggregation_callback);
}

MetricAttributes attr{attributes};

hash_map_[hash] = {attr, aggregation_callback()};
Expand All @@ -80,6 +97,12 @@ class AttributesHashMap
{
return it->second.second.get();
}

if (IsOverflowAttributes())
{
return GetOrSetOveflowAttributes(aggregation_callback);
}

MetricAttributes attr{};
hash_map_[hash] = {attr, aggregation_callback()};
return hash_map_[hash].second.get();
Expand All @@ -95,6 +118,11 @@ class AttributesHashMap
return it->second.second.get();
}

if (IsOverflowAttributes())
{
return GetOrSetOveflowAttributes(aggregation_callback);
}

MetricAttributes attr{attributes};

hash_map_[hash] = {attr, aggregation_callback()};
Expand All @@ -113,6 +141,12 @@ class AttributesHashMap
{
it->second.second = std::move(aggr);
}
else if (IsOverflowAttributes())
{
hash_map_[kOverflowAttributesHash] = {
MetricAttributes{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}},
std::move(aggr)};
}
else
{
MetricAttributes attr{attributes};
Expand All @@ -127,6 +161,12 @@ class AttributesHashMap
{
it->second.second = std::move(aggr);
}
else if (IsOverflowAttributes())
{
hash_map_[kOverflowAttributesHash] = {
MetricAttributes{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}},
std::move(aggr)};
}
else
{
MetricAttributes attr{attributes};
Expand Down Expand Up @@ -157,6 +197,29 @@ class AttributesHashMap

private:
std::unordered_map<size_t, std::pair<MetricAttributes, std::unique_ptr<Aggregation>>> hash_map_;
size_t attributes_limit_;

Aggregation *GetOrSetOveflowAttributes(
std::function<std::unique_ptr<Aggregation>()> aggregation_callback)
{
auto agg = aggregation_callback();
return GetOrSetOveflowAttributes(std::move(agg));
}

Aggregation *GetOrSetOveflowAttributes(std::unique_ptr<Aggregation> agg)
{
auto it = hash_map_.find(kOverflowAttributesHash);
if (it != hash_map_.end())
{
return it->second.second.get();
}

MetricAttributes attr{{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}};
hash_map_[kOverflowAttributesHash] = {attr, std::move(agg)};
return hash_map_[kOverflowAttributesHash].second.get();
}

bool IsOverflowAttributes() const { return (hash_map_.size() + 1 >= attributes_limit_); }
};
} // namespace metrics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage
const AttributesProcessor *attributes_processor,
nostd::shared_ptr<ExemplarReservoir> &&exemplar_reservoir
OPENTELEMETRY_MAYBE_UNUSED,
const AggregationConfig *aggregation_config)
const AggregationConfig *aggregation_config,
size_t attributes_limit = kAggregationCardinalityLimit)
: instrument_descriptor_(instrument_descriptor),
attributes_hashmap_(new AttributesHashMap()),
attributes_hashmap_(new AttributesHashMap(attributes_limit)),
attributes_processor_(attributes_processor),
#ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
exemplar_reservoir_(exemplar_reservoir),
Expand Down
1 change: 1 addition & 0 deletions sdk/test/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ foreach(
attributes_hashmap_test
base2_exponential_histogram_indexer_test
circular_buffer_counter_test
cardinality_limit_test
histogram_test
sync_metric_storage_counter_test
sync_metric_storage_histogram_test
Expand Down
108 changes: 108 additions & 0 deletions sdk/test/metrics/cardinality_limit_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#include "common.h"
#include "opentelemetry/common/key_value_iterable_view.h"
#include "opentelemetry/sdk/metrics/aggregation/sum_aggregation.h"
#include "opentelemetry/sdk/metrics/instruments.h"
#include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
#include "opentelemetry/sdk/metrics/state/sync_metric_storage.h"

#include <gtest/gtest.h>
#include <functional>

using namespace opentelemetry::sdk::metrics;
using namespace opentelemetry::common;
namespace nostd = opentelemetry::nostd;

TEST(CardinalityLimit, AttributesHashMapBasicTests)
{
AttributesHashMap hash_map(10);
std::function<std::unique_ptr<Aggregation>()> aggregation_callback =
[]() -> std::unique_ptr<Aggregation> {
return std::unique_ptr<Aggregation>(new LongSumAggregation(true));
};
// add 10 unique metric points. 9 should be added to hashmap, 10th should be overflow.
long record_value = 100;
for (auto i = 0; i < 10; i++)
{
OrderedAttributeMap attributes = {{"key", std::to_string(i)}};
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes);
static_cast<LongSumAggregation *>(
hash_map.GetOrSetDefault(attributes, aggregation_callback, hash))
->Aggregate(record_value);
}
EXPECT_EQ(hash_map.Size(), 10);
// add 5 unique metric points above limit, they all should get consolidated as single
// overflowmetric point.
for (auto i = 10; i < 15; i++)
{
OrderedAttributeMap attributes = {{"key", std::to_string(i)}};
auto hash = opentelemetry::sdk::common::GetHashForAttributeMap(attributes);
static_cast<LongSumAggregation *>(
hash_map.GetOrSetDefault(attributes, aggregation_callback, hash))
->Aggregate(record_value);
}
EXPECT_EQ(hash_map.Size(), 10); // only one more metric point should be added as overflow.
// get the overflow metric point
auto agg = hash_map.GetOrSetDefault(
OrderedAttributeMap({{kAttributesLimitOverflowKey, kAttributesLimitOverflowValue}}),
aggregation_callback, kOverflowAttributesHash);
EXPECT_NE(agg, nullptr);
auto sum_agg = static_cast<LongSumAggregation *>(agg);
EXPECT_EQ(nostd::get<int64_t>(nostd::get<SumPointData>(sum_agg->ToPoint()).value_),
record_value * 6); // 1 from previous 10, 5 from current 5.
}

class WritableMetricStorageCardinalityLimitTestFixture
: public ::testing::TestWithParam<AggregationTemporality>
{};

TEST_P(WritableMetricStorageCardinalityLimitTestFixture, LongCounterSumAggregation)
{
auto sdk_start_ts = std::chrono::system_clock::now();
const size_t attributes_limit = 10;
InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kCounter,
InstrumentValueType::kLong};
std::unique_ptr<DefaultAttributesProcessor> default_attributes_processor{
new DefaultAttributesProcessor{}};
SyncMetricStorage storage(instr_desc, AggregationType::kSum, default_attributes_processor.get(),
ExemplarReservoir::GetNoExemplarReservoir(), nullptr, attributes_limit);

long record_value = 100;
// add 9 unique metric points, and 6 more above limit.
for (auto i = 0; i < 15; i++)
{
std::map<std::string, std::string> attributes = {{"key", std::to_string(i)}};
storage.RecordLong(record_value,
KeyValueIterableView<std::map<std::string, std::string>>(attributes),
opentelemetry::context::Context{});
}
AggregationTemporality temporality = GetParam();
std::shared_ptr<CollectorHandle> collector(new MockCollectorHandle(temporality));
std::vector<std::shared_ptr<CollectorHandle>> collectors;
collectors.push_back(collector);
//... Some computation here
auto collection_ts = std::chrono::system_clock::now();
size_t count_attributes = 0;
bool overflow_present = false;
storage.Collect(
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) {
for (const auto &data_attr : metric_data.point_data_attr_)
{
const auto &data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
count_attributes++;
if (data_attr.attributes.begin()->first == kAttributesLimitOverflowKey)
{
EXPECT_EQ(nostd::get<int64_t>(data.value_), record_value * 6);
overflow_present = true;
}
}
return true;
});
EXPECT_EQ(count_attributes, attributes_limit);
EXPECT_EQ(overflow_present, true);
}
INSTANTIATE_TEST_SUITE_P(All,
WritableMetricStorageCardinalityLimitTestFixture,
::testing::Values(AggregationTemporality::kDelta));

0 comments on commit 9b89843

Please sign in to comment.