Skip to content

Commit

Permalink
Merge pull request #43816 from wddgit/streamSkipLumiConcurrentFilters
Browse files Browse the repository at this point in the history
In Concurrent*Filters, upgrade to allow a stream to skip a lumi
  • Loading branch information
cmsbuild authored Feb 18, 2024
2 parents b43bd96 + 86bd5d6 commit ff5ccb1
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 22 deletions.
28 changes: 17 additions & 11 deletions GeneratorInterface/Core/interface/ConcurrentGeneratorFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ namespace edm {
HAD hadronizer_;
std::unique_ptr<DEC> decayer_;
unsigned int nEventsInLumiBlock_;
unsigned long long nStreamEndLumis_{0};
bool initialized_ = false;
};
template <typename HAD, typename DEC>
struct GenLumiCache {
gen::GenStreamCache<HAD, DEC>* useInLumi_{nullptr};
unsigned long long nGlobalBeginLumis_{0};
};
} // namespace gen

Expand Down Expand Up @@ -157,13 +157,17 @@ namespace edm {
private:
void initLumi(gen::GenStreamCache<HAD, DEC>* cache, LuminosityBlock const& index, EventSetup const& es) const;
ParameterSet config_;

// The following six variables depend on the fact that the Framework does
// not execute global begin lumi transitions and global begin run transitions
// concurrently. Within a transition, modules might execute concurrently,
// but only one such transition will be active at a time.
mutable std::atomic<gen::GenStreamCache<HAD, DEC>*> useInLumi_{nullptr};
mutable std::atomic<unsigned long long> greatestNStreamEndLumis_{0};
mutable std::atomic<unsigned long long> nextNGlobalBeginLumis_{1};
mutable std::atomic<bool> streamEndRunComplete_{true};
// The next two data members are thread safe and can be safely mutable because
// they are only modified/read in globalBeginRun and globalBeginLuminosityBlock.
mutable unsigned long long nGlobalBeginRuns_{0};
mutable unsigned long long nInitializedInGlobalLumiAfterNewRun_{0};
mutable unsigned long long nGlobalBeginLumis_{0};
};

