Skip to content

Commit

Permalink
Merge pull request #44624 from wddgit/streamBeginLumiExceptionBehavior
Browse files Browse the repository at this point in the history
Improve behavior after exception in begin/end stream lumi
  • Loading branch information
cmsbuild authored Apr 19, 2024
2 parents 28b1c52 + 26b519a commit c4ee22a
Show file tree
Hide file tree
Showing 14 changed files with 737 additions and 70 deletions.
77 changes: 42 additions & 35 deletions FWCore/Framework/interface/StreamSchedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
#include <sstream>
#include <atomic>
#include <unordered_set>
#include <utility>

namespace edm {

Expand Down Expand Up @@ -397,47 +398,53 @@ namespace edm {
typename T::TransitionInfoType& transitionInfo,
ServiceToken const& token,
bool cleaningUpAfterException) {
auto group = iHolder.group();
auto const& principal = transitionInfo.principal();
T::setStreamContext(streamContext_, principal);

auto id = principal.id();
ServiceWeakToken weakToken = token;
auto doneTask = make_waiting_task(
[this, iHolder, id, cleaningUpAfterException, weakToken](std::exception_ptr const* iPtr) mutable {
std::exception_ptr excpt;
if (iPtr) {
excpt = *iPtr;
//add context information to the exception and print message
try {
convertException::wrap([&]() { std::rethrow_exception(excpt); });
} catch (cms::Exception& ex) {
//TODO: should add the transition type info
std::ostringstream ost;
if (ex.context().empty()) {
ost << "Processing " << T::transitionName() << " " << id;
}
ServiceRegistry::Operate op(weakToken.lock());
addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
excpt = std::current_exception();
}

ServiceRegistry::Operate op(weakToken.lock());
actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
auto doneTask = make_waiting_task([this, iHolder = std::move(iHolder), id, cleaningUpAfterException, weakToken](
std::exception_ptr const* iPtr) mutable {
std::exception_ptr excpt;
if (iPtr) {
excpt = *iPtr;
//add context information to the exception and print message
try {
convertException::wrap([&]() { std::rethrow_exception(excpt); });
} catch (cms::Exception& ex) {
//TODO: should add the transition type info
std::ostringstream ost;
if (ex.context().empty()) {
ost << "Processing " << T::transitionName() << " " << id;
}
// Caught exception is propagated via WaitingTaskHolder
CMS_SA_ALLOW try {
ServiceRegistry::Operate op(weakToken.lock());
T::postScheduleSignal(actReg_.get(), &streamContext_);
} catch (...) {
if (not excpt) {
excpt = std::current_exception();
}
}
iHolder.doneWaiting(excpt);
});
ServiceRegistry::Operate op(weakToken.lock());
addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
excpt = std::current_exception();
}

// We are already handling an earlier exception, so ignore it
// if this signal results in another exception being thrown.
CMS_SA_ALLOW try {
ServiceRegistry::Operate op(weakToken.lock());
actReg_->preStreamEarlyTerminationSignal_(streamContext_, TerminationOrigin::ExceptionFromThisContext);
} catch (...) {
}
}
// Caught exception is propagated via WaitingTaskHolder
CMS_SA_ALLOW try {
ServiceRegistry::Operate op(weakToken.lock());
T::postScheduleSignal(actReg_.get(), &streamContext_);
} catch (...) {
if (not excpt) {
excpt = std::current_exception();
}
}
iHolder.doneWaiting(excpt);
});

auto task = make_functor_task(
[this, h = WaitingTaskHolder(*iHolder.group(), doneTask), info = transitionInfo, weakToken]() mutable {
auto task =
make_functor_task([this, h = WaitingTaskHolder(*group, doneTask), info = transitionInfo, weakToken]() mutable {
auto token = weakToken.lock();
ServiceRegistry::Operate op(token);
// Caught exception is propagated via WaitingTaskHolder
Expand Down Expand Up @@ -465,7 +472,7 @@ namespace edm {
//Enqueueing will start another thread if there is only
// one thread in the job. Having stream == 0 use spawn
// avoids starting up another thread when there is only one stream.
iHolder.group()->run([task]() {
group->run([task]() {
TaskSentry s{task};
task->execute();
});
Expand Down
6 changes: 4 additions & 2 deletions FWCore/Framework/interface/WorkerInPath.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include "FWCore/ServiceRegistry/interface/ParentContext.h"
#include "FWCore/ServiceRegistry/interface/PlaceInPathContext.h"

#include <utility>

namespace edm {

class PathContext;
Expand Down Expand Up @@ -116,7 +118,7 @@ namespace edm {

if constexpr (T::isEvent_) {
ParentContext parentContext(&placeInPathContext_);
worker_->doWorkAsync<T>(iTask, info, token, streamID, parentContext, context);
worker_->doWorkAsync<T>(std::move(iTask), info, token, streamID, parentContext, context);
} else {
ParentContext parentContext(context);

Expand All @@ -125,7 +127,7 @@ namespace edm {
// into the runs or lumis in stream transitions, so there can be
// no data dependencies which require prefetching. Prefetching is
// needed for global transitions, but they are run elsewhere.
worker_->doWorkNoPrefetchingAsync<T>(iTask, info, token, streamID, parentContext, context);
worker_->doWorkNoPrefetchingAsync<T>(std::move(iTask), info, token, streamID, parentContext, context);
}
}
} // namespace edm
Expand Down
100 changes: 82 additions & 18 deletions FWCore/Framework/interface/maker/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,7 @@ namespace edm {
bool ranAcquireWithoutException_;
bool moduleValid_ = true;
bool shouldTryToContinue_ = false;
bool beginSucceeded_ = false;
};

namespace {
Expand All @@ -633,14 +634,33 @@ namespace edm {
ModuleSignalSentry(ActivityRegistry* a,
typename T::Context const* context,
ModuleCallingContext const* moduleCallingContext)
: a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
if (a_)
T::preModuleSignal(a_, context, moduleCallingContext_);
}
: a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {}

~ModuleSignalSentry() {
if (a_)
T::postModuleSignal(a_, context_, moduleCallingContext_);
// This destructor does nothing unless we are unwinding the
// the stack from an earlier exception (a_ will be null if we are
// are not). We want to report the earlier exception and ignore any
// addition exceptions from the post module signal.
CMS_SA_ALLOW try {
if (a_) {
T::postModuleSignal(a_, context_, moduleCallingContext_);
}
} catch (...) {
}
}
void preModuleSignal() {
if (a_) {
T::preModuleSignal(a_, context_, moduleCallingContext_);
}
}
void postModuleSignal() {
if (a_) {
auto temp = a_;
// Setting a_ to null informs the destructor that the signal
// was already run and that it should do nothing.
a_ = nullptr;
T::postModuleSignal(temp, context_, moduleCallingContext_);
}
}

private:
Expand Down Expand Up @@ -690,7 +710,15 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoBegin(info, mcc);
// If preModuleSignal() throws, implDoBegin() is not called, and the
// cpp destructor calls postModuleSignal (ignoring additional exceptions)
cpp.preModuleSignal();
// If implDoBegin() throws, the cpp destructor calls postModuleSignal
// (ignoring additional exceptions)
auto returnValue = iWorker->implDoBegin(info, mcc);
// If postModuleSignal() throws, the exception will propagate to the framework
cpp.postModuleSignal();
return returnValue;
}
static void esPrefetchAsync(Worker* worker,
WaitingTaskHolder waitingTask,
Expand All @@ -715,7 +743,10 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoStreamBegin(id, info, mcc);
cpp.preModuleSignal();
auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
cpp.postModuleSignal();
return returnValue;
}
static void esPrefetchAsync(Worker* worker,
WaitingTaskHolder waitingTask,
Expand All @@ -740,7 +771,10 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoEnd(info, mcc);
cpp.preModuleSignal();
auto returnValue = iWorker->implDoEnd(info, mcc);
cpp.postModuleSignal();
return returnValue;
}
static void esPrefetchAsync(Worker* worker,
WaitingTaskHolder waitingTask,
Expand All @@ -765,7 +799,10 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoStreamEnd(id, info, mcc);
cpp.preModuleSignal();
auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
cpp.postModuleSignal();
return returnValue;
}
static void esPrefetchAsync(Worker* worker,
WaitingTaskHolder waitingTask,
Expand All @@ -791,7 +828,10 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoBegin(info, mcc);
cpp.preModuleSignal();
auto returnValue = iWorker->implDoBegin(info, mcc);
cpp.postModuleSignal();
return returnValue;
}
static void esPrefetchAsync(Worker* worker,
WaitingTaskHolder waitingTask,
Expand All @@ -816,7 +856,11 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoStreamBegin(id, info, mcc);
cpp.preModuleSignal();
auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
cpp.postModuleSignal();
iWorker->beginSucceeded_ = true;
return returnValue;
}
static void esPrefetchAsync(Worker* worker,
WaitingTaskHolder waitingTask,
Expand All @@ -842,7 +886,10 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoEnd(info, mcc);
cpp.preModuleSignal();
auto returnValue = iWorker->implDoEnd(info, mcc);
cpp.postModuleSignal();
return returnValue;
}
static void esPrefetchAsync(Worker* worker,
WaitingTaskHolder waitingTask,
Expand All @@ -866,8 +913,16 @@ namespace edm {
ActivityRegistry* actReg,
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoStreamEnd(id, info, mcc);
if (iWorker->beginSucceeded_) {
iWorker->beginSucceeded_ = false;

ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
cpp.preModuleSignal();
auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
cpp.postModuleSignal();
return returnValue;
}
return true;
}
static void esPrefetchAsync(Worker* worker,
WaitingTaskHolder waitingTask,
Expand All @@ -892,7 +947,10 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoBeginProcessBlock(info.principal(), mcc);
cpp.preModuleSignal();
auto returnValue = iWorker->implDoBeginProcessBlock(info.principal(), mcc);
cpp.postModuleSignal();
return returnValue;
}
static void esPrefetchAsync(
Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
Expand All @@ -912,7 +970,10 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
cpp.preModuleSignal();
auto returnValue = iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
cpp.postModuleSignal();
return returnValue;
}
static void esPrefetchAsync(
Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
Expand All @@ -932,7 +993,10 @@ namespace edm {
ModuleCallingContext const* mcc,
Arg::Context const* context) {
ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
return iWorker->implDoEndProcessBlock(info.principal(), mcc);
cpp.preModuleSignal();
auto returnValue = iWorker->implDoEndProcessBlock(info.principal(), mcc);
cpp.postModuleSignal();
return returnValue;
}
static void esPrefetchAsync(
Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
Expand Down
3 changes: 0 additions & 3 deletions FWCore/Framework/src/EventProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1773,9 +1773,6 @@ namespace edm {
streamQueues_[i].pause();

auto& event = principalCache_.eventPrincipal(i);
//We need to be sure that 'status' and its internal shared_ptr<LuminosityBlockPrincipal> are only
// held by the container as this lambda may not finish executing before all the tasks it
// spawns have already started to run.
auto eventSetupImpls = &status->eventSetupImpls();
auto lp = status->lumiPrincipal().get();
streamLumiStatus_[i] = std::move(status);
Expand Down
3 changes: 2 additions & 1 deletion FWCore/Integration/plugins/BuildFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
<use name="DataFormats/TestObjects"/>
</library>

<library file="AssociationMapProducer.cc,AssociationMapAnalyzer.cc,MissingDictionaryTestProducer.cc,ExistingDictionaryTestModules.cc, TableTestModules.cc, TestGlobalOutput.cc, TestLimitedOutput.cc, TestOneOutput.cc, PluginUsingProducer.cc,SwitchProducerProvenanceAnalyzer.cc,ProducerUsingCollector.cc,ExceptionThrowingProducer.cc,ProdigalAnalyzer.cc,IntSource.cc,ViewAnalyzer.cc,TestFindProduct.cc,ManyProductProducer.cc,TestParentage.cc,HierarchicalEDProducer.cc,ThrowingSource.cc, DelayedReaderThrowingSource.cc,TestHistoryKeeping.cc,PathAnalyzer.cc,SourceWithWaits.cc" name="FWCoreIntegrationSomeTestModules">
<library file="AssociationMapProducer.cc,AssociationMapAnalyzer.cc,MissingDictionaryTestProducer.cc,ExistingDictionaryTestModules.cc, TableTestModules.cc, TestGlobalOutput.cc, TestLimitedOutput.cc, TestOneOutput.cc, PluginUsingProducer.cc,SwitchProducerProvenanceAnalyzer.cc,ProducerUsingCollector.cc,ExceptionThrowingProducer.cc,TestServiceOne.cc,TestServiceTwo.cc,ProdigalAnalyzer.cc,IntSource.cc,ViewAnalyzer.cc,TestFindProduct.cc,ManyProductProducer.cc,TestParentage.cc,HierarchicalEDProducer.cc,ThrowingSource.cc, DelayedReaderThrowingSource.cc,TestHistoryKeeping.cc,PathAnalyzer.cc,SourceWithWaits.cc" name="FWCoreIntegrationSomeTestModules">
<flags EDM_PLUGIN="1"/>
<use name="FWCore/Framework"/>
<use name="FWCore/ParameterSet"/>
<use name="FWCore/ServiceRegistry"/>
<use name="FWCore/Sources"/>
<use name="DataFormats/Provenance"/>
<use name="FWCore/MessageLogger"/>
Expand Down
Loading

0 comments on commit c4ee22a

Please sign in to comment.