Skip to content

Commit

Permalink
Added code to handle ConditionalTasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Dr15Jones committed Mar 22, 2022
1 parent a92cf9e commit 76938a5
Show file tree
Hide file tree
Showing 6 changed files with 354 additions and 5 deletions.
4 changes: 4 additions & 0 deletions DataFormats/Provenance/interface/ProductResolverIndexHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ namespace edm {
// If the TypeID for the wrapped type is already available,
// it is faster to call getContainedTypeFromWrapper directly.
TypeID getContainedType(TypeID const& typeID);

bool typeIsViewCompatible(TypeID const& requestedViewType,
TypeID const& wrappedtypeID,
std::string const& className);
} // namespace productholderindexhelper

class ProductResolverIndexHelper {
Expand Down
26 changes: 26 additions & 0 deletions DataFormats/Provenance/src/ProductResolverIndexHelper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,32 @@ namespace edm {
TypeID const wrappedTypeID = TypeID(wrappedType.typeInfo());
return getContainedTypeFromWrapper(wrappedTypeID, className);
}

bool typeIsViewCompatible(TypeID const& requestedViewType,
TypeID const& wrappedtypeID,
std::string const& className) {
auto elementType = getContainedTypeFromWrapper(wrappedtypeID, className);
if (elementType == TypeID(typeid(void)) or elementType == TypeID()) {
//the wrapped type is not a container
return false;
}
if (elementType == requestedViewType) {
return true;
}
//need to check for inheritance match
std::vector<std::string> missingDictionaries;
std::vector<TypeWithDict> baseTypes;
if (!public_base_classes(missingDictionaries, elementType, baseTypes)) {
return false;
}
for (auto const& base : baseTypes) {
if (TypeID(base.typeInfo()) == requestedViewType) {
return true;
}
}
return false;
}

} // namespace productholderindexhelper

