From 5a75c0eb065f27198454e34b2ab9fbdc289006e6 Mon Sep 17 00:00:00 2001 From: Dan Riley Date: Tue, 23 Mar 2021 09:43:10 -0400 Subject: [PATCH 1/4] Working (suboptimal) version --- HLTrigger/Timer/plugins/FastTimerService.cc | 45 ++++++++++++++++++--- HLTrigger/Timer/plugins/FastTimerService.h | 18 +++++++++ 2 files changed, 58 insertions(+), 5 deletions(-) diff --git a/HLTrigger/Timer/plugins/FastTimerService.cc b/HLTrigger/Timer/plugins/FastTimerService.cc index 62abb7a78f1fe..cc64ff29977b5 100644 --- a/HLTrigger/Timer/plugins/FastTimerService.cc +++ b/HLTrigger/Timer/plugins/FastTimerService.cc @@ -766,7 +766,8 @@ void FastTimerService::PlotsPerJob::fill_lumi(AtomicResources const& data, unsig /////////////////////////////////////////////////////////////////////////////// FastTimerService::FastTimerService(const edm::ParameterSet& config, edm::ActivityRegistry& registry) - : // configuration + : tbb::task_scheduler_observer(true), + // configuration callgraph_(), // job configuration concurrent_lumis_(0), @@ -1099,6 +1100,9 @@ void FastTimerService::postSourceLumi(edm::LuminosityBlockIndex index) { } void FastTimerService::postEndJob() { + for (auto& thread : threads_) { + thread.measure_and_accumulate(overhead_); + } if (print_job_summary_) { edm::LogVerbatim out("FastReport"); printSummary(out, job_summary_, "Job"); @@ -1662,14 +1666,45 @@ void FastTimerService::postModuleStreamEndLumi(edm::StreamContext const& sc, edm thread().measure_and_accumulate(lumi_transition_[index]); } +pthread_key_t FastTimerService::ThreadGuard::key; +pthread_once_t FastTimerService::ThreadGuard::key_once = PTHREAD_ONCE_INIT; + +FastTimerService::ThreadGuard::ThreadGuard() { + auto err = ::pthread_key_create(&key, reset_thread); + if (err) { + edm::LogWarning("FastTimerService") << "ThreadGuard key creation failed: " << ::strerror(err); + } +} + +bool FastTimerService::ThreadGuard::register_thread(FastTimerService::Measurement& m, FastTimerService::AtomicResources& r) { + auto ptr = ::pthread_getspecific(key); + + if (not ptr) { + auto p = new specific_t(m, r); + auto err = ::pthread_setspecific(key, p); + if (err) { + edm::LogWarning("FastTimerService") << "ThreadGuard pthread_setspecific failed: " << ::strerror(err); + } + return true; + } + return false; +} + +void FastTimerService::ThreadGuard::reset_thread(void* ptr) { + auto p = static_cast(ptr); + // account any resources used or freed by the thread before leaving the TBB pool + p->first.measure_and_accumulate(p->second); + delete p; +} + void FastTimerService::on_scheduler_entry(bool worker) { - // initialise the measurement point for a thread that has newly joining the TBB pool - thread().measure(); + if (guard_.register_thread(thread(), overhead_)) { + // initialise the measurement point for a thread that has newly joined the TBB pool + thread().measure(); + } } void FastTimerService::on_scheduler_exit(bool worker) { - // account any resources used or freed by the thread before leaving the TBB pool - thread().measure_and_accumulate(overhead_); } FastTimerService::Measurement& FastTimerService::thread() { return threads_.local(); } diff --git a/HLTrigger/Timer/plugins/FastTimerService.h b/HLTrigger/Timer/plugins/FastTimerService.h index ac8dc872ec89b..0ab695acfb2f3 100644 --- a/HLTrigger/Timer/plugins/FastTimerService.h +++ b/HLTrigger/Timer/plugins/FastTimerService.h @@ -3,6 +3,7 @@ // system headers #include +#include // C++ headers #include @@ -455,6 +456,23 @@ class FastTimerService : public tbb::task_scheduler_observer { std::vector run_summary_; // whole event time accounting per-run std::mutex summary_mutex_; // synchronise access to the summary objects across different threads + // + struct ThreadGuard { + using specific_t = std::pair; + + ThreadGuard(); + ~ThreadGuard() = default; + + static void reset_thread(void* t); + bool register_thread(FastTimerService::Measurement& t, FastTimerService::AtomicResources& r); + + static pthread_key_t key; + static pthread_once_t key_once; + }; + + // + ThreadGuard guard_; + // per-thread quantities, lazily allocated tbb::enumerable_thread_specific, tbb::ets_key_per_instance> threads_; From 63175bdb5df1100c38c36729c15e492294351895 Mon Sep 17 00:00:00 2001 From: Dan Riley Date: Wed, 24 Mar 2021 14:09:34 -0400 Subject: [PATCH 2/4] Remove enumerable_thread_specific so we only use one thread_specific slot --- HLTrigger/Timer/plugins/FastTimerService.cc | 51 +++++++++++++-------- HLTrigger/Timer/plugins/FastTimerService.h | 24 ++++++---- 2 files changed, 48 insertions(+), 27 deletions(-) diff --git a/HLTrigger/Timer/plugins/FastTimerService.cc b/HLTrigger/Timer/plugins/FastTimerService.cc index cc64ff29977b5..6f9d0093fd583 100644 --- a/HLTrigger/Timer/plugins/FastTimerService.cc +++ b/HLTrigger/Timer/plugins/FastTimerService.cc @@ -34,6 +34,7 @@ using json = nlohmann::json; #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Utilities/interface/Exception.h" #include "FWCore/Utilities/interface/StreamID.h" #include "HLTrigger/Timer/interface/memory_usage.h" #include "HLTrigger/Timer/interface/processor_model.h" @@ -1100,9 +1101,7 @@ void FastTimerService::postSourceLumi(edm::LuminosityBlockIndex index) { } void FastTimerService::postEndJob() { - for (auto& thread : threads_) { - thread.measure_and_accumulate(overhead_); - } + guard_.finalize(); if (print_job_summary_) { edm::LogVerbatim out("FastReport"); printSummary(out, job_summary_, "Job"); @@ -1666,39 +1665,53 @@ void FastTimerService::postModuleStreamEndLumi(edm::StreamContext const& sc, edm thread().measure_and_accumulate(lumi_transition_[index]); } -pthread_key_t FastTimerService::ThreadGuard::key; -pthread_once_t FastTimerService::ThreadGuard::key_once = PTHREAD_ONCE_INIT; - FastTimerService::ThreadGuard::ThreadGuard() { - auto err = ::pthread_key_create(&key, reset_thread); + auto err = ::pthread_key_create(&key_, retire_thread); if (err) { - edm::LogWarning("FastTimerService") << "ThreadGuard key creation failed: " << ::strerror(err); + throw cms::Exception("FastTimerService") << "ThreadGuard key creation failed: " << ::strerror(err); } } -bool FastTimerService::ThreadGuard::register_thread(FastTimerService::Measurement& m, FastTimerService::AtomicResources& r) { - auto ptr = ::pthread_getspecific(key); +// If this is a new thread, register it and return true +bool FastTimerService::ThreadGuard::register_thread(FastTimerService::AtomicResources& r) { + auto ptr = ::pthread_getspecific(key_); if (not ptr) { - auto p = new specific_t(m, r); - auto err = ::pthread_setspecific(key, p); + auto p = thread_resources_.emplace_back(std::make_unique(r)); + auto err = ::pthread_setspecific(key_, p->get()); if (err) { - edm::LogWarning("FastTimerService") << "ThreadGuard pthread_setspecific failed: " << ::strerror(err); + throw cms::Exception("FastTimerService") << "ThreadGuard pthread_setspecific failed: " << ::strerror(err); } return true; } return false; } -void FastTimerService::ThreadGuard::reset_thread(void* ptr) { +// called when a thread exits +void FastTimerService::ThreadGuard::retire_thread(void* ptr) { auto p = static_cast(ptr); // account any resources used or freed by the thread before leaving the TBB pool - p->first.measure_and_accumulate(p->second); - delete p; + p->measurement_.measure_and_accumulate(p->resource_); + p->live_ = false; +} + +// finalize all threads that have not retired +void FastTimerService::ThreadGuard::finalize() { + for (auto& p : thread_resources_) { + if (p->live_) { + p->measurement_.measure_and_accumulate(p->resource_); + } + } +} + +FastTimerService::Measurement& FastTimerService::ThreadGuard::thread() { + auto ptr = ::pthread_getspecific(key_); + auto p = static_cast(ptr); + return p->measurement_; } void FastTimerService::on_scheduler_entry(bool worker) { - if (guard_.register_thread(thread(), overhead_)) { + if (guard_.register_thread(overhead_)) { // initialise the measurement point for a thread that has newly joined the TBB pool thread().measure(); } @@ -1707,7 +1720,9 @@ void FastTimerService::on_scheduler_entry(bool worker) { void FastTimerService::on_scheduler_exit(bool worker) { } -FastTimerService::Measurement& FastTimerService::thread() { return threads_.local(); } +FastTimerService::Measurement& FastTimerService::thread() { + return guard_.thread(); +} // describe the module's configuration void FastTimerService::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { diff --git a/HLTrigger/Timer/plugins/FastTimerService.h b/HLTrigger/Timer/plugins/FastTimerService.h index 0ab695acfb2f3..cf85463f3a710 100644 --- a/HLTrigger/Timer/plugins/FastTimerService.h +++ b/HLTrigger/Timer/plugins/FastTimerService.h @@ -458,25 +458,31 @@ class FastTimerService : public tbb::task_scheduler_observer { // struct ThreadGuard { - using specific_t = std::pair; + struct specific_t { + specific_t(AtomicResources& r) : resource_(r), live_(true) {} + ~specific_t() = default; + + Measurement measurement_; + AtomicResources& resource_; + bool live_; + }; ThreadGuard(); ~ThreadGuard() = default; - static void reset_thread(void* t); - bool register_thread(FastTimerService::Measurement& t, FastTimerService::AtomicResources& r); + static void retire_thread(void* t); + + bool register_thread(FastTimerService::AtomicResources& r); + Measurement& thread(); + void finalize(); - static pthread_key_t key; - static pthread_once_t key_once; + tbb::concurrent_vector> thread_resources_; + pthread_key_t key_; }; // ThreadGuard guard_; - // per-thread quantities, lazily allocated - tbb::enumerable_thread_specific, tbb::ets_key_per_instance> - threads_; - // atomic variables to keep track of the completion of each step, process by process std::unique_ptr[]> subprocess_event_check_; std::unique_ptr[]> subprocess_global_lumi_check_; From 574677e611e6133b4eac014601489c08d9a30887 Mon Sep 17 00:00:00 2001 From: Dan Riley Date: Wed, 24 Mar 2021 14:18:52 -0400 Subject: [PATCH 3/4] formatting --- HLTrigger/Timer/plugins/FastTimerService.cc | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/HLTrigger/Timer/plugins/FastTimerService.cc b/HLTrigger/Timer/plugins/FastTimerService.cc index 6f9d0093fd583..7ca01b8b5c067 100644 --- a/HLTrigger/Timer/plugins/FastTimerService.cc +++ b/HLTrigger/Timer/plugins/FastTimerService.cc @@ -1717,12 +1717,9 @@ void FastTimerService::on_scheduler_entry(bool worker) { } } -void FastTimerService::on_scheduler_exit(bool worker) { -} +void FastTimerService::on_scheduler_exit(bool worker) {} -FastTimerService::Measurement& FastTimerService::thread() { - return guard_.thread(); -} +FastTimerService::Measurement& FastTimerService::thread() { return guard_.thread(); } // describe the module's configuration void FastTimerService::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { From f646cb6e7e81c0790fcdb3389db2219530fd1358 Mon Sep 17 00:00:00 2001 From: Dan Riley Date: Thu, 25 Mar 2021 09:24:46 -0400 Subject: [PATCH 4/4] fix memory management and race condition --- HLTrigger/Timer/plugins/FastTimerService.cc | 29 +++++++++++++-------- HLTrigger/Timer/plugins/FastTimerService.h | 5 ++-- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/HLTrigger/Timer/plugins/FastTimerService.cc b/HLTrigger/Timer/plugins/FastTimerService.cc index 7ca01b8b5c067..7d9eaefdfe613 100644 --- a/HLTrigger/Timer/plugins/FastTimerService.cc +++ b/HLTrigger/Timer/plugins/FastTimerService.cc @@ -1677,8 +1677,9 @@ bool FastTimerService::ThreadGuard::register_thread(FastTimerService::AtomicReso auto ptr = ::pthread_getspecific(key_); if (not ptr) { - auto p = thread_resources_.emplace_back(std::make_unique(r)); - auto err = ::pthread_setspecific(key_, p->get()); + auto p = thread_resources_.emplace_back(std::make_shared(r)); + auto pp = new std::shared_ptr(*p); + auto err = ::pthread_setspecific(key_, pp); if (err) { throw cms::Exception("FastTimerService") << "ThreadGuard pthread_setspecific failed: " << ::strerror(err); } @@ -1687,27 +1688,33 @@ bool FastTimerService::ThreadGuard::register_thread(FastTimerService::AtomicReso return false; } +std::shared_ptr* FastTimerService::ThreadGuard::ptr(void* p) { + return static_cast*>(p); +} + // called when a thread exits -void FastTimerService::ThreadGuard::retire_thread(void* ptr) { - auto p = static_cast(ptr); - // account any resources used or freed by the thread before leaving the TBB pool - p->measurement_.measure_and_accumulate(p->resource_); - p->live_ = false; +void FastTimerService::ThreadGuard::retire_thread(void* p) { + auto ps = ptr(p); + auto expected = true; + if ((*ps)->live_.compare_exchange_strong(expected, false)) { + // account any resources used or freed by the thread before leaving the TBB pool + (*ps)->measurement_.measure_and_accumulate((*ps)->resource_); + } + delete ps; } // finalize all threads that have not retired void FastTimerService::ThreadGuard::finalize() { for (auto& p : thread_resources_) { - if (p->live_) { + auto expected = true; + if (p->live_.compare_exchange_strong(expected, false)) { p->measurement_.measure_and_accumulate(p->resource_); } } } FastTimerService::Measurement& FastTimerService::ThreadGuard::thread() { - auto ptr = ::pthread_getspecific(key_); - auto p = static_cast(ptr); - return p->measurement_; + return (*ptr(::pthread_getspecific(key_)))->measurement_; } void FastTimerService::on_scheduler_entry(bool worker) { diff --git a/HLTrigger/Timer/plugins/FastTimerService.h b/HLTrigger/Timer/plugins/FastTimerService.h index cf85463f3a710..f7616173e5ebd 100644 --- a/HLTrigger/Timer/plugins/FastTimerService.h +++ b/HLTrigger/Timer/plugins/FastTimerService.h @@ -464,19 +464,20 @@ class FastTimerService : public tbb::task_scheduler_observer { Measurement measurement_; AtomicResources& resource_; - bool live_; + std::atomic live_; }; ThreadGuard(); ~ThreadGuard() = default; static void retire_thread(void* t); + static std::shared_ptr* ptr(void* p); bool register_thread(FastTimerService::AtomicResources& r); Measurement& thread(); void finalize(); - tbb::concurrent_vector> thread_resources_; + tbb::concurrent_vector> thread_resources_; pthread_key_t key_; };