diff --git a/api/include/opentelemetry/metrics/async_instruments.h b/api/include/opentelemetry/metrics/async_instruments.h index 065a47649a..568ba602a3 100644 --- a/api/include/opentelemetry/metrics/async_instruments.h +++ b/api/include/opentelemetry/metrics/async_instruments.h @@ -9,20 +9,22 @@ OPENTELEMETRY_BEGIN_NAMESPACE namespace metrics { -class AsynchronousInstrument -{}; -template -class ObservableCounter : public AsynchronousInstrument -{}; +using ObservableCallbackPtr = void (*)(ObserverResult, void *); -template -class ObservableGauge : public AsynchronousInstrument -{}; +class ObservableInstrument +{ +public: + /** + * Sets up a function that will be called whenever a metric collection is initiated. + */ + virtual void AddCallback(ObservableCallbackPtr, void *state) noexcept = 0; -template -class ObservableUpDownCounter : public AsynchronousInstrument -{}; + /** + * Remove a function that was configured to be called whenever a metric collection is initiated. + */ + virtual void RemoveCallback(ObservableCallbackPtr, void *state) noexcept = 0; +}; } // namespace metrics OPENTELEMETRY_END_NAMESPACE diff --git a/api/include/opentelemetry/metrics/meter.h b/api/include/opentelemetry/metrics/meter.h index e0454e360b..0bc9b7116c 100644 --- a/api/include/opentelemetry/metrics/meter.h +++ b/api/include/opentelemetry/metrics/meter.h @@ -51,22 +51,18 @@ class Meter * shared_ptr to that Observable Counter * * @param name the name of the new Observable Counter. - * @param callback the function to be observed by the instrument. * @param description a brief description of what the Observable Counter is used for. * @param unit the unit of metric values following https://unitsofmeasure.org/ucum.html. - * @param state to be passed back to callback */ - virtual void CreateLongObservableCounter(nostd::string_view name, - void (*callback)(ObserverResult &, void *), - nostd::string_view description = "", - nostd::string_view unit = "", - void *state = nullptr) noexcept = 0; - - virtual void CreateDoubleObservableCounter(nostd::string_view name, - void (*callback)(ObserverResult &, void *), - nostd::string_view description = "", - nostd::string_view unit = "", - void *state = nullptr) noexcept = 0; + virtual nostd::shared_ptr CreateLongObservableCounter( + nostd::string_view name, + nostd::string_view description = "", + nostd::string_view unit = "") noexcept = 0; + + virtual nostd::shared_ptr CreateDoubleObservableCounter( + nostd::string_view name, + nostd::string_view description = "", + nostd::string_view unit = "") noexcept = 0; /** * Creates a Histogram with the passed characteristics and returns a shared_ptr to that Histogram. @@ -91,22 +87,18 @@ class Meter * shared_ptr to that Observable Counter * * @param name the name of the new Observable Gauge. - * @param callback the function to be observed by the instrument. * @param description a brief description of what the Observable Gauge is used for. * @param unit the unit of metric values following https://unitsofmeasure.org/ucum.html. - * @param state to be passed back to callback */ - virtual void CreateLongObservableGauge(nostd::string_view name, - void (*callback)(ObserverResult &, void *), - nostd::string_view description = "", - nostd::string_view unit = "", - void *state = nullptr) noexcept = 0; - - virtual void CreateDoubleObservableGauge(nostd::string_view name, - void (*callback)(ObserverResult &, void *), - nostd::string_view description = "", - nostd::string_view unit = "", - void *state = nullptr) noexcept = 0; + virtual nostd::shared_ptr CreateLongObservableGauge( + nostd::string_view name, + nostd::string_view description = "", + nostd::string_view unit = "") noexcept = 0; + + virtual nostd::shared_ptr CreateDoubleObservableGauge( + nostd::string_view name, + nostd::string_view description = "", + nostd::string_view unit = "") noexcept = 0; /** * Creates an UpDownCounter with the passed characteristics and returns a shared_ptr to that @@ -132,23 +124,18 @@ class Meter * a shared_ptr to that Observable UpDownCounter * * @param name the name of the new Observable UpDownCounter. - * @param callback the function to be observed by the instrument. * @param description a brief description of what the Observable UpDownCounter is used for. * @param unit the unit of metric values following https://unitsofmeasure.org/ucum.html. - * @param state to be passed back to callback */ - virtual void CreateLongObservableUpDownCounter(nostd::string_view name, - void (*callback)(ObserverResult &, void *), - nostd::string_view description = "", - nostd::string_view unit = "", - void *state = nullptr) noexcept = 0; - - virtual void CreateDoubleObservableUpDownCounter(nostd::string_view name, - void (*callback)(ObserverResult &, - void *), - nostd::string_view description = "", - nostd::string_view unit = "", - void *state = nullptr) noexcept = 0; + virtual nostd::shared_ptr CreateLongObservableUpDownCounter( + nostd::string_view name, + nostd::string_view description = "", + nostd::string_view unit = "") noexcept = 0; + + virtual nostd::shared_ptr CreateDoubleObservableUpDownCounter( + nostd::string_view name, + nostd::string_view description = "", + nostd::string_view unit = "") noexcept = 0; }; } // namespace metrics OPENTELEMETRY_END_NAMESPACE diff --git a/api/include/opentelemetry/metrics/noop.h b/api/include/opentelemetry/metrics/noop.h index c47f7489f0..ffd7ad1340 100644 --- a/api/include/opentelemetry/metrics/noop.h +++ b/api/include/opentelemetry/metrics/noop.h @@ -64,56 +64,16 @@ class NoopUpDownCounter : public UpDownCounter {} }; -template -class NoopObservableCounter : public ObservableCounter -{ -public: - NoopObservableCounter(nostd::string_view name, - void (*callback)(ObserverResult &), - nostd::string_view description, - nostd::string_view unit) noexcept - {} - - NoopObservableCounter(nostd::string_view name, - void (*callback)(ObserverResult &, const common::KeyValueIterable &), - nostd::string_view description, - nostd::string_view unit) noexcept - {} -}; - -template -class NoopObservableGauge : public ObservableGauge +class NoopObservableInstrument : public ObservableInstrument { public: - NoopObservableGauge(nostd::string_view name, - void (*callback)(ObserverResult &), - nostd::string_view description, - nostd::string_view unit) noexcept + NoopObservableInstrument(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit) noexcept {} - NoopObservableGauge(nostd::string_view name, - void (*callback)(ObserverResult &, const common::KeyValueIterable &), - nostd::string_view description, - nostd::string_view unit) noexcept - {} -}; - -template -class NoopObservableUpDownCounter : public ObservableUpDownCounter -{ -public: - NoopObservableUpDownCounter(nostd::string_view name, - void (*callback)(ObserverResult &), - nostd::string_view description, - nostd::string_view unit) noexcept - {} - - NoopObservableUpDownCounter(nostd::string_view name, - void (*callback)(ObserverResult &, - const common::KeyValueIterable &), - nostd::string_view description, - nostd::string_view unit) noexcept - {} + void AddCallback(ObservableCallbackPtr, void *state) noexcept override {} + void RemoveCallback(ObservableCallbackPtr, void *state) noexcept override {} }; /** @@ -137,19 +97,23 @@ class NoopMeter final : public Meter return nostd::shared_ptr>{new NoopCounter(name, description, unit)}; } - void CreateLongObservableCounter(nostd::string_view name, - void (*callback)(ObserverResult &, void *), - nostd::string_view description = "", - nostd::string_view unit = "", - void *state = nullptr) noexcept override - {} + nostd::shared_ptr CreateLongObservableCounter( + nostd::string_view name, + nostd::string_view description = "", + nostd::string_view unit = "") noexcept override + { + return nostd::shared_ptr( + new NoopObservableInstrument(name, description, unit)); + } - void CreateDoubleObservableCounter(nostd::string_view name, - void (*callback)(ObserverResult &, void *), - nostd::string_view description = "", - nostd::string_view unit = "", - void *state = nullptr) noexcept override - {} + nostd::shared_ptr CreateDoubleObservableCounter( + nostd::string_view name, + nostd::string_view description = "", + nostd::string_view unit = "") noexcept override + { + return nostd::shared_ptr( + new NoopObservableInstrument(name, description, unit)); + } nostd::shared_ptr> CreateLongHistogram( nostd::string_view name, @@ -167,19 +131,23 @@ class NoopMeter final : public Meter return nostd::shared_ptr>{new NoopHistogram(name, description, unit)}; } - void CreateLongObservableGauge(nostd::string_view name, - void (*callback)(ObserverResult &, void *), - nostd::string_view description = "", - nostd::string_view unit = "", - void *state = nullptr) noexcept override - {} + nostd::shared_ptr CreateLongObservableGauge( + nostd::string_view name, + nostd::string_view description = "", + nostd::string_view unit = "") noexcept override + { + return nostd::shared_ptr( + new NoopObservableInstrument(name, description, unit)); + } - void CreateDoubleObservableGauge(nostd::string_view name, - void (*callback)(ObserverResult &, void *), - nostd::string_view description = "", - nostd::string_view unit = "", - void *state = nullptr) noexcept override - {} + nostd::shared_ptr CreateDoubleObservableGauge( + nostd::string_view name, + nostd::string_view description = "", + nostd::string_view unit = "") noexcept override + { + return nostd::shared_ptr( + new NoopObservableInstrument(name, description, unit)); + } nostd::shared_ptr> CreateLongUpDownCounter( nostd::string_view name, @@ -199,19 +167,23 @@ class NoopMeter final : public Meter new NoopUpDownCounter(name, description, unit)}; } - void CreateLongObservableUpDownCounter(nostd::string_view name, - void (*callback)(ObserverResult &, void *), - nostd::string_view description = "", - nostd::string_view unit = "", - void *state = nullptr) noexcept override - {} + nostd::shared_ptr CreateLongObservableUpDownCounter( + nostd::string_view name, + nostd::string_view description = "", + nostd::string_view unit = "") noexcept override + { + return nostd::shared_ptr( + new NoopObservableInstrument(name, description, unit)); + } - void CreateDoubleObservableUpDownCounter(nostd::string_view name, - void (*callback)(ObserverResult &, void *), - nostd::string_view description = "", - nostd::string_view unit = "", - void *state = nullptr) noexcept override - {} + nostd::shared_ptr CreateDoubleObservableUpDownCounter( + nostd::string_view name, + nostd::string_view description = "", + nostd::string_view unit = "") noexcept override + { + return nostd::shared_ptr( + new NoopObservableInstrument(name, description, unit)); + } }; /** diff --git a/api/include/opentelemetry/metrics/observer_result.h b/api/include/opentelemetry/metrics/observer_result.h index 84893deef0..82acaf9a0e 100644 --- a/api/include/opentelemetry/metrics/observer_result.h +++ b/api/include/opentelemetry/metrics/observer_result.h @@ -6,6 +6,7 @@ # include "opentelemetry/common/attribute_value.h" # include "opentelemetry/common/key_value_iterable_view.h" +# include "opentelemetry/nostd/shared_ptr.h" # include "opentelemetry/nostd/span.h" # include "opentelemetry/nostd/string_view.h" # include "opentelemetry/nostd/type_traits.h" @@ -15,12 +16,11 @@ namespace metrics { /** - * ObserverResult class is necessary for the callback recording asynchronous + * ObserverResultT class is necessary for the callback recording asynchronous * instrument use. */ - template -class ObserverResult +class ObserverResultT { public: @@ -44,6 +44,9 @@ class ObserverResult } }; +using ObserverResult = nostd::variant>, + nostd::shared_ptr>>; + } // namespace metrics OPENTELEMETRY_END_NAMESPACE #endif diff --git a/examples/common/metrics_foo_library/foo_library.cc b/examples/common/metrics_foo_library/foo_library.cc index c46276833c..ec4fec736d 100644 --- a/examples/common/metrics_foo_library/foo_library.cc +++ b/examples/common/metrics_foo_library/foo_library.cc @@ -32,12 +32,18 @@ std::map get_random_attr() class MeasurementFetcher { public: - static void Fetcher(opentelemetry::metrics::ObserverResult &observer_result, void *state) + static void Fetcher(opentelemetry::metrics::ObserverResult observer_result, void *state) { - double val = (rand() % 700) + 1.1; std::map labels = get_random_attr(); auto labelkv = opentelemetry::common::KeyValueIterableView{labels}; - observer_result.Observe(val /*, labelkv*/); + if (nostd::holds_alternative< + nostd::shared_ptr>>(observer_result)) + { + double val = (rand() % 700) + 1.1; + nostd::get>>( + observer_result) + ->Observe(val /*, labelkv */); + } } }; } // namespace @@ -62,7 +68,8 @@ void foo_library::observable_counter_example(const std::string &name) std::string counter_name = name + "_observable_counter"; auto provider = metrics_api::Provider::GetMeterProvider(); nostd::shared_ptr meter = provider->GetMeter(name, "1.2.0"); - meter->CreateDoubleObservableCounter(counter_name, MeasurementFetcher::Fetcher); + auto counter = meter->CreateDoubleObservableCounter(counter_name); + counter->AddCallback(MeasurementFetcher::Fetcher, nullptr); while (true) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); diff --git a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h index 8b1f76377b..f14d8f3a27 100644 --- a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h @@ -7,108 +7,37 @@ # include "opentelemetry/metrics/observer_result.h" # include "opentelemetry/nostd/string_view.h" # include "opentelemetry/sdk/metrics/instruments.h" +# include "opentelemetry/sdk/metrics/state/observable_registry.h" + OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk { namespace metrics { -template -class Asynchronous -{ -public: - Asynchronous(nostd::string_view name, - void (*callback)(opentelemetry::metrics::ObserverResult &), - nostd::string_view description = "", - nostd::string_view unit = "") - : name_(name), callback_(callback), description_(description), unit_(unit) - {} - -protected: - std::string name_; - void (*callback_)(opentelemetry::metrics::ObserverResult &); - std::string description_; - std::string unit_; -}; - -class LongObservableCounter : public opentelemetry::metrics::ObservableCounter, - public Asynchronous -{ -public: - LongObservableCounter(nostd::string_view name, - void (*callback)(opentelemetry::metrics::ObserverResult &), - nostd::string_view description = "", - nostd::string_view unit = "") - : Asynchronous(name, callback, description, unit) - - {} -}; - -class DoubleObservableCounter : public opentelemetry::metrics::ObservableCounter, - public Asynchronous -{ -public: - DoubleObservableCounter(nostd::string_view name, - void (*callback)(opentelemetry::metrics::ObserverResult &), - nostd::string_view description = "", - nostd::string_view unit = "") - : Asynchronous(name, callback, description, unit) - - {} -}; +class AsyncWritableMetricStorage; -class LongObservableGauge : public opentelemetry::metrics::ObservableGauge, - public Asynchronous +class ObservableInstrument : public opentelemetry::metrics::ObservableInstrument { public: - LongObservableGauge(nostd::string_view name, - void (*callback)(opentelemetry::metrics::ObserverResult &), - nostd::string_view description = "", - nostd::string_view unit = "") - : Asynchronous(name, callback, description, unit) + ObservableInstrument(InstrumentDescriptor instrument_descriptor, + std::unique_ptr storage); - {} -}; + void AddCallback(opentelemetry::metrics::ObservableCallbackPtr callback, + void *state) noexcept override; -class DoubleObservableGauge : public opentelemetry::metrics::ObservableGauge, - public Asynchronous -{ -public: - DoubleObservableGauge(nostd::string_view name, - void (*callback)(opentelemetry::metrics::ObserverResult &), - nostd::string_view description = "", - nostd::string_view unit = "") - : Asynchronous(name, callback, description, unit) + void RemoveCallback(opentelemetry::metrics::ObservableCallbackPtr callback, + void *state) noexcept override; - {} -}; + const InstrumentDescriptor &GetInstrumentDescriptor(); -class LongObservableUpDownCounter : public opentelemetry::metrics::ObservableUpDownCounter, - public Asynchronous -{ -public: - LongObservableUpDownCounter(nostd::string_view name, - void (*callback)(opentelemetry::metrics::ObserverResult &), - nostd::string_view description = "", - nostd::string_view unit = "") - : Asynchronous(name, callback, description, unit) - - {} -}; + AsyncWritableMetricStorage *GetMetricStorage(); -class DoubleObservableUpDownCounter - : public opentelemetry::metrics::ObservableUpDownCounter, - public Asynchronous -{ -public: - DoubleObservableUpDownCounter(nostd::string_view name, - void (*callback)(opentelemetry::metrics::ObserverResult &), - nostd::string_view description = "", - nostd::string_view unit = "") - : Asynchronous(name, callback, description, unit) - {} +private: + InstrumentDescriptor instrument_descriptor_; + std::unique_ptr storage_; + std::unique_ptr observable_registry_; }; - } // namespace metrics } // namespace sdk OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/include/opentelemetry/sdk/metrics/instruments.h b/sdk/include/opentelemetry/sdk/metrics/instruments.h index 5d85357143..845485f70d 100644 --- a/sdk/include/opentelemetry/sdk/metrics/instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/instruments.h @@ -19,6 +19,12 @@ enum class InstrumentType kObservableUpDownCounter }; +enum class InstrumentClass +{ + kSync, + kAsync +}; + enum class InstrumentValueType { kInt, @@ -52,6 +58,19 @@ struct InstrumentDescriptor InstrumentValueType value_type_; }; +static InstrumentClass GetInstrumentClass(InstrumentType type) +{ + if (type == InstrumentType::kCounter || type == InstrumentType::kHistogram || + type == InstrumentType::kUpDownCounter) + { + return InstrumentClass::kSync; + } + else + { + return InstrumentClass::kAsync; + } +} + using MetricAttributes = opentelemetry::sdk::common::OrderedAttributeMap; /*class InstrumentSelector { diff --git a/sdk/include/opentelemetry/sdk/metrics/meter.h b/sdk/include/opentelemetry/sdk/metrics/meter.h index d3df60ced0..332c849ee1 100644 --- a/sdk/include/opentelemetry/sdk/metrics/meter.h +++ b/sdk/include/opentelemetry/sdk/metrics/meter.h @@ -22,7 +22,8 @@ namespace metrics { class MetricStorage; -class WritableMetricStorage; +class SyncWritableMetricStorage; +class AsyncWritableMetricsStorge; class Meter final : public opentelemetry::metrics::Meter { @@ -43,19 +44,15 @@ class Meter final : public opentelemetry::metrics::Meter nostd::string_view description = "", nostd::string_view unit = "") noexcept override; - void CreateLongObservableCounter(nostd::string_view name, - void (*callback)(opentelemetry::metrics::ObserverResult &, - void *), - nostd::string_view description = "", - nostd::string_view unit = "", - void *state = nullptr) noexcept override; + nostd::shared_ptr CreateLongObservableCounter( + nostd::string_view name, + nostd::string_view description = "", + nostd::string_view unit = "") noexcept override; - void CreateDoubleObservableCounter( + nostd::shared_ptr CreateDoubleObservableCounter( nostd::string_view name, - void (*callback)(opentelemetry::metrics::ObserverResult &, void *), nostd::string_view description = "", - nostd::string_view unit = "", - void *state = nullptr) noexcept override; + nostd::string_view unit = "") noexcept override; nostd::shared_ptr> CreateLongHistogram( nostd::string_view name, @@ -67,19 +64,15 @@ class Meter final : public opentelemetry::metrics::Meter nostd::string_view description = "", nostd::string_view unit = "") noexcept override; - void CreateLongObservableGauge(nostd::string_view name, - void (*callback)(opentelemetry::metrics::ObserverResult &, - void *), - nostd::string_view description = "", - nostd::string_view unit = "", - void *state = nullptr) noexcept override; + nostd::shared_ptr CreateLongObservableGauge( + nostd::string_view name, + nostd::string_view description = "", + nostd::string_view unit = "") noexcept override; - void CreateDoubleObservableGauge( + nostd::shared_ptr CreateDoubleObservableGauge( nostd::string_view name, - void (*callback)(opentelemetry::metrics::ObserverResult &, void *), nostd::string_view description = "", - nostd::string_view unit = "", - void *state = nullptr) noexcept override; + nostd::string_view unit = "") noexcept override; nostd::shared_ptr> CreateLongUpDownCounter( nostd::string_view name, @@ -91,19 +84,15 @@ class Meter final : public opentelemetry::metrics::Meter nostd::string_view description = "", nostd::string_view unit = "") noexcept override; - void CreateLongObservableUpDownCounter( + nostd::shared_ptr CreateLongObservableUpDownCounter( nostd::string_view name, - void (*callback)(opentelemetry::metrics::ObserverResult &, void *), nostd::string_view description = "", - nostd::string_view unit = "", - void *state = nullptr) noexcept override; + nostd::string_view unit = "") noexcept override; - void CreateDoubleObservableUpDownCounter( - nostd::string_view name, - void (*callback)(opentelemetry::metrics::ObserverResult &, void *), - nostd::string_view description = "", - nostd::string_view unit = "", - void *state = nullptr) noexcept override; + nostd::shared_ptr + CreateDoubleObservableUpDownCounter(nostd::string_view name, + nostd::string_view description = "", + nostd::string_view unit = "") noexcept override; /** Returns the associated instrumentation scope */ const sdk::instrumentationscope::InstrumentationScope *GetInstrumentationScope() const noexcept; @@ -125,42 +114,10 @@ class Meter final : public opentelemetry::metrics::Meter std::shared_ptr meter_context_; // Mapping between instrument-name and Aggregation Storage. std::unordered_map> storage_registry_; - - std::unique_ptr RegisterMetricStorage( + std::unique_ptr RegisterSyncMetricStorage( + InstrumentDescriptor &instrument_descriptor); + std::unique_ptr RegisterAsyncMetricStorage( InstrumentDescriptor &instrument_descriptor); - - template - void RegisterAsyncMetricStorage(InstrumentDescriptor &instrument_descriptor, - void (*callback)(opentelemetry::metrics::ObserverResult &, - void *), - void *state = nullptr) - { - auto view_registry = meter_context_->GetViewRegistry(); - auto success = view_registry->FindViews( - instrument_descriptor, *scope_, - [this, &instrument_descriptor, callback, state](const View &view) { - auto view_instr_desc = instrument_descriptor; - if (!view.GetName().empty()) - { - view_instr_desc.name_ = view.GetName(); - } - if (!view.GetDescription().empty()) - { - view_instr_desc.description_ = view.GetDescription(); - } - auto storage = std::shared_ptr>(new AsyncMetricStorage( - view_instr_desc, view.GetAggregationType(), callback, &view.GetAttributesProcessor(), - view.GetAggregationConfig(), state)); - storage_registry_[instrument_descriptor.name_] = storage; - return true; - }); - if (!success) - { - OTEL_INTERNAL_LOG_ERROR( - "[Meter::RegisterAsyncMetricStorage] - Error during finding matching views." - << "Some of the matching view configurations may not be used for metric collection"); - } - } }; } // namespace metrics } // namespace sdk diff --git a/sdk/include/opentelemetry/sdk/metrics/observer_result.h b/sdk/include/opentelemetry/sdk/metrics/observer_result.h index ca7227bc5b..f371c152d4 100644 --- a/sdk/include/opentelemetry/sdk/metrics/observer_result.h +++ b/sdk/include/opentelemetry/sdk/metrics/observer_result.h @@ -16,10 +16,10 @@ namespace sdk namespace metrics { template -class ObserverResult final : public opentelemetry::metrics::ObserverResult +class ObserverResultT final : public opentelemetry::metrics::ObserverResultT { public: - ObserverResult(const AttributesProcessor *attributes_processor) + explicit ObserverResultT(const AttributesProcessor *attributes_processor = nullptr) : attributes_processor_(attributes_processor) {} @@ -27,8 +27,15 @@ class ObserverResult final : public opentelemetry::metrics::ObserverResult void Observe(T value, const opentelemetry::common::KeyValueIterable &attributes) noexcept override { - auto attr = attributes_processor_->process(attributes); - data_.insert({attr, value}); + if (attributes_processor_) + { + auto attr = attributes_processor_->process(attributes); + data_.insert({attr, value}); + } + else + { + data_.insert({MetricAttributes{attributes}, value}); + } } const std::unordered_map &GetMeasurements() @@ -38,7 +45,6 @@ class ObserverResult final : public opentelemetry::metrics::ObserverResult private: std::unordered_map data_; - const AttributesProcessor *attributes_processor_; }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h index 11e7d83c14..97072f2efb 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/async_metric_storage.h @@ -22,39 +22,29 @@ namespace sdk namespace metrics { -template -class AsyncMetricStorage : public MetricStorage +class AsyncMetricStorage : public MetricStorage, public AsyncWritableMetricStorage { public: AsyncMetricStorage(InstrumentDescriptor instrument_descriptor, const AggregationType aggregation_type, - void (*measurement_callback)(opentelemetry::metrics::ObserverResult &, - void *), const AttributesProcessor *attributes_processor, nostd::shared_ptr aggregation_config, void *state = nullptr) : instrument_descriptor_(instrument_descriptor), aggregation_type_{aggregation_type}, - measurement_collection_callback_{measurement_callback}, attributes_processor_{attributes_processor}, state_{state}, cumulative_hash_map_(new AttributesHashMap()), + delta_hash_map_(new AttributesHashMap()), temporal_metric_storage_(instrument_descriptor, aggregation_config) {} - bool Collect(CollectorHandle *collector, - nostd::span> collectors, - opentelemetry::common::SystemTimestamp sdk_start_ts, - opentelemetry::common::SystemTimestamp collection_ts, - nostd::function_ref metric_collection_callback) noexcept override + template + void Record(const std::unordered_map &measurements, + opentelemetry::common::SystemTimestamp observation_time) noexcept { - opentelemetry::sdk::metrics::ObserverResult ob_res(attributes_processor_); - - // read the measurement using configured callback - measurement_collection_callback_(ob_res, state_); - std::shared_ptr delta_hash_map(new AttributesHashMap()); // process the read measurements - aggregate and store in hashmap - for (auto &measurement : ob_res.GetMeasurements()) + for (auto &measurement : measurements) { auto aggr = DefaultAggregation::CreateAggregation(aggregation_type_, instrument_descriptor_); aggr->Aggregate(measurement.second); @@ -65,29 +55,61 @@ class AsyncMetricStorage : public MetricStorage cumulative_hash_map_->Set(measurement.first, DefaultAggregation::CloneAggregation( aggregation_type_, instrument_descriptor_, *delta)); - delta_hash_map->Set(measurement.first, std::move(delta)); + delta_hash_map_->Set(measurement.first, std::move(delta)); } else { cumulative_hash_map_->Set( measurement.first, DefaultAggregation::CloneAggregation(aggregation_type_, instrument_descriptor_, *aggr)); - delta_hash_map->Set(measurement.first, std::move(aggr)); + delta_hash_map_->Set(measurement.first, std::move(aggr)); } } + } + + void RecordLong( + const std::unordered_map &measurements, + opentelemetry::common::SystemTimestamp observation_time) noexcept override + { + if (instrument_descriptor_.value_type_ != InstrumentValueType::kLong) + { + return; + } + Record(measurements, observation_time); + } + + void RecordDouble( + const std::unordered_map &measurements, + opentelemetry::common::SystemTimestamp observation_time) noexcept override + { + if (instrument_descriptor_.value_type_ != InstrumentValueType::kDouble) + { + return; + } + Record(measurements, observation_time); + } + + bool Collect(CollectorHandle *collector, + nostd::span> collectors, + opentelemetry::common::SystemTimestamp sdk_start_ts, + opentelemetry::common::SystemTimestamp collection_ts, + nostd::function_ref metric_collection_callback) noexcept override + { - return temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts, - std::move(delta_hash_map), - metric_collection_callback); + auto status = temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, + collection_ts, std::move(delta_hash_map_), + metric_collection_callback); + delta_hash_map_.reset(new AttributesHashMap()); + return status; } private: InstrumentDescriptor instrument_descriptor_; AggregationType aggregation_type_; - void (*measurement_collection_callback_)(opentelemetry::metrics::ObserverResult &, void *); const AttributesProcessor *attributes_processor_; void *state_; std::unique_ptr cumulative_hash_map_; + std::unique_ptr delta_hash_map_; TemporalMetricStorage temporal_metric_storage_; }; diff --git a/sdk/include/opentelemetry/sdk/metrics/state/metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/metric_storage.h index e0ba55cbfe..18ee235158 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/metric_storage.h @@ -8,6 +8,8 @@ # include "opentelemetry/context/context.h" # include "opentelemetry/sdk/metrics/data/metric_data.h" # include "opentelemetry/sdk/metrics/instruments.h" +# include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" + OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk { @@ -28,7 +30,8 @@ class MetricStorage nostd::function_ref callback) noexcept = 0; }; -class WritableMetricStorage +/* Represents the sync metric storage */ +class SyncWritableMetricStorage { public: virtual void RecordLong(long value, const opentelemetry::context::Context &context) noexcept = 0; @@ -44,7 +47,21 @@ class WritableMetricStorage const opentelemetry::common::KeyValueIterable &attributes, const opentelemetry::context::Context &context) noexcept = 0; - virtual ~WritableMetricStorage() = default; + virtual ~SyncWritableMetricStorage() = default; +}; + +/* Represents the async metric stroage */ +class AsyncWritableMetricStorage +{ +public: + /* Records a batch of measurements */ + virtual void RecordLong( + const std::unordered_map &measurements, + opentelemetry::common::SystemTimestamp observation_time) noexcept = 0; + + virtual void RecordDouble( + const std::unordered_map &measurements, + opentelemetry::common::SystemTimestamp observation_time) noexcept = 0; }; class NoopMetricStorage : public MetricStorage @@ -61,7 +78,7 @@ class NoopMetricStorage : public MetricStorage } }; -class NoopWritableMetricStorage : public WritableMetricStorage +class NoopWritableMetricStorage : public SyncWritableMetricStorage { public: void RecordLong(long value, const opentelemetry::context::Context &context) noexcept = 0; @@ -80,6 +97,20 @@ class NoopWritableMetricStorage : public WritableMetricStorage {} }; +class NoopAsyncWritableMetricStorage : public AsyncWritableMetricStorage +{ +public: + void RecordLong( + const std::unordered_map &measurements, + opentelemetry::common::SystemTimestamp observation_time) noexcept override + {} + + void RecordDouble( + const std::unordered_map &measurements, + opentelemetry::common::SystemTimestamp observation_time) noexcept override + {} +}; + } // namespace metrics } // namespace sdk OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/include/opentelemetry/sdk/metrics/state/multi_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/multi_metric_storage.h index ceeafa0406..b3cae8694c 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/multi_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/multi_metric_storage.h @@ -15,10 +15,13 @@ namespace sdk namespace metrics { -class MultiMetricStorage : public WritableMetricStorage +class SyncMultiMetricStorage : public SyncWritableMetricStorage { public: - void AddStorage(std::shared_ptr storage) { storages_.push_back(storage); } + void AddStorage(std::shared_ptr storage) + { + storages_.push_back(storage); + } virtual void RecordLong(long value, const opentelemetry::context::Context &context) noexcept override @@ -59,7 +62,39 @@ class MultiMetricStorage : public WritableMetricStorage } private: - std::vector> storages_; + std::vector> storages_; +}; + +class AsyncMultiMetricStorage : public AsyncWritableMetricStorage +{ +public: + void AddStorage(std::shared_ptr storage) + { + storages_.push_back(storage); + } + + void RecordLong( + const std::unordered_map &measurements, + opentelemetry::common::SystemTimestamp observation_time) noexcept override + { + for (auto &s : storages_) + { + s->RecordLong(measurements, observation_time); + } + } + + void RecordDouble( + const std::unordered_map &measurements, + opentelemetry::common::SystemTimestamp observation_time) noexcept override + { + for (auto &s : storages_) + { + s->RecordDouble(measurements, observation_time); + } + } + +private: + std::vector> storages_; }; } // namespace metrics diff --git a/sdk/include/opentelemetry/sdk/metrics/state/observable_registry.h b/sdk/include/opentelemetry/sdk/metrics/state/observable_registry.h new file mode 100644 index 0000000000..045b83c27b --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/state/observable_registry.h @@ -0,0 +1,48 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once +#ifndef ENABLE_METRICS_PREVIEW + +# include "opentelemetry/common/timestamp.h" +# include "opentelemetry/metrics/async_instruments.h" + +# include +# include +# include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +struct ObservableCallbackRecord +{ + opentelemetry::metrics::ObservableCallbackPtr callback; + void *state; + opentelemetry::metrics::ObservableInstrument *instrument; +}; + +class ObservableRegistry +{ +public: + void AddCallback(opentelemetry::metrics::ObservableCallbackPtr callback, + void *state, + opentelemetry::metrics::ObservableInstrument *instrument); + + void RemoveCallback(opentelemetry::metrics::ObservableCallbackPtr callback, + void *state, + opentelemetry::metrics::ObservableInstrument *instrument); + + void Observe(opentelemetry::common::SystemTimestamp collection_ts); + +private: + std::vector> callbacks_; + std::mutex callbacks_m_; +}; + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif \ No newline at end of file diff --git a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h index d547f3699f..fe3be8613b 100644 --- a/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h +++ b/sdk/include/opentelemetry/sdk/metrics/state/sync_metric_storage.h @@ -23,7 +23,7 @@ namespace sdk { namespace metrics { -class SyncMetricStorage : public MetricStorage, public WritableMetricStorage +class SyncMetricStorage : public MetricStorage, public SyncWritableMetricStorage { public: diff --git a/sdk/include/opentelemetry/sdk/metrics/sync_instruments.h b/sdk/include/opentelemetry/sdk/metrics/sync_instruments.h index 1c66787d80..05dae8558f 100644 --- a/sdk/include/opentelemetry/sdk/metrics/sync_instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/sync_instruments.h @@ -17,26 +17,26 @@ namespace metrics { // forward declaration -class WritableMetricStorage; +class SyncWritableMetricStorage; class Synchronous { public: Synchronous(InstrumentDescriptor instrument_descriptor, - std::unique_ptr storage) + std::unique_ptr storage) : instrument_descriptor_(instrument_descriptor), storage_(std::move(storage)) {} protected: InstrumentDescriptor instrument_descriptor_; - std::unique_ptr storage_; + std::unique_ptr storage_; }; class LongCounter : public Synchronous, public opentelemetry::metrics::Counter { public: LongCounter(InstrumentDescriptor instrument_descriptor, - std::unique_ptr storage); + std::unique_ptr storage); void Add(long value, const opentelemetry::common::KeyValueIterable &attributes) noexcept override; void Add(long value, @@ -52,7 +52,7 @@ class DoubleCounter : public Synchronous, public opentelemetry::metrics::Counter public: DoubleCounter(InstrumentDescriptor instrument_descriptor, - std::unique_ptr storage); + std::unique_ptr storage); void Add(double value, const opentelemetry::common::KeyValueIterable &attributes) noexcept override; @@ -68,7 +68,7 @@ class LongUpDownCounter : public Synchronous, public opentelemetry::metrics::UpD { public: LongUpDownCounter(InstrumentDescriptor instrument_descriptor, - std::unique_ptr storage); + std::unique_ptr storage); void Add(long value, const opentelemetry::common::KeyValueIterable &attributes) noexcept override; void Add(long value, @@ -83,7 +83,7 @@ class DoubleUpDownCounter : public Synchronous, public opentelemetry::metrics::U { public: DoubleUpDownCounter(InstrumentDescriptor instrument_descriptor, - std::unique_ptr storage); + std::unique_ptr storage); void Add(double value, const opentelemetry::common::KeyValueIterable &attributes) noexcept override; @@ -99,7 +99,7 @@ class LongHistogram : public Synchronous, public opentelemetry::metrics::Histogr { public: LongHistogram(InstrumentDescriptor instrument_descriptor, - std::unique_ptr storage); + std::unique_ptr storage); void Record(long value, const opentelemetry::common::KeyValueIterable &attributes, @@ -112,7 +112,7 @@ class DoubleHistogram : public Synchronous, public opentelemetry::metrics::Histo { public: DoubleHistogram(InstrumentDescriptor instrument_descriptor, - std::unique_ptr storage); + std::unique_ptr storage); void Record(double value, const opentelemetry::common::KeyValueIterable &attributes, diff --git a/sdk/src/metrics/CMakeLists.txt b/sdk/src/metrics/CMakeLists.txt index 77a371a80c..dbbb465183 100644 --- a/sdk/src/metrics/CMakeLists.txt +++ b/sdk/src/metrics/CMakeLists.txt @@ -1,11 +1,13 @@ add_library( opentelemetry_metrics + async_instruments.cc meter_provider.cc meter.cc meter_context.cc metric_reader.cc export/periodic_exporting_metric_reader.cc state/metric_collector.cc + state/observable_registry.cc state/sync_metric_storage.cc state/temporal_metric_storage.cc aggregation/histogram_aggregation.cc diff --git a/sdk/src/metrics/async_instruments.cc b/sdk/src/metrics/async_instruments.cc new file mode 100644 index 0000000000..e41cb304a8 --- /dev/null +++ b/sdk/src/metrics/async_instruments.cc @@ -0,0 +1,49 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/sdk/metrics/async_instruments.h" +# include "opentelemetry/sdk/metrics/state/metric_storage.h" +# include "opentelemetry/sdk_config.h" + +# include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +ObservableInstrument::ObservableInstrument(InstrumentDescriptor instrument_descriptor, + std::unique_ptr storage) + : instrument_descriptor_(instrument_descriptor), + storage_(std::move(storage)), + observable_registry_{new ObservableRegistry()} +{} + +void ObservableInstrument::AddCallback(opentelemetry::metrics::ObservableCallbackPtr callback, + void *state) noexcept +{ + observable_registry_->AddCallback(callback, state, this); +} + +void ObservableInstrument::RemoveCallback(opentelemetry::metrics::ObservableCallbackPtr callback, + void *state) noexcept +{ + observable_registry_->AddCallback(callback, state, this); +} + +const InstrumentDescriptor &ObservableInstrument::GetInstrumentDescriptor() +{ + return instrument_descriptor_; +} + +AsyncWritableMetricStorage *ObservableInstrument::GetMetricStorage() +{ + return storage_.get(); +} + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/sdk/src/metrics/meter.cc b/sdk/src/metrics/meter.cc index 551d87b2bc..c8d5901382 100644 --- a/sdk/src/metrics/meter.cc +++ b/sdk/src/metrics/meter.cc @@ -36,7 +36,7 @@ nostd::shared_ptr> Meter::CreateLongCounter(nostd::string InstrumentDescriptor instrument_descriptor = { std::string{name.data(), name.size()}, std::string{description.data(), description.size()}, std::string{unit.data(), unit.size()}, InstrumentType::kCounter, InstrumentValueType::kLong}; - auto storage = RegisterMetricStorage(instrument_descriptor); + auto storage = RegisterSyncMetricStorage(instrument_descriptor); return nostd::shared_ptr>( new LongCounter(instrument_descriptor, std::move(storage))); } @@ -50,36 +50,37 @@ nostd::shared_ptr> Meter::CreateDoubleCounter( std::string{name.data(), name.size()}, std::string{description.data(), description.size()}, std::string{unit.data(), unit.size()}, InstrumentType::kCounter, InstrumentValueType::kDouble}; - auto storage = RegisterMetricStorage(instrument_descriptor); + auto storage = RegisterSyncMetricStorage(instrument_descriptor); return nostd::shared_ptr>{ new DoubleCounter(instrument_descriptor, std::move(storage))}; } -void Meter::CreateLongObservableCounter(nostd::string_view name, - void (*callback)(metrics::ObserverResult &, void *), - nostd::string_view description, - nostd::string_view unit, - void *state) noexcept +nostd::shared_ptr Meter::CreateLongObservableCounter( + nostd::string_view name, + nostd::string_view description, + nostd::string_view unit) noexcept { InstrumentDescriptor instrument_descriptor = { std::string{name.data(), name.size()}, std::string{description.data(), description.size()}, std::string{unit.data(), unit.size()}, InstrumentType::kObservableCounter, InstrumentValueType::kLong}; - RegisterAsyncMetricStorage(instrument_descriptor, callback, state); + auto storage = RegisterAsyncMetricStorage(instrument_descriptor); + return nostd::shared_ptr{ + new ObservableInstrument(instrument_descriptor, std::move(storage))}; } -void Meter::CreateDoubleObservableCounter(nostd::string_view name, - void (*callback)(metrics::ObserverResult &, - void *), - nostd::string_view description, - nostd::string_view unit, - void *state) noexcept +nostd::shared_ptr +Meter::CreateDoubleObservableCounter(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit) noexcept { InstrumentDescriptor instrument_descriptor = { std::string{name.data(), name.size()}, std::string{description.data(), description.size()}, std::string{unit.data(), unit.size()}, InstrumentType::kObservableCounter, InstrumentValueType::kDouble}; - RegisterAsyncMetricStorage(instrument_descriptor, callback, state); + auto storage = RegisterAsyncMetricStorage(instrument_descriptor); + return nostd::shared_ptr{ + new ObservableInstrument(instrument_descriptor, std::move(storage))}; } nostd::shared_ptr> Meter::CreateLongHistogram( @@ -91,7 +92,7 @@ nostd::shared_ptr> Meter::CreateLongHistogram( std::string{name.data(), name.size()}, std::string{description.data(), description.size()}, std::string{unit.data(), unit.size()}, InstrumentType::kHistogram, InstrumentValueType::kLong}; - auto storage = RegisterMetricStorage(instrument_descriptor); + auto storage = RegisterSyncMetricStorage(instrument_descriptor); return nostd::shared_ptr>{ new LongHistogram(instrument_descriptor, std::move(storage))}; } @@ -105,35 +106,37 @@ nostd::shared_ptr> Meter::CreateDoubleHistogram( std::string{name.data(), name.size()}, std::string{description.data(), description.size()}, std::string{unit.data(), unit.size()}, InstrumentType::kHistogram, InstrumentValueType::kDouble}; - auto storage = RegisterMetricStorage(instrument_descriptor); + auto storage = RegisterSyncMetricStorage(instrument_descriptor); return nostd::shared_ptr>{ new DoubleHistogram(instrument_descriptor, std::move(storage))}; } -void Meter::CreateLongObservableGauge(nostd::string_view name, - void (*callback)(metrics::ObserverResult &, void *), - nostd::string_view description, - nostd::string_view unit, - void *state) noexcept +nostd::shared_ptr Meter::CreateLongObservableGauge( + nostd::string_view name, + nostd::string_view description, + nostd::string_view unit) noexcept { InstrumentDescriptor instrument_descriptor = { std::string{name.data(), name.size()}, std::string{description.data(), description.size()}, std::string{unit.data(), unit.size()}, InstrumentType::kObservableGauge, InstrumentValueType::kLong}; - RegisterAsyncMetricStorage(instrument_descriptor, callback, state); + auto storage = RegisterAsyncMetricStorage(instrument_descriptor); + return nostd::shared_ptr{ + new ObservableInstrument(instrument_descriptor, std::move(storage))}; } -void Meter::CreateDoubleObservableGauge(nostd::string_view name, - void (*callback)(metrics::ObserverResult &, void *), - nostd::string_view description, - nostd::string_view unit, - void *state) noexcept +nostd::shared_ptr Meter::CreateDoubleObservableGauge( + nostd::string_view name, + nostd::string_view description, + nostd::string_view unit) noexcept { InstrumentDescriptor instrument_descriptor = { std::string{name.data(), name.size()}, std::string{description.data(), description.size()}, std::string{unit.data(), unit.size()}, InstrumentType::kObservableGauge, InstrumentValueType::kDouble}; - RegisterAsyncMetricStorage(instrument_descriptor, callback, state); + auto storage = RegisterAsyncMetricStorage(instrument_descriptor); + return nostd::shared_ptr{ + new ObservableInstrument(instrument_descriptor, std::move(storage))}; } nostd::shared_ptr> Meter::CreateLongUpDownCounter( @@ -145,7 +148,7 @@ nostd::shared_ptr> Meter::CreateLongUpDownCounter( std::string{name.data(), name.size()}, std::string{description.data(), description.size()}, std::string{unit.data(), unit.size()}, InstrumentType::kUpDownCounter, InstrumentValueType::kLong}; - auto storage = RegisterMetricStorage(instrument_descriptor); + auto storage = RegisterSyncMetricStorage(instrument_descriptor); return nostd::shared_ptr>{ new LongUpDownCounter(instrument_descriptor, std::move(storage))}; } @@ -159,37 +162,37 @@ nostd::shared_ptr> Meter::CreateDoubleUpDownCount std::string{name.data(), name.size()}, std::string{description.data(), description.size()}, std::string{unit.data(), unit.size()}, InstrumentType::kUpDownCounter, InstrumentValueType::kDouble}; - auto storage = RegisterMetricStorage(instrument_descriptor); + auto storage = RegisterSyncMetricStorage(instrument_descriptor); return nostd::shared_ptr>{ new DoubleUpDownCounter(instrument_descriptor, std::move(storage))}; } -void Meter::CreateLongObservableUpDownCounter(nostd::string_view name, - void (*callback)(metrics::ObserverResult &, - void *), - nostd::string_view description, - nostd::string_view unit, - void *state) noexcept +nostd::shared_ptr +Meter::CreateLongObservableUpDownCounter(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit) noexcept { InstrumentDescriptor instrument_descriptor = { std::string{name.data(), name.size()}, std::string{description.data(), description.size()}, std::string{unit.data(), unit.size()}, InstrumentType::kObservableUpDownCounter, InstrumentValueType::kLong}; - RegisterAsyncMetricStorage(instrument_descriptor, callback, state); + auto storage = RegisterAsyncMetricStorage(instrument_descriptor); + return nostd::shared_ptr{ + new ObservableInstrument(instrument_descriptor, std::move(storage))}; } -void Meter::CreateDoubleObservableUpDownCounter(nostd::string_view name, - void (*callback)(metrics::ObserverResult &, - void *), - nostd::string_view description, - nostd::string_view unit, - void *state) noexcept +nostd::shared_ptr +Meter::CreateDoubleObservableUpDownCounter(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit) noexcept { InstrumentDescriptor instrument_descriptor = { std::string{name.data(), name.size()}, std::string{description.data(), description.size()}, std::string{unit.data(), unit.size()}, InstrumentType::kObservableUpDownCounter, InstrumentValueType::kDouble}; - RegisterAsyncMetricStorage(instrument_descriptor, callback, state); + auto storage = RegisterAsyncMetricStorage(instrument_descriptor); + return nostd::shared_ptr{ + new ObservableInstrument(instrument_descriptor, std::move(storage))}; } const sdk::instrumentationscope::InstrumentationScope *Meter::GetInstrumentationScope() @@ -198,11 +201,11 @@ const sdk::instrumentationscope::InstrumentationScope *Meter::GetInstrumentation return scope_.get(); } -std::unique_ptr Meter::RegisterMetricStorage( +std::unique_ptr Meter::RegisterSyncMetricStorage( InstrumentDescriptor &instrument_descriptor) { auto view_registry = meter_context_->GetViewRegistry(); - std::unique_ptr storages(new MultiMetricStorage()); + std::unique_ptr storages(new SyncMultiMetricStorage()); auto success = view_registry->FindViews( instrument_descriptor, *scope_, [this, &instrument_descriptor, &storages](const View &view) { @@ -215,11 +218,12 @@ std::unique_ptr Meter::RegisterMetricStorage( { view_instr_desc.description_ = view.GetDescription(); } + auto multi_storage = static_cast(storages.get()); + auto storage = std::shared_ptr(new SyncMetricStorage( view_instr_desc, view.GetAggregationType(), &view.GetAttributesProcessor(), NoExemplarReservoir::GetNoExemplarReservoir(), view.GetAggregationConfig())); storage_registry_[instrument_descriptor.name_] = storage; - auto multi_storage = static_cast(storages.get()); multi_storage->AddStorage(storage); return true; }); @@ -233,10 +237,44 @@ std::unique_ptr Meter::RegisterMetricStorage( return storages; } +std::unique_ptr Meter::RegisterAsyncMetricStorage( + InstrumentDescriptor &instrument_descriptor) +{ + auto view_registry = meter_context_->GetViewRegistry(); + std::unique_ptr storages(new AsyncMultiMetricStorage()); + auto success = view_registry->FindViews( + instrument_descriptor, *GetInstrumentationScope(), + [this, &instrument_descriptor, &storages](const View &view) { + auto view_instr_desc = instrument_descriptor; + if (!view.GetName().empty()) + { + view_instr_desc.name_ = view.GetName(); + } + if (!view.GetDescription().empty()) + { + view_instr_desc.description_ = view.GetDescription(); + } + auto storage = std::shared_ptr( + new AsyncMetricStorage(view_instr_desc, view.GetAggregationType(), + &view.GetAttributesProcessor(), view.GetAggregationConfig())); + storage_registry_[instrument_descriptor.name_] = storage; + static_cast(storages.get())->AddStorage(storage); + return true; + }); + if (!success) + { + OTEL_INTERNAL_LOG_ERROR( + "[Meter::RegisterAsyncMetricStorage] - Error during finding matching views." + << "Some of the matching view configurations mayn't be used for metric collection"); + } + return storages; +} + /** collect metrics across all the meters **/ std::vector Meter::Collect(CollectorHandle *collector, opentelemetry::common::SystemTimestamp collect_ts) noexcept { + std::vector metric_data_list; for (auto &metric_storage : storage_registry_) { diff --git a/sdk/src/metrics/state/observable_registry.cc b/sdk/src/metrics/state/observable_registry.cc new file mode 100644 index 0000000000..7e12d4a190 --- /dev/null +++ b/sdk/src/metrics/state/observable_registry.cc @@ -0,0 +1,80 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW + +# include "opentelemetry/sdk/metrics/state/observable_registry.h" +# include "opentelemetry/sdk/metrics/async_instruments.h" +# include "opentelemetry/sdk/metrics/observer_result.h" +# include "opentelemetry/sdk/metrics/state/metric_storage.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +void ObservableRegistry::AddCallback(opentelemetry::metrics::ObservableCallbackPtr callback, + void *state, + opentelemetry::metrics::ObservableInstrument *instrument) +{ + // TBD - Check if existing + std::unique_ptr record( + new ObservableCallbackRecord{callback, state, instrument}); + std::lock_guard lock_guard{callbacks_m_}; + callbacks_.push_back(std::move(record)); +} + +void ObservableRegistry::RemoveCallback(opentelemetry::metrics::ObservableCallbackPtr callback, + void *state, + opentelemetry::metrics::ObservableInstrument *instrument) +{ + std::lock_guard lock_guard{callbacks_m_}; + auto new_end = std::remove_if( + callbacks_.begin(), callbacks_.end(), + [callback, state, instrument](const std::unique_ptr &record) { + return record->callback == callback && record->state == state && + record->instrument == instrument; + }); + callbacks_.erase(new_end, callbacks_.end()); +} + +void ObservableRegistry::Observe(opentelemetry::common::SystemTimestamp collection_ts) +{ + std::lock_guard lock_guard{callbacks_m_}; + for (auto &callback_wrap : callbacks_) + { + auto value_type = + static_cast(callback_wrap->instrument) + ->GetInstrumentDescriptor() + .value_type_; + auto storage = + static_cast(callback_wrap->instrument) + ->GetMetricStorage(); + if (value_type == InstrumentValueType::kDouble) + { + nostd::shared_ptr> ob_res( + new opentelemetry::sdk::metrics::ObserverResultT()); + callback_wrap->callback(ob_res, callback_wrap->state); + storage->RecordDouble( + static_cast *>(ob_res.get()) + ->GetMeasurements(), + collection_ts); + } + else + { + nostd::shared_ptr> ob_res( + new opentelemetry::sdk::metrics::ObserverResultT()); + callback_wrap->callback(ob_res, callback_wrap->state); + storage->RecordLong( + static_cast *>(ob_res.get()) + ->GetMeasurements(), + collection_ts); + } + } +} + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/sdk/src/metrics/sync_instruments.cc b/sdk/src/metrics/sync_instruments.cc index 7cf9034cdc..61e30248c3 100644 --- a/sdk/src/metrics/sync_instruments.cc +++ b/sdk/src/metrics/sync_instruments.cc @@ -14,7 +14,7 @@ namespace sdk namespace metrics { LongCounter::LongCounter(InstrumentDescriptor instrument_descriptor, - std::unique_ptr storage) + std::unique_ptr storage) : Synchronous(instrument_descriptor, std::move(storage)) {} @@ -44,7 +44,7 @@ void LongCounter::Add(long value, const opentelemetry::context::Context &context } DoubleCounter::DoubleCounter(InstrumentDescriptor instrument_descriptor, - std::unique_ptr storage) + std::unique_ptr storage) : Synchronous(instrument_descriptor, std::move(storage)) {} @@ -74,7 +74,7 @@ void DoubleCounter::Add(double value, const opentelemetry::context::Context &con } LongUpDownCounter::LongUpDownCounter(InstrumentDescriptor instrument_descriptor, - std::unique_ptr storage) + std::unique_ptr storage) : Synchronous(instrument_descriptor, std::move(storage)) {} @@ -104,7 +104,7 @@ void LongUpDownCounter::Add(long value, const opentelemetry::context::Context &c } DoubleUpDownCounter::DoubleUpDownCounter(InstrumentDescriptor instrument_descriptor, - std::unique_ptr storage) + std::unique_ptr storage) : Synchronous(instrument_descriptor, std::move(storage)) {} @@ -134,7 +134,7 @@ void DoubleUpDownCounter::Add(double value, const opentelemetry::context::Contex } LongHistogram::LongHistogram(InstrumentDescriptor instrument_descriptor, - std::unique_ptr storage) + std::unique_ptr storage) : Synchronous(instrument_descriptor, std::move(storage)) {} @@ -165,7 +165,7 @@ void LongHistogram::Record(long value, const opentelemetry::context::Context &co } DoubleHistogram::DoubleHistogram(InstrumentDescriptor instrument_descriptor, - std::unique_ptr storage) + std::unique_ptr storage) : Synchronous(instrument_descriptor, std::move(storage)) {} diff --git a/sdk/test/metrics/CMakeLists.txt b/sdk/test/metrics/CMakeLists.txt index faf2f2b49f..7038506739 100644 --- a/sdk/test/metrics/CMakeLists.txt +++ b/sdk/test/metrics/CMakeLists.txt @@ -12,6 +12,7 @@ foreach( sync_instruments_test async_instruments_test metric_reader_test + observable_registry_test periodic_exporting_metric_reader_test) add_executable(${testname} "${testname}.cc") target_link_libraries( diff --git a/sdk/test/metrics/async_instruments_test.cc b/sdk/test/metrics/async_instruments_test.cc index ff9504f78c..67f5d919cd 100644 --- a/sdk/test/metrics/async_instruments_test.cc +++ b/sdk/test/metrics/async_instruments_test.cc @@ -3,6 +3,7 @@ #ifndef ENABLE_METRICS_PREVIEW # include "opentelemetry/sdk/metrics/async_instruments.h" +# include "opentelemetry/sdk/metrics/state/multi_metric_storage.h" # include @@ -11,50 +12,16 @@ using namespace opentelemetry::sdk::metrics; using M = std::map; -void asyc_generate_measurements_long(opentelemetry::metrics::ObserverResult &observer) {} +void asyc_generate_measurements(opentelemetry::metrics::ObserverResult observer, void *state) {} -void asyc_generate_measurements_double(opentelemetry::metrics::ObserverResult &observer) {} - -TEST(AsyncInstruments, LongObservableCounter) -{ - auto asyc_generate_meas_long = [](opentelemetry::metrics::ObserverResult &observer) {}; - EXPECT_NO_THROW( - LongObservableCounter counter("long_counter", asyc_generate_meas_long, "description", "1")); -} - -TEST(AsyncInstruments, DoubleObservableCounter) -{ - auto asyc_generate_meas_double = [](opentelemetry::metrics::ObserverResult &observer) {}; - EXPECT_NO_THROW(DoubleObservableCounter counter("long_counter", asyc_generate_meas_double, - "description", "1")); -} - -TEST(AsyncInstruments, LongObservableGauge) -{ - auto asyc_generate_meas_long = [](opentelemetry::metrics::ObserverResult &observer) {}; - EXPECT_NO_THROW( - LongObservableGauge counter("long_counter", asyc_generate_meas_long, "description", "1")); -} - -TEST(AsyncInstruments, DoubleObservableGauge) -{ - auto asyc_generate_meas_double = [](opentelemetry::metrics::ObserverResult &observer) {}; - EXPECT_NO_THROW( - DoubleObservableGauge counter("long_counter", asyc_generate_meas_double, "description", "1")); -} - -TEST(AsyncInstruments, LongObservableUpDownCounter) -{ - auto asyc_generate_meas_long = [](opentelemetry::metrics::ObserverResult &observer) {}; - EXPECT_NO_THROW(LongObservableUpDownCounter counter("long_counter", asyc_generate_meas_long, - "description", "1")); -} - -TEST(AsyncInstruments, DoubleObservableUpDownCounter) +TEST(AsyncInstruments, ObservableInstrument) { - auto asyc_generate_meas_double = [](opentelemetry::metrics::ObserverResult &observer) {}; - EXPECT_NO_THROW(DoubleObservableUpDownCounter counter("long_counter", asyc_generate_meas_double, - "description", "1")); + InstrumentDescriptor instrument_descriptor = {"long_counter", "description", "1", + InstrumentType::kObservableCounter, + InstrumentValueType::kLong}; + std::unique_ptr metric_storage(new AsyncMultiMetricStorage()); + ObservableInstrument observable_counter_long(instrument_descriptor, std::move(metric_storage)); + observable_counter_long.AddCallback(asyc_generate_measurements, nullptr); } #endif diff --git a/sdk/test/metrics/async_metric_storage_test.cc b/sdk/test/metrics/async_metric_storage_test.cc index 6e2e399e44..f41f40182e 100644 --- a/sdk/test/metrics/async_metric_storage_test.cc +++ b/sdk/test/metrics/async_metric_storage_test.cc @@ -4,13 +4,14 @@ #ifndef ENABLE_METRICS_PREVIEW # include "opentelemetry/sdk/metrics/state/async_metric_storage.h" # include "opentelemetry/common/key_value_iterable_view.h" -# include "opentelemetry/nostd/shared_ptr.h" +# include "opentelemetry/sdk/metrics/async_instruments.h" # include "opentelemetry/sdk/metrics/instruments.h" # include "opentelemetry/sdk/metrics/meter_context.h" # include "opentelemetry/sdk/metrics/metric_exporter.h" # include "opentelemetry/sdk/metrics/metric_reader.h" # include "opentelemetry/sdk/metrics/observer_result.h" # include "opentelemetry/sdk/metrics/state/metric_collector.h" +# include "opentelemetry/sdk/metrics/state/observable_registry.h" # include # include @@ -41,21 +42,20 @@ class WritableMetricStorageTestFixture : public ::testing::TestWithParam &observer_result, - void * /*state*/) + static void Fetcher(opentelemetry::metrics::ObserverResult observer_result, void * /*state*/) { fetch_count++; if (fetch_count == 1) { - observer_result.Observe(20l, {{"RequestType", "GET"}}); - observer_result.Observe(10l, {{"RequestType", "PUT"}}); + opentelemetry::nostd::get<0>(observer_result)->Observe(20l, {{"RequestType", "GET"}}); + opentelemetry::nostd::get<0>(observer_result)->Observe(10l, {{"RequestType", "PUT"}}); number_of_get += 20l; number_of_put += 10l; } else if (fetch_count == 2) { - observer_result.Observe(40l, {{"RequestType", "GET"}}); - observer_result.Observe(20l, {{"RequestType", "PUT"}}); + opentelemetry::nostd::get<0>(observer_result)->Observe(40l, {{"RequestType", "GET"}}); + opentelemetry::nostd::get<0>(observer_result)->Observe(20l, {{"RequestType", "PUT"}}); number_of_get += 40l; number_of_put += 20l; } @@ -71,7 +71,7 @@ class MeasurementFetcher static size_t fetch_count; static long number_of_get; static long number_of_put; - static const size_t number_of_attributes = 2; // GET , PUT + static const size_t number_of_attributes = 2; }; size_t MeasurementFetcher::fetch_count; @@ -96,12 +96,16 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation) collectors.push_back(collector); size_t count_attributes = 0; - std::unique_ptr default_attributes_rocessor{ - new DefaultAttributesProcessor{}}; - opentelemetry::sdk::metrics::AsyncMetricStorage storage( - instr_desc, AggregationType::kSum, MeasurementFetcher::Fetcher, - default_attributes_rocessor.get(), + opentelemetry::sdk::metrics::AsyncMetricStorage storage( + instr_desc, AggregationType::kSum, new DefaultAttributesProcessor(), std::shared_ptr{}); + long get_count = 20l; + long put_count = 10l; + size_t attribute_count = 2; + std::unordered_map measurements = { + {{{"RequestType", "GET"}}, get_count}, {{{"RequestType", "PUT"}}, put_count}}; + storage.RecordLong(measurements, + opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now())); storage.Collect(collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) { @@ -111,20 +115,20 @@ TEST_P(WritableMetricStorageTestFixture, TestAggregation) if (opentelemetry::nostd::get( data_attr.attributes.find("RequestType")->second) == "GET") { - EXPECT_EQ(opentelemetry::nostd::get(data.value_), - MeasurementFetcher::number_of_get); + EXPECT_EQ(opentelemetry::nostd::get(data.value_), get_count); } else if (opentelemetry::nostd::get( data_attr.attributes.find("RequestType")->second) == "PUT") { - EXPECT_EQ(opentelemetry::nostd::get(data.value_), - MeasurementFetcher::number_of_put); + EXPECT_EQ(opentelemetry::nostd::get(data.value_), put_count); } - count_attributes++; } return true; }); - EXPECT_EQ(MeasurementFetcher::number_of_attributes, count_attributes); + // subsequent recording after collection shouldn't fail + EXPECT_NO_THROW(storage.RecordLong( + measurements, opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()))); + EXPECT_EQ(MeasurementFetcher::number_of_attributes, attribute_count); } INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong, diff --git a/sdk/test/metrics/multi_metric_storage_test.cc b/sdk/test/metrics/multi_metric_storage_test.cc index 7b74007953..768949be8c 100644 --- a/sdk/test/metrics/multi_metric_storage_test.cc +++ b/sdk/test/metrics/multi_metric_storage_test.cc @@ -13,7 +13,7 @@ using namespace opentelemetry; using namespace opentelemetry::sdk::instrumentationscope; using namespace opentelemetry::sdk::metrics; -class TestMetricStorage : public WritableMetricStorage +class TestMetricStorage : public SyncWritableMetricStorage { public: void RecordLong(long value, const opentelemetry::context::Context &context) noexcept override @@ -46,9 +46,9 @@ class TestMetricStorage : public WritableMetricStorage TEST(MultiMetricStorageTest, BasicTests) { - std::shared_ptr storage( + std::shared_ptr storage( new TestMetricStorage()); - MultiMetricStorage storages{}; + SyncMultiMetricStorage storages{}; storages.AddStorage(storage); EXPECT_NO_THROW(storages.RecordLong(10l, opentelemetry::context::Context{})); EXPECT_NO_THROW(storages.RecordLong(20l, opentelemetry::context::Context{})); diff --git a/sdk/test/metrics/observable_registry_test.cc b/sdk/test/metrics/observable_registry_test.cc new file mode 100644 index 0000000000..77b5fc13fa --- /dev/null +++ b/sdk/test/metrics/observable_registry_test.cc @@ -0,0 +1,77 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/sdk/metrics/state/observable_registry.h" +# include "opentelemetry/metrics/observer_result.h" + +# include + +using namespace opentelemetry::sdk::metrics; + +# if 0 +class MeasurementFetcher +{ +public: + static void Fetcher1(opentelemetry::metrics::ObserverResult &observer_result, + void * /*state*/) + { + fetch_count1++; + if (fetch_count1 == 1) + { + std::get observer_result(attributes_processor); + ObserverResultT observer_result(attributes_processor); observer_result.Observe(10l); observer_result.Observe(20l); diff --git a/sdk/test/metrics/sync_instruments_test.cc b/sdk/test/metrics/sync_instruments_test.cc index 232803d890..88996efd61 100644 --- a/sdk/test/metrics/sync_instruments_test.cc +++ b/sdk/test/metrics/sync_instruments_test.cc @@ -24,7 +24,7 @@ TEST(SyncInstruments, LongCounter) { InstrumentDescriptor instrument_descriptor = { "long_counter", "description", "1", InstrumentType::kCounter, InstrumentValueType::kLong}; - std::unique_ptr metric_storage(new MultiMetricStorage()); + std::unique_ptr metric_storage(new SyncMultiMetricStorage()); LongCounter counter(instrument_descriptor, std::move(metric_storage)); EXPECT_NO_THROW(counter.Add(10l)); EXPECT_NO_THROW(counter.Add(10l, opentelemetry::context::Context{})); @@ -43,7 +43,7 @@ TEST(SyncInstruments, DoubleCounter) { InstrumentDescriptor instrument_descriptor = { "double_counter", "description", "1", InstrumentType::kCounter, InstrumentValueType::kDouble}; - std::unique_ptr metric_storage(new MultiMetricStorage()); + std::unique_ptr metric_storage(new SyncMultiMetricStorage()); DoubleCounter counter(instrument_descriptor, std::move(metric_storage)); EXPECT_NO_THROW(counter.Add(10.10)); EXPECT_NO_THROW(counter.Add(10.10, opentelemetry::context::Context{})); @@ -63,7 +63,7 @@ TEST(SyncInstruments, LongUpDownCounter) InstrumentDescriptor instrument_descriptor = {"long_updowncounter", "description", "1", InstrumentType::kUpDownCounter, InstrumentValueType::kLong}; - std::unique_ptr metric_storage(new MultiMetricStorage()); + std::unique_ptr metric_storage(new SyncMultiMetricStorage()); LongUpDownCounter counter(instrument_descriptor, std::move(metric_storage)); EXPECT_NO_THROW(counter.Add(10l)); EXPECT_NO_THROW(counter.Add(10l, opentelemetry::context::Context{})); @@ -83,7 +83,7 @@ TEST(SyncInstruments, DoubleUpDownCounter) InstrumentDescriptor instrument_descriptor = {"double_updowncounter", "description", "1", InstrumentType::kUpDownCounter, InstrumentValueType::kDouble}; - std::unique_ptr metric_storage(new MultiMetricStorage()); + std::unique_ptr metric_storage(new SyncMultiMetricStorage()); DoubleUpDownCounter counter(instrument_descriptor, std::move(metric_storage)); EXPECT_NO_THROW(counter.Add(10.10)); EXPECT_NO_THROW(counter.Add(10.10, opentelemetry::context::Context{})); @@ -102,7 +102,7 @@ TEST(SyncInstruments, LongHistogram) { InstrumentDescriptor instrument_descriptor = { "long_histogram", "description", "1", InstrumentType::kHistogram, InstrumentValueType::kLong}; - std::unique_ptr metric_storage(new MultiMetricStorage()); + std::unique_ptr metric_storage(new SyncMultiMetricStorage()); LongHistogram counter(instrument_descriptor, std::move(metric_storage)); EXPECT_NO_THROW(counter.Record(10l, opentelemetry::context::Context{})); EXPECT_NO_THROW(counter.Record(-10l, opentelemetry::context::Context{})); // This is ignored @@ -119,7 +119,7 @@ TEST(SyncInstruments, DoubleHistogram) InstrumentDescriptor instrument_descriptor = {"double_histogram", "description", "1", InstrumentType::kHistogram, InstrumentValueType::kDouble}; - std::unique_ptr metric_storage(new MultiMetricStorage()); + std::unique_ptr metric_storage(new SyncMultiMetricStorage()); DoubleHistogram counter(instrument_descriptor, std::move(metric_storage)); EXPECT_NO_THROW(counter.Record(10.10, opentelemetry::context::Context{})); EXPECT_NO_THROW(counter.Record(-10.10, opentelemetry::context::Context{})); // This is ignored.