Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch FastTimerService to using a local thread observer #33261

Merged
merged 4 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 62 additions & 8 deletions HLTrigger/Timer/plugins/FastTimerService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ using json = nlohmann::json;
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "FWCore/Utilities/interface/Exception.h"
#include "FWCore/Utilities/interface/StreamID.h"
#include "HLTrigger/Timer/interface/memory_usage.h"
#include "HLTrigger/Timer/interface/processor_model.h"
Expand Down Expand Up @@ -766,7 +767,8 @@ void FastTimerService::PlotsPerJob::fill_lumi(AtomicResources const& data, unsig
///////////////////////////////////////////////////////////////////////////////

FastTimerService::FastTimerService(const edm::ParameterSet& config, edm::ActivityRegistry& registry)
: // configuration
: tbb::task_scheduler_observer(true),
// configuration
callgraph_(),
// job configuration
concurrent_lumis_(0),
Expand Down Expand Up @@ -1099,6 +1101,7 @@ void FastTimerService::postSourceLumi(edm::LuminosityBlockIndex index) {
}

void FastTimerService::postEndJob() {
guard_.finalize();
if (print_job_summary_) {
edm::LogVerbatim out("FastReport");
printSummary(out, job_summary_, "Job");
Expand Down Expand Up @@ -1662,17 +1665,68 @@ void FastTimerService::postModuleStreamEndLumi(edm::StreamContext const& sc, edm
thread().measure_and_accumulate(lumi_transition_[index]);
}

void FastTimerService::on_scheduler_entry(bool worker) {
// initialise the measurement point for a thread that has newly joining the TBB pool
thread().measure();
FastTimerService::ThreadGuard::ThreadGuard() {
auto err = ::pthread_key_create(&key_, retire_thread);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, retire_thread is called when the worker thread exits.
Why not call it from on_scheduler_exit instead ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Monitoring the primary arena, on_scheduler_entry and on_scheduler_exit get called many times as threads get moved between different arenas, and there's no way to tell when the call to on_scheduler_exit is the final exit. This is the only reliable way I could think of to catch if a thread gets deleted before the end job sequence.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so with this approach, the time (and resources) spent by a thread outside the main arena would be accounted as "overhead", right ?
Which is not a bad thing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that with either the global observer or this PR, what gets measured as overhead is the time outside of any defined CMS module, irrespective of the arena. With the global observer you were getting one call to on_scheduler_enter and on_scheduler_exit per thread, and I'm trying to replicate that with the observer on the primary arena.

if (err) {
throw cms::Exception("FastTimerService") << "ThreadGuard key creation failed: " << ::strerror(err);
}
}

void FastTimerService::on_scheduler_exit(bool worker) {
// account any resources used or freed by the thread before leaving the TBB pool
thread().measure_and_accumulate(overhead_);
// If this is a new thread, register it and return true
bool FastTimerService::ThreadGuard::register_thread(FastTimerService::AtomicResources& r) {
auto ptr = ::pthread_getspecific(key_);

if (not ptr) {
auto p = thread_resources_.emplace_back(std::make_shared<specific_t>(r));
auto pp = new std::shared_ptr<specific_t>(*p);
auto err = ::pthread_setspecific(key_, pp);
if (err) {
throw cms::Exception("FastTimerService") << "ThreadGuard pthread_setspecific failed: " << ::strerror(err);
}
return true;
}
return false;
}

std::shared_ptr<FastTimerService::ThreadGuard::specific_t>* FastTimerService::ThreadGuard::ptr(void* p) {
return static_cast<std::shared_ptr<specific_t>*>(p);
}

// called when a thread exits
void FastTimerService::ThreadGuard::retire_thread(void* p) {
auto ps = ptr(p);
auto expected = true;
if ((*ps)->live_.compare_exchange_strong(expected, false)) {
// account any resources used or freed by the thread before leaving the TBB pool
(*ps)->measurement_.measure_and_accumulate((*ps)->resource_);
}
delete ps;
}

FastTimerService::Measurement& FastTimerService::thread() { return threads_.local(); }
// finalize all threads that have not retired
void FastTimerService::ThreadGuard::finalize() {
for (auto& p : thread_resources_) {
auto expected = true;
if (p->live_.compare_exchange_strong(expected, false)) {
p->measurement_.measure_and_accumulate(p->resource_);
}
}
}

FastTimerService::Measurement& FastTimerService::ThreadGuard::thread() {
return (*ptr(::pthread_getspecific(key_)))->measurement_;
}

void FastTimerService::on_scheduler_entry(bool worker) {
if (guard_.register_thread(overhead_)) {
// initialise the measurement point for a thread that has newly joined the TBB pool
thread().measure();
}
}

void FastTimerService::on_scheduler_exit(bool worker) {}

FastTimerService::Measurement& FastTimerService::thread() { return guard_.thread(); }

// describe the module's configuration
void FastTimerService::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
Expand Down
31 changes: 28 additions & 3 deletions HLTrigger/Timer/plugins/FastTimerService.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

// system headers
#include <unistd.h>
#include <pthread.h>

// C++ headers
#include <chrono>
Expand Down Expand Up @@ -455,9 +456,33 @@ class FastTimerService : public tbb::task_scheduler_observer {
std::vector<ResourcesPerJob> run_summary_; // whole event time accounting per-run
std::mutex summary_mutex_; // synchronise access to the summary objects across different threads

// per-thread quantities, lazily allocated
tbb::enumerable_thread_specific<Measurement, tbb::cache_aligned_allocator<Measurement>, tbb::ets_key_per_instance>
threads_;
//
struct ThreadGuard {
struct specific_t {
specific_t(AtomicResources& r) : resource_(r), live_(true) {}
~specific_t() = default;

Measurement measurement_;
AtomicResources& resource_;
std::atomic<bool> live_;
};

ThreadGuard();
~ThreadGuard() = default;

static void retire_thread(void* t);
static std::shared_ptr<specific_t>* ptr(void* p);

bool register_thread(FastTimerService::AtomicResources& r);
Measurement& thread();
void finalize();

tbb::concurrent_vector<std::shared_ptr<specific_t>> thread_resources_;
pthread_key_t key_;
};

//
ThreadGuard guard_;

// atomic variables to keep track of the completion of each step, process by process
std::unique_ptr<std::atomic<unsigned int>[]> subprocess_event_check_;
Expand Down