Skip to content

Commit

Permalink
Improve behavior after exception in begin/end global lumi
Browse files Browse the repository at this point in the history
  • Loading branch information
wddgit committed Apr 24, 2024
1 parent 3fb8b0e commit daed9f8
Show file tree
Hide file tree
Showing 16 changed files with 824 additions and 219 deletions.
175 changes: 77 additions & 98 deletions FWCore/Framework/interface/GlobalSchedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "FWCore/Framework/interface/WorkerRegistry.h"
#include "FWCore/MessageLogger/interface/ExceptionMessages.h"
#include "FWCore/ServiceRegistry/interface/GlobalContext.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
#include "FWCore/ServiceRegistry/interface/ServiceToken.h"
#include "FWCore/Utilities/interface/Algorithms.h"
#include "FWCore/Utilities/interface/BranchType.h"
#include "FWCore/Utilities/interface/ConvertException.h"
Expand All @@ -28,6 +30,7 @@
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
#include "FWCore/Utilities/interface/thread_safety_macros.h"

#include <exception>
#include <map>
#include <memory>
#include <set>
Expand All @@ -38,37 +41,6 @@

namespace edm {

namespace {
template <typename T>
class GlobalScheduleSignalSentry {
public:
GlobalScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context)
: a_(a), context_(context), allowThrow_(false) {
if (a_)
T::preScheduleSignal(a_, context_);
}
~GlobalScheduleSignalSentry() noexcept(false) {
// Caught exception is rethrown
CMS_SA_ALLOW try {
if (a_)
T::postScheduleSignal(a_, context_);
} catch (...) {
if (allowThrow_) {
throw;
}
}
}

void allowThrow() { allowThrow_ = true; }

private:
// We own none of these resources.
ActivityRegistry* a_; // We do not use propagate_const because the registry itself is mutable.
typename T::Context const* context_;
bool allowThrow_;
};
} // namespace

class ActivityRegistry;
class ExceptionCollector;
class ProcessContext;
Expand Down Expand Up @@ -131,25 +103,16 @@ namespace edm {
AllWorkers const& allWorkers() const { return workerManagers_[0].allWorkers(); }

private:
//Sentry class to only send a signal if an
// exception occurs. An exception is identified
// by the destructor being called without first
// calling completedSuccessfully().
class SendTerminationSignalIfException {
public:
SendTerminationSignalIfException(edm::ActivityRegistry* iReg, edm::GlobalContext const* iContext)
: reg_(iReg), context_(iContext) {}
~SendTerminationSignalIfException() {
if (reg_) {
reg_->preGlobalEarlyTerminationSignal_(*context_, TerminationOrigin::ExceptionFromThisContext);
}
}
void completedSuccessfully() { reg_ = nullptr; }
template <typename T>
void preScheduleSignal(GlobalContext const*, ServiceToken const&);

template <typename T>
void postScheduleSignal(GlobalContext const*, ServiceWeakToken const&, std::exception_ptr&);

private:
edm::ActivityRegistry* reg_; // We do not use propagate_const because the registry itself is mutable.
GlobalContext const* context_;
};
void handleException(GlobalContext const*,
ServiceWeakToken const&,
bool cleaningUpAfterException,
std::exception_ptr&);

/// returns the action table
ExceptionToActionTable const& actionTable() const { return workerManagers_[0].actionTable(); }
Expand All @@ -173,72 +136,88 @@ namespace edm {
//need the doneTask to own the memory
auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal, processContext_));

if (actReg_) {
//Services may depend upon each other
ServiceRegistry::Operate op(token);
T::preScheduleSignal(actReg_.get(), globalContext.get());
}

ServiceWeakToken weakToken = token;
auto doneTask = make_waiting_task(
[this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr const* iPtr) mutable {
std::exception_ptr excpt;
if (iPtr) {
excpt = *iPtr;
//add context information to the exception and print message
try {
convertException::wrap([&]() { std::rethrow_exception(excpt); });
} catch (cms::Exception& ex) {
//TODO: should add the transition type info
std::ostringstream ost;
if (ex.context().empty()) {
ost << "Processing " << T::transitionName() << " ";
}
ServiceRegistry::Operate op(weakToken.lock());
addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
excpt = std::current_exception();
}
if (actReg_) {
ServiceRegistry::Operate op(weakToken.lock());
actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
}
}
if (actReg_) {
// Caught exception is propagated via WaitingTaskHolder
CMS_SA_ALLOW try {
ServiceRegistry::Operate op(weakToken.lock());
T::postScheduleSignal(actReg_.get(), globalContext.get());
} catch (...) {
if (not excpt) {
excpt = std::current_exception();
}
}
// add context information to the exception and print message
handleException(globalContext.get(), weakToken, cleaningUpAfterException, excpt);
}
postScheduleSignal<T>(globalContext.get(), weakToken, excpt);
iHolder.doneWaiting(excpt);
});
unsigned int managerIndex = principal.index();
if constexpr (T::branchType_ == InRun) {
managerIndex += numberOfConcurrentLumis_;
}
WorkerManager& workerManager = workerManagers_[managerIndex];
workerManager.resetAll();

