Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release dependent modules only after the worker has finished for scheduled modules #39245

Merged
merged 2 commits into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions FWCore/Framework/interface/maker/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ namespace edm {

virtual bool hasAccumulator() const = 0;

// Used in PuttableProductResolver
edm::WaitingTaskList& waitingTaskList() { return waitingTasks_; }

protected:
template <typename O>
friend class workerhelper::CallImpl;
Expand Down
46 changes: 10 additions & 36 deletions FWCore/Framework/src/ProductResolvers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -407,47 +407,21 @@ namespace edm {
}
}

//Need to try modifying prefetchRequested_ before adding to m_waitingTasks
bool expected = false;
bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
m_waitingTasks.add(waitTask);

if (worker_ and prefetchRequested) {
//using a waiting task to do a callback guarantees that
// the m_waitingTasks list will be released from waiting even
// if the module does not put this data product or the
// module has an exception while running

auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
if (nullptr != iException) {
m_waitingTasks.doneWaiting(*iException);
} else {
m_waitingTasks.doneWaiting(std::exception_ptr());
}
});
worker_->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting));
if (waitingTasks_) {
// using a waiting task to do a callback guarantees that the
// waitingTasks_ list (from the worker) will be released from
// waiting even if the module does not put this data product
// or the module has an exception while running
waitingTasks_->add(waitTask);
}
}
}

void PuttableProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
ProducedProductResolver::putProduct(std::move(edp));
bool expected = false;
if (prefetchRequested_.compare_exchange_strong(expected, true)) {
m_waitingTasks.doneWaiting(std::exception_ptr());
makortel marked this conversation as resolved.
Show resolved Hide resolved
}
}

void PuttableProductResolver::resetProductData_(bool deleteEarly) {
if (not deleteEarly) {
prefetchRequested_ = false;
m_waitingTasks.reset();
}
DataManagingProductResolver::resetProductData_(deleteEarly);
}

void PuttableProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
auto worker = iConfigure.findWorker(branchDescription().moduleLabel());
if (worker) {
waitingTasks_ = &worker->waitingTaskList();
}
}

void UnscheduledProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
Expand Down
14 changes: 7 additions & 7 deletions FWCore/Framework/src/ProductResolvers.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ namespace edm {
class PuttableProductResolver : public ProducedProductResolver {
public:
explicit PuttableProductResolver(std::shared_ptr<BranchDescription const> bd)
: ProducedProductResolver(bd, ProductStatus::NotPut), worker_(nullptr), prefetchRequested_(false) {}
: ProducedProductResolver(bd, ProductStatus::NotPut) {}

void setupUnscheduled(UnscheduledConfigurator const&) final;

Expand All @@ -218,12 +218,12 @@ namespace edm {
ModuleCallingContext const* mcc) const override;
bool unscheduledWasNotRun_() const override { return false; }

void putProduct(std::unique_ptr<WrapperBase> edp) const override;
void resetProductData_(bool deleteEarly) override;

CMS_THREAD_SAFE mutable WaitingTaskList m_waitingTasks;
Worker* worker_;
mutable std::atomic<bool> prefetchRequested_;
// The WaitingTaskList below is the one from the worker, if one
// corresponds to this ProductResolver. For the Source-like cases
// where there is no such Worker, the tasks depending on the data
// depending on this ProductResolver are assumed to be eligible to
// run immediately after their prefetch.
WaitingTaskList* waitingTasks_ = nullptr;
};

class UnscheduledProductResolver : public ProducedProductResolver {
Expand Down
9 changes: 6 additions & 3 deletions FWCore/Framework/src/StreamSchedule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,12 @@ namespace edm {
ServiceRegistry::Operate guard(serviceToken);
Traits::preScheduleSignal(actReg_.get(), &streamContext_);

// Data dependencies need to be set up before marking empty
// (End)Paths complete in case something consumes the status of
// the empty (EndPath)
workerManager_.setupResolvers(ep);
workerManager_.setupOnDemandSystem(info);

HLTPathStatus hltPathStatus(hlt::Pass, 0);
for (int empty_trig_path : empty_trig_paths_) {
results_->at(empty_trig_path) = hltPathStatus;
Expand All @@ -912,9 +918,6 @@ namespace edm {
}
}

workerManager_.setupResolvers(ep);
workerManager_.setupOnDemandSystem(info);

++total_events_;

//use to give priorities on an error to ones from Paths
Expand Down