Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run G4 workers on their own dedicated threads #34899

Merged
merged 6 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions SimG4Core/Application/interface/RunManagerMTWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class RunManagerMTWorker {

void DumpMagneticField(const G4Field*, const std::string&) const;

static void resetTLS();
void resetTLS();
int getThreadIndex() const { return m_thread_index; }

Generator m_generator;
edm::EDGetTokenT<edm::HepMCProduct> m_InToken;
Expand All @@ -111,12 +112,14 @@ class RunManagerMTWorker {
edm::ParameterSet m_p;

struct TLSData;
static thread_local TLSData* m_tls;
static thread_local bool dumpMF;
TLSData* m_tls{nullptr};
bool dumpMF{false};

G4SimEvent* m_simEvent;
std::unique_ptr<CMSSteppingVerbose> m_sVerbose;
std::unordered_map<std::string, std::unique_ptr<SensitiveDetectorMakerBase>> m_sdMakers;

const int m_thread_index{-1};
};

#endif
98 changes: 98 additions & 0 deletions SimG4Core/Application/interface/ThreadHandoff.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#ifndef SimG4Core_Application_ThreadHandoff_h
#define SimG4Core_Application_ThreadHandoff_h
// -*- C++ -*-
//
// Package: SimG4Core/Application
// Class : ThreadHandoff
//
/**\class ThreadHandoff ThreadHandoff.h "SimG4Core/Application/interface/ThreadHandoff.h"

Description: [one line class summary]

Usage:
<usage>

*/
//
// Original Author: Christopher Jones
// Created: Mon, 16 Aug 2021 13:51:53 GMT
//

// system include files
#include <condition_variable>
#include <cstring> //strerror_r
#include <exception>
#include <mutex>
#include <pthread.h>

// user include files

// forward declarations

namespace omt {
class ThreadHandoff {
public:
explicit ThreadHandoff(int stackSize);
~ThreadHandoff();

ThreadHandoff(const ThreadHandoff&) = delete; // stop default
const ThreadHandoff& operator=(const ThreadHandoff&) = delete; // stop default

template <typename F>
void runAndWait(F&& iF) {
Functor<F> f{std::move(iF)};

std::unique_lock<std::mutex> lck(m_mutex);
m_loopReady = false;
m_toRun = &f;

m_threadHandoff.notify_one();

m_threadHandoff.wait(lck, [this]() { return m_loopReady; });
auto e = f.exception();
if (e) {
std::rethrow_exception(e);
}
}

void stopThread() {
runAndWait([this]() { m_stopThread = true; });
}

private:
class FunctorBase {
public:
virtual ~FunctorBase() {}
virtual void execute() = 0;
};
template <typename F>
class Functor : public FunctorBase {
public:
explicit Functor(F&& iF) : m_f(std::move(iF)) {}
void execute() final {
try {
m_f();
} catch (...) {
m_except = std::current_exception();
}
}
std::exception_ptr exception() { return m_except; }

private:
F m_f;
std::exception_ptr m_except;
};

static void* threadLoop(void* iArgs);

// ---------- member data --------------------------------
pthread_t m_thread;
std::mutex m_mutex;
std::condition_variable m_threadHandoff;

FunctorBase* m_toRun{nullptr};
bool m_loopReady{false};
bool m_stopThread{false};
};
} // namespace omt
#endif
47 changes: 40 additions & 7 deletions SimG4Core/Application/plugins/OscarMTProducer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/Utilities/interface/RandomNumberGenerator.h"
#include "FWCore/Utilities/interface/Exception.h"

Expand All @@ -34,6 +35,8 @@
#include "SimDataFormats/TrackingHit/interface/PSimHitContainer.h"
#include "SimDataFormats/CaloHit/interface/PCaloHitContainer.h"

#include "SimG4Core/Application/interface/ThreadHandoff.h"

#include "Randomize.hh"

// for some reason void doesn't compile
Expand All @@ -56,6 +59,7 @@ class OscarMTProducer : public edm::stream::EDProducer<edm::GlobalCache<OscarMTM
void produce(edm::Event& e, const edm::EventSetup& c) override;

private:
omt::ThreadHandoff m_handoff;
std::unique_ptr<RunManagerMTWorker> m_runManagerWorker;
const OscarMTMasterThread* m_masterThread = nullptr;
};
Expand Down Expand Up @@ -87,17 +91,25 @@ namespace {
explicit StaticRandomEngineSetUnset(CLHEP::HepRandomEngine* engine);
~StaticRandomEngineSetUnset();

CLHEP::HepRandomEngine* currentEngine() { return m_currentEngine; }

private:
CLHEP::HepRandomEngine* m_currentEngine;
CLHEP::HepRandomEngine* m_previousEngine;
};
} // namespace

