Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

METRICS SDK - Calling Observable Instruments callback during metrics collection #1554

Merged
merged 10 commits into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions sdk/include/opentelemetry/sdk/metrics/async_instruments.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class ObservableInstrument : public opentelemetry::metrics::ObservableInstrument
{
public:
ObservableInstrument(InstrumentDescriptor instrument_descriptor,
std::unique_ptr<AsyncWritableMetricStorage> storage);
std::unique_ptr<AsyncWritableMetricStorage> storage,
std::shared_ptr<ObservableRegistry> observable_registry);

void AddCallback(opentelemetry::metrics::ObservableCallbackPtr callback,
void *state) noexcept override;
Expand All @@ -36,7 +37,7 @@ class ObservableInstrument : public opentelemetry::metrics::ObservableInstrument
private:
InstrumentDescriptor instrument_descriptor_;
std::unique_ptr<AsyncWritableMetricStorage> storage_;
std::unique_ptr<ObservableRegistry> observable_registry_;
std::shared_ptr<ObservableRegistry> observable_registry_;
};
} // namespace metrics
} // namespace sdk
Expand Down
2 changes: 2 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/meter.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace metrics
class MetricStorage;
class SyncWritableMetricStorage;
class AsyncWritableMetricsStorge;
class ObservableRegistry;

class Meter final : public opentelemetry::metrics::Meter
{
Expand Down Expand Up @@ -114,6 +115,7 @@ class Meter final : public opentelemetry::metrics::Meter
std::weak_ptr<sdk::metrics::MeterContext> meter_context_;
// Mapping between instrument-name and Aggregation Storage.
std::unordered_map<std::string, std::shared_ptr<MetricStorage>> storage_registry_;
std::shared_ptr<ObservableRegistry> observable_registry_;
std::unique_ptr<SyncWritableMetricStorage> RegisterSyncMetricStorage(
InstrumentDescriptor &instrument_descriptor);
std::unique_ptr<AsyncWritableMetricStorage> RegisterAsyncMetricStorage(
Expand Down
6 changes: 4 additions & 2 deletions sdk/src/metrics/async_instruments.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ namespace metrics
{

ObservableInstrument::ObservableInstrument(InstrumentDescriptor instrument_descriptor,
std::unique_ptr<AsyncWritableMetricStorage> storage)
std::unique_ptr<AsyncWritableMetricStorage> storage,
std::shared_ptr<ObservableRegistry> observable_registry)
: instrument_descriptor_(instrument_descriptor),
storage_(std::move(storage)),
observable_registry_{new ObservableRegistry()}
observable_registry_{observable_registry}

{}

void ObservableInstrument::AddCallback(opentelemetry::metrics::ObservableCallbackPtr callback,
Expand Down
19 changes: 12 additions & 7 deletions sdk/src/metrics/meter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
# include "opentelemetry/sdk/metrics/async_instruments.h"
# include "opentelemetry/sdk/metrics/exemplar/no_exemplar_reservoir.h"
# include "opentelemetry/sdk/metrics/state/multi_metric_storage.h"
# include "opentelemetry/sdk/metrics/state/observable_registry.h"
# include "opentelemetry/sdk/metrics/state/sync_metric_storage.h"

# include "opentelemetry/sdk/metrics/sync_instruments.h"
# include "opentelemetry/sdk_config.h"

Expand All @@ -26,7 +28,9 @@ namespace nostd = opentelemetry::nostd;
Meter::Meter(
std::weak_ptr<MeterContext> meter_context,
std::unique_ptr<sdk::instrumentationscope::InstrumentationScope> instrumentation_scope) noexcept
: scope_{std::move(instrumentation_scope)}, meter_context_{meter_context}
: scope_{std::move(instrumentation_scope)},
meter_context_{meter_context},
observable_registry_(new ObservableRegistry())
{}

nostd::shared_ptr<metrics::Counter<long>> Meter::CreateLongCounter(nostd::string_view name,
Expand Down Expand Up @@ -66,7 +70,7 @@ nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument> Meter::CreateLon
InstrumentValueType::kLong};
auto storage = RegisterAsyncMetricStorage(instrument_descriptor);
return nostd::shared_ptr<metrics::ObservableInstrument>{
new ObservableInstrument(instrument_descriptor, std::move(storage))};
new ObservableInstrument(instrument_descriptor, std::move(storage), observable_registry_)};
}

nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument>
Expand All @@ -80,7 +84,7 @@ Meter::CreateDoubleObservableCounter(nostd::string_view name,
InstrumentValueType::kDouble};
auto storage = RegisterAsyncMetricStorage(instrument_descriptor);
return nostd::shared_ptr<metrics::ObservableInstrument>{
new ObservableInstrument(instrument_descriptor, std::move(storage))};
new ObservableInstrument(instrument_descriptor, std::move(storage), observable_registry_)};
}

nostd::shared_ptr<metrics::Histogram<long>> Meter::CreateLongHistogram(
Expand Down Expand Up @@ -122,7 +126,7 @@ nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument> Meter::CreateLon
InstrumentValueType::kLong};
auto storage = RegisterAsyncMetricStorage(instrument_descriptor);
return nostd::shared_ptr<metrics::ObservableInstrument>{
new ObservableInstrument(instrument_descriptor, std::move(storage))};
new ObservableInstrument(instrument_descriptor, std::move(storage), observable_registry_)};
}

nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument> Meter::CreateDoubleObservableGauge(
Expand All @@ -136,7 +140,7 @@ nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument> Meter::CreateDou
InstrumentValueType::kDouble};
auto storage = RegisterAsyncMetricStorage(instrument_descriptor);
return nostd::shared_ptr<metrics::ObservableInstrument>{
new ObservableInstrument(instrument_descriptor, std::move(storage))};
new ObservableInstrument(instrument_descriptor, std::move(storage), observable_registry_)};
}

nostd::shared_ptr<metrics::UpDownCounter<long>> Meter::CreateLongUpDownCounter(
Expand Down Expand Up @@ -178,7 +182,7 @@ Meter::CreateLongObservableUpDownCounter(nostd::string_view name,
InstrumentValueType::kLong};
auto storage = RegisterAsyncMetricStorage(instrument_descriptor);
return nostd::shared_ptr<metrics::ObservableInstrument>{
new ObservableInstrument(instrument_descriptor, std::move(storage))};
new ObservableInstrument(instrument_descriptor, std::move(storage), observable_registry_)};
}

nostd::shared_ptr<opentelemetry::metrics::ObservableInstrument>
Expand All @@ -192,7 +196,7 @@ Meter::CreateDoubleObservableUpDownCounter(nostd::string_view name,
InstrumentValueType::kDouble};
auto storage = RegisterAsyncMetricStorage(instrument_descriptor);
return nostd::shared_ptr<metrics::ObservableInstrument>{
new ObservableInstrument(instrument_descriptor, std::move(storage))};
new ObservableInstrument(instrument_descriptor, std::move(storage), observable_registry_)};
}

const sdk::instrumentationscope::InstrumentationScope *Meter::GetInstrumentationScope()
Expand Down Expand Up @@ -290,6 +294,7 @@ std::vector<MetricData> Meter::Collect(CollectorHandle *collector,
opentelemetry::common::SystemTimestamp collect_ts) noexcept
{

observable_registry_->Observe(collect_ts);
std::vector<MetricData> metric_data_list;
auto ctx = meter_context_.lock();
if (!ctx)
Expand Down
1 change: 1 addition & 0 deletions sdk/test/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
foreach(
testname
meter_provider_sdk_test
meter_test
view_registry_test
aggregation_test
attributes_processor_test
Expand Down
4 changes: 3 additions & 1 deletion sdk/test/metrics/async_instruments_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ TEST(AsyncInstruments, ObservableInstrument)
InstrumentDescriptor instrument_descriptor = {"long_counter", "description", "1",
InstrumentType::kObservableCounter,
InstrumentValueType::kLong};
std::shared_ptr<ObservableRegistry> observable_registry(new ObservableRegistry());
std::unique_ptr<AsyncWritableMetricStorage> metric_storage(new AsyncMultiMetricStorage());
ObservableInstrument observable_counter_long(instrument_descriptor, std::move(metric_storage));
ObservableInstrument observable_counter_long(instrument_descriptor, std::move(metric_storage),
observable_registry);
observable_counter_long.AddCallback(asyc_generate_measurements, nullptr);
}

Expand Down
75 changes: 75 additions & 0 deletions sdk/test/metrics/meter_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#ifndef ENABLE_METRICS_PREVIEW
# include "opentelemetry/sdk/metrics/meter.h"
# include "opentelemetry/sdk/metrics/data/point_data.h"
# include "opentelemetry/sdk/metrics/meter_context.h"
# include "opentelemetry/sdk/metrics/meter_provider.h"
# include "opentelemetry/sdk/metrics/metric_reader.h"

# include <gtest/gtest.h>

using namespace opentelemetry;
using namespace opentelemetry::sdk::instrumentationscope;
using namespace opentelemetry::sdk::metrics;

class MockMetricReader : public MetricReader
{
public:
MockMetricReader() = default;
AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) const noexcept override
{
return AggregationTemporality::kCumulative;
}
bool OnShutDown(std::chrono::microseconds timeout) noexcept override { return true; }
bool OnForceFlush(std::chrono::microseconds timeout) noexcept override { return true; }
void OnInitialized() noexcept override {}
};

namespace
{
nostd::shared_ptr<metrics::Meter> InitMeter(MetricReader **metricReaderPtr)
{
static std::shared_ptr<metrics::MeterProvider> provider(new MeterProvider());
std::unique_ptr<MetricReader> metric_reader(new MockMetricReader());
*metricReaderPtr = metric_reader.get();
auto p = std::static_pointer_cast<MeterProvider>(provider);
p->AddMetricReader(std::move(metric_reader));
auto meter = provider->GetMeter("meter_name");
return meter;
}
} // namespace

void asyc_generate_measurements(opentelemetry::metrics::ObserverResult observer, void *state)
{
auto observer_long =
nostd::get<nostd::shared_ptr<opentelemetry::metrics::ObserverResultT<long>>>(observer);
observer_long->Observe(10l);
}

TEST(MeterTest, BasicAsyncTests)
{
MetricReader *metric_reader_ptr = nullptr;
auto meter = InitMeter(&metric_reader_ptr);
auto observable_counter = meter->CreateLongObservableCounter("observable_counter");
observable_counter->AddCallback(asyc_generate_measurements, nullptr);

size_t count = 0;
metric_reader_ptr->Collect([&count](ResourceMetrics &metric_data) {
EXPECT_EQ(metric_data.scope_metric_data_.size(), 1);
if (metric_data.scope_metric_data_.size())
{
EXPECT_EQ(metric_data.scope_metric_data_[0].metric_data_.size(), 1);
if (metric_data.scope_metric_data_.size())
{
count += metric_data.scope_metric_data_[0].metric_data_.size();
EXPECT_EQ(count, 1);
}
}
return true;
});
}

#endif