Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In Concurrent*Filters, upgrade to allow a stream to skip a lumi #43816

Merged
merged 1 commit into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions GeneratorInterface/Core/interface/ConcurrentGeneratorFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ namespace edm {
HAD hadronizer_;
std::unique_ptr<DEC> decayer_;
unsigned int nEventsInLumiBlock_;
unsigned long long nStreamEndLumis_{0};
bool initialized_ = false;
};
template <typename HAD, typename DEC>
struct GenLumiCache {
gen::GenStreamCache<HAD, DEC>* useInLumi_{nullptr};
unsigned long long nGlobalBeginLumis_{0};
};
} // namespace gen

Expand Down Expand Up @@ -157,13 +157,17 @@ namespace edm {
private:
void initLumi(gen::GenStreamCache<HAD, DEC>* cache, LuminosityBlock const& index, EventSetup const& es) const;
ParameterSet config_;

// The following six variables depend on the fact that the Framework does
// not execute global begin lumi transitions and global begin run transitions
// concurrently. Within a transition, modules might execute concurrently,
// but only one such transition will be active at a time.
mutable std::atomic<gen::GenStreamCache<HAD, DEC>*> useInLumi_{nullptr};
mutable std::atomic<unsigned long long> greatestNStreamEndLumis_{0};
mutable std::atomic<unsigned long long> nextNGlobalBeginLumis_{1};
mutable std::atomic<bool> streamEndRunComplete_{true};
// The next two data members are thread safe and can be safely mutable because
// they are only modified/read in globalBeginRun and globalBeginLuminosityBlock.
mutable unsigned long long nGlobalBeginRuns_{0};
mutable unsigned long long nInitializedInGlobalLumiAfterNewRun_{0};
mutable unsigned long long nGlobalBeginLumis_{0};
};

