Skip to content

Commit

Permalink
Faster ProductProvenanceRetriever
Browse files Browse the repository at this point in the history
Setup at construction the data structure used to hold all possible BranchIDs that can be called using insertIntoSet. The individual std::vector elements can each be written to by different threads safely. As the structure is re-used, any later changes to the ProductRegistry require updating the structure.
  • Loading branch information
Dr15Jones committed Dec 3, 2019
1 parent bdaea93 commit a9e7916
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 35 deletions.
59 changes: 56 additions & 3 deletions DataFormats/Provenance/interface/ProductProvenanceRetriever.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ ProductProvenanceRetriever: Manages the per event/lumi/run per product provenanc
#include "DataFormats/Provenance/interface/ProductProvenance.h"
#include "DataFormats/Provenance/interface/ProcessHistoryID.h"
#include "FWCore/Utilities/interface/propagate_const.h"
#include "FWCore/Utilities/interface/Likely.h"
#include "FWCore/Utilities/interface/thread_safety_macros.h"

#include "tbb/concurrent_unordered_set.h"
#include <vector>
#include <memory>
#include <set>
#include <atomic>
#include <string_view>

/*
ProductProvenanceRetriever
Expand All @@ -24,6 +27,7 @@ namespace edm {
class ProvenanceReaderBase;
class WaitingTask;
class ModuleCallingContext;
class ProductRegistry;

struct ProductProvenanceHasher {
size_t operator()(ProductProvenance const& tid) const { return tid.branchID().id(); }
Expand Down Expand Up @@ -52,6 +56,7 @@ namespace edm {
class ProductProvenanceRetriever {
public:
explicit ProductProvenanceRetriever(unsigned int iTransitionIndex);
ProductProvenanceRetriever(unsigned int iTransitionIndex, edm::ProductRegistry const&);
explicit ProductProvenanceRetriever(std::unique_ptr<ProvenanceReaderBase> reader);

ProductProvenanceRetriever& operator=(ProductProvenanceRetriever const&) = delete;
Expand All @@ -74,12 +79,60 @@ namespace edm {

void readProvenanceAsync(WaitingTask* task, ModuleCallingContext const* moduleCallingContext) const;

void update(edm::ProductRegistry const&);

class ProducedProvenanceInfo {
public:
ProducedProvenanceInfo(BranchID iBid) : provenance_{iBid}, isParentageSet_{false} {}
ProducedProvenanceInfo(ProducedProvenanceInfo&&) = default;
ProducedProvenanceInfo(ProducedProvenanceInfo const& iOther) : provenance_{iOther.provenance_.branchID()} {
bool isSet = iOther.isParentageSet_.load(std::memory_order_acquire);
if (isSet) {
provenance_.set(iOther.provenance_.parentageID());
}
isParentageSet_.store(isSet, std::memory_order_release);
}

ProducedProvenanceInfo& operator=(ProducedProvenanceInfo&&) = default;
ProducedProvenanceInfo& operator=(ProducedProvenanceInfo const& iOther) {
bool isSet = iOther.isParentageSet_.load(std::memory_order_acquire);
if (isSet) {
provenance_ = iOther.provenance_;
} else {
provenance_ = ProductProvenance(iOther.provenance_.branchID());
}
isParentageSet_.store(isSet, std::memory_order_release);
return *this;
}

ProductProvenance const* productProvenance() const noexcept {
if (LIKELY(isParentageSet())) {
return &provenance_;
}
return nullptr;
}
BranchID branchID() const noexcept { return provenance_.branchID(); }

bool isParentageSet() const noexcept { return isParentageSet_.load(std::memory_order_acquire); }

void threadsafe_set(ParentageID id) const {
provenance_.set(std::move(id));
isParentageSet_.store(true, std::memory_order_release);
}

void resetParentage() { isParentageSet_.store(false, std::memory_order_release); }

private:
CMS_THREAD_GUARD(isParentageSet_) mutable ProductProvenance provenance_;
mutable std::atomic<bool> isParentageSet_;
};

private:
void readProvenance() const;
void setTransitionIndex(unsigned int transitionIndex) { transitionIndex_ = transitionIndex; }
void setupEntryInfoSet(edm::ProductRegistry const&);

mutable tbb::concurrent_unordered_set<ProductProvenance, ProductProvenanceHasher, ProductProvenanceEqual>
entryInfoSet_;
std::vector<ProducedProvenanceInfo> entryInfoSet_;
mutable std::atomic<const std::set<ProductProvenance>*> readEntryInfoSet_;
edm::propagate_const<std::shared_ptr<ProductProvenanceRetriever>> nextRetriever_;
edm::propagate_const<ProductProvenanceRetriever const*> parentProcessRetriever_;
Expand Down
116 changes: 85 additions & 31 deletions DataFormats/Provenance/src/ProductProvenanceRetriever.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "DataFormats/Provenance/interface/ProductProvenanceRetriever.h"
#include "DataFormats/Provenance/interface/ProductRegistry.h"
#include "FWCore/Utilities/interface/EDMException.h"

#include <cassert>
Expand All @@ -18,6 +19,17 @@ namespace edm {
provenanceReader_(),
transitionIndex_(iTransitionIndex) {}

ProductProvenanceRetriever::ProductProvenanceRetriever(unsigned int iTransitionIndex,
edm::ProductRegistry const& iReg)
: entryInfoSet_(),
readEntryInfoSet_(),
nextRetriever_(),
parentProcessRetriever_(nullptr),
provenanceReader_(),
transitionIndex_(iTransitionIndex) {
setupEntryInfoSet(iReg);
}

ProductProvenanceRetriever::ProductProvenanceRetriever(std::unique_ptr<ProvenanceReaderBase> reader)
: entryInfoSet_(),
readEntryInfoSet_(),
Expand All @@ -41,6 +53,26 @@ namespace edm {
}
}

void ProductProvenanceRetriever::setupEntryInfoSet(edm::ProductRegistry const& iReg) {
std::set<BranchID> ids;
for (auto const& p : iReg.productList()) {
if (p.second.branchType() == edm::InEvent) {
if (p.second.produced() or p.second.isProvenanceSetOnRead()) {
ids.insert(p.second.branchID());
}
}
}
entryInfoSet_.reserve(ids.size());
for (auto const& b : ids) {
entryInfoSet_.emplace_back(b);
}
}

void ProductProvenanceRetriever::update(edm::ProductRegistry const& iReg) {
entryInfoSet_.clear();
setupEntryInfoSet(iReg);
}

void ProductProvenanceRetriever::readProvenanceAsync(WaitingTask* task,
ModuleCallingContext const* moduleCallingContext) const {
if (provenanceReader_ and nullptr == readEntryInfoSet_.load()) {
Expand All @@ -63,7 +95,7 @@ namespace edm {
readEntryInfoSet_ = nullptr;
}
}
entryInfoSet_ = iFrom.entryInfoSet_;
assert(iFrom.entryInfoSet_.empty());
provenanceReader_ = iFrom.provenanceReader_;

if (iFrom.nextRetriever_) {
Expand All @@ -77,7 +109,9 @@ namespace edm {
void ProductProvenanceRetriever::reset() {
delete readEntryInfoSet_.load();
readEntryInfoSet_ = nullptr;
entryInfoSet_.clear();
for (auto& e : entryInfoSet_) {
e.resetParentage();
}
if (nextRetriever_) {
nextRetriever_->reset();
}
Expand All @@ -89,7 +123,18 @@ namespace edm {
// provenance when someone tries to access it not when doing the insert
// doing the delay saves 20% of time when doing an analysis job
//readProvenance();
entryInfoSet_.insert(std::move(entryInfo));
auto itFound =
std::lower_bound(entryInfoSet_.begin(),
entryInfoSet_.end(),
entryInfo.branchID(),
[](auto const& iEntry, edm::BranchID const& iValue) { return iEntry.branchID() < iValue; });
if
UNLIKELY(itFound == entryInfoSet_.end() or itFound->branchID() != entryInfo.branchID()) {
throw edm::Exception(edm::errors::LogicError)
<< "ProductProvenanceRetriever::insertIntoSet passed a BranchID " << entryInfo.branchID().id()
<< " that has not been pre-registered";
}
itFound->threadsafe_set(entryInfo.moveParentageID());
}

void ProductProvenanceRetriever::mergeProvenanceRetrievers(std::shared_ptr<ProductProvenanceRetriever> other) {
Expand All @@ -101,42 +146,51 @@ namespace edm {
}

ProductProvenance const* ProductProvenanceRetriever::branchIDToProvenance(BranchID const& bid) const {
ProductProvenance ei(bid);
auto it = entryInfoSet_.find(ei);
if (it == entryInfoSet_.end()) {
if (parentProcessRetriever_) {
return parentProcessRetriever_->branchIDToProvenance(bid);
auto itFound = std::lower_bound(
entryInfoSet_.begin(), entryInfoSet_.end(), bid, [](auto const& iEntry, edm::BranchID const& iValue) {
return iEntry.branchID() < iValue;
});
if (itFound != entryInfoSet_.end() and itFound->branchID() == bid) {
if (auto p = itFound->productProvenance()) {
return p;
}
//check in source
readProvenance();
auto ptr = readEntryInfoSet_.load();
if (ptr) {
auto itRead = ptr->find(ei);
if (itRead != ptr->end()) {
return &*itRead;
}
}
if (nextRetriever_) {
return nextRetriever_->branchIDToProvenance(bid);
}
if (parentProcessRetriever_) {
return parentProcessRetriever_->branchIDToProvenance(bid);
}
//check in source
readProvenance();
auto ptr = readEntryInfoSet_.load();
if (ptr) {
ProductProvenance ei(bid);
auto itRead = ptr->find(ei);
if (itRead != ptr->end()) {
return &*itRead;
}
return nullptr;
}
return &*it;
if (nextRetriever_) {
return nextRetriever_->branchIDToProvenance(bid);
}
return nullptr;
}

ProductProvenance const* ProductProvenanceRetriever::branchIDToProvenanceForProducedOnly(BranchID const& bid) const {
ProductProvenance ei(bid);
auto it = entryInfoSet_.find(ei);
if (it == entryInfoSet_.end()) {
if (parentProcessRetriever_) {
return parentProcessRetriever_->branchIDToProvenanceForProducedOnly(bid);
}
if (nextRetriever_) {
return nextRetriever_->branchIDToProvenanceForProducedOnly(bid);
auto itFound = std::lower_bound(
entryInfoSet_.begin(), entryInfoSet_.end(), bid, [](auto const& iEntry, edm::BranchID const& iValue) {
return iEntry.branchID() < iValue;
});
if (itFound != entryInfoSet_.end() and itFound->branchID() == bid) {
if (auto p = itFound->productProvenance()) {
return p;
}
return nullptr;
}
return &*it;
if (parentProcessRetriever_) {
return parentProcessRetriever_->branchIDToProvenanceForProducedOnly(bid);
}
if (nextRetriever_) {
return nextRetriever_->branchIDToProvenanceForProducedOnly(bid);
}
return nullptr;
}

ProvenanceReaderBase::~ProvenanceReaderBase() {}
Expand Down
1 change: 1 addition & 0 deletions FWCore/Framework/interface/EventPrincipal.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ namespace edm {
edm::ThinnedAssociation const* getThinnedAssociation(edm::BranchID const& branchID) const;

unsigned int transitionIndex_() const override;
void changedIndexes_() final;

std::shared_ptr<ProductProvenanceRetriever const> provRetrieverPtr() const {
return get_underlying_safe(provRetrieverPtr_);
Expand Down
3 changes: 3 additions & 0 deletions FWCore/Framework/interface/Principal.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ namespace edm {
}

private:
//called by adjustIndexesAfterProductRegistryAddition only if an index actually changed
virtual void changedIndexes_() {}

void addScheduledProduct(std::shared_ptr<BranchDescription const> bd);
void addSourceProduct(std::shared_ptr<BranchDescription const> bd);
void addInputProduct(std::shared_ptr<BranchDescription const> bd);
Expand Down
4 changes: 3 additions & 1 deletion FWCore/Framework/src/EventPrincipal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace edm {
: Base(reg, reg->productLookup(InEvent), pc, InEvent, historyAppender, isForPrimaryProcess),
aux_(),
luminosityBlockPrincipal_(nullptr),
provRetrieverPtr_(new ProductProvenanceRetriever(streamIndex)),
provRetrieverPtr_(new ProductProvenanceRetriever(streamIndex, *reg)),
eventSelectionIDs_(),
branchIDListHelper_(branchIDListHelper),
thinnedAssociationsHelper_(thinnedAssociationsHelper),
Expand Down Expand Up @@ -216,6 +216,8 @@ namespace edm {

unsigned int EventPrincipal::transitionIndex_() const { return streamID_.value(); }

void EventPrincipal::changedIndexes_() { provRetrieverPtr_->update(productRegistry()); }

static void throwProductDeletedException(ProductID const& pid,
edm::EventPrincipal::ConstProductResolverPtr const phb) {
ProductDeletedException exception;
Expand Down
5 changes: 5 additions & 0 deletions FWCore/Framework/src/Principal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ namespace edm {

void Principal::adjustIndexesAfterProductRegistryAddition() {
if (preg_->getNextIndexValue(branchType_) != productResolvers_.size()) {
bool changed = false;
productResolvers_.resize(preg_->getNextIndexValue(branchType_));
for (auto const& prod : preg_->productList()) {
BranchDescription const& bd = prod.second;
Expand All @@ -896,9 +897,13 @@ namespace edm {
assert(!bd.produced());
auto cbd = std::make_shared<BranchDescription const>(bd);
addInputProduct(cbd);
changed = true;
}
}
}
if (changed) {
changedIndexes_();
}
}
assert(preg_->getNextIndexValue(branchType_) == productResolvers_.size());
}
Expand Down

0 comments on commit a9e7916

Please sign in to comment.