ParentContext parentContext(globalContext.get());
//make sure the ProductResolvers know about their
// workers to allow proper data dependency handling
workerManager.setupResolvers(transitionInfo.principal());

//make sure the task doesn't get run until all workers have beens started
WaitingTaskHolder holdForLoop(*iHolder.group(), doneTask);
auto& aw = workerManager.allWorkers();
for (Worker* worker : boost::adaptors::reverse(aw)) {
worker->doWorkAsync<T>(
holdForLoop, transitionInfo, token, StreamID::invalidStreamID(), parentContext, globalContext.get());

CMS_SA_ALLOW try {
preScheduleSignal<T>(globalContext.get(), token);

unsigned int managerIndex = principal.index();
if constexpr (T::branchType_ == InRun) {
managerIndex += numberOfConcurrentLumis_;
}
WorkerManager& workerManager = workerManagers_[managerIndex];
workerManager.resetAll();

ParentContext parentContext(globalContext.get());
// make sure the ProductResolvers know about their
// workers to allow proper data dependency handling
workerManager.setupResolvers(transitionInfo.principal());

auto& aw = workerManager.allWorkers();
for (Worker* worker : boost::adaptors::reverse(aw)) {
worker->doWorkAsync<T>(
holdForLoop, transitionInfo, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
}
} catch (...) {
holdForLoop.doneWaiting(std::current_exception());
}
} catch (...) {
iHolder.doneWaiting(std::current_exception());
}
}

template <typename T>
void GlobalSchedule::preScheduleSignal(GlobalContext const* globalContext, ServiceToken const& token) {
if (actReg_) {
try {
ServiceRegistry::Operate op(token);
convertException::wrap([this, globalContext]() { T::preScheduleSignal(actReg_.get(), globalContext); });
} catch (cms::Exception& ex) {
std::ostringstream ost;
ex.addContext("Handling pre signal, likely in a service function");
exceptionContext(ost, *globalContext);
ex.addContext(ost.str());
throw;
}
}
}

template <typename T>
void GlobalSchedule::postScheduleSignal(GlobalContext const* globalContext,
ServiceWeakToken const& weakToken,
std::exception_ptr& excpt) {
if (actReg_) {
try {
convertException::wrap([this, &weakToken, globalContext]() {
ServiceRegistry::Operate op(weakToken.lock());
T::postScheduleSignal(actReg_.get(), globalContext);
});
} catch (cms::Exception& ex) {
if (not excpt) {
std::ostringstream ost;
ex.addContext("Handling post signal, likely in a service function");
exceptionContext(ost, *globalContext);
ex.addContext(ost.str());
excpt = std::current_exception();
}
}
}
}

} // namespace edm

#endif
30 changes: 23 additions & 7 deletions FWCore/Framework/interface/maker/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,12 @@ namespace edm {
}
void preModuleSignal() {
if (a_) {
T::preModuleSignal(a_, context_, moduleCallingContext_);
try {
convertException::wrap([this]() { T::preModuleSignal(a_, context_, moduleCallingContext_); });
} catch (cms::Exception& ex) {
ex.addContext("Handling pre module signal, likely in a service function immediately before module method");
throw;
}
}
}
void postModuleSignal() {
Expand All @@ -659,7 +664,12 @@ namespace edm {
// Setting a_ to null informs the destructor that the signal
// was already run and that it should do nothing.
a_ = nullptr;
T::postModuleSignal(temp, context_, moduleCallingContext_);
try {
convertException::wrap([this, temp]() { T::postModuleSignal(temp, context_, moduleCallingContext_); });
} catch (cms::Exception& ex) {
ex.addContext("Handling post module signal, likely in a service function immediately after module method");
throw;
}
}
}