ProductResolverIndexHelper::ProductResolverIndexHelper()
Expand Down
15 changes: 15 additions & 0 deletions FWCore/Framework/interface/StreamSchedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,21 @@ namespace edm {

void reportSkipped(EventPrincipal const& ep) const;

struct AliasInfo {
std::string friendlyClassName;
std::string instanceLabel;
std::string originalInstanceLabel;
std::string originalModuleLabel;
};
std::vector<Worker*> tryToPlaceConditionalModules(
Worker*,
std::unordered_set<std::string>& conditionalModules,
std::multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
std::multimap<std::string, AliasInfo> const& aliasMap,
ParameterSet& proc_pset,
ProductRegistry& preg,
PreallocationConfiguration const* prealloc,
std::shared_ptr<ProcessConfiguration const> processConfiguration);
void fillWorkers(ParameterSet& proc_pset,
ProductRegistry& preg,
PreallocationConfiguration const* prealloc,
Expand Down
224 changes: 219 additions & 5 deletions FWCore/Framework/src/StreamSchedule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "DataFormats/Provenance/interface/BranchIDListHelper.h"
#include "DataFormats/Provenance/interface/ProcessConfiguration.h"
#include "DataFormats/Provenance/interface/ProductRegistry.h"
#include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
#include "FWCore/Framework/src/OutputModuleDescription.h"
#include "FWCore/Framework/interface/TriggerNamesService.h"
#include "FWCore/Framework/src/TriggerReport.h"
Expand All @@ -21,6 +22,7 @@
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "FWCore/ParameterSet/interface/Registry.h"
#include "FWCore/ServiceRegistry/interface/PathContext.h"
#include "FWCore/Reflection/interface/DictionaryTools.h"
#include "FWCore/Utilities/interface/Algorithms.h"
#include "FWCore/Utilities/interface/ConvertException.h"
#include "FWCore/Utilities/interface/ExceptionCollector.h"
Expand All @@ -36,6 +38,7 @@
#include <list>
#include <map>
#include <exception>
#include <unordered_set>

namespace edm {

Expand Down Expand Up @@ -378,6 +381,154 @@ namespace edm {
}
}

static Worker* getWorker(std::string const& moduleLabel,
ParameterSet& proc_pset,
WorkerManager& workerManager,
ProductRegistry& preg,
PreallocationConfiguration const* prealloc,
std::shared_ptr<ProcessConfiguration const> processConfiguration) {
bool isTracked;
ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
if (modpset == nullptr) {
return nullptr;
}
assert(isTracked);

return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
}

std::vector<Worker*> StreamSchedule::tryToPlaceConditionalModules(
Worker* worker,
std::unordered_set<std::string>& conditionalModules,
std::multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
std::multimap<std::string, AliasInfo> const& aliasMap,
ParameterSet& proc_pset,
ProductRegistry& preg,
PreallocationConfiguration const* prealloc,
std::shared_ptr<ProcessConfiguration const> processConfiguration) {
std::vector<Worker*> returnValue;
auto const& consumesInfo = worker->consumesInfo();
auto moduleLabel = worker->description()->moduleLabel();
using namespace productholderindexhelper;
for (auto const& ci : consumesInfo) {
if (not ci.skipCurrentProcess() and
(ci.process().empty() or ci.process() == processConfiguration->processName())) {
auto productModuleLabel = ci.label();
if (productModuleLabel.empty()) {
for (auto const& branch : conditionalModuleBranches) {
if (ci.kindOfType() == edm::PRODUCT_TYPE) {
if (branch.second->unwrappedTypeID() != ci.type()) {
continue;
}
} else {
if (not typeIsViewCompatible(
ci.type(), TypeID(branch.second->wrappedType().typeInfo()), branch.second->className())) {
continue;
}
}

auto condWorker =
getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
assert(condWorker);

conditionalModules.erase(productModuleLabel);

auto dependents = tryToPlaceConditionalModules(condWorker,
conditionalModules,
conditionalModuleBranches,
aliasMap,
proc_pset,
preg,
prealloc,
processConfiguration);
returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
returnValue.push_back(condWorker);
}
} else {
//just a regular consumes
bool productFromConditionalModule = false;
auto itFound = conditionalModules.find(productModuleLabel);
if (itFound == conditionalModules.end()) {
//Check to see if this was an alias
auto findAlias = aliasMap.equal_range(productModuleLabel);
if (findAlias.first != findAlias.second) {
for (auto it = findAlias.first; it != findAlias.second; ++it) {
//this was previously filtered so only the conditional modules remain
productModuleLabel = it->second.originalModuleLabel;
if (it->second.friendlyClassName == "*" or
(ci.type().friendlyClassName() == it->second.friendlyClassName)) {
if (it->second.instanceLabel == "*" or ci.instance() == it->second.instanceLabel) {
productFromConditionalModule = true;
//need to check the rest of the data product info
break;
}
} else if (ci.kindOfType() == ELEMENT_TYPE) {
//consume is a View so need to do more intrusive search
if (it->second.instanceLabel == "*" or ci.instance() == it->second.instanceLabel) {
//find matching branches in module
auto branches = conditionalModuleBranches.equal_range(productModuleLabel);
for (auto itBranch = branches.first; itBranch != branches.second; ++it) {
if (it->second.originalInstanceLabel == "*" or
itBranch->second->productInstanceName() == it->second.originalInstanceLabel) {
if (typeIsViewCompatible(ci.type(),
TypeID(itBranch->second->wrappedType().typeInfo()),
itBranch->second->className())) {
productFromConditionalModule = true;
break;
}
}
}
}
}
}
}
itFound = conditionalModules.find(productModuleLabel);
} else {
//need to check the rest of the data product info
auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel);
for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) {
if (itBranch->second->productInstanceName() == ci.instance()) {
if (ci.kindOfType() == PRODUCT_TYPE) {
if (ci.type() == itBranch->second->unwrappedTypeID()) {
productFromConditionalModule = true;
break;
}
} else {
//this is a view
if (typeIsViewCompatible(ci.type(),
TypeID(itBranch->second->wrappedType().typeInfo()),
itBranch->second->className())) {
productFromConditionalModule = true;
break;
}
}
}
}
}
if (productFromConditionalModule) {
auto condWorker =
getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
assert(condWorker);

conditionalModules.erase(itFound);

auto dependents = tryToPlaceConditionalModules(condWorker,
conditionalModules,
conditionalModuleBranches,
aliasMap,
proc_pset,
preg,
prealloc,
processConfiguration);
returnValue.insert(returnValue.end(), dependents.begin(), dependents.end());
returnValue.push_back(condWorker);
}
}
}
}
return returnValue;
}

