diff --git a/DataFormats/Provenance/interface/ProductProvenanceRetriever.h b/DataFormats/Provenance/interface/ProductProvenanceRetriever.h index 265cf7cbfae0f..877b1774e7dd4 100644 --- a/DataFormats/Provenance/interface/ProductProvenanceRetriever.h +++ b/DataFormats/Provenance/interface/ProductProvenanceRetriever.h @@ -10,11 +10,14 @@ ProductProvenanceRetriever: Manages the per event/lumi/run per product provenanc #include "DataFormats/Provenance/interface/ProductProvenance.h" #include "DataFormats/Provenance/interface/ProcessHistoryID.h" #include "FWCore/Utilities/interface/propagate_const.h" +#include "FWCore/Utilities/interface/Likely.h" +#include "FWCore/Utilities/interface/thread_safety_macros.h" -#include "tbb/concurrent_unordered_set.h" +#include #include #include #include +#include /* ProductProvenanceRetriever @@ -24,6 +27,7 @@ namespace edm { class ProvenanceReaderBase; class WaitingTask; class ModuleCallingContext; + class ProductRegistry; struct ProductProvenanceHasher { size_t operator()(ProductProvenance const& tid) const { return tid.branchID().id(); } @@ -52,6 +56,7 @@ namespace edm { class ProductProvenanceRetriever { public: explicit ProductProvenanceRetriever(unsigned int iTransitionIndex); + ProductProvenanceRetriever(unsigned int iTransitionIndex, edm::ProductRegistry const&); explicit ProductProvenanceRetriever(std::unique_ptr reader); ProductProvenanceRetriever& operator=(ProductProvenanceRetriever const&) = delete; @@ -74,12 +79,60 @@ namespace edm { void readProvenanceAsync(WaitingTask* task, ModuleCallingContext const* moduleCallingContext) const; + void update(edm::ProductRegistry const&); + + class ProducedProvenanceInfo { + public: + ProducedProvenanceInfo(BranchID iBid) : provenance_{iBid}, isParentageSet_{false} {} + ProducedProvenanceInfo(ProducedProvenanceInfo&&) = default; + ProducedProvenanceInfo(ProducedProvenanceInfo const& iOther) : provenance_{iOther.provenance_.branchID()} { + bool isSet = iOther.isParentageSet_.load(std::memory_order_acquire); + if (isSet) { + provenance_.set(iOther.provenance_.parentageID()); + } + isParentageSet_.store(isSet, std::memory_order_release); + } + + ProducedProvenanceInfo& operator=(ProducedProvenanceInfo&&) = default; + ProducedProvenanceInfo& operator=(ProducedProvenanceInfo const& iOther) { + bool isSet = iOther.isParentageSet_.load(std::memory_order_acquire); + if (isSet) { + provenance_ = iOther.provenance_; + } else { + provenance_ = ProductProvenance(iOther.provenance_.branchID()); + } + isParentageSet_.store(isSet, std::memory_order_release); + return *this; + } + + ProductProvenance const* productProvenance() const noexcept { + if (LIKELY(isParentageSet())) { + return &provenance_; + } + return nullptr; + } + BranchID branchID() const noexcept { return provenance_.branchID(); } + + bool isParentageSet() const noexcept { return isParentageSet_.load(std::memory_order_acquire); } + + void threadsafe_set(ParentageID id) const { + provenance_.set(std::move(id)); + isParentageSet_.store(true, std::memory_order_release); + } + + void resetParentage() { isParentageSet_.store(false, std::memory_order_release); } + + private: + CMS_THREAD_GUARD(isParentageSet_) mutable ProductProvenance provenance_; + mutable std::atomic isParentageSet_; + }; + private: void readProvenance() const; void setTransitionIndex(unsigned int transitionIndex) { transitionIndex_ = transitionIndex; } + void setupEntryInfoSet(edm::ProductRegistry const&); - mutable tbb::concurrent_unordered_set - entryInfoSet_; + std::vector entryInfoSet_; mutable std::atomic*> readEntryInfoSet_; edm::propagate_const> nextRetriever_; edm::propagate_const parentProcessRetriever_; diff --git a/DataFormats/Provenance/src/ProductProvenanceRetriever.cc b/DataFormats/Provenance/src/ProductProvenanceRetriever.cc index 08427f891f4db..c05a4bab16329 100644 --- a/DataFormats/Provenance/src/ProductProvenanceRetriever.cc +++ b/DataFormats/Provenance/src/ProductProvenanceRetriever.cc @@ -1,4 +1,5 @@ #include "DataFormats/Provenance/interface/ProductProvenanceRetriever.h" +#include "DataFormats/Provenance/interface/ProductRegistry.h" #include "FWCore/Utilities/interface/EDMException.h" #include @@ -18,6 +19,17 @@ namespace edm { provenanceReader_(), transitionIndex_(iTransitionIndex) {} + ProductProvenanceRetriever::ProductProvenanceRetriever(unsigned int iTransitionIndex, + edm::ProductRegistry const& iReg) + : entryInfoSet_(), + readEntryInfoSet_(), + nextRetriever_(), + parentProcessRetriever_(nullptr), + provenanceReader_(), + transitionIndex_(iTransitionIndex) { + setupEntryInfoSet(iReg); + } + ProductProvenanceRetriever::ProductProvenanceRetriever(std::unique_ptr reader) : entryInfoSet_(), readEntryInfoSet_(), @@ -41,6 +53,26 @@ namespace edm { } } + void ProductProvenanceRetriever::setupEntryInfoSet(edm::ProductRegistry const& iReg) { + std::set ids; + for (auto const& p : iReg.productList()) { + if (p.second.branchType() == edm::InEvent) { + if (p.second.produced() or p.second.isProvenanceSetOnRead()) { + ids.insert(p.second.branchID()); + } + } + } + entryInfoSet_.reserve(ids.size()); + for (auto const& b : ids) { + entryInfoSet_.emplace_back(b); + } + } + + void ProductProvenanceRetriever::update(edm::ProductRegistry const& iReg) { + entryInfoSet_.clear(); + setupEntryInfoSet(iReg); + } + void ProductProvenanceRetriever::readProvenanceAsync(WaitingTask* task, ModuleCallingContext const* moduleCallingContext) const { if (provenanceReader_ and nullptr == readEntryInfoSet_.load()) { @@ -63,7 +95,7 @@ namespace edm { readEntryInfoSet_ = nullptr; } } - entryInfoSet_ = iFrom.entryInfoSet_; + assert(iFrom.entryInfoSet_.empty()); provenanceReader_ = iFrom.provenanceReader_; if (iFrom.nextRetriever_) { @@ -77,7 +109,9 @@ namespace edm { void ProductProvenanceRetriever::reset() { delete readEntryInfoSet_.load(); readEntryInfoSet_ = nullptr; - entryInfoSet_.clear(); + for (auto& e : entryInfoSet_) { + e.resetParentage(); + } if (nextRetriever_) { nextRetriever_->reset(); } @@ -89,7 +123,18 @@ namespace edm { // provenance when someone tries to access it not when doing the insert // doing the delay saves 20% of time when doing an analysis job //readProvenance(); - entryInfoSet_.insert(std::move(entryInfo)); + auto itFound = + std::lower_bound(entryInfoSet_.begin(), + entryInfoSet_.end(), + entryInfo.branchID(), + [](auto const& iEntry, edm::BranchID const& iValue) { return iEntry.branchID() < iValue; }); + if + UNLIKELY(itFound == entryInfoSet_.end() or itFound->branchID() != entryInfo.branchID()) { + throw edm::Exception(edm::errors::LogicError) + << "ProductProvenanceRetriever::insertIntoSet passed a BranchID " << entryInfo.branchID().id() + << " that has not been pre-registered"; + } + itFound->threadsafe_set(entryInfo.moveParentageID()); } void ProductProvenanceRetriever::mergeProvenanceRetrievers(std::shared_ptr other) { @@ -101,42 +146,51 @@ namespace edm { } ProductProvenance const* ProductProvenanceRetriever::branchIDToProvenance(BranchID const& bid) const { - ProductProvenance ei(bid); - auto it = entryInfoSet_.find(ei); - if (it == entryInfoSet_.end()) { - if (parentProcessRetriever_) { - return parentProcessRetriever_->branchIDToProvenance(bid); + auto itFound = std::lower_bound( + entryInfoSet_.begin(), entryInfoSet_.end(), bid, [](auto const& iEntry, edm::BranchID const& iValue) { + return iEntry.branchID() < iValue; + }); + if (itFound != entryInfoSet_.end() and itFound->branchID() == bid) { + if (auto p = itFound->productProvenance()) { + return p; } - //check in source - readProvenance(); - auto ptr = readEntryInfoSet_.load(); - if (ptr) { - auto itRead = ptr->find(ei); - if (itRead != ptr->end()) { - return &*itRead; - } - } - if (nextRetriever_) { - return nextRetriever_->branchIDToProvenance(bid); + } + if (parentProcessRetriever_) { + return parentProcessRetriever_->branchIDToProvenance(bid); + } + //check in source + readProvenance(); + auto ptr = readEntryInfoSet_.load(); + if (ptr) { + ProductProvenance ei(bid); + auto itRead = ptr->find(ei); + if (itRead != ptr->end()) { + return &*itRead; } - return nullptr; } - return &*it; + if (nextRetriever_) { + return nextRetriever_->branchIDToProvenance(bid); + } + return nullptr; } ProductProvenance const* ProductProvenanceRetriever::branchIDToProvenanceForProducedOnly(BranchID const& bid) const { - ProductProvenance ei(bid); - auto it = entryInfoSet_.find(ei); - if (it == entryInfoSet_.end()) { - if (parentProcessRetriever_) { - return parentProcessRetriever_->branchIDToProvenanceForProducedOnly(bid); - } - if (nextRetriever_) { - return nextRetriever_->branchIDToProvenanceForProducedOnly(bid); + auto itFound = std::lower_bound( + entryInfoSet_.begin(), entryInfoSet_.end(), bid, [](auto const& iEntry, edm::BranchID const& iValue) { + return iEntry.branchID() < iValue; + }); + if (itFound != entryInfoSet_.end() and itFound->branchID() == bid) { + if (auto p = itFound->productProvenance()) { + return p; } - return nullptr; } - return &*it; + if (parentProcessRetriever_) { + return parentProcessRetriever_->branchIDToProvenanceForProducedOnly(bid); + } + if (nextRetriever_) { + return nextRetriever_->branchIDToProvenanceForProducedOnly(bid); + } + return nullptr; } ProvenanceReaderBase::~ProvenanceReaderBase() {} diff --git a/FWCore/Framework/interface/EventPrincipal.h b/FWCore/Framework/interface/EventPrincipal.h index b2838e4c9a9da..12acbc7f3b8d6 100644 --- a/FWCore/Framework/interface/EventPrincipal.h +++ b/FWCore/Framework/interface/EventPrincipal.h @@ -152,6 +152,7 @@ namespace edm { edm::ThinnedAssociation const* getThinnedAssociation(edm::BranchID const& branchID) const; unsigned int transitionIndex_() const override; + void changedIndexes_() final; std::shared_ptr provRetrieverPtr() const { return get_underlying_safe(provRetrieverPtr_); diff --git a/FWCore/Framework/interface/Principal.h b/FWCore/Framework/interface/Principal.h index eae9e2c0ca751..0888bb7e3b816 100644 --- a/FWCore/Framework/interface/Principal.h +++ b/FWCore/Framework/interface/Principal.h @@ -218,6 +218,9 @@ namespace edm { } private: + //called by adjustIndexesAfterProductRegistryAddition only if an index actually changed + virtual void changedIndexes_() {} + void addScheduledProduct(std::shared_ptr bd); void addSourceProduct(std::shared_ptr bd); void addInputProduct(std::shared_ptr bd); diff --git a/FWCore/Framework/src/EventPrincipal.cc b/FWCore/Framework/src/EventPrincipal.cc index 97cacc7709d51..ca27d26429a7e 100644 --- a/FWCore/Framework/src/EventPrincipal.cc +++ b/FWCore/Framework/src/EventPrincipal.cc @@ -37,7 +37,7 @@ namespace edm { : Base(reg, reg->productLookup(InEvent), pc, InEvent, historyAppender, isForPrimaryProcess), aux_(), luminosityBlockPrincipal_(nullptr), - provRetrieverPtr_(new ProductProvenanceRetriever(streamIndex)), + provRetrieverPtr_(new ProductProvenanceRetriever(streamIndex, *reg)), eventSelectionIDs_(), branchIDListHelper_(branchIDListHelper), thinnedAssociationsHelper_(thinnedAssociationsHelper), @@ -216,6 +216,8 @@ namespace edm { unsigned int EventPrincipal::transitionIndex_() const { return streamID_.value(); } + void EventPrincipal::changedIndexes_() { provRetrieverPtr_->update(productRegistry()); } + static void throwProductDeletedException(ProductID const& pid, edm::EventPrincipal::ConstProductResolverPtr const phb) { ProductDeletedException exception; diff --git a/FWCore/Framework/src/Principal.cc b/FWCore/Framework/src/Principal.cc index 3318003f396a5..a5b33fc5833f5 100644 --- a/FWCore/Framework/src/Principal.cc +++ b/FWCore/Framework/src/Principal.cc @@ -885,6 +885,7 @@ namespace edm { void Principal::adjustIndexesAfterProductRegistryAddition() { if (preg_->getNextIndexValue(branchType_) != productResolvers_.size()) { + bool changed = false; productResolvers_.resize(preg_->getNextIndexValue(branchType_)); for (auto const& prod : preg_->productList()) { BranchDescription const& bd = prod.second; @@ -896,9 +897,13 @@ namespace edm { assert(!bd.produced()); auto cbd = std::make_shared(bd); addInputProduct(cbd); + changed = true; } } } + if (changed) { + changedIndexes_(); + } } assert(preg_->getNextIndexValue(branchType_) == productResolvers_.size()); }