Expand Down Expand Up @@ -831,6 +841,7 @@ namespace edm {
cpp.preModuleSignal();
auto returnValue = iWorker->implDoBegin(info, mcc);
cpp.postModuleSignal();
iWorker->beginSucceeded_ = true;
return returnValue;
}
static void esPrefetchAsync(Worker* worker,
Expand Down Expand Up @@ -885,11 +896,16 @@ namespace edm {
ActivityRegistry* actReg,
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
cpp.preModuleSignal();
auto returnValue = iWorker->implDoEnd(info, mcc);
cpp.postModuleSignal();
return returnValue;
if (iWorker->beginSucceeded_) {
iWorker->beginSucceeded_ = false;

ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
cpp.preModuleSignal();
auto returnValue = iWorker->implDoEnd(info, mcc);
cpp.postModuleSignal();
return returnValue;
}
return true;
}
static void esPrefetchAsync(Worker* worker,
WaitingTaskHolder waitingTask,
Expand Down
34 changes: 14 additions & 20 deletions FWCore/Framework/src/EventProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1744,22 +1744,18 @@ namespace edm {
looper_->prefetchAsync(
nextTask, serviceToken_, Transition::BeginLuminosityBlock, *(status->lumiPrincipal()), es);
}) | ifThen(looper_, [this, status, &es](auto nextTask) {
status->globalBeginDidSucceed();
ServiceRegistry::Operate operateLooper(serviceToken_);
looper_->doBeginLuminosityBlock(*(status->lumiPrincipal()), es, &processContext_);
}) | then([this, status, iRunStatus](std::exception_ptr const* iException, auto holder) mutable {
status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());

if (iException) {
status->resetResources();
queueWhichWaitsForIOVsToFinish_.resume();
WaitingTaskHolder copyHolder(holder);
copyHolder.doneWaiting(*iException);
globalEndLumiAsync(holder, status);
endRunAsync(iRunStatus, holder);
} else {
if (not looper_) {
status->globalBeginDidSucceed();
}

status->setGlobalEndRunHolder(iRunStatus->globalEndRunHolder());
status->globalBeginDidSucceed();

EventSetupImpl const& es = status->eventSetupImpl(esp_->subProcessIndex());
using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
Expand Down Expand Up @@ -1957,18 +1953,16 @@ namespace edm {
auto eventSetupImpls = &lumiStatus->eventSetupImpls();
bool cleaningUpAfterException = lumiStatus->cleaningUpAfterException() || iTask.taskHasFailed();

if (lumiStatus->didGlobalBeginSucceed()) {
auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
*schedule_,
iStreamIndex,
transitionInfo,
serviceToken_,
subProcesses_,
cleaningUpAfterException);
}
auto& lumiPrincipal = *lumiStatus->lumiPrincipal();
using Traits = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
LumiTransitionInfo transitionInfo(lumiPrincipal, es, eventSetupImpls);
endStreamTransitionAsync<Traits>(std::move(lumiDoneTask),
*schedule_,
iStreamIndex,
transitionInfo,
serviceToken_,
subProcesses_,
cleaningUpAfterException);
}

void EventProcessor::endUnfinishedLumi(bool cleaningUpAfterException) {
Expand Down
28 changes: 28 additions & 0 deletions FWCore/Framework/src/GlobalSchedule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,32 @@ namespace edm {
}
return result;
}

void GlobalSchedule::handleException(GlobalContext const* globalContext,
ServiceWeakToken const& weakToken,
bool cleaningUpAfterException,
std::exception_ptr& excpt) {
//add context information to the exception and print message
try {
convertException::wrap([&excpt]() { std::rethrow_exception(excpt); });
} catch (cms::Exception& ex) {
std::ostringstream ost;
// In most cases the exception will already have context at this point,
// but add some context here in those rare cases where it does not.
if (ex.context().empty()) {
exceptionContext(ost, *globalContext);
}
ServiceRegistry::Operate op(weakToken.lock());
addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
excpt = std::current_exception();
}
CMS_SA_ALLOW try {
if (actReg_) {
ServiceRegistry::Operate op(weakToken.lock());
actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
}
} catch (...) {
}
}

} // namespace edm
13 changes: 11 additions & 2 deletions FWCore/Framework/test/global_filter_t.cppunit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -540,9 +540,18 @@ void testGlobalFilter::testTransitions(std::shared_ptr<T> iMod, Expectations con
edm::maker::ModuleHolderT<edm::global::EDFilterBase> h(iMod, nullptr);
h.preallocate(edm::PreallocationConfiguration{});

edm::WorkerT<edm::global::EDFilterBase> w{iMod, m_desc, nullptr};
edm::WorkerT<edm::global::EDFilterBase> wOther{iMod, m_desc, nullptr};
edm::WorkerT<edm::global::EDFilterBase> wGlobalLumi{iMod, m_desc, nullptr};
edm::WorkerT<edm::global::EDFilterBase> wStreamLumi{iMod, m_desc, nullptr};
for (auto& keyVal : m_transToFunc) {
testTransition(iMod, &w, keyVal.first, iExpect, keyVal.second);
edm::Worker* worker = &wOther;
if (keyVal.first == Trans::kStreamBeginLuminosityBlock || keyVal.first == Trans::kStreamEndLuminosityBlock) {
worker = &wStreamLumi;
} else if (keyVal.first == Trans::kGlobalBeginLuminosityBlock ||
keyVal.first == Trans::kGlobalEndLuminosityBlock) {
worker = &wGlobalLumi;
}
testTransition(iMod, worker, keyVal.first, iExpect, keyVal.second);
}
});
}
Expand Down
Loading

0 comments on commit daed9f8

Please sign in to comment.