//------------------------------------------------------------------------
Expand Down Expand Up @@ -358,6 +362,8 @@ namespace edm {
while (useInLumi_.load() == nullptr) {
}

++nGlobalBeginLumis_;

// streamEndRun also uses the hadronizer in the stream cache
// so we also need to wait for it to finish if there is a new run
if (nInitializedInGlobalLumiAfterNewRun_ < nGlobalBeginRuns_) {
Expand All @@ -368,6 +374,7 @@ namespace edm {

auto lumiCache = std::make_shared<gen::GenLumiCache<HAD, DEC>>();
lumiCache->useInLumi_ = useInLumi_.load();
lumiCache->nGlobalBeginLumis_ = nGlobalBeginLumis_;
return lumiCache;
}

Expand Down Expand Up @@ -399,7 +406,7 @@ namespace edm {

template <class HAD, class DEC>
void ConcurrentGeneratorFilter<HAD, DEC>::streamEndLuminosityBlockSummary(StreamID id,
LuminosityBlock const&,
LuminosityBlock const& lumi,
EventSetup const&,
gen::GenLumiSummary* iSummary) const {
auto cache = this->streamCache(id);
Expand Down Expand Up @@ -437,13 +444,12 @@ namespace edm {

cache->nEventsInLumiBlock_ = 0;

// The next section of code depends on the Framework behavior that the stream
// lumi transitions are executed for all streams for every lumi even when
// there are no events for a stream to process.
gen::GenStreamCache<HAD, DEC>* streamCachePtr = this->streamCache(id);
unsigned long long expected = streamCachePtr->nStreamEndLumis_;
++streamCachePtr->nStreamEndLumis_;
if (greatestNStreamEndLumis_.compare_exchange_strong(expected, streamCachePtr->nStreamEndLumis_)) {
unsigned long long expected = this->luminosityBlockCache(lumi.index())->nGlobalBeginLumis_;
unsigned long long nextValue = expected + 1;
// This exchange should succeed and the conditional block should be executed only
// for the first stream to try for each lumi.
if (nextNGlobalBeginLumis_.compare_exchange_strong(expected, nextValue)) {
streamEndRunComplete_ = false;
useInLumi_ = streamCachePtr;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure I understood, is the idea that the first streamEndLuminosityBlock (of a Lumi) to arrive here would change the streamEndRunComplete_ and useInLumi_?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment at this point in both files.

Expand Down
28 changes: 17 additions & 11 deletions GeneratorInterface/Core/interface/ConcurrentHadronizerFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ namespace edm {
std::unique_ptr<DEC> decayer_;
std::unique_ptr<HepMCFilterDriver> filter_;
unsigned long long nInitializedWithLHERunInfo_{0};
unsigned long long nStreamEndLumis_{0};
bool initialized_ = false;
};
template <typename HAD, typename DEC>
struct LumiCache {
gen::StreamCache<HAD, DEC>* useInLumi_{nullptr};
unsigned long long nGlobalBeginRuns_{0};
unsigned long long nGlobalBeginLumis_{0};
};
} // namespace gen

Expand Down Expand Up @@ -166,13 +166,17 @@ namespace edm {
EDGetTokenT<LHEEventProduct> eventProductToken_;
unsigned int counterRunInfoProducts_;
unsigned int nAttempts_;
// The following six variables depend on the fact that the Framework does
// not execute global begin lumi transitions and global begin run transitions
// concurrently. Within a transition, modules might execute concurrently,
// but only one such transition will be active at a time.
mutable std::atomic<gen::StreamCache<HAD, DEC>*> useInLumi_{nullptr};
mutable std::atomic<unsigned long long> greatestNStreamEndLumis_{0};
mutable std::atomic<unsigned long long> nextNGlobalBeginLumis_{1};
mutable std::atomic<bool> streamEndRunComplete_{true};
// The next two data members are thread safe and can be safely mutable because
// they are only modified/read in globalBeginRun and globalBeginLuminosityBlock.
mutable unsigned long long nGlobalBeginRuns_{0};
mutable unsigned long long nInitializedWithLHERunInfo_{0};
mutable unsigned long long nGlobalBeginLumis_{0};

bool const hasFilter_;
};

Expand Down Expand Up @@ -515,6 +519,8 @@ namespace edm {
while (useInLumi_.load() == nullptr) {
}

++nGlobalBeginLumis_;

// streamEndRun also uses the hadronizer in the stream cache
// so we also need to wait for it to finish if there is a new run
if (nInitializedWithLHERunInfo_ < nGlobalBeginRuns_) {
Expand All @@ -527,6 +533,7 @@ namespace edm {
auto lumiCache = std::make_shared<gen::LumiCache<HAD, DEC>>();
lumiCache->useInLumi_ = useInLumi_.load();
lumiCache->nGlobalBeginRuns_ = nGlobalBeginRuns_;
lumiCache->nGlobalBeginLumis_ = nGlobalBeginLumis_;
return lumiCache;
}

Expand All @@ -541,7 +548,7 @@ namespace edm {

template <class HAD, class DEC>
void ConcurrentHadronizerFilter<HAD, DEC>::streamEndLuminosityBlockSummary(StreamID id,
LuminosityBlock const&,
LuminosityBlock const& lumi,
EventSetup const&,
gen::LumiSummary* iSummary) const {
const lhef::LHERunInfo* lheRunInfo = this->streamCache(id)->hadronizer_.getLHERunInfo().get();
Expand Down Expand Up @@ -594,13 +601,12 @@ namespace edm {
}
}

// The next section of code depends on the Framework behavior that the stream
// lumi transitions are executed for all streams for every lumi even when
// there are no events for a stream to process.
gen::StreamCache<HAD, DEC>* streamCachePtr = this->streamCache(id);
unsigned long long expected = streamCachePtr->nStreamEndLumis_;
++streamCachePtr->nStreamEndLumis_;
if (greatestNStreamEndLumis_.compare_exchange_strong(expected, streamCachePtr->nStreamEndLumis_)) {
unsigned long long expected = this->luminosityBlockCache(lumi.index())->nGlobalBeginLumis_;
unsigned long long nextValue = expected + 1;
// This exchange should succeed and the conditional block should be executed only
// for the first stream to try for each lumi.
if (nextNGlobalBeginLumis_.compare_exchange_strong(expected, nextValue)) {
streamEndRunComplete_ = false;
useInLumi_ = streamCachePtr;
}
Expand Down