//------------------------------------------------------------------------
Expand Down Expand Up @@ -358,6 +362,8 @@ namespace edm {
while (useInLumi_.load() == nullptr) {
}

++nGlobalBeginLumis_;

// streamEndRun also uses the hadronizer in the stream cache
// so we also need to wait for it to finish if there is a new run
if (nInitializedInGlobalLumiAfterNewRun_ < nGlobalBeginRuns_) {
Expand All @@ -368,6 +374,7 @@ namespace edm {

auto lumiCache = std::make_shared<gen::GenLumiCache<HAD, DEC>>();
lumiCache->useInLumi_ = useInLumi_.load();
lumiCache->nGlobalBeginLumis_ = nGlobalBeginLumis_;
return lumiCache;
}

Expand Down Expand Up @@ -399,7 +406,7 @@ namespace edm {

template <class HAD, class DEC>
void ConcurrentGeneratorFilter<HAD, DEC>::streamEndLuminosityBlockSummary(StreamID id,
LuminosityBlock const&,
LuminosityBlock const& lumi,
EventSetup const&,
gen::GenLumiSummary* iSummary) const {
auto cache = this->streamCache(id);
Expand Down Expand Up @@ -437,13 +444,12 @@ namespace edm {

cache->nEventsInLumiBlock_ = 0;

// The next section of code depends on the Framework behavior that the stream
// lumi transitions are executed for all streams for every lumi even when
// there are no events for a stream to process.
gen::GenStreamCache<HAD, DEC>* streamCachePtr = this->streamCache(id);
unsigned long long expected = streamCachePtr->nStreamEndLumis_;
++streamCachePtr->nStreamEndLumis_;
if (greatestNStreamEndLumis_.compare_exchange_strong(expected, streamCachePtr->nStreamEndLumis_)) {
unsigned long long expected = this->luminosityBlockCache(lumi.index())->nGlobalBeginLumis_;
unsigned long long nextValue = expected + 1;
// This exchange should succeed and the conditional block should be executed only
// for the first stream to try for each lumi.
if (nextNGlobalBeginLumis_.compare_exchange_strong(expected, nextValue)) {
streamEndRunComplete_ = false;
useInLumi_ = streamCachePtr;
}
Expand Down
28 changes: 17 additions & 11 deletions GeneratorInterface/Core/interface/ConcurrentHadronizerFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ namespace edm {
std::unique_ptr<DEC> decayer_;
std::unique_ptr<HepMCFilterDriver> filter_;
unsigned long long nInitializedWithLHERunInfo_{0};
unsigned long long nStreamEndLumis_{0};
bool initialized_ = false;
};
template <typename HAD, typename DEC>
struct LumiCache {
gen::StreamCache<HAD, DEC>* useInLumi_{nullptr};
unsigned long long nGlobalBeginRuns_{0};
unsigned long long nGlobalBeginLumis_{0};
};
} // namespace gen

Expand Down Expand Up @@ -166,13 +166,17 @@ namespace edm {
EDGetTokenT<LHEEventProduct> eventProductToken_;
unsigned int counterRunInfoProducts_;
unsigned int nAttempts_;
// The following six variables depend on the fact that the Framework does
// not execute global begin lumi transitions and global begin run transitions
// concurrently. Within a transition, modules might execute concurrently,
// but only one such transition will be active at a time.
mutable std::atomic<gen::StreamCache<HAD, DEC>*> useInLumi_{nullptr};
mutable std::atomic<unsigned long long> greatestNStreamEndLumis_{0};
mutable std::atomic<unsigned long long> nextNGlobalBeginLumis_{1};
mutable std::atomic<bool> streamEndRunComplete_{true};
// The next two data members are thread safe and can be safely mutable because
// they are only modified/read in globalBeginRun and globalBeginLuminosityBlock.
mutable unsigned long long nGlobalBeginRuns_{0};
mutable unsigned long long nInitializedWithLHERunInfo_{0};
mutable unsigned long long nGlobalBeginLumis_{0};

bool const hasFilter_;
};

Expand Down Expand Up @@ -515,6 +519,8 @@ namespace edm {
while (useInLumi_.load() == nullptr) {
}

++nGlobalBeginLumis_;

// streamEndRun also uses the hadronizer in the stream cache
// so we also need to wait for it to finish if there is a new run
if (nInitializedWithLHERunInfo_ < nGlobalBeginRuns_) {
Expand All @@ -527,6 +533,7 @@ namespace edm {
auto lumiCache = std::make_shared<gen::LumiCache<HAD, DEC>>();
lumiCache->useInLumi_ = useInLumi_.load();
lumiCache->nGlobalBeginRuns_ = nGlobalBeginRuns_;
lumiCache->nGlobalBeginLumis_ = nGlobalBeginLumis_;
return lumiCache;
}

Expand All @@ -541,7 +548,7 @@ namespace edm {

template <class HAD, class DEC>
void ConcurrentHadronizerFilter<HAD, DEC>::streamEndLuminosityBlockSummary(StreamID id,
LuminosityBlock const&,
LuminosityBlock const& lumi,
EventSetup const&,
gen::LumiSummary* iSummary) const {
const lhef::LHERunInfo* lheRunInfo = this->streamCache(id)->hadronizer_.getLHERunInfo().get();
Expand Down Expand Up @@ -594,13 +601,12 @@ namespace edm {
}
}

// The next section of code depends on the Framework behavior that the stream
// lumi transitions are executed for all streams for every lumi even when
// there are no events for a stream to process.
gen::StreamCache<HAD, DEC>* streamCachePtr = this->streamCache(id);
unsigned long long expected = streamCachePtr->nStreamEndLumis_;
++streamCachePtr->nStreamEndLumis_;
if (greatestNStreamEndLumis_.compare_exchange_strong(expected, streamCachePtr->nStreamEndLumis_)) {
unsigned long long expected = this->luminosityBlockCache(lumi.index())->nGlobalBeginLumis_;
unsigned long long nextValue = expected + 1;
// This exchange should succeed and the conditional block should be executed only
// for the first stream to try for each lumi.
if (nextNGlobalBeginLumis_.compare_exchange_strong(expected, nextValue)) {
streamEndRunComplete_ = false;
useInLumi_ = streamCachePtr;
}
Expand Down

0 comments on commit ff5ccb1

Please sign in to comment.