void StreamSchedule::fillWorkers(ParameterSet& proc_pset,
ProductRegistry& preg,
PreallocationConfiguration const* prealloc,
Expand All @@ -389,6 +540,61 @@ namespace edm {
vstring modnames = proc_pset.getParameter<vstring>(pathName);
PathWorkers tmpworkers;

//Pull out ConditionalTask modules
auto itCondBegin = std::find(modnames.begin(), modnames.end(), "#");

std::unordered_set<std::string> conditionalmods;
//need to capture
std::multimap<std::string, AliasInfo> aliasMap;
std::multimap<std::string, edm::BranchDescription const*> conditionalModsBranches;
if (itCondBegin != modnames.end()) {
//the last entry should be ignored since it is required to be "@"
conditionalmods = std::unordered_set<std::string>(
std::make_move_iterator(itCondBegin + 1), std::make_move_iterator(modnames.begin() + modnames.size() - 1));

for (auto const& cond : conditionalmods) {
//force the creation of the conditional modules so alias check can work
(void)getWorker(cond, proc_pset, workerManager_, preg, prealloc, processConfiguration);
}
//find aliases
{
auto aliases = proc_pset.getParameter<std::vector<std::string>>("@all_aliases");
std::string const star("*");
for (auto const& alias : aliases) {
auto info = proc_pset.getParameter<edm::ParameterSet>(alias);
auto aliasedToModuleLabels = info.getParameterNames();
for (auto const& mod : aliasedToModuleLabels) {
if (not mod.empty() and mod[0] != '@' and conditionalmods.find(mod) != conditionalmods.end()) {
auto aliasPSet = proc_pset.getParameter<edm::ParameterSet>(mod);
std::string type = star;
std::string instance = star;
std::string originalInstance = star;
if (aliasPSet.exists("type")) {
type = aliasPSet.getParameter<std::string>("type");
}
if (aliasPSet.exists("toProductInstance")) {
instance = aliasPSet.getParameter<std::string>("toProductInstance");
}
if (aliasPSet.exists("fromProductInstance")) {
originalInstance = aliasPSet.getParameter<std::string>("fromProductInstance");
}

aliasMap.emplace(alias, AliasInfo{type, instance, originalInstance, mod});
}
}
}
}
{
//find branches created by the conditional modules
for (auto const& prod : preg.productList()) {
if (conditionalmods.find(prod.first.moduleLabel()) != conditionalmods.end()) {
conditionalModsBranches.emplace(prod.first.moduleLabel(), &prod.second);
}
}
}
}
modnames.erase(itCondBegin, modnames.end());

unsigned int placeInPath = 0;
for (auto const& name : modnames) {
//Modules except EDFilters are set to run concurrently by default
Expand All @@ -409,9 +615,8 @@ namespace edm {
moduleLabel.erase(0, 1);
}

bool isTracked;
ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
if (modpset == nullptr) {
Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration);
if (worker == nullptr) {
std::string pathType("endpath");
if (!search_all(endPathNames, pathName)) {
pathType = std::string("path");
Expand All @@ -420,9 +625,7 @@ namespace edm {
<< "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName
<< "\"\n please check spelling or remove that label from the path.";
}
assert(isTracked);

Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel);
if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) {
// We have a filter on an end path, and the filter is not explicitly ignored.
// See if the filter is allowed.
Expand All @@ -442,6 +645,17 @@ namespace edm {
if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) {
runConcurrently = false;
}

//TODO: call consumesInfo and see if need any modules from conditionalmods
// call module's typeLabelList function to see what it produces
// consume many has blank module label so need to check type -> what about Views?
auto condModules = tryToPlaceConditionalModules(
worker, conditionalmods, conditionalModsBranches, aliasMap, proc_pset, preg, prealloc, processConfiguration);
for (auto condMod : condModules) {
tmpworkers.emplace_back(condMod, WorkerInPath::Ignore, placeInPath, true);
++placeInPath;
}

tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently);
++placeInPath;
}
Expand Down
8 changes: 8 additions & 0 deletions FWCore/Framework/test/BuildFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -404,4 +404,12 @@
<test name="testFWCoreFrameworkBadScheduleException5" command="cmsRun ${LOCALTOP}/src/FWCore/Framework/test/test_bad_schedule_exception_message_cfg.py 5; grep -v 'Fatal Exception' test_bad_schedule_5.log | diff -q ${LOCALTOP}/src/FWCore/Framework/test/unit_test_outputs/test_bad_schedule_5.log -"/>
<test name="testFWCoreFrameworkBadScheduleException6" command="cmsRun ${LOCALTOP}/src/FWCore/Framework/test/test_bad_schedule_exception_message_cfg.py 6; grep -v 'Fatal Exception' test_bad_schedule_6.log | diff -q ${LOCALTOP}/src/FWCore/Framework/test/unit_test_outputs/test_bad_schedule_6.log -"/>
<test name="testFWCoreFrameworkBadScheduleException7" command="cmsRun ${LOCALTOP}/src/FWCore/Framework/test/test_bad_schedule_exception_message_cfg.py 7; grep -v 'Fatal Exception' test_bad_schedule_7.log | diff -q ${LOCALTOP}/src/FWCore/Framework/test/unit_test_outputs/test_bad_schedule_7.log -"/>
<test name="testFWCoreFrameworkConditionalTask" command="cmsRun ${LOCALTOP}/src/FWCore/Framework/test/test_conditionaltasks_cfg.py --"/>
<test name="testFWCoreFrameworkConditionalTask_filterSucceeds" command="cmsRun ${LOCALTOP}/src/FWCore/Framework/test/test_conditionaltasks_cfg.py -- --filterSucceeds"/>
<test name="testFWCoreFrameworkConditionalTask_reverseDependencies" command="cmsRun ${LOCALTOP}/src/FWCore/Framework/test/test_conditionaltasks_cfg.py -- --reverseDependencies"/>
<test name="testFWCoreFrameworkConditionalTask_testAlias" command="cmsRun ${LOCALTOP}/src/FWCore/Framework/test/test_conditionaltasks_cfg.py -- --testAlias"/>
<test name="testFWCoreFrameworkConditionalTask_testAlias_aliasWithStar" command="cmsRun ${LOCALTOP}/src/FWCore/Framework/test/test_conditionaltasks_cfg.py -- --testAlias --aliasWithStar"/>
<test name="testFWCoreFrameworkConditionalTask_testView" command="cmsRun ${LOCALTOP}/src/FWCore/Framework/test/test_conditionaltasks_cfg.py -- --testView"/>
<test name="testFWCoreFrameworkConditionalTask_testView_testAlias" command="cmsRun ${LOCALTOP}/src/FWCore/Framework/test/test_conditionaltasks_cfg.py -- --testView --testAlias"/>
<test name="testFWCoreFrameworkConditionalTask_testView_testAlias_aliasWithStar" command="cmsRun ${LOCALTOP}/src/FWCore/Framework/test/test_conditionaltasks_cfg.py -- --testView --testAlias --aliasWithStar"/>
<test name="testFWCoreFrameworkModuleSynchLumiBoundary" command="run_module_synch_lumiboundary.sh"/>
Loading

0 comments on commit 76938a5

Please sign in to comment.