diff --git a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h index ecb5cc7e72..83773ffb61 100644 --- a/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h +++ b/sdk/include/opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h @@ -63,8 +63,11 @@ class PeriodicExportingMetricReader : public MetricReader std::thread worker_thread_; /* Synchronization primitives */ - std::condition_variable cv_; - std::mutex cv_m_; + std::atomic is_force_flush_pending_; + std::atomic is_force_wakeup_background_worker_; + std::atomic is_force_flush_notified_; + std::condition_variable cv_, force_flush_cv_; + std::mutex cv_m_, force_flush_m_; }; } // namespace metrics diff --git a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc index bcca86bee0..dfa4a5ee6b 100644 --- a/sdk/src/metrics/export/periodic_exporting_metric_reader.cc +++ b/sdk/src/metrics/export/periodic_exporting_metric_reader.cc @@ -60,14 +60,15 @@ void PeriodicExportingMetricReader::DoBackgroundWork() auto end = std::chrono::steady_clock::now(); auto export_time_ms = std::chrono::duration_cast(end - start); auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms; - cv_.wait_for(lk, remaining_wait_interval_ms); + cv_.wait_for(lk, remaining_wait_interval_ms, [this]() { + if (is_force_wakeup_background_worker_.load(std::memory_order_acquire)) + { + is_force_wakeup_background_worker_.store(false, std::memory_order_release); + return true; + } + return IsShutdown(); + }); } while (IsShutdown() != true); - // One last Collect and Export before shutdown - auto status = CollectAndExportOnce(); - if (!status) - { - OTEL_INTERNAL_LOG_ERROR("[Periodic Exporting Metric Reader] Collect-Export Cycle Failure.") - } } bool PeriodicExportingMetricReader::CollectAndExportOnce() @@ -86,6 +87,7 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() return true; }); }); + std::future_status status; do { @@ -96,12 +98,93 @@ bool PeriodicExportingMetricReader::CollectAndExportOnce() break; } } while (status != std::future_status::ready); + bool notify_force_flush = is_force_flush_pending_.exchange(false, std::memory_order_acq_rel); + if (notify_force_flush) + { + is_force_flush_notified_.store(true, std::memory_order_release); + force_flush_cv_.notify_one(); + } + return true; } bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept { - return exporter_->ForceFlush(timeout); + std::unique_lock lk_cv(force_flush_m_); + is_force_flush_pending_.store(true, std::memory_order_release); + auto break_condition = [this]() { + if (IsShutdown()) + { + return true; + } + + // Wake up the worker thread once. + if (is_force_flush_pending_.load(std::memory_order_acquire)) + { + is_force_wakeup_background_worker_.store(true, std::memory_order_release); + cv_.notify_one(); + } + return is_force_flush_notified_.load(std::memory_order_acquire); + }; + + auto wait_timeout = opentelemetry::common::DurationUtil::AdjustWaitForTimeout( + timeout, std::chrono::microseconds::zero()); + std::chrono::steady_clock::duration timeout_steady = + std::chrono::duration_cast(wait_timeout); + if (timeout_steady <= std::chrono::steady_clock::duration::zero()) + { + timeout_steady = std::chrono::steady_clock::duration::max(); + } + + bool result = false; + while (!result && timeout_steady > std::chrono::steady_clock::duration::zero()) + { + // When is_force_flush_notified_.store(true) and force_flush_cv_.notify_all() is called + // between is_force_flush_pending_.load() and force_flush_cv_.wait(). We must not wait + // for ever + std::chrono::steady_clock::time_point start_timepoint = std::chrono::steady_clock::now(); + result = force_flush_cv_.wait_for(lk_cv, export_interval_millis_, break_condition); + timeout_steady -= std::chrono::steady_clock::now() - start_timepoint; + } + + // If it will be already signaled, we must wait until notified. + // We use a spin lock here + if (false == is_force_flush_pending_.exchange(false, std::memory_order_acq_rel)) + { + for (int retry_waiting_times = 0; + false == is_force_flush_notified_.load(std::memory_order_acquire); ++retry_waiting_times) + { + opentelemetry::common::SpinLockMutex::fast_yield(); + if ((retry_waiting_times & 127) == 127) + { + std::this_thread::yield(); + } + } + } + is_force_flush_notified_.store(false, std::memory_order_release); + + if (result) + { + // - If original `timeout` is `zero`, use that in exporter::forceflush + // - Else if remaining `timeout_steady` more than zero, use that in exporter::forceflush + // - Else don't invoke exporter::forceflush ( as remaining time is zero or less) + if (timeout <= std::chrono::steady_clock::duration::zero()) + { + result = + exporter_->ForceFlush(std::chrono::duration_cast(timeout)); + } + else if (timeout_steady > std::chrono::steady_clock::duration::zero()) + { + result = exporter_->ForceFlush( + std::chrono::duration_cast(timeout_steady)); + } + else + { + // remaining timeout_steady is zero or less + result = false; + } + } + return result; } bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept diff --git a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc index cd789b5028..e115f79f75 100644 --- a/sdk/test/metrics/periodic_exporting_metric_reader_test.cc +++ b/sdk/test/metrics/periodic_exporting_metric_reader_test.cc @@ -70,6 +70,7 @@ TEST(PeriodicExporingMetricReader, BasicTests) MockMetricProducer producer; reader.SetMetricProducer(&producer); std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + EXPECT_NO_THROW(reader.ForceFlush()); reader.Shutdown(); EXPECT_EQ(static_cast(exporter_ptr)->GetDataCount(), static_cast(&producer)->GetDataCount());