diff --git a/FWCore/Framework/interface/maker/Worker.h b/FWCore/Framework/interface/maker/Worker.h index c36fe2409b207..65ab71f8c0af2 100644 --- a/FWCore/Framework/interface/maker/Worker.h +++ b/FWCore/Framework/interface/maker/Worker.h @@ -250,6 +250,9 @@ namespace edm { virtual bool hasAccumulator() const = 0; + // Used in PuttableProductResolver + edm::WaitingTaskList& waitingTaskList() { return waitingTasks_; } + protected: template friend class workerhelper::CallImpl; diff --git a/FWCore/Framework/src/ProductResolvers.cc b/FWCore/Framework/src/ProductResolvers.cc index 426c97a0ceaa6..28e1a003040eb 100644 --- a/FWCore/Framework/src/ProductResolvers.cc +++ b/FWCore/Framework/src/ProductResolvers.cc @@ -407,47 +407,21 @@ namespace edm { } } - //Need to try modifying prefetchRequested_ before adding to m_waitingTasks - bool expected = false; - bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true); - m_waitingTasks.add(waitTask); - - if (worker_ and prefetchRequested) { - //using a waiting task to do a callback guarantees that - // the m_waitingTasks list will be released from waiting even - // if the module does not put this data product or the - // module has an exception while running - - auto waiting = make_waiting_task([this](std::exception_ptr const* iException) { - if (nullptr != iException) { - m_waitingTasks.doneWaiting(*iException); - } else { - m_waitingTasks.doneWaiting(std::exception_ptr()); - } - }); - worker_->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting)); + if (waitingTasks_) { + // using a waiting task to do a callback guarantees that the + // waitingTasks_ list (from the worker) will be released from + // waiting even if the module does not put this data product + // or the module has an exception while running + waitingTasks_->add(waitTask); } } } - void PuttableProductResolver::putProduct(std::unique_ptr edp) const { - ProducedProductResolver::putProduct(std::move(edp)); - bool expected = false; - if (prefetchRequested_.compare_exchange_strong(expected, true)) { - m_waitingTasks.doneWaiting(std::exception_ptr()); - } - } - - void PuttableProductResolver::resetProductData_(bool deleteEarly) { - if (not deleteEarly) { - prefetchRequested_ = false; - m_waitingTasks.reset(); - } - DataManagingProductResolver::resetProductData_(deleteEarly); - } - void PuttableProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) { - worker_ = iConfigure.findWorker(branchDescription().moduleLabel()); + auto worker = iConfigure.findWorker(branchDescription().moduleLabel()); + if (worker) { + waitingTasks_ = &worker->waitingTaskList(); + } } void UnscheduledProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) { diff --git a/FWCore/Framework/src/ProductResolvers.h b/FWCore/Framework/src/ProductResolvers.h index 8ca0faccff8d7..9cd119784787c 100644 --- a/FWCore/Framework/src/ProductResolvers.h +++ b/FWCore/Framework/src/ProductResolvers.h @@ -201,7 +201,7 @@ namespace edm { class PuttableProductResolver : public ProducedProductResolver { public: explicit PuttableProductResolver(std::shared_ptr bd) - : ProducedProductResolver(bd, ProductStatus::NotPut), worker_(nullptr), prefetchRequested_(false) {} + : ProducedProductResolver(bd, ProductStatus::NotPut) {} void setupUnscheduled(UnscheduledConfigurator const&) final; @@ -218,12 +218,12 @@ namespace edm { ModuleCallingContext const* mcc) const override; bool unscheduledWasNotRun_() const override { return false; } - void putProduct(std::unique_ptr edp) const override; - void resetProductData_(bool deleteEarly) override; - - CMS_THREAD_SAFE mutable WaitingTaskList m_waitingTasks; - Worker* worker_; - mutable std::atomic prefetchRequested_; + // The WaitingTaskList below is the one from the worker, if one + // corresponds to this ProductResolver. For the Source-like cases + // where there is no such Worker, the tasks depending on the data + // depending on this ProductResolver are assumed to be eligible to + // run immediately after their prefetch. + WaitingTaskList* waitingTasks_ = nullptr; }; class UnscheduledProductResolver : public ProducedProductResolver { diff --git a/FWCore/Framework/src/StreamSchedule.cc b/FWCore/Framework/src/StreamSchedule.cc index 20338b0791a30..cd0614afadb0e 100644 --- a/FWCore/Framework/src/StreamSchedule.cc +++ b/FWCore/Framework/src/StreamSchedule.cc @@ -890,6 +890,12 @@ namespace edm { ServiceRegistry::Operate guard(serviceToken); Traits::preScheduleSignal(actReg_.get(), &streamContext_); + // Data dependencies need to be set up before marking empty + // (End)Paths complete in case something consumes the status of + // the empty (EndPath) + workerManager_.setupResolvers(ep); + workerManager_.setupOnDemandSystem(info); + HLTPathStatus hltPathStatus(hlt::Pass, 0); for (int empty_trig_path : empty_trig_paths_) { results_->at(empty_trig_path) = hltPathStatus; @@ -912,9 +918,6 @@ namespace edm { } } - workerManager_.setupResolvers(ep); - workerManager_.setupOnDemandSystem(info); - ++total_events_; //use to give priorities on an error to ones from Paths