From 33d9c628f2e7029b6b92733df69336f7e384c7e4 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 1 Apr 2022 14:07:44 -0700 Subject: [PATCH] Implement periodic exporting metric reader (#1286) --- exporters/ostream/BUILD | 6 +- .../export/periodic_exporting_metric_reader.h | 72 +++++++++++++ .../sdk/metrics/metric_exporter.h | 9 +- .../opentelemetry/sdk/metrics/metric_reader.h | 1 + sdk/src/metrics/CMakeLists.txt | 1 + .../periodic_exporting_metric_reader.cc | 101 ++++++++++++++++++ sdk/src/metrics/metric_reader.cc | 10 +- sdk/test/metrics/CMakeLists.txt | 3 +- sdk/test/metrics/meter_provider_sdk_test.cc | 3 +- .../periodic_exporting_metric_reader_test.cc | 81 ++++++++++++++ 10 files changed, 272 insertions(+), 15 deletions(-) create mode 100644 sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h create mode 100644 sdk/src/metrics/export/periodic_exporting_metric_reader.cc create mode 100644 sdk/test/metrics/periodic_exporting_metric_reader_test.cc diff --git a/exporters/ostream/BUILD b/exporters/ostream/BUILD index 19c0f7914d..f74d896344 100644 --- a/exporters/ostream/BUILD +++ b/exporters/ostream/BUILD @@ -43,7 +43,9 @@ cc_library( ], ) -#TODO MetricData is still changing, uncomment once it is final +# TODO - Uncomment once MetricData interface is finalised +#cc_library( +# name = "ostream_metric_exporter", # srcs = [ # "src/metric_exporter.cc", # ], @@ -70,7 +72,7 @@ cc_library( # deps = [ # ":ostream_metric_exporter", # "@com_google_googletest//:gtest_main", -#], +# ], #) cc_test( diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h new file mode 100644 index 0000000000..29125a6ea2 --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -0,0 +1,72 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once +#ifndef ENABLE_METRICS_PREVIEW + +# include "opentelemetry/sdk/metrics/metric_reader.h" +# include "opentelemetry/version.h" + +# include +# include +# include +# include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +class MetricExporter; +/** + * Struct to hold PeriodicExortingMetricReader options. + */ + +constexpr std::chrono::milliseconds kExportIntervalMillis = std::chrono::milliseconds(60000); +constexpr std::chrono::milliseconds kExportTimeOutMillis = std::chrono::milliseconds(30000); +struct PeriodicExportingMetricReaderOptions +{ + + /* The time interval between two consecutive exports. */ + std::chrono::milliseconds export_interval_millis = + std::chrono::milliseconds(kExportIntervalMillis); + + /* how long the export can run before it is cancelled. */ + std::chrono::milliseconds export_timeout_millis = std::chrono::milliseconds(kExportTimeOutMillis); +}; + +class PeriodicExportingMetricReader : public MetricReader +{ + +public: + PeriodicExportingMetricReader( + std::unique_ptr exporter, + const PeriodicExportingMetricReaderOptions &option, + AggregationTemporality aggregation_temporality = AggregationTemporality::kCumulative); + +private: + bool OnForceFlush(std::chrono::microseconds timeout) noexcept override; + + bool OnShutDown(std::chrono::microseconds timeout) noexcept override; + + void OnInitialized() noexcept override; + + std::unique_ptr exporter_; + std::chrono::milliseconds export_interval_millis_; + std::chrono::milliseconds export_timeout_millis_; + + void DoBackgroundWork(); + + /* The background worker thread */ + std::thread worker_thread_; + + /* Synchronization primitives */ + std::condition_variable cv_; + std::mutex cv_m_; +}; + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif diff --git a/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h b/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h index 3769259472..2488edd7ed 100644 --- a/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h +++ b/sdk/include/opentelemetry/sdk/metrics/metric_exporter.h @@ -30,11 +30,9 @@ class MetricExporter /** * Exports a batch of metrics recordables. This method must not be called * concurrently for the same exporter instance. - * @param spans a span of unique pointers to metrics data + * @param data metrics data */ - virtual opentelemetry::sdk::common::ExportResult Export( - const nostd::span> - &records) noexcept = 0; + virtual opentelemetry::sdk::common::ExportResult Export(const MetricData &data) noexcept = 0; /** * Force flush the exporter. @@ -49,9 +47,6 @@ class MetricExporter */ virtual bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0; - -private: - AggregationTemporality aggregation_temporality_; }; } // namespace metrics } // namespace sdk diff --git a/sdk/include/opentelemetry/sdk/metrics/metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/metric_reader.h index 57690ac87c..7cf449e1e3 100644 --- a/sdk/include/opentelemetry/sdk/metrics/metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/metric_reader.h @@ -57,6 +57,7 @@ class MetricReader virtual void OnInitialized() noexcept {} +protected: bool IsShutdown() const noexcept; private: diff --git a/sdk/src/metrics/CMakeLists.txt b/sdk/src/metrics/CMakeLists.txt index d8eff48cd5..b6656b5bf8 100644 --- a/sdk/src/metrics/CMakeLists.txt +++ b/sdk/src/metrics/CMakeLists.txt @@ -4,6 +4,7 @@ add_library( meter.cc meter_context.cc metric_reader.cc + export/periodic_exporting_metric_reader.cc state/metric_collector.cc state/sync_metric_storage.cc aggregation/histogram_aggregation.cc diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc new file mode 100644 index 0000000000..e9a91a9e06 --- /dev/null +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -0,0 +1,101 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW +# include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h" +# include "opentelemetry/sdk/common/global_log_handler.h" +# include "opentelemetry/sdk/metrics/metric_exporter.h" + +# include +# include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +PeriodicExportingMetricReader::PeriodicExportingMetricReader( + std::unique_ptr exporter, + const PeriodicExportingMetricReaderOptions &option, + AggregationTemporality aggregation_temporality) + : MetricReader(aggregation_temporality), + exporter_{std::move(exporter)}, + export_interval_millis_{option.export_interval_millis}, + export_timeout_millis_{option.export_timeout_millis} +{ + if (export_interval_millis_ <= export_timeout_millis_) + { + OTEL_INTERNAL_LOG_WARN( + "[Periodic Exporting Metric Reader] Invalid configuration: " + "export_interval_millis_ should be less than export_timeout_millis_, using default values"); + export_interval_millis_ = kExportIntervalMillis; + export_timeout_millis_ = kExportTimeOutMillis; + } +} + +void PeriodicExportingMetricReader::OnInitialized() noexcept +{ + worker_thread_ = std::thread(&PeriodicExportingMetricReader::DoBackgroundWork, this); +} + +void PeriodicExportingMetricReader::DoBackgroundWork() +{ + std::unique_lock lk(cv_m_); + do + { + if (IsShutdown()) + { + break; + } + std::atomic cancel_export_for_timeout{false}; + auto start = std::chrono::steady_clock::now(); + auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] { + Collect([this, &cancel_export_for_timeout](MetricData data) { + if (cancel_export_for_timeout) + { + OTEL_INTERNAL_LOG_ERROR( + "[Periodic Exporting Metric Reader] Collect took longer configured time: " + << export_timeout_millis_.count() << " ms, and timed out"); + return false; + } + this->exporter_->Export(data); + return true; + }); + }); + std::future_status status; + do + { + status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_)); + if (status == std::future_status::timeout) + { + cancel_export_for_timeout = true; + break; + } + } while (status != std::future_status::ready); + auto end = std::chrono::steady_clock::now(); + auto export_time_ms = std::chrono::duration_cast(end - start); + auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms; + cv_.wait_for(lk, remaining_wait_interval_ms); + } while (true); +} + +bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept +{ + return exporter_->ForceFlush(timeout); +} + +bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept +{ + if (worker_thread_.joinable()) + { + cv_.notify_one(); + worker_thread_.join(); + } + return exporter_->Shutdown(timeout); +} + +} // namespace metrics +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE +#endif \ No newline at end of file diff --git a/sdk/src/metrics/metric_reader.cc b/sdk/src/metrics/metric_reader.cc index 8238ad2a55..abf73d766a 100644 --- a/sdk/src/metrics/metric_reader.cc +++ b/sdk/src/metrics/metric_reader.cc @@ -20,6 +20,7 @@ MetricReader::MetricReader(AggregationTemporality aggregation_temporality) void MetricReader::SetMetricProducer(MetricProducer *metric_producer) { metric_producer_ = metric_producer; + OnInitialized(); } AggregationTemporality MetricReader::GetAggregationTemporality() const noexcept @@ -46,18 +47,21 @@ bool MetricReader::Collect(nostd::function_ref callback) noexc bool MetricReader::Shutdown(std::chrono::microseconds timeout) noexcept { bool status = true; - if (IsShutdown()) { OTEL_INTERNAL_LOG_WARN("MetricReader::Shutdown - Cannot invoke shutdown twice!"); } + + { + const std::lock_guard locked(lock_); + shutdown_ = true; + } + if (!OnShutDown(timeout)) { status = false; OTEL_INTERNAL_LOG_WARN("MetricReader::OnShutDown Shutdown failed. Will not be tried again!"); } - const std::lock_guard locked(lock_); - shutdown_ = true; return status; } diff --git a/sdk/test/metrics/CMakeLists.txt b/sdk/test/metrics/CMakeLists.txt index fa1f22c73a..faf2f2b49f 100644 --- a/sdk/test/metrics/CMakeLists.txt +++ b/sdk/test/metrics/CMakeLists.txt @@ -11,7 +11,8 @@ foreach( observer_result_test sync_instruments_test async_instruments_test - metric_reader_test) + metric_reader_test + periodic_exporting_metric_reader_test) add_executable(${testname} "${testname}.cc") target_link_libraries( ${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} diff --git a/sdk/test/metrics/meter_provider_sdk_test.cc b/sdk/test/metrics/meter_provider_sdk_test.cc index 6ede67d032..ec0a39b523 100644 --- a/sdk/test/metrics/meter_provider_sdk_test.cc +++ b/sdk/test/metrics/meter_provider_sdk_test.cc @@ -18,8 +18,7 @@ class MockMetricExporter : public MetricExporter public: MockMetricExporter() = default; - opentelemetry::sdk::common::ExportResult Export( - const opentelemetry::nostd::span> &records) noexcept override + opentelemetry::sdk::common::ExportResult Export(const MetricData &records) noexcept override { return opentelemetry::sdk::common::ExportResult::kSuccess; } diff --git a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc new file mode 100644 index 0000000000..f13fbb0e04 --- /dev/null +++ b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#ifndef ENABLE_METRICS_PREVIEW + +# include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h" +# include "opentelemetry/sdk/metrics/export/metric_producer.h" +# include "opentelemetry/sdk/metrics/metric_exporter.h" + +# include + +using namespace opentelemetry; +using namespace opentelemetry::sdk::instrumentationlibrary; +using namespace opentelemetry::sdk::metrics; + +class MockPushMetricExporter : public MetricExporter +{ +public: + opentelemetry::sdk::common::ExportResult Export(const MetricData &record) noexcept override + { + records_.push_back(record); + return opentelemetry::sdk::common::ExportResult::kSuccess; + } + + bool ForceFlush( + std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override + { + return false; + } + + bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override + { + return true; + } + + size_t GetDataCount() { return records_.size(); } + +private: + std::vector records_; +}; + +class MockMetricProducer : public MetricProducer +{ +public: + MockMetricProducer(std::chrono::microseconds sleep_ms = std::chrono::microseconds::zero()) + : sleep_ms_{sleep_ms}, data_sent_size_(0) + {} + + bool Collect(nostd::function_ref callback) noexcept override + { + std::this_thread::sleep_for(sleep_ms_); + data_sent_size_++; + MetricData data; + callback(data); + return true; + } + + size_t GetDataCount() { return data_sent_size_; } + +private: + std::chrono::microseconds sleep_ms_; + size_t data_sent_size_; +}; + +TEST(PeriodicExporingMetricReader, BasicTests) +{ + std::unique_ptr exporter(new MockPushMetricExporter()); + PeriodicExportingMetricReaderOptions options; + options.export_timeout_millis = std::chrono::milliseconds(200); + options.export_interval_millis = std::chrono::milliseconds(500); + auto exporter_ptr = exporter.get(); + PeriodicExportingMetricReader reader(std::move(exporter), options); + MockMetricProducer producer; + reader.SetMetricProducer(&producer); + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + reader.Shutdown(); + EXPECT_EQ(static_cast(exporter_ptr)->GetDataCount(), + static_cast(&producer)->GetDataCount()); +} + +#endif \ No newline at end of file