OscarMTProducer::OscarMTProducer(edm::ParameterSet const& p, const OscarMTMasterThread* ms) {
OscarMTProducer::OscarMTProducer(edm::ParameterSet const& p, const OscarMTMasterThread* ms)
: m_handoff{p.getUntrackedParameter<int>("workerThreadStackSize", 10 * 1024 * 1024)} {
// Random number generation not allowed here
StaticRandomEngineSetUnset random(nullptr);

m_runManagerWorker = std::make_unique<RunManagerMTWorker>(p, consumesCollector());
auto token = edm::ServiceRegistry::instance().presentToken();
m_handoff.runAndWait([this, &p, token]() {
edm::ServiceRegistry::Operate guard{token};
StaticRandomEngineSetUnset random(nullptr);
m_runManagerWorker = std::make_unique<RunManagerMTWorker>(p, consumesCollector());
});
m_masterThread = ms;
m_masterThread->callConsumes(consumesCollector());

Expand Down Expand Up @@ -165,7 +177,13 @@ OscarMTProducer::OscarMTProducer(edm::ParameterSet const& p, const OscarMTMaster
edm::LogVerbatim("SimG4CoreApplication") << "OscarMTProducer is constructed";
}

OscarMTProducer::~OscarMTProducer() {}
OscarMTProducer::~OscarMTProducer() {
auto token = edm::ServiceRegistry::instance().presentToken();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dr15Jones , is it safe calling edm::ServiceRegistry::instance() in a destructor? May be better keeping the pointer as a class member?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The framework guarantees that it is safe to use the Service system in module destructors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

If the framework guarantee the order of destruction, it is fine.

m_handoff.runAndWait([this, token]() {
edm::ServiceRegistry::Operate guard{token};
m_runManagerWorker.reset();
});
}

std::unique_ptr<OscarMTMasterThread> OscarMTProducer::initializeGlobalCache(const edm::ParameterSet& iConfig) {
// Random number generation not allowed here
Expand Down Expand Up @@ -198,21 +216,31 @@ void OscarMTProducer::globalEndJob(OscarMTMasterThread* masterThread) {

void OscarMTProducer::beginRun(const edm::Run&, const edm::EventSetup& es) {
edm::LogVerbatim("SimG4CoreApplication") << "OscarMTProducer::beginRun";
m_runManagerWorker->beginRun(es);
m_runManagerWorker->initializeG4(m_masterThread->runManagerMasterPtr(), es);
auto token = edm::ServiceRegistry::instance().presentToken();
m_handoff.runAndWait([this, &es, token]() {
edm::ServiceRegistry::Operate guard{token};
m_runManagerWorker->beginRun(es);
m_runManagerWorker->initializeG4(m_masterThread->runManagerMasterPtr(), es);
});
edm::LogVerbatim("SimG4CoreApplication") << "OscarMTProducer::beginRun done";
}

void OscarMTProducer::endRun(const edm::Run&, const edm::EventSetup&) {
// Random number generation not allowed here
StaticRandomEngineSetUnset random(nullptr);
edm::LogVerbatim("SimG4CoreApplication") << "OscarMTProducer::endRun";
m_runManagerWorker->endRun();
auto token = edm::ServiceRegistry::instance().presentToken();
m_handoff.runAndWait([this, token]() {
StaticRandomEngineSetUnset random(nullptr);
edm::ServiceRegistry::Operate guard{token};
m_runManagerWorker->endRun();
});
edm::LogVerbatim("SimG4CoreApplication") << "OscarMTProducer::endRun done";
}

void OscarMTProducer::produce(edm::Event& e, const edm::EventSetup& es) {
StaticRandomEngineSetUnset random(e.streamID());
auto engine = random.currentEngine();
edm::LogVerbatim("SimG4CoreApplication") << "Produce event " << e.id() << " stream " << e.streamID();
//edm::LogVerbatim("SimG4CoreApplication") << " rand= " << G4UniformRand();

Expand All @@ -221,7 +249,12 @@ void OscarMTProducer::produce(edm::Event& e, const edm::EventSetup& es) {

std::unique_ptr<G4SimEvent> evt;
try {
evt = m_runManagerWorker->produce(e, es, globalCache()->runManagerMaster());
auto token = edm::ServiceRegistry::instance().presentToken();
m_handoff.runAndWait([this, &e, &es, &evt, token, engine]() {
edm::ServiceRegistry::Operate guard{token};
StaticRandomEngineSetUnset random(engine);
evt = m_runManagerWorker->produce(e, es, globalCache()->runManagerMaster());
});
} catch (const SimG4Exception& simg4ex) {
edm::LogWarning("SimG4CoreApplication") << "SimG4Exception caght! " << simg4ex.what();

Expand Down
70 changes: 9 additions & 61 deletions SimG4Core/Application/src/RunManagerMTWorker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,13 @@
#include "tbb/task_arena.h"

static std::once_flag applyOnce;
thread_local bool RunManagerMTWorker::dumpMF = false;

// from https://hypernews.cern.ch/HyperNews/CMS/get/edmFramework/3302/2.html
namespace {
std::atomic<int> thread_counter{0};

int get_new_thread_index() { return thread_counter++; }

thread_local int s_thread_index = get_new_thread_index();

int getThreadIndex() { return s_thread_index; }

void createWatchers(const edm::ParameterSet& iP,
SimActivityRegistry* iReg,
std::vector<std::shared_ptr<SimWatcher>>& oWatchers,
Expand All @@ -102,10 +97,6 @@ namespace {
}
}
}

std::atomic<int> active_tlsdata{0};
std::atomic<bool> tls_shutdown_timeout{false};
std::atomic<int> n_tls_shutdown_task{0};
} // namespace

struct RunManagerMTWorker::TLSData {
Expand All @@ -127,17 +118,17 @@ struct RunManagerMTWorker::TLSData {
bool threadInitialized = false;
bool runTerminated = false;

TLSData() { ++active_tlsdata; }
TLSData() {}

~TLSData() { --active_tlsdata; }
~TLSData() {}
};

//This can not be a smart pointer since we must delete some of the members
// before leaving main() else we get a segmentation fault caused by accessing
// other 'singletons' after those singletons have been deleted. Instead we
// atempt to delete all TLS at RunManagerMTWorker destructor. If that fails for
// some reason, it is better to leak than cause a crash.
thread_local RunManagerMTWorker::TLSData* RunManagerMTWorker::m_tls{nullptr};
//thread_local RunManagerMTWorker::TLSData* RunManagerMTWorker::m_tls{nullptr};

RunManagerMTWorker::RunManagerMTWorker(const edm::ParameterSet& iConfig, edm::ConsumesCollector&& iC)
: m_generator(iConfig.getParameter<edm::ParameterSet>("Generator")),
Expand All @@ -158,7 +149,8 @@ RunManagerMTWorker::RunManagerMTWorker(const edm::ParameterSet& iConfig, edm::Co
m_pCustomUIsession(iConfig.getUntrackedParameter<edm::ParameterSet>("CustomUIsession")),
m_p(iConfig),
m_simEvent(nullptr),
m_sVerbose(nullptr) {
m_sVerbose(nullptr),
m_thread_index{get_new_thread_index()} {
std::vector<std::string> onlySDs = iConfig.getParameter<std::vector<std::string>>("OnlySDs");
m_sdMakers = sim::sensitiveDetectorMakers(m_p, iC, onlySDs);
std::vector<edm::ParameterSet> watchers = iConfig.getParameter<std::vector<edm::ParameterSet>>("Watchers");
Expand All @@ -180,47 +172,9 @@ RunManagerMTWorker::RunManagerMTWorker(const edm::ParameterSet& iConfig, edm::Co
edm::LogVerbatim("SimG4CoreApplication") << "SD[" << k << "] " << itr->first;
}

RunManagerMTWorker::~RunManagerMTWorker() {
++n_tls_shutdown_task;
resetTLS();

{
//make sure all tasks are done before continuing
timespec s;
s.tv_sec = 0;
s.tv_nsec = 10000;
while (n_tls_shutdown_task != 0) {
nanosleep(&s, nullptr);
}
}
}

void RunManagerMTWorker::resetTLS() {
m_tls = nullptr;
RunManagerMTWorker::~RunManagerMTWorker() { resetTLS(); }

if (active_tlsdata != 0 and not tls_shutdown_timeout) {
++n_tls_shutdown_task;
//need to run tasks on each thread which has set the tls
{
tbb::task_arena arena(tbb::task_arena::attach{});
arena.enqueue([]() { RunManagerMTWorker::resetTLS(); });
}
timespec s;
s.tv_sec = 0;
s.tv_nsec = 10000;
//we do not want this thread to be used for a new task since it
// has already cleared its structures. In order to fill all TBB
// threads we wait for all TLSes to clear
int count = 0;
while (active_tlsdata.load() != 0 and ++count < 1000) {
nanosleep(&s, nullptr);
}
if (count >= 1000) {
tls_shutdown_timeout = true;
}
}
--n_tls_shutdown_task;
}
void RunManagerMTWorker::resetTLS() { m_tls = nullptr; }

void RunManagerMTWorker::beginRun(edm::EventSetup const& es) {
for (auto& maker : m_sdMakers) {
Expand Down Expand Up @@ -323,7 +277,7 @@ void RunManagerMTWorker::initializeG4(RunManagerMT* runManagerMaster, const edm:

std::string fieldFile = m_p.getUntrackedParameter<std::string>("FileNameField", "");
if (!fieldFile.empty()) {
std::call_once(applyOnce, []() { dumpMF = true; });
std::call_once(applyOnce, [this]() { dumpMF = true; });
if (dumpMF) {
edm::LogVerbatim("SimG4CoreApplication") << "RunManagerMTWorker: Dump magnetic field to file " << fieldFile;
DumpMagneticField(tM->GetFieldManager()->GetDetectorField(), fieldFile);
Expand Down Expand Up @@ -494,13 +448,7 @@ std::unique_ptr<G4SimEvent> RunManagerMTWorker::produce(const edm::Event& inpevt
// We have to do the per-thread initialization, and per-thread
// per-run initialization here by ourselves.

if (nullptr == m_tls || !m_tls->threadInitialized) {
edm::LogVerbatim("SimG4CoreApplication")
<< "RunManagerMTWorker::produce(): stream " << inpevt.streamID() << " thread " << getThreadIndex()
<< " Geant4 initialisation for this thread";
initializeG4(&runManagerMaster, es);
m_tls->threadInitialized = true;
}
assert(m_tls != nullptr and m_tls->threadInitialized);
// Initialize run
if (inpevt.id().run() != m_tls->currentRunNumber) {
edm::LogVerbatim("SimG4CoreApplication")
Expand Down
Loading