Skip to content

Commit

Permalink
Use Worker's WaitingTaskList directly in PuttableProductResolver
Browse files Browse the repository at this point in the history
This change was triggered by a case where a PuttableProductResolver
was filled by a Worker that produced many products, and one of the
products (A) had a Ref to another product (B), and the product B was
not consumed by any module (it was only accessed through the Ref).
Since the putProduct() released the WaitingTaskList of the Resolver,
that lead to the consumer of A to run, and that consumer dereferenced
the Ref to see that B was not there.

Besides scheduled modules another cases where PuttableProductResolver
is used are Sources inheriting PuttableSourceBase, and TestProcessor.
In these cases the products are put into the Resolvers (or left as
non-produced) before launching the prefetching of the unscheduled
system. Therefore in these use cases the consuming modules do not need
to wait for the Resolver to be filled.

After several fix attempts it seemed easiest to just use the Worker's
WaitingTaskList directly in PuttableProductResolver. This approach
fulfills the requirements of both Worker and Source(-like) use cases,
and even simplifies the code.
  • Loading branch information
makortel committed Sep 21, 2022
1 parent 291b1f4 commit f88c583
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 43 deletions.
3 changes: 3 additions & 0 deletions FWCore/Framework/interface/maker/Worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ namespace edm {

virtual bool hasAccumulator() const = 0;

// Used in PuttableProductResolver
edm::WaitingTaskList* waitingTaskList() { return &waitingTasks_; }

protected:
template <typename O>
friend class workerhelper::CallImpl;
Expand Down
46 changes: 10 additions & 36 deletions FWCore/Framework/src/ProductResolvers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -407,47 +407,21 @@ namespace edm {
}
}

//Need to try modifying prefetchRequested_ before adding to m_waitingTasks
bool expected = false;
bool prefetchRequested = prefetchRequested_.compare_exchange_strong(expected, true);
m_waitingTasks.add(waitTask);

if (worker_ and prefetchRequested) {
//using a waiting task to do a callback guarantees that
// the m_waitingTasks list will be released from waiting even
// if the module does not put this data product or the
// module has an exception while running

auto waiting = make_waiting_task([this](std::exception_ptr const* iException) {
if (nullptr != iException) {
m_waitingTasks.doneWaiting(*iException);
} else {
m_waitingTasks.doneWaiting(std::exception_ptr());
}
});
worker_->callWhenDoneAsync(WaitingTaskHolder(*waitTask.group(), waiting));
if (waitingTasks_) {
// using a waiting task to do a callback guarantees that the
// waitingTasks_ list (from the worker) will be released from
// waiting even if the module does not put this data product
// or the module has an exception while running
waitingTasks_->add(waitTask);
}
}
}

void PuttableProductResolver::putProduct(std::unique_ptr<WrapperBase> edp) const {
ProducedProductResolver::putProduct(std::move(edp));
bool expected = false;
if (prefetchRequested_.compare_exchange_strong(expected, true)) {
m_waitingTasks.doneWaiting(std::exception_ptr());
}
}

void PuttableProductResolver::resetProductData_(bool deleteEarly) {
if (not deleteEarly) {
prefetchRequested_ = false;
m_waitingTasks.reset();
}
DataManagingProductResolver::resetProductData_(deleteEarly);
}

void PuttableProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
worker_ = iConfigure.findWorker(branchDescription().moduleLabel());
auto worker = iConfigure.findWorker(branchDescription().moduleLabel());
if (worker) {
waitingTasks_ = worker->waitingTaskList();
}
}

void UnscheduledProductResolver::setupUnscheduled(UnscheduledConfigurator const& iConfigure) {
Expand Down
14 changes: 7 additions & 7 deletions FWCore/Framework/src/ProductResolvers.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ namespace edm {
class PuttableProductResolver : public ProducedProductResolver {
public:
explicit PuttableProductResolver(std::shared_ptr<BranchDescription const> bd)
: ProducedProductResolver(bd, ProductStatus::NotPut), worker_(nullptr), prefetchRequested_(false) {}
: ProducedProductResolver(bd, ProductStatus::NotPut) {}

void setupUnscheduled(UnscheduledConfigurator const&) final;

Expand All @@ -218,12 +218,12 @@ namespace edm {
ModuleCallingContext const* mcc) const override;
bool unscheduledWasNotRun_() const override { return false; }

void putProduct(std::unique_ptr<WrapperBase> edp) const override;
void resetProductData_(bool deleteEarly) override;

CMS_THREAD_SAFE mutable WaitingTaskList m_waitingTasks;
Worker* worker_;
mutable std::atomic<bool> prefetchRequested_;
// The WaitingTaskList below is the one of the worker, if one
// corresponds to this ProductResolver. For the Source-like cases
// where there is no such Worker, the tasks depending on the data
// depending on this ProductResolver are assumed to be eligible to
// run immediately after their prefetch.
WaitingTaskList* waitingTasks_ = nullptr;
};

class UnscheduledProductResolver : public ProducedProductResolver {
Expand Down

0 comments on commit f88c583

Please sign in to comment.