Skip to content

Commit

Permalink
Merge pull request #39245 from makortel/puttableProductResolverPutPro…
Browse files Browse the repository at this point in the history
…duct

Release dependent modules only after the worker has finished for scheduled modules
  • Loading branch information
cmsbuild authored Sep 23, 2022
2 parents 53e6af9 + e8a6e6a commit fbfd2d7
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 46 deletions.
3 changes: 3 additions & 0 deletions FWCore/Framework/interface/maker/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ namespace edm {

virtual bool hasAccumulator() const = 0;

// Used in PuttableProductResolver
edm::WaitingTaskList& waitingTaskList() { return waitingTasks_; }

protected:
template <typename O>
friend class workerhelper::CallImpl;
Expand Down
46 changes: 10 additions & 36 deletions FWCore/Framework/src/ProductResolvers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -407,47 +407,21 @@ namespace edm {
}
}

//Need to try modifying prefetchRequested_ before adding to m_waitingTasks
bool expected = false;
bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
m_waitingTasks.add(waitTask);

if (worker_ and prefetchRequested) {
//using a waiting task to do a callback guarantees that
// the m_waitingTasks list will be released from waiting even
// if the module does not put this data product or the
// module has an exception while running

auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
if (nullptr != iException) {
m_waitingTasks.doneWaiting(*iException);
} else {
m_waitingTasks.doneWaiting(std::exception_ptr());
}
});
worker_->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting));
if (waitingTasks_) {
// using a waiting task to do a callback guarantees that the
// waitingTasks_ list (from the worker) will be released from
// waiting even if the module does not put this data product
// or the module has an exception while running
waitingTasks_->add(waitTask);
}
}
}

void PuttableProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
ProducedProductResolver::putProduct(std::move(edp));
bool expected = false;
if (prefetchRequested_.compare_exchange_strong(expected, true)) {
m_waitingTasks.doneWaiting(std::exception_ptr());
}
}

void PuttableProductResolver::resetProductData_(bool deleteEarly) {
if (not deleteEarly) {
prefetchRequested_ = false;
m_waitingTasks.reset();
}
DataManagingProductResolver::resetProductData_(deleteEarly);
}

void PuttableProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
auto worker = iConfigure.findWorker(branchDescription().moduleLabel());
if (worker) {
waitingTasks_ = &worker->waitingTaskList();
}
}

void UnscheduledProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
Expand Down
14 changes: 7 additions & 7 deletions FWCore/Framework/src/ProductResolvers.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ namespace edm {
class PuttableProductResolver : public ProducedProductResolver {
public:
explicit PuttableProductResolver(std::shared_ptr<BranchDescription const> bd)
: ProducedProductResolver(bd, ProductStatus::NotPut), worker_(nullptr), prefetchRequested_(false) {}
: ProducedProductResolver(bd, ProductStatus::NotPut) {}

void setupUnscheduled(UnscheduledConfigurator const&) final;

Expand All @@ -218,12 +218,12 @@ namespace edm {
ModuleCallingContext const* mcc) const override;
bool unscheduledWasNotRun_() const override { return false; }

void putProduct(std::unique_ptr<WrapperBase> edp) const override;
void resetProductData_(bool deleteEarly) override;

CMS_THREAD_SAFE mutable WaitingTaskList m_waitingTasks;
Worker* worker_;
mutable std::atomic<bool> prefetchRequested_;
// The WaitingTaskList below is the one from the worker, if one
// corresponds to this ProductResolver. For the Source-like cases
// where there is no such Worker, the tasks depending on the data
// depending on this ProductResolver are assumed to be eligible to
// run immediately after their prefetch.
WaitingTaskList* waitingTasks_ = nullptr;
};

class UnscheduledProductResolver : public ProducedProductResolver {
Expand Down
9 changes: 6 additions & 3 deletions FWCore/Framework/src/StreamSchedule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,12 @@ namespace edm {
ServiceRegistry::Operate guard(serviceToken);
Traits::preScheduleSignal(actReg_.get(), &streamContext_);

// Data dependencies need to be set up before marking empty
// (End)Paths complete in case something consumes the status of
// the empty (EndPath)
workerManager_.setupResolvers(ep);
workerManager_.setupOnDemandSystem(info);

HLTPathStatus hltPathStatus(hlt::Pass, 0);
for (int empty_trig_path : empty_trig_paths_) {
results_->at(empty_trig_path) = hltPathStatus;
Expand All @@ -912,9 +918,6 @@ namespace edm {
}
}

workerManager_.setupResolvers(ep);
workerManager_.setupOnDemandSystem(info);

++total_events_;

//use to give priorities on an error to ones from Paths
Expand Down

0 comments on commit fbfd2d7

Please sign in to comment.