From 622cdc87cb4dbee3aa7cf182165570875b707186 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Wed, 9 Mar 2022 10:31:10 -0600 Subject: [PATCH 01/10] Improve testing modules - added fillDescriptions - fixed a bug when filter not supposed to produce a data product - allow testing for missing data products --- FWCore/Framework/test/stubs/ToyAnalyzers.cc | 21 +++++++++++++++---- .../Framework/test/stubs/ToyIntProducers.cc | 15 ++++++++++++- FWCore/Framework/test/stubs/ToyModules.cc | 4 +++- 3 files changed, 34 insertions(+), 6 deletions(-) diff --git a/FWCore/Framework/test/stubs/ToyAnalyzers.cc b/FWCore/Framework/test/stubs/ToyAnalyzers.cc index 3eb4624b6abdb..7a594376770d9 100644 --- a/FWCore/Framework/test/stubs/ToyAnalyzers.cc +++ b/FWCore/Framework/test/stubs/ToyAnalyzers.cc @@ -50,29 +50,42 @@ namespace edmtest { public: IntTestAnalyzer(edm::ParameterSet const& iPSet) : value_(iPSet.getUntrackedParameter("valueMustMatch")), - token_(consumes(iPSet.getUntrackedParameter("moduleLabel"))) {} + token_(consumes(iPSet.getUntrackedParameter("moduleLabel"))), + missing_(iPSet.getUntrackedParameter("valueMustBeMissing")) {} static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) { edm::ParameterSetDescription desc; desc.addUntracked("valueMustMatch"); desc.addUntracked("moduleLabel"); + desc.addUntracked("valueMustBeMissing", false); descriptions.addDefault(desc); } void analyze(edm::StreamID, edm::Event const& iEvent, edm::EventSetup const&) const { - auto const& prod = iEvent.get(token_); - if (prod.value != value_) { + auto const& prod = iEvent.getHandle(token_); + if (missing_) { + if (prod.isValid()) { + edm::ProductLabels labels; + labelsForToken(token_, labels); + throw cms::Exception("ValueNotMissing") + << "The value for \"" << labels.module << ":" << labels.productInstance << ":" << labels.process + << "\" is being produced, which is not expected."; + } + return; + } + if (prod->value != value_) { edm::ProductLabels labels; labelsForToken(token_, labels); throw cms::Exception("ValueMismatch") << "The value for \"" << labels.module << ":" << labels.productInstance << ":" << labels.process << "\" is " - << prod.value << " but it was supposed to be " << value_; + << prod->value << " but it was supposed to be " << value_; } } private: int const value_; edm::EDGetTokenT const token_; + bool const missing_; }; //-------------------------------------------------------------------- diff --git a/FWCore/Framework/test/stubs/ToyIntProducers.cc b/FWCore/Framework/test/stubs/ToyIntProducers.cc index 12b8602403e53..c5934bf12b7b8 100644 --- a/FWCore/Framework/test/stubs/ToyIntProducers.cc +++ b/FWCore/Framework/test/stubs/ToyIntProducers.cc @@ -119,6 +119,12 @@ namespace edmtest { : token_{produces()}, value_(p.getParameter("ivalue")) {} void produce(edm::Event& e, edm::EventSetup const& c) override; + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + edm::ParameterSetDescription desc; + desc.add("ivalue"); + descriptions.addDefault(desc); + } + private: edm::EDPutTokenT token_; int value_; @@ -373,7 +379,7 @@ namespace edmtest { explicit AddIntsProducer(edm::ParameterSet const& p) : putToken_{produces()}, otherPutToken_{produces("other")}, - onlyGetOnEvent_(p.getUntrackedParameter("onlyGetOnEvent", 0u)) { + onlyGetOnEvent_(p.getUntrackedParameter("onlyGetOnEvent")) { auto const& labels = p.getParameter>("labels"); for (auto const& label : labels) { tokens_.emplace_back(consumes(label)); @@ -381,6 +387,13 @@ namespace edmtest { } void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override; + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + edm::ParameterSetDescription desc; + desc.addUntracked("onlyGetOnEvent", 0u); + desc.add>("labels"); + descriptions.addDefault(desc); + } + private: std::vector> tokens_; const edm::EDPutTokenT putToken_; diff --git a/FWCore/Framework/test/stubs/ToyModules.cc b/FWCore/Framework/test/stubs/ToyModules.cc index 1b5837106a0e9..59b42641cde55 100644 --- a/FWCore/Framework/test/stubs/ToyModules.cc +++ b/FWCore/Framework/test/stubs/ToyModules.cc @@ -378,7 +378,9 @@ namespace edmtest { if (product.value < threshold_) { return false; } - iEvent.emplace(putToken_, product); + if (shouldProduce_) { + iEvent.emplace(putToken_, product); + } return true; } From a92cf9e7b19040911bf09fab6b28e17c341a33c6 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Thu, 10 Mar 2022 10:20:18 -0600 Subject: [PATCH 02/10] Added ConditionalTask object --- FWCore/ParameterSet/python/Config.py | 483 ++++++++++++++++- FWCore/ParameterSet/python/SequenceTypes.py | 501 +++++++++++++++--- .../ParameterSet/python/SequenceVisitors.py | 7 +- 3 files changed, 893 insertions(+), 98 deletions(-) diff --git a/FWCore/ParameterSet/python/Config.py b/FWCore/ParameterSet/python/Config.py index 0533e1bc96af9..7247ee9f527fc 100644 --- a/FWCore/ParameterSet/python/Config.py +++ b/FWCore/ParameterSet/python/Config.py @@ -123,6 +123,7 @@ def __init__(self,name,*Mods): self.__dict__['_Process__finalpaths'] = DictTypes.SortedKeysDict() # of definition self.__dict__['_Process__sequences'] = {} self.__dict__['_Process__tasks'] = {} + self.__dict__['_Process__conditionaltasks'] = {} self.__dict__['_Process__services'] = {} self.__dict__['_Process__essources'] = {} self.__dict__['_Process__esproducers'] = {} @@ -304,6 +305,10 @@ def tasks_(self): """returns a dict of the tasks that have been added to the Process""" return DictTypes.FixedKeysDict(self.__tasks) tasks = property(tasks_,doc="dictionary containing the tasks for the process") + def conditionaltasks_(self): + """returns a dict of the conditionaltasks that have been added to the Process""" + return DictTypes.FixedKeysDict(self.__conditionaltasks) + conditionaltasks = property(conditionaltasks_,doc="dictionary containing the conditionatasks for the process") def schedule_(self): """returns the schedule that has been added to the Process or None if none have been added""" return self.__schedule @@ -410,6 +415,9 @@ def __setattr__(self,name,value): +"an instance of "+str(type(value))+" will not work - requested label is "+name) if not isinstance(value,_Labelable) and not isinstance(value,Source) and not isinstance(value,Looper) and not isinstance(value,Schedule): if name == value.type_(): + if hasattr(self,name) and (getattr(self,name)!=value): + self._replaceInTasks(name, value) + self._replaceInConditionalTasks(name, value) # Only Services get handled here self.add_(value) return @@ -446,6 +454,7 @@ def __setattr__(self,name,value): if newValue._isTaskComponent(): if not self.__InExtendCall: self._replaceInTasks(name, newValue) + self._replaceInConditionalTasks(name, newValue) self._replaceInSchedule(name, newValue) else: if not isinstance(newValue, Task): @@ -464,8 +473,10 @@ def __setattr__(self,name,value): if s is not None: raise ValueError(msg1+s.label_()+msg2) - if isinstance(newValue, _Sequenceable) or newValue._isTaskComponent(): + if isinstance(newValue, _Sequenceable) or newValue._isTaskComponent() or isinstance(newValue, ConditionalTask): if not self.__InExtendCall: + if isinstance(newValue, ConditionalTask): + self._replaceInConditionalTasks(name, newValue) self._replaceInSequences(name, newValue) else: #should check to see if used in sequence before complaining @@ -566,7 +577,7 @@ def __delattr__(self,name): self._delHelper(name) obj = getattr(self,name) if not obj is None: - if not isinstance(obj, Sequence) and not isinstance(obj, Task): + if not isinstance(obj, Sequence) and not isinstance(obj, Task) and not isinstance(obj,ConditionalTask): # For modules, ES modules and services we can also remove # the deleted object from Sequences, Paths, EndPaths, and # Tasks. Note that for Sequences and Tasks that cannot be done @@ -578,6 +589,7 @@ def __delattr__(self,name): # has been checked that the deleted Sequence is not used). if obj._isTaskComponent(): self._replaceInTasks(name, None) + self._replaceInConditionalTasks(name, None) self._replaceInSchedule(name, None) if isinstance(obj, _Sequenceable) or obj._isTaskComponent(): self._replaceInSequences(name, None) @@ -605,7 +617,6 @@ def add_(self,value): if not isinstance(value,_Unlabelable): raise TypeError #clone the item - #clone the item if self.__isStrict: newValue =value.copy() value.setIsFrozen() @@ -685,6 +696,9 @@ def _placeESSource(self,name,mod): def _placeTask(self,name,task): self._validateTask(task, name) self._place(name, task, self.__tasks) + def _placeConditionalTask(self,name,task): + self._validateConditionalTask(task, name) + self._place(name, task, self.__conditionaltasks) def _placeAlias(self,name,mod): self._place(name, mod, self.__aliases) def _placePSet(self,name,mod): @@ -740,7 +754,7 @@ def extend(self,other,items=()): self.__setattr__(name,item) elif isinstance(item,_ModuleSequenceType): seqs[name]=item - elif isinstance(item,Task): + elif isinstance(item,Task) or isinstance(item, ConditionalTask): tasksToAttach[name] = item elif isinstance(item,_Labelable): self.__setattr__(name,item) @@ -913,8 +927,8 @@ def _validateSequence(self, sequence, label): l = set() visitor = NodeNameVisitor(l) sequence.visit(visitor) - except: - raise RuntimeError("An entry in sequence "+label + ' has no label') + except Exception as e: + raise RuntimeError("An entry in sequence {} has no label\n Seen entries: {}\n Error: {}".format(label, l, e)) def _validateTask(self, task, label): # See if every module and service has been inserted into the process @@ -924,6 +938,14 @@ def _validateTask(self, task, label): task.visit(visitor) except: raise RuntimeError("An entry in task " + label + ' has not been attached to the process') + def _validateConditionalTask(self, task, label): + # See if every module and service has been inserted into the process + try: + l = set() + visitor = NodeNameVisitor(l) + task.visit(visitor) + except: + raise RuntimeError("An entry in task " + label + ' has not been attached to the process') def _itemsInDependencyOrder(self, processDictionaryOfItems): # The items can be Sequences or Tasks and the input @@ -939,6 +961,8 @@ def _itemsInDependencyOrder(self, processDictionaryOfItems): containedItems = [] if isinstance(item, Task): v = TaskVisitor(containedItems) + elif isinstance(item, ConditionalTask): + v = ConditionalTaskVisitor(containedItems) else: v = SequenceVisitor(containedItems) try: @@ -947,6 +971,9 @@ def _itemsInDependencyOrder(self, processDictionaryOfItems): if isinstance(item, Task): raise RuntimeError("Failed in a Task visitor. Probably " \ "a circular dependency discovered in Task with label " + label) + elif isinstance(item, ConditionalTask): + raise RuntimeError("Failed in a ConditionalTask visitor. Probably " \ + "a circular dependency discovered in ConditionalTask with label " + label) else: raise RuntimeError("Failed in a Sequence visitor. Probably a " \ "circular dependency discovered in Sequence with label " + label) @@ -963,6 +990,10 @@ def _itemsInDependencyOrder(self, processDictionaryOfItems): raise RuntimeError("Task has a label, but using its label to get an attribute" \ " from the process yields a different object or None\n"+ "label = " + containedItem.label_()) + if isinstance(item, ConditionalTask): + raise RuntimeError("ConditionalTask has a label, but using its label to get an attribute" \ + " from the process yields a different object or None\n"+ + "label = " + containedItem.label_()) else: raise RuntimeError("Sequence has a label, but using its label to get an attribute" \ " from the process yields a different object or None\n"+ @@ -1017,6 +1048,7 @@ def dumpPython(self, options=PrintOptions()): result+=self._dumpPythonList(self.es_sources_(), options) result+=self._dumpPython(self.es_prefers_(), options) result+=self._dumpPythonList(self._itemsInDependencyOrder(self.tasks), options) + result+=self._dumpPythonList(self._itemsInDependencyOrder(self.conditionaltasks), options) result+=self._dumpPythonList(self._itemsInDependencyOrder(self.sequences), options) result+=self._dumpPythonList(self.paths_(), options) result+=self._dumpPythonList(self.endpaths_(), options) @@ -1122,6 +1154,10 @@ def _replaceInTasks(self, label, new): old = getattr(self,label) for task in self.tasks.values(): task.replace(old, new) + def _replaceInConditionalTasks(self, label, new): + old = getattr(self,label) + for task in self.conditionaltasks.values(): + task.replace(old, new) def _replaceInSchedule(self, label, new): if self.schedule_() == None: return @@ -1245,14 +1281,23 @@ def _insertPaths(self, processPSet, nodeVisitor): endpathValidator = EndPathValidator() decoratedList = [] lister = DecoratedNodeNameVisitor(decoratedList) - pathCompositeVisitor = CompositeVisitor(pathValidator, nodeVisitor, lister) + condTaskModules = [] + constTaskVistor = ModuleNodeOnConditionalTaskVisitor(condTaskModules) + pathCompositeVisitor = CompositeVisitor(pathValidator, nodeVisitor, lister, constTaskVistor) endpathCompositeVisitor = CompositeVisitor(endpathValidator, nodeVisitor, lister) for triggername in triggerPaths: iPath = self.paths_()[triggername] iPath.resolve(self.__dict__) pathValidator.setLabel(triggername) lister.initialize() + condTaskModules[:] = [] iPath.visit(pathCompositeVisitor) + if condTaskModules: + decoratedList.append("#") + l = list({x.label_() for x in condTaskModules}) + l.sort() + decoratedList.extend(l) + decoratedList.append("@") iPath.insertInto(processPSet, triggername, decoratedList) for endpathname in endpaths: if endpathname is not endPathWithFinalPathModulesName: @@ -1785,6 +1830,8 @@ def _toReplaceWith(toObj,fromObj): toObj._tasks = fromObj._tasks elif isinstance(fromObj,Task): toObj._collection = fromObj._collection + elif isinstance(fromObj,ConditionalTask): + toObj._collection = fromObj._collection elif isinstance(fromObj,_Parameterizable): #clear old items just incase fromObj is not a complete superset of toObj for p in toObj.parameterNames_(): @@ -2261,6 +2308,14 @@ def __init__(self,*arg,**args): self.assertRaises(ValueError, p.extend, FromArg(a = EDAlias())) self.assertRaises(ValueError, p.__setattr__, "a", EDAlias()) + p = Process('test') + p.a = EDProducer("MyProducer") + p.t = ConditionalTask(p.a) + p.p = Path(p.t) + self.assertRaises(ValueError, p.extend, FromArg(a = EDProducer("YourProducer"))) + self.assertRaises(ValueError, p.extend, FromArg(a = EDAlias())) + self.assertRaises(ValueError, p.__setattr__, "a", EDAlias()) + p = Process('test') p.a = EDProducer("MyProducer") p.s = Sequence(p.a) @@ -2497,6 +2552,41 @@ def testProcessDumpPython(self): process.p = cms.Path(process.a) process.p2 = cms.Path(process.r, process.task1, process.task2) process.schedule = cms.Schedule(*[ process.p2, process.p ], tasks=[process.task3, process.task4, process.task5])""") + # include some conditional tasks + p = Process("test") + p.a = EDAnalyzer("MyAnalyzer") + p.b = EDProducer("bProducer") + p.c = EDProducer("cProducer") + p.d = EDProducer("dProducer") + p.e = EDProducer("eProducer") + p.f = EDProducer("fProducer") + p.g = EDProducer("gProducer") + p.task5 = Task() + p.task3 = Task() + p.task2 = ConditionalTask(p.c, p.task3) + p.task1 = ConditionalTask(p.task5) + p.p = Path(p.a) + s = Sequence(p.a) + p.r = Sequence(s) + p.p2 = Path(p.r, p.task1, p.task2) + p.schedule = Schedule(p.p2,p.p,tasks=[p.task5]) + d=p.dumpPython() + self.assertEqual(_lineDiff(d,Process("test").dumpPython()), +"""process.b = cms.EDProducer("bProducer") +process.c = cms.EDProducer("cProducer") +process.d = cms.EDProducer("dProducer") +process.e = cms.EDProducer("eProducer") +process.f = cms.EDProducer("fProducer") +process.g = cms.EDProducer("gProducer") +process.a = cms.EDAnalyzer("MyAnalyzer") +process.task5 = cms.Task() +process.task3 = cms.Task() +process.task2 = cms.ConditionalTask(process.c, process.task3) +process.task1 = cms.ConditionalTask(process.task5) +process.r = cms.Sequence((process.a)) +process.p = cms.Path(process.a) +process.p2 = cms.Path(process.r, process.task1, process.task2) +process.schedule = cms.Schedule(*[ process.p2, process.p ], tasks=[process.task5])""") # only tasks p = Process("test") p.d = EDProducer("dProducer") @@ -2565,12 +2655,13 @@ def testGlobalReplace(self): t4 = Task(p.d) t5 = Task(p.d) t6 = Task(p.d) + p.ct1 = ConditionalTask(p.d) s = Sequence(p.a*p.b) - p.s4 = Sequence(p.a*p.b) + p.s4 = Sequence(p.a*p.b, p.ct1) s.associate(t2) p.s4.associate(t2) p.p = Path(p.c+s+p.a) - p.p2 = Path(p.c+p.s4+p.a) + p.p2 = Path(p.c+p.s4+p.a, p.ct1) p.e3 = EndPath(p.c+s+p.a) new = EDAnalyzer("NewAnalyzer") new2 = EDProducer("NewProducer") @@ -2587,14 +2678,14 @@ def testGlobalReplace(self): visitor_p2 = NodeVisitor() p.p2.visit(visitor_p2) self.assertTrue(visitor_p2.modules == set([new,new2,p.b,p.c])) - self.assertEqual(p.p2.dumpPython()[:-1], "cms.Path(process.c+process.s4+process.a)") + self.assertEqual(p.p2.dumpPython()[:-1], "cms.Path(process.c+process.s4+process.a, process.ct1)") visitor3 = NodeVisitor() p.e3.visit(visitor3) self.assertTrue(visitor3.modules == set([new,new2,p.b,p.c])) visitor4 = NodeVisitor() p.s4.visit(visitor4) self.assertTrue(visitor4.modules == set([new,new2,p.b])) - self.assertEqual(p.s4.dumpPython()[:-1],"cms.Sequence(process.a+process.b, cms.Task(process.d))") + self.assertEqual(p.s4.dumpPython()[:-1],"cms.Sequence(process.a+process.b, cms.Task(process.d), process.ct1)") visitor5 = NodeVisitor() p.t1.visit(visitor5) self.assertTrue(visitor5.modules == set([new2])) @@ -2602,6 +2693,14 @@ def testGlobalReplace(self): listOfTasks = list(p.schedule._tasks) listOfTasks[0].visit(visitor6) self.assertTrue(visitor6.modules == set([new2])) + visitor7 = NodeVisitor() + p.ct1.visit(visitor7) + self.assertTrue(visitor7.modules == set([new2])) + visitor8 = NodeVisitor() + listOfConditionalTasks = list(p.conditionaltasks.values()) + listOfConditionalTasks[0].visit(visitor8) + self.assertTrue(visitor8.modules == set([new2])) + p.d2 = EDProducer("YourProducer") p.schedule = Schedule(p.p, p.p2, p.e3, tasks=[p.t1]) @@ -2663,7 +2762,7 @@ def testTask(self): edproducer9 = EDProducer("b9") edfilter = EDFilter("c") service = Service("d") - service3 = Service("d") + service3 = Service("d", v = untracked.uint32(3)) essource = ESSource("e") esproducer = ESProducer("f") testTask2 = Task() @@ -2848,6 +2947,193 @@ def testTask(self): process.path200.replace(process.g,process.e) self.assertEqual(process.path200.dumpPython(), "cms.EndPath(process.c, cms.Task(process.e))\n") + def testConditionalTask(self): + + # create some objects to use in tests + edanalyzer = EDAnalyzer("a") + edproducer = EDProducer("b") + edproducer2 = EDProducer("b2") + edproducer3 = EDProducer("b3") + edproducer4 = EDProducer("b4") + edproducer8 = EDProducer("b8") + edproducer9 = EDProducer("b9") + edfilter = EDFilter("c") + service = Service("d") + service3 = Service("d", v = untracked.uint32(3)) + essource = ESSource("e") + esproducer = ESProducer("f") + testTask2 = Task() + testCTask2 = ConditionalTask() + + # test adding things to Tasks + testTask1 = ConditionalTask(edproducer, edfilter) + self.assertRaises(RuntimeError, testTask1.add, edanalyzer) + testTask1.add(essource, service) + testTask1.add(essource, esproducer) + testTask1.add(testTask2) + testTask1.add(testCTask2) + coll = testTask1._collection + self.assertTrue(edproducer in coll) + self.assertTrue(edfilter in coll) + self.assertTrue(service in coll) + self.assertTrue(essource in coll) + self.assertTrue(esproducer in coll) + self.assertTrue(testTask2 in coll) + self.assertTrue(testCTask2 in coll) + self.assertTrue(len(coll) == 7) + self.assertTrue(len(testTask2._collection) == 0) + + taskContents = [] + for i in testTask1: + taskContents.append(i) + self.assertEqual(taskContents, [edproducer, edfilter, essource, service, esproducer, testTask2, testCTask2]) + + # test attaching Task to Process + process = Process("test") + + process.mproducer = edproducer + process.mproducer2 = edproducer2 + process.mfilter = edfilter + process.messource = essource + process.mesproducer = esproducer + process.d = service + + testTask3 = ConditionalTask(edproducer, edproducer2) + testTask1.add(testTask3) + process.myTask1 = testTask1 + + # test the validation that occurs when attaching a Task to a Process + # first a case that passes, then one the fails on an EDProducer + # then one that fails on a service + l = set() + visitor = NodeNameVisitor(l) + testTask1.visit(visitor) + self.assertEqual(l, set(['mesproducer', 'mproducer', 'mproducer2', 'mfilter', 'd', 'messource'])) + l2 = testTask1.moduleNames + self.assertEqual(l, set(['mesproducer', 'mproducer', 'mproducer2', 'mfilter', 'd', 'messource'])) + + testTask4 = ConditionalTask(edproducer3) + l.clear() + self.assertRaises(RuntimeError, testTask4.visit, visitor) + try: + process.myTask4 = testTask4 + self.assertTrue(False) + except RuntimeError: + pass + + testTask5 = ConditionalTask(service3) + l.clear() + self.assertRaises(RuntimeError, testTask5.visit, visitor) + try: + process.myTask5 = testTask5 + self.assertTrue(False) + except RuntimeError: + pass + + process.d = service3 + process.myTask5 = testTask5 + + # test placement into the Process and the tasks property + expectedDict = { 'myTask1' : testTask1, 'myTask5' : testTask5 } + expectedFixedDict = DictTypes.FixedKeysDict(expectedDict); + self.assertEqual(process.conditionaltasks, expectedFixedDict) + self.assertEqual(process.conditionaltasks['myTask1'], testTask1) + self.assertEqual(process.myTask1, testTask1) + + # test replacing an EDProducer in a ConditionalTask when calling __settattr__ + # for the EDProducer on the Process. + process.mproducer2 = edproducer4 + process.d = service + l = list() + visitor1 = ModuleNodeVisitor(l) + testTask1.visit(visitor1) + l.sort(key=lambda mod: mod.__str__()) + expectedList = sorted([edproducer,essource,esproducer,service,edfilter,edproducer,edproducer4],key=lambda mod: mod.__str__()) + self.assertEqual(expectedList, l) + process.myTask6 = ConditionalTask() + process.myTask7 = ConditionalTask() + process.mproducer8 = edproducer8 + process.myTask8 = ConditionalTask(process.mproducer8) + process.myTask6.add(process.myTask7) + process.myTask7.add(process.myTask8) + process.myTask1.add(process.myTask6) + process.myTask8.add(process.myTask5) + self.assertEqual(process.myTask8.dumpPython(), "cms.ConditionalTask(process.mproducer8, process.myTask5)\n") + + testDict = process._itemsInDependencyOrder(process.conditionaltasks) + expectedLabels = ["myTask5", "myTask8", "myTask7", "myTask6", "myTask1"] + expectedTasks = [process.myTask5, process.myTask8, process.myTask7, process.myTask6, process.myTask1] + index = 0 + for testLabel, testTask in testDict.items(): + self.assertEqual(testLabel, expectedLabels[index]) + self.assertEqual(testTask, expectedTasks[index]) + index += 1 + + pythonDump = testTask1.dumpPython(PrintOptions()) + + + expectedPythonDump = 'cms.ConditionalTask(process.d, process.mesproducer, process.messource, process.mfilter, process.mproducer, process.mproducer2, process.myTask6)\n' + self.assertEqual(pythonDump, expectedPythonDump) + + process.myTask5 = ConditionalTask() + self.assertEqual(process.myTask8.dumpPython(), "cms.ConditionalTask(process.mproducer8, process.myTask5)\n") + process.myTask100 = ConditionalTask() + process.mproducer9 = edproducer9 + sequence1 = Sequence(process.mproducer8, process.myTask1, process.myTask5, testTask2, testTask3) + sequence2 = Sequence(process.mproducer8 + process.mproducer9) + process.sequence3 = Sequence((process.mproducer8 + process.mfilter)) + sequence4 = Sequence() + process.path1 = Path(process.mproducer+process.mproducer8+sequence1+sequence2+process.sequence3+sequence4) + process.path1.associate(process.myTask1, process.myTask5, testTask2, testTask3) + process.path11 = Path(process.mproducer+process.mproducer8+sequence1+sequence2+process.sequence3+ sequence4,process.myTask1, process.myTask5, testTask2, testTask3, process.myTask100) + process.path2 = Path(process.mproducer) + process.path3 = Path(process.mproducer9+process.mproducer8,testTask2) + + self.assertEqual(process.path1.dumpPython(PrintOptions()), 'cms.Path(process.mproducer+process.mproducer8+cms.Sequence(process.mproducer8, cms.ConditionalTask(process.None, process.mproducer), cms.Task(), process.myTask1, process.myTask5)+(process.mproducer8+process.mproducer9)+process.sequence3, cms.ConditionalTask(process.None, process.mproducer), cms.Task(), process.myTask1, process.myTask5)\n') + + self.assertEqual(process.path11.dumpPython(PrintOptions()), 'cms.Path(process.mproducer+process.mproducer8+cms.Sequence(process.mproducer8, cms.ConditionalTask(process.None, process.mproducer), cms.Task(), process.myTask1, process.myTask5)+(process.mproducer8+process.mproducer9)+process.sequence3, cms.ConditionalTask(process.None, process.mproducer), cms.Task(), process.myTask1, process.myTask100, process.myTask5)\n') + + # test NodeNameVisitor and moduleNames + l = set() + nameVisitor = NodeNameVisitor(l) + process.path1.visit(nameVisitor) + self.assertTrue(l == set(['mproducer', 'd', 'mesproducer', None, 'mproducer9', 'mproducer8', 'messource', 'mproducer2', 'mfilter'])) + self.assertTrue(process.path1.moduleNames() == set(['mproducer', 'd', 'mesproducer', None, 'mproducer9', 'mproducer8', 'messource', 'mproducer2', 'mfilter'])) + + # test copy + process.mproducer10 = EDProducer("b10") + process.path21 = process.path11.copy() + process.path21.replace(process.mproducer, process.mproducer10) + + self.assertEqual(process.path11.dumpPython(PrintOptions()), 'cms.Path(process.mproducer+process.mproducer8+cms.Sequence(process.mproducer8, cms.ConditionalTask(process.None, process.mproducer), cms.Task(), process.myTask1, process.myTask5)+(process.mproducer8+process.mproducer9)+process.sequence3, cms.ConditionalTask(process.None, process.mproducer), cms.Task(), process.myTask1, process.myTask100, process.myTask5)\n') + + # Some peculiarities of the way things work show up here. dumpPython sorts tasks and + # removes duplication at the level of strings. The Task and Sequence objects themselves + # remove duplicate tasks in their contents if the instances are the same (exact same python + # object id which is not the same as the string representation being the same). + # Also note that the mutating visitor replaces sequences and tasks that have + # modified contents with their modified contents, it does not modify the sequence + # or task itself. + self.assertEqual(process.path21.dumpPython(PrintOptions()), 'cms.Path(process.mproducer10+process.mproducer8+process.mproducer8+(process.mproducer8+process.mproducer9)+process.sequence3, cms.ConditionalTask(process.None, process.mproducer10), cms.ConditionalTask(process.d, process.mesproducer, process.messource, process.mfilter, process.mproducer10, process.mproducer2, process.mproducer8, process.myTask5), cms.Task(), process.myTask100, process.myTask5)\n') + + process.path22 = process.path21.copyAndExclude([process.d, process.mesproducer, process.mfilter]) + self.assertEqual(process.path22.dumpPython(PrintOptions()), 'cms.Path(process.mproducer10+process.mproducer8+process.mproducer8+(process.mproducer8+process.mproducer9)+process.mproducer8, cms.ConditionalTask(process.None, process.mproducer10), cms.ConditionalTask(process.messource, process.mproducer10, process.mproducer2, process.mproducer8, process.myTask5), cms.Task(), process.myTask100, process.myTask5)\n') + + process.path23 = process.path22.copyAndExclude([process.messource, process.mproducer10]) + self.assertEqual(process.path23.dumpPython(PrintOptions()), 'cms.Path(process.mproducer8+process.mproducer8+(process.mproducer8+process.mproducer9)+process.mproducer8, cms.ConditionalTask(process.None), cms.ConditionalTask(process.mproducer2, process.mproducer8, process.myTask5), cms.Task(), process.myTask100, process.myTask5)\n') + + process = Process("Test") + + process.b = EDProducer("b") + process.b2 = EDProducer("b2") + process.b3 = EDProducer("b3") + process.p = Path(process.b, ConditionalTask(process.b3, process.b2)) + p = TestMakePSet() + process.fillProcessDesc(p) + self.assertEqual(p.values["@all_modules"], (True, ['b', 'b2', 'b3'])) + self.assertEqual(p.values["@paths"], (True, ['p'])) + self.assertEqual(p.values["p"], (True, ['b','#','b2','b3','@'])) + def testPath(self): p = Process("test") @@ -2873,21 +3159,32 @@ def testPath(self): self.assertRaises(TypeError,Path,p.es) t = Path() - self.assertTrue(t.dumpPython(PrintOptions()) == 'cms.Path()\n') + self.assertEqual(t.dumpPython(PrintOptions()), 'cms.Path()\n') t = Path(p.a) - self.assertTrue(t.dumpPython(PrintOptions()) == 'cms.Path(process.a)\n') + self.assertEqual(t.dumpPython(PrintOptions()), 'cms.Path(process.a)\n') t = Path(Task()) - self.assertTrue(t.dumpPython(PrintOptions()) == 'cms.Path(cms.Task())\n') + self.assertEqual(t.dumpPython(PrintOptions()), 'cms.Path(cms.Task())\n') t = Path(p.a, Task()) - self.assertTrue(t.dumpPython(PrintOptions()) == 'cms.Path(process.a, cms.Task())\n') + self.assertEqual(t.dumpPython(PrintOptions()), 'cms.Path(process.a, cms.Task())\n') p.prod = EDProducer("prodName") p.t1 = Task(p.prod) t = Path(p.a, p.t1, Task(), p.t1) - self.assertTrue(t.dumpPython(PrintOptions()) == 'cms.Path(process.a, cms.Task(), process.t1)\n') + self.assertEqual(t.dumpPython(PrintOptions()), 'cms.Path(process.a, cms.Task(), process.t1)\n') + + t = Path(ConditionalTask()) + self.assertEqual(t.dumpPython(PrintOptions()), 'cms.Path(cms.ConditionalTask())\n') + + t = Path(p.a, ConditionalTask()) + self.assertEqual(t.dumpPython(PrintOptions()), 'cms.Path(process.a, cms.ConditionalTask())\n') + + p.prod = EDProducer("prodName") + p.t1 = ConditionalTask(p.prod) + t = Path(p.a, p.t1, Task(), p.t1) + self.assertEqual(t.dumpPython(PrintOptions()), 'cms.Path(process.a, cms.Task(), process.t1)\n') def testFinalPath(self): p = Process("test") @@ -2919,7 +3216,11 @@ def testFinalPath(self): p.prod = EDProducer("prodName") p.t1 = Task(p.prod) self.assertRaises(TypeError, FinalPath, p.a, p.t1, Task(), p.t1) - + + p.prod = EDProducer("prodName") + p.t1 = ConditionalTask(p.prod) + self.assertRaises(TypeError, FinalPath, p.a, p.t1, ConditionalTask(), p.t1) + p.t = FinalPath(p.a) p.a = OutputModule("ReplacedOutputModule") self.assertEqual(p.t.dumpPython(PrintOptions()), 'cms.FinalPath(process.a)\n') @@ -2959,7 +3260,8 @@ def testContains(self): seq1 = Sequence(e) task1 = Task(g) - path = Path(a * c * seq1, task1) + ctask1 = ConditionalTask(h) + path = Path(a * c * seq1, task1, ctask1) self.assertTrue(path.contains(a)) self.assertFalse(path.contains(b)) @@ -2968,6 +3270,7 @@ def testContains(self): self.assertTrue(path.contains(e)) self.assertFalse(path.contains(f)) self.assertTrue(path.contains(g)) + self.assertTrue(path.contains(h)) endpath = EndPath(h * i) self.assertFalse(endpath.contains(b)) @@ -2998,6 +3301,14 @@ def testContains(self): self.assertTrue(sch.contains(l)) self.assertTrue(sch.contains(m)) + ctask2 = ConditionalTask(l, task1) + ctask = ConditionalTask(j, k, ctask2) + self.assertFalse(ctask.contains(b)) + self.assertTrue(ctask.contains(j)) + self.assertTrue(ctask.contains(k)) + self.assertTrue(ctask.contains(l)) + self.assertTrue(ctask.contains(g)) + def testSchedule(self): p = Process("test") p.a = EDAnalyzer("MyAnalyzer") @@ -3387,11 +3698,15 @@ def testPrune(self): p.e = EDProducer("MyProducer") p.f = EDProducer("YourProducer") p.g = EDProducer("TheirProducer") + p.h = EDProducer("OnesProducer") p.s = Sequence(p.d) p.t1 = Task(p.e) p.t2 = Task(p.f) p.t3 = Task(p.g, p.t1) - p.path1 = Path(p.a, p.t3) + p.ct1 = ConditionalTask(p.h) + p.ct2 = ConditionalTask(p.f) + p.ct3 = ConditionalTask(p.ct1) + p.path1 = Path(p.a, p.t3, p.ct3) p.path2 = Path(p.b) self.assertTrue(p.schedule is None) pths = p.paths @@ -3410,6 +3725,7 @@ def testPrune(self): self.assertTrue(hasattr(p, 'e')) self.assertTrue(not hasattr(p, 'f')) self.assertTrue(hasattr(p, 'g')) + self.assertTrue(hasattr(p, 'h')) self.assertTrue(not hasattr(p, 's')) self.assertTrue(hasattr(p, 't1')) self.assertTrue(not hasattr(p, 't2')) @@ -3431,14 +3747,21 @@ def testPrune(self): p.g = EDProducer("YourProducer") p.h = EDProducer("TheirProducer") p.i = EDProducer("OurProducer") + p.j = EDProducer("OurProducer") + p.k = EDProducer("OurProducer") + p.l = EDProducer("OurProducer") p.t1 = Task(p.f) p.t2 = Task(p.g) p.t3 = Task(p.h) p.t4 = Task(p.i) - p.s = Sequence(p.d, p.t1) - p.s2 = Sequence(p.b, p.t2) + p.ct1 = Task(p.f) + p.ct2 = Task(p.j) + p.ct3 = Task(p.k) + p.ct4 = Task(p.l) + p.s = Sequence(p.d, p.t1, p.ct1) + p.s2 = Sequence(p.b, p.t2, p.ct2) p.s3 = Sequence(p.e) - p.path1 = Path(p.a, p.t3) + p.path1 = Path(p.a, p.t3, p.ct3) p.path2 = Path(p.b) p.path3 = Path(p.b+p.s2) p.path4 = Path(p.b+p.s3) @@ -3458,10 +3781,17 @@ def testPrune(self): self.assertTrue(hasattr(p, 'g')) self.assertTrue(hasattr(p, 'h')) self.assertTrue(hasattr(p, 'i')) + self.assertTrue(hasattr(p, 'j')) + self.assertTrue(hasattr(p, 'k')) + self.assertTrue(not hasattr(p, 'l')) self.assertTrue(not hasattr(p, 't1')) self.assertTrue(hasattr(p, 't2')) self.assertTrue(hasattr(p, 't3')) self.assertTrue(hasattr(p, 't4')) + self.assertTrue(not hasattr(p, 'ct1')) + self.assertTrue(hasattr(p, 'ct2')) + self.assertTrue(hasattr(p, 'ct3')) + self.assertTrue(not hasattr(p, 'ct4')) self.assertTrue(not hasattr(p, 's')) self.assertTrue(hasattr(p, 's2')) self.assertTrue(not hasattr(p, 's3')) @@ -3501,6 +3831,17 @@ def testPrune(self): self.assertTrue(hasattr(p, 'b')) self.assertTrue(hasattr(p, 's')) self.assertTrue(hasattr(p, 'pth')) + #test ConditionalTaskPlaceholder + p = Process("test") + p.a = EDProducer("MyProducer") + p.b = EDProducer("YourProducer") + p.s = ConditionalTask(ConditionalTaskPlaceholder("a"),p.b) + p.pth = Path(p.s) + p.prune() + self.assertTrue(hasattr(p, 'a')) + self.assertTrue(hasattr(p, 'b')) + self.assertTrue(hasattr(p, 's')) + self.assertTrue(hasattr(p, 'pth')) #test unresolved SequencePlaceholder p = Process("test") p.b = EDProducer("YourAnalyzer") @@ -3579,6 +3920,65 @@ def testTaskPlaceholder(self): process.path1 = cms.Path(process.b, process.t2, process.t3) process.endpath1 = cms.EndPath(process.b, process.t5) process.schedule = cms.Schedule(*[ process.path1, process.endpath1 ], tasks=[process.t7, process.t8])""") + def testConditionalTaskPlaceholder(self): + p = Process("test") + p.a = EDProducer("ma") + p.b = EDAnalyzer("mb") + p.t1 = ConditionalTask(ConditionalTaskPlaceholder("c")) + p.t2 = ConditionalTask(p.a, ConditionalTaskPlaceholder("d"), p.t1) + p.t3 = ConditionalTask(ConditionalTaskPlaceholder("e")) + p.path1 = Path(p.b, p.t2, p.t3) + p.t5 = ConditionalTask(p.a, ConditionalTaskPlaceholder("g"), ConditionalTaskPlaceholder("t4")) + p.t4 = ConditionalTask(ConditionalTaskPlaceholder("f")) + p.path2 = Path(p.b, p.t5) + p.schedule = Schedule(p.path1, p.path2) + p.c = EDProducer("mc") + p.d = EDProducer("md") + p.e = EDProducer("me") + p.f = EDProducer("mf") + p.g = EDProducer("mg") + p.h = EDProducer("mh") + p.i = EDProducer("mi") + p.j = EDProducer("mj") + self.assertEqual(_lineDiff(p.dumpPython(),Process('test').dumpPython()), +"""process.a = cms.EDProducer("ma") +process.c = cms.EDProducer("mc") +process.d = cms.EDProducer("md") +process.e = cms.EDProducer("me") +process.f = cms.EDProducer("mf") +process.g = cms.EDProducer("mg") +process.h = cms.EDProducer("mh") +process.i = cms.EDProducer("mi") +process.j = cms.EDProducer("mj") +process.b = cms.EDAnalyzer("mb") +process.t1 = cms.ConditionalTask(cms.ConditionalTaskPlaceholder("c")) +process.t2 = cms.ConditionalTask(cms.ConditionalTaskPlaceholder("d"), process.a, process.t1) +process.t3 = cms.ConditionalTask(cms.ConditionalTaskPlaceholder("e")) +process.t5 = cms.ConditionalTask(cms.ConditionalTaskPlaceholder("g"), cms.ConditionalTaskPlaceholder("t4"), process.a) +process.t4 = cms.ConditionalTask(cms.ConditionalTaskPlaceholder("f")) +process.path1 = cms.Path(process.b, process.t2, process.t3) +process.path2 = cms.Path(process.b, process.t5) +process.schedule = cms.Schedule(*[ process.path1, process.path2 ])""") + p.resolve() + self.assertEqual(_lineDiff(p.dumpPython(),Process('test').dumpPython()), +"""process.a = cms.EDProducer("ma") +process.c = cms.EDProducer("mc") +process.d = cms.EDProducer("md") +process.e = cms.EDProducer("me") +process.f = cms.EDProducer("mf") +process.g = cms.EDProducer("mg") +process.h = cms.EDProducer("mh") +process.i = cms.EDProducer("mi") +process.j = cms.EDProducer("mj") +process.b = cms.EDAnalyzer("mb") +process.t1 = cms.ConditionalTask(process.c) +process.t2 = cms.ConditionalTask(process.a, process.d, process.t1) +process.t3 = cms.ConditionalTask(process.e) +process.t4 = cms.ConditionalTask(process.f) +process.t5 = cms.ConditionalTask(process.a, process.g, process.t4) +process.path1 = cms.Path(process.b, process.t2, process.t3) +process.path2 = cms.Path(process.b, process.t5) +process.schedule = cms.Schedule(*[ process.path1, process.path2 ])""") def testDelete(self): p = Process("test") @@ -3594,12 +3994,17 @@ def testDelete(self): t2 = Task(p.g, p.h) t3 = Task(p.g, p.h) p.t4 = Task(p.h) + p.ct1 = ConditionalTask(p.g, p.h) + ct2 = ConditionalTask(p.g, p.h) + ct3 = ConditionalTask(p.g, p.h) + p.ct4 = ConditionalTask(p.h) p.s = Sequence(p.d+p.e) - p.path1 = Path(p.a+p.f+p.s,t2) + p.path1 = Path(p.a+p.f+p.s,t2,ct2) p.path2 = Path(p.a) + p.path3 = Path(ct3, p.ct4) p.endpath2 = EndPath(p.b) p.endpath1 = EndPath(p.b+p.f) - p.schedule = Schedule(p.path2, p.endpath2, tasks=[t3, p.t4]) + p.schedule = Schedule(p.path2, p.path3, p.endpath2, tasks=[t3, p.t4]) self.assertTrue(hasattr(p, 'f')) self.assertTrue(hasattr(p, 'g')) del p.e @@ -3608,13 +4013,17 @@ def testDelete(self): self.assertFalse(hasattr(p, 'f')) self.assertFalse(hasattr(p, 'g')) self.assertEqual(p.t1.dumpPython(), 'cms.Task(process.h)\n') + self.assertEqual(p.ct1.dumpPython(), 'cms.ConditionalTask(process.h)\n') self.assertEqual(p.s.dumpPython(), 'cms.Sequence(process.d)\n') - self.assertEqual(p.path1.dumpPython(), 'cms.Path(process.a+process.s, cms.Task(process.h))\n') + self.assertEqual(p.path1.dumpPython(), 'cms.Path(process.a+process.s, cms.ConditionalTask(process.h), cms.Task(process.h))\n') self.assertEqual(p.endpath1.dumpPython(), 'cms.EndPath(process.b)\n') + self.assertEqual(p.path3.dumpPython(), 'cms.Path(cms.ConditionalTask(process.h), process.ct4)\n') del p.s - self.assertEqual(p.path1.dumpPython(), 'cms.Path(process.a+(process.d), cms.Task(process.h))\n') - self.assertEqual(p.schedule_().dumpPython(), 'cms.Schedule(*[ process.path2, process.endpath2 ], tasks=[cms.Task(process.h), process.t4])\n') + self.assertEqual(p.path1.dumpPython(), 'cms.Path(process.a+(process.d), cms.ConditionalTask(process.h), cms.Task(process.h))\n') + self.assertEqual(p.schedule_().dumpPython(), 'cms.Schedule(*[ process.path2, process.path3, process.endpath2 ], tasks=[cms.Task(process.h), process.t4])\n') del p.path2 + self.assertEqual(p.schedule_().dumpPython(), 'cms.Schedule(*[ process.path3, process.endpath2 ], tasks=[cms.Task(process.h), process.t4])\n') + del p.path3 self.assertEqual(p.schedule_().dumpPython(), 'cms.Schedule(*[ process.endpath2 ], tasks=[cms.Task(process.h), process.t4])\n') del p.endpath2 self.assertEqual(p.schedule_().dumpPython(), 'cms.Schedule(tasks=[cms.Task(process.h), process.t4])\n') @@ -3837,6 +4246,7 @@ def __init__(self): p.a =EDAnalyzer("MyAnalyzer", fred = int32(1)) m1.toReplaceWith(p.a, EDAnalyzer("YourAnalyzer", wilma = int32(3))) self.assertRaises(TypeError, lambda: m1.toReplaceWith(p.a, EDProducer("YourProducer"))) + #Task p.b =EDAnalyzer("BAn") p.c =EDProducer("c") p.d =EDProducer("d") @@ -3851,6 +4261,23 @@ def __init__(self): p.e =EDProducer("e") m1.toReplaceWith(p.td, Task(p.e)) self.assertTrue(p.td._collection == OrderedSet([p.e])) + #ConditionalTask + p.b =EDAnalyzer("BAn") + p.c =EDProducer("c") + p.d =EDProducer("d") + del p.tc + del p.td + p.tc = ConditionalTask(p.c) + p.td = ConditionalTask(p.d) + p.s = Sequence(p.a, p.tc) + m1.toReplaceWith(p.s, Sequence(p.a+p.b, p.td)) + self.assertEqual(p.a.wilma.value(),3) + self.assertEqual(p.a.type_(),"YourAnalyzer") + self.assertEqual(hasattr(p,"fred"),False) + self.assertTrue(p.s.dumpPython() == "cms.Sequence(process.a+process.b, process.td)\n") + p.e =EDProducer("e") + m1.toReplaceWith(p.td, ConditionalTask(p.e)) + self.assertTrue(p.td._collection == OrderedSet([p.e])) #check toReplaceWith doesn't activate not chosen m1 = Modifier() p = Process("test") diff --git a/FWCore/ParameterSet/python/SequenceTypes.py b/FWCore/ParameterSet/python/SequenceTypes.py index 55204fbbc2dd5..b32d91ac1ef42 100644 --- a/FWCore/ParameterSet/python/SequenceTypes.py +++ b/FWCore/ParameterSet/python/SequenceTypes.py @@ -238,13 +238,19 @@ def findDirectDependencies(element, collection,sortByType=True): continue t = 'sequences' # cms.Task - elif isinstance(item, Task): + elif isinstance(item, _Task): if not item.hasLabel_(): dependencies += item.directDependencies(sortByType) continue t = 'tasks' + # cms.ConditionalTask + elif isinstance(item, ConditionalTask): + if not item.hasLabel_(): + dependencies += item.directDependencies(sortByType) + continue + t = 'conditionaltasks' # SequencePlaceholder and TaskPlaceholder do not add an explicit dependency - elif isinstance(item, (SequencePlaceholder, TaskPlaceholder)): + elif isinstance(item, (SequencePlaceholder, TaskPlaceholder, ConditionalTaskPlaceholder)): continue # unsupported elements else: @@ -262,7 +268,7 @@ class _ModuleSequenceType(_ConfigureComponent, _Labelable): def __init__(self,*arg, **argv): self.__dict__["_isFrozen"] = False self._seq = None - if (len(arg) > 1 and not isinstance(arg[1], Task)) or (len(arg) > 0 and not isinstance(arg[0],_Sequenceable) and not isinstance(arg[0],Task)): + if (len(arg) > 1 and not isinstance(arg[1], _TaskBase)) or (len(arg) > 0 and not isinstance(arg[0],_Sequenceable) and not isinstance(arg[0],_TaskBase)): typename = format_typename(self) msg = format_outerframe(2) msg += "The %s constructor takes zero or one sequenceable argument followed by zero or more arguments of type Task. But the following types are given:\n" %typename @@ -287,7 +293,7 @@ def __init__(self,*arg, **argv): self.associate(*tasks) def associate(self,*tasks): for task in tasks: - if not isinstance(task, Task): + if not isinstance(task, _TaskBase): raise TypeError("associate only works with objects of type Task") self._tasks.add(task) def isFrozen(self): @@ -444,7 +450,7 @@ def replace(self, original, replacement): # where objects that contain other objects are involved. See the comments # for the _MutatingSequenceVisitor. - if isinstance(original,Task) != isinstance(replacement,Task): + if (isinstance(original,Task) != isinstance(replacement,Task)) or (isinstance(original,ConditionalTask) != isinstance(replacement,ConditionalTask)): raise TypeError("replace only works if both arguments are Tasks or neither") v = _CopyAndReplaceSequenceVisitor(original,replacement) self.visit(v) @@ -857,6 +863,18 @@ def enter(self,visitee): def leave(self,visitee): pass +# Fills a list of all ConditionalTasks visited +# Can visit a ConditionalTask, Sequence, Path, or EndPath +class ConditionalTaskVisitor(object): + def __init__(self,d): + self.deps = d + def enter(self,visitee): + if isinstance(visitee,ConditionalTask): + self.deps.append(visitee) + pass + def leave(self,visitee): + pass + # Fills a list of all modules visited. # Can visit a Sequence, Path, EndPath, or Task # For purposes of this visitor, a module is considered @@ -897,6 +915,23 @@ def leave(self,visitee): if isinstance(visitee, Task): self._levelInTasks -= 1 +class ModuleNodeOnConditionalTaskVisitor(object): + def __init__(self,l): + self.l = l + self._levelInTasks = 0 + def enter(self,visitee): + if isinstance(visitee, ConditionalTask): + self._levelInTasks += 1 + if self._levelInTasks == 0: + return + if visitee.isLeaf(): + self.l.append(visitee) + pass + def leave(self,visitee): + if self._levelInTasks > 0: + if isinstance(visitee, ConditionalTask): + self._levelInTasks -= 1 + # Should not be used on Tasks. # Similar to ModuleNodeVisitor with the following # differences. It only lists the modules that were @@ -961,7 +996,7 @@ def enter(self,visitee): if visitee._inProcess: self.l.add(visitee.type_()) else: - raise RuntimeError("Service not attached to process") + raise RuntimeError("Service not attached to process: {}".format(visitee.dumpPython())) def leave(self,visitee): pass @@ -973,14 +1008,25 @@ def __init__(self, type): self._type = type self.l = [] self.taskLeaves = [] + self.taskLeavesInConditionalTasks = [] + self.presentTaskLeaves = self.taskLeaves self._levelInTasks = 0 + self.conditionaltaskLeaves = [] + self._levelInConditionalTasks = 0 + def enter(self,visitee): if isinstance(visitee, Task): self._levelInTasks += 1 return + if isinstance(visitee, ConditionalTask): + self.presentTaskLeaves = self.taskLeavesInConditionalTasks + self._levelInConditionalTasks += 1 + return if visitee.isLeaf(): if self._levelInTasks > 0: - self.taskLeaves.append(visitee) + self.presentTaskLeaves.append(visitee) + elif self._levelInConditionalTasks > 0: + self.conditionaltaskLeaves.append(visitee) else: self.l.append(visitee) def leave(self, visitee): @@ -988,18 +1034,31 @@ def leave(self, visitee): if isinstance(visitee, Task): self._levelInTasks -= 1 return + if self._levelInConditionalTasks > 0: + if isinstance(visitee, ConditionalTask): + self._levelInConditionalTasks -= 1 + if 0 == self._levelInConditionalTasks: + self.presentTaskLeaves = self.taskLeaves + return if isinstance(visitee,_UnarySequenceOperator): self.l[-1] = visitee def result(self): - tsk = Task(*self.taskLeaves) + tsks = [] + if self.taskLeaves: + tsks.append(Task(*self.taskLeaves)) + if self.conditionaltaskLeaves: + ct = ConditionalTask(*self.conditionaltaskLeaves) + if self.taskLeavesInConditionalTasks: + ct.append(*self.taskLeavesInConditionalTasks) + tsks.append(ct) if len(self.l) > 0: # why doesn't (sum(self.l) work? seq = self.l[0] for el in self.l[1:]: seq += el - return self._type(seq, tsk) + return self._type(seq, *tsks) else: - return self._type(tsk) + return self._type(*tsks) def resultString(self): sep = '' returnValue = '' @@ -1031,7 +1090,7 @@ def initialize(self): self._levelInTasks = 0 def enter(self,visitee): - if isinstance(visitee, Task): + if isinstance(visitee, _TaskBase): self._levelInTasks += 1 if self._levelInTasks > 0: return @@ -1055,7 +1114,7 @@ def enter(self,visitee): def leave(self,visitee): # Ignore if this visitee is inside a Task if self._levelInTasks > 0: - if isinstance(visitee, Task): + if isinstance(visitee, _TaskBase): self._levelInTasks -= 1 return if isinstance(visitee,_BooleanLogicExpression): @@ -1335,19 +1394,19 @@ def leave(self,visitee): node.__init__(contents[0][0]) self.__stack[-2][-1] = [node, True, None] - elif isinstance(visitee, Task): + elif isinstance(visitee, _TaskBase): nonNull = [] for c in contents: if c[0] is not None: nonNull.append(c[0]) - self.__stack[-2][-1] = [Task(*nonNull), True, None] + self.__stack[-2][-1] = [visitee._makeInstance(*nonNull), True, None] elif isinstance(visitee, Sequence): seq = _SequenceCollection() tasks = list() for c in contents: if c[0] is None: continue - if isinstance(c[0], Task): + if isinstance(c[0], _TaskBase): tasks.append(c[0]) else: seq = seq + c[0] @@ -1364,7 +1423,7 @@ def leave(self,visitee): def result(self, visitedContainer): - if isinstance(visitedContainer, Task): + if isinstance(visitedContainer, _TaskBase): result = list() for n in (x[0] for x in self.__stack[0]): if n is not None: @@ -1376,7 +1435,7 @@ def result(self, visitedContainer): for c in self.__stack[0]: if c[0] is None: continue - if isinstance(c[0], Task): + if isinstance(c[0], _TaskBase): tasks.append(c[0]) else: seq = seq + c[0] @@ -1435,46 +1494,31 @@ def __call__(self,test): def didReplace(self): return self._didApply() -class Task(_ConfigureComponent, _Labelable) : - """Holds EDProducers, EDFilters, ESProducers, ESSources, Services, and Tasks. - A Task can be associated with Sequences, Paths, EndPaths and the Schedule. - An EDProducer or EDFilter will be enabled to run unscheduled if it is on - a task associated with the Schedule or any scheduled Path or EndPath (directly - or indirectly through Sequences) and not be on any scheduled Path or EndPath. - ESSources, ESProducers, and Services will be enabled to run if they are on - a Task associated with the Schedule or a scheduled Path or EndPath. In other - cases, they will be enabled to run if and only if they are not on a Task attached - to the process. - """ - +class _TaskBase(_ConfigureComponent, _Labelable) : def __init__(self, *items): self._collection = OrderedSet() self.add(*items) def __setattr__(self,name,value): if not name.startswith("_"): - raise AttributeError("You cannot set parameters for Task objects.") + raise AttributeError("You cannot set parameters for {} objects.".format(self._taskType())) else: self.__dict__[name] = value def add(self, *items): for item in items: - if not isinstance(item, _ConfigureComponent) or not item._isTaskComponent(): - if not isinstance(item, TaskPlaceholder): - raise RuntimeError("Adding an entry of type '" + type(item).__name__ + "'to a Task.\n" - "It is illegal to add this type to a Task.") + if not self._allowedInTask(item): + raise RuntimeError("Adding an entry of type '{0}' to a {1}.\n" + "It is illegal to add this type to a {1}.".format(type(item).__name__, self._taskType())) self._collection.add(item) - def _place(self, name, proc): - proc._placeTask(name,self) - def fillContents(self, taskContents, options=PrintOptions()): # only dump the label, if possible if self.hasLabel_(): taskContents.add(_Labelable.dumpSequencePython(self, options)) else: for i in self._collection: - if isinstance(i, Task): + if isinstance(i, _TaskBase): i.fillContents(taskContents, options) else: taskContents.add(i.dumpSequencePython(options)) @@ -1487,7 +1531,7 @@ def dumpPythonNoNewline(self, options=PrintOptions()): """Returns a string which is the python representation of the object""" taskContents = set() for i in self._collection: - if isinstance(i, Task): + if isinstance(i, _TaskBase): i.fillContents(taskContents, options) else: taskContents.add(i.dumpSequencePython(options)) @@ -1499,14 +1543,14 @@ def dumpPythonNoNewline(self, options=PrintOptions()): iFirst = False s += item if len(taskContents) > 255: - return "cms.Task(*[" + s + "])" - return "cms.Task(" + s + ")" + return "cms.{}(*[" + s + "])".format(self._taskType()) + return "cms.{}({})".format(self._taskType(),s) def directDependencies(self,sortByType=True): return findDirectDependencies(self, self._collection,sortByType=sortByType) def _isTaskComponent(self): - return True + return False def isLeaf(self): return False @@ -1519,7 +1563,7 @@ def visit(self,visitor): visitor.leave(i) def _errorstr(self): - return "Task(...)" + return "{}(...)".format(self.taskType_()) def __iter__(self): for key in self._collection: @@ -1551,7 +1595,7 @@ def contains(self, mod): self.visit(visitor) return visitor.result() def copy(self): - return Task(*self._collection) + return self._makeInstance(*self._collection) def copyAndExclude(self,listOfModulesToExclude): """Returns a copy of the sequence which excludes those module in 'listOfModulesToExclude'""" # You can exclude instances of these types EDProducer, EDFilter, ESSource, ESProducer, @@ -1564,7 +1608,7 @@ def copyAndExclude(self,listOfModulesToExclude): raise TypeError("copyAndExclude can only exclude objects that can be placed on a Task") v = _CopyAndExcludeSequenceVisitor(listOfModulesToExclude) self.visit(v) - return Task(*v.result(self)) + return self._makeInstance(*v.result(self)) def copyAndAdd(self, *modulesToAdd): """Returns a copy of the Task adding modules/tasks""" t = self.copy() @@ -1578,7 +1622,7 @@ def expandAndClone(self): l = [] v = ModuleNodeVisitor(l) self.visit(v) - return Task(*l) + return self._makeInstance(*l) def replace(self, original, replacement): """Finds all instances of 'original' and substitutes 'replacement' for them. Returns 'True' if a replacement occurs.""" @@ -1589,10 +1633,10 @@ def replace(self, original, replacement): # where objects that contain other objects are involved. See the comments # for the _MutatingSequenceVisitor. - if not original._isTaskComponent() or (not replacement is None and not replacement._isTaskComponent()): - raise TypeError("The Task replace function only works with objects that can be placed on a Task\n" + \ - " replace was called with original type = " + str(type(original)) + "\n" + \ - " and replacement type = " + str(type(replacement)) + "\n") + if not self._allowedInTask(original) or (not replacement is None and not self._allowedInTask(replacement)): + raise TypeError("The Task replace function only works with objects that can be placed on a {}\n".format(self._taskType()) + \ + " replace was called with original type = {}\n".format(str(type(original))) + \ + " and replacement type = {}\n".format(str(type(replacement)))) else: v = _CopyAndReplaceSequenceVisitor(original,replacement) self.visit(v) @@ -1614,7 +1658,7 @@ def remove(self, something): # Works very similar to copyAndExclude, there are 2 differences. This changes # the object itself instead of making a copy and second it only removes # the first instance of the argument instead of all of them. - if not something._isTaskComponent(): + if not self._allowedInTask(something): raise TypeError("remove only works with objects that can be placed on a Task") v = _CopyAndRemoveFirstSequenceVisitor(something) self.visit(v) @@ -1626,18 +1670,18 @@ def remove(self, something): def resolve(self, processDict,keepIfCannotResolve=False): temp = OrderedSet() for i in self._collection: - if isinstance(i, Task) or isinstance(i, TaskPlaceholder): + if self._mustResolve(i): temp.add(i.resolve(processDict,keepIfCannotResolve)) else: temp.add(i) self._collection = temp return self - -class TaskPlaceholder(object): + +class _TaskBasePlaceholder(object): def __init__(self, name): self._name = name def _isTaskComponent(self): - return True + return False def isLeaf(self): return False def visit(self,visitor): @@ -1645,32 +1689,117 @@ def visit(self,visitor): def __str__(self): return self._name def insertInto(self, parameterSet, myname): - raise RuntimeError("The TaskPlaceholder "+self._name - +" was never overridden") + raise RuntimeError("The {} {} was never overridden".format(self._typeName(), self._name)) def resolve(self, processDict,keepIfCannotResolve=False): if not self._name in processDict: if keepIfCannotResolve: return self - raise RuntimeError("The TaskPlaceholder "+self._name+ " cannot be resolved.\n Known keys are:"+str(processDict.keys())) + raise RuntimeError("The {} {} cannot be resolved.\n Known keys are: {}".format(self._typeName(), self._name,str(processDict.keys()))) o = processDict[self._name] - if not o._isTaskComponent(): - raise RuntimeError("The TaskPlaceholder "+self._name+ " refers to an object type which is not allowed to be on a task: "+str(type(o))) - if isinstance(o, Task): + if not self._allowedInTask(o): + raise RuntimeError("The {} {} refers to an object type which is not allowed to be on a task: {}".format(self._typeName(), self._name, str(type(o)))) + if isinstance(o, self._taskClass()): return o.resolve(processDict) return o def copy(self): - returnValue =TaskPlaceholder.__new__(type(self)) - returnValue.__init__(self._name) - return returnValue + return self._makeInstance(self._name) def dumpSequencePython(self, options=PrintOptions()): - return 'cms.TaskPlaceholder("%s")'%self._name + return 'cms.{}("{}")'.format(self._typeName(), self._name) def dumpPython(self, options=PrintOptions()): - result = 'cms.TaskPlaceholder(\"' + result = 'cms.{}(\"'.format(self._typeName()) if options.isCfg: result += 'process.' result += self._name+'\")\n' return result +class Task(_TaskBase) : + """Holds EDProducers, EDFilters, ESProducers, ESSources, Services, and Tasks. + A Task can be associated with Sequences, Paths, EndPaths and the Schedule. + An EDProducer or EDFilter will be enabled to run unscheduled if it is on + a task associated with the Schedule or any scheduled Path or EndPath (directly + or indirectly through Sequences) and not be on any scheduled Path or EndPath. + ESSources, ESProducers, and Services will be enabled to run if they are on + a Task associated with the Schedule or a scheduled Path or EndPath. In other + cases, they will be enabled to run if and only if they are not on a Task attached + to the process. + """ + @staticmethod + def _taskType(): + return "Task" + def _place(self, name, proc): + proc._placeTask(name,self) + def _isTaskComponent(self): + return True + @staticmethod + def _makeInstance(*items): + return Task(*items) + @staticmethod + def _allowedInTask(item ): + return (isinstance(item, _ConfigureComponent) and item._isTaskComponent()) or isinstance(item, TaskPlaceholder) + @staticmethod + def _mustResolve(item): + return isinstance(item, Task) or isinstance(item, TaskPlaceholder) + +class TaskPlaceholder(_TaskBasePlaceholder): + def _isTaskComponent(self): + return True + @staticmethod + def _typeName(): + return "TaskPlaceholder" + @staticmethod + def _makeInstance(name): + return TaskPlaceholder(name) + @staticmethod + def _allowedInTask(obj): + return Task._allowedInTask(obj) + @staticmethod + def _taskClass(): + return Task + + +class ConditionalTask(_TaskBase) : + """Holds EDProducers, EDFilters, ESProducers, ESSources, Services, Tasks and ConditionalTasks. + A ConditionalTask can be associated with Sequences, Paths, and EndPaths. + An EDProducer or EDFilter will be added to a Path or EndPath based on which other + modules on the Path consumes its data products. If that ConditionalTask assigned module + is placed after an EDFilter, the module will only run if the EDFilter passes. If no module + on the Path needs the module's data products, the module will be removed from the job. + """ + @staticmethod + def _taskType(): + return "ConditionalTask" + def _place(self, name, proc): + proc._placeConditionalTask(name,self) + def _isTaskComponent(self): + return False + @staticmethod + def _makeInstance(*items): + return ConditionalTask(*items) + @staticmethod + def _allowedInTask(item): + return isinstance(item, ConditionalTask) or isinstance(item, ConditionalTaskPlaceholder) or Task._allowedInTask(item) + @staticmethod + def _mustResolve(item): + return Task._mustResolve(item) or isinstance(item, ConditionalTask) or isinstance(item, ConditionalTaskPlaceholder) + + +class ConditionalTaskPlaceholder(_TaskBasePlaceholder): + def _isTaskComponent(self): + return False + @staticmethod + def _typeName(): + return "ConditionalTaskPlaceholder" + @staticmethod + def _makeInstance(name): + return ConditionalTaskPlaceholder(name) + @staticmethod + def _allowedInTask(obj): + return Task._allowedInTask(obj) or ConditionalTask._allowedInTask(obj) + @staticmethod + def _taskClass(): + return ConditionalTask + + if __name__=="__main__": import unittest class DummyModule(_Labelable, _SequenceLeaf, _ConfigureComponent): @@ -1817,6 +1946,20 @@ def testDumpPython(self): s = Sequence(e, t4) p12 = Path(a+b+s+c,t1) self.assertEqual(p12.dumpPython(),"cms.Path(process.a+process.b+cms.Sequence(process.e, cms.Task(process.d, process.f))+process.c, cms.Task(process.a))\n") + ct1 = ConditionalTask(a) + ct2 = ConditionalTask(c, b) + ct3 = ConditionalTask() + p13 = Path((a+b)*c, ct1) + self.assertEqual(p13.dumpPython(),"cms.Path(process.a+process.b+process.c, cms.ConditionalTask(process.a))\n") + p14 = Path((a+b)*c, ct2, ct1) + self.assertEqual(p14.dumpPython(),"cms.Path(process.a+process.b+process.c, cms.ConditionalTask(process.a), cms.ConditionalTask(process.b, process.c))\n") + p15 = Path(ct1, ct2, ct3) + self.assertEqual(p15.dumpPython(),"cms.Path(cms.ConditionalTask(), cms.ConditionalTask(process.a), cms.ConditionalTask(process.b, process.c))\n") + ct4 = ConditionalTask(d, Task(f)) + s = Sequence(e, ct4) + p16 = Path(a+b+s+c,ct1) + self.assertEqual(p16.dumpPython(),"cms.Path(process.a+process.b+cms.Sequence(process.e, cms.ConditionalTask(process.d, process.f))+process.c, cms.ConditionalTask(process.a))\n") + l = list() namesVisitor = DecoratedNodeNameVisitor(l) p.visit(namesVisitor) @@ -1843,6 +1986,9 @@ def testDumpPython(self): p12.visit(namesVisitor) self.assertEqual(l, ['a', 'b', 'e', 'c']) l[:] = [] + p16.visit(namesVisitor) + self.assertEqual(l, ['a', 'b', 'e', 'c']) + l[:] = [] moduleVisitor = ModuleNodeVisitor(l) p8.visit(moduleVisitor) names = [m.label_() for m in l] @@ -1851,6 +1997,8 @@ def testDumpPython(self): self.assertEqual(tph.dumpPython(), 'cms.TaskPlaceholder("process.a")\n') sph = SequencePlaceholder('a') self.assertEqual(sph.dumpPython(), 'cms.SequencePlaceholder("process.a")\n') + ctph = ConditionalTaskPlaceholder('a') + self.assertEqual(ctph.dumpPython(), 'cms.ConditionalTaskPlaceholder("process.a")\n') def testDumpConfig(self): a = DummyModule("a") @@ -1913,6 +2061,19 @@ def leave(self,visitee): e=DummyModule("e") f=DummyModule("f") g=DummyModule("g") + ct1 = ConditionalTask(d) + ct2 = ConditionalTask(e, ct1) + ct3 = ConditionalTask(f, g, ct2) + s=Sequence(plusAB, ct3, ct2) + multSC = s*c + p=Path(multSC, ct1, ct2) + l = [] + v = ModuleNodeVisitor(l) + p.visit(v) + expected = [a,b,f,g,e,d,e,d,c,d,e,d] + self.assertEqual(expected,l) + + t1 = Task(d) t2 = Task(e, t1) t3 = Task(f, g, t2) @@ -1924,6 +2085,7 @@ def leave(self,visitee): v = ModuleNodeVisitor(l) p.visit(v) expected = [a,b,f,g,e,d,e,d,c,d,e,d] + self.assertEqual(expected,l) l[:] = [] v = ModuleNodeOnTaskVisitor(l) @@ -2050,6 +2212,7 @@ def testReplace(self): m8 = DummyModule("m8") m9 = DummyModule("m9") + #Task t6 = Task(m6) t7 = Task(m7) t89 = Task(m8, m9) @@ -2105,7 +2268,65 @@ def testReplace(self): t3 = Task(m5) t2.replace(m2,t3) self.assertTrue(t2.dumpPython() == "cms.Task(process.m1, process.m3, process.m5)\n") - + + #ConditionalTask + ct6 = ConditionalTask(m6) + ct7 = ConditionalTask(m7) + ct89 = ConditionalTask(m8, m9) + + cs1 = Sequence(m1+m2, ct6) + cs2 = Sequence(m3+m4, ct7) + cs3 = Sequence(cs1+cs2, ct89) + cs3.replace(m3,m5) + l[:] = [] + cs3.visit(namesVisitor) + self.assertEqual(l,['m1','m2','m5','m4']) + + cs3.replace(m8,m1) + self.assertEqual(cs3.dumpPython(), "cms.Sequence(cms.Sequence(process.m1+process.m2, cms.ConditionalTask(process.m6))+process.m5+process.m4, cms.ConditionalTask(process.m1, process.m9), cms.ConditionalTask(process.m7))\n") + + cs3.replace(m1,m7) + self.assertEqual(cs3.dumpPython(), "cms.Sequence(process.m7+process.m2+process.m5+process.m4, cms.ConditionalTask(process.m6), cms.ConditionalTask(process.m7), cms.ConditionalTask(process.m7, process.m9))\n") + result = cs3.replace(ct7, ct89) + self.assertEqual(cs3.dumpPython(), "cms.Sequence(process.m7+process.m2+process.m5+process.m4, cms.ConditionalTask(process.m6), cms.ConditionalTask(process.m7, process.m9), cms.ConditionalTask(process.m8, process.m9))\n") + self.assertTrue(result) + result = cs3.replace(ct7, ct89) + self.assertFalse(result) + + ct1 = ConditionalTask() + ct1.replace(m1,m2) + self.assertEqual(ct1.dumpPython(), "cms.ConditionalTask()\n") + + ct1 = ConditionalTask(m1) + ct1.replace(m1,m2) + self.assertEqual(ct1.dumpPython(), "cms.ConditionalTask(process.m2)\n") + + ct1 = ConditionalTask(m1,m2, m2) + ct1.replace(m2,m3) + self.assertEqual(ct1.dumpPython(), "cms.ConditionalTask(process.m1, process.m3)\n") + + ct1 = ConditionalTask(m1,m2) + ct2 = ConditionalTask(m1,m3,ct1) + ct2.replace(m1,m4) + self.assertEqual(ct2.dumpPython(), "cms.ConditionalTask(process.m2, process.m3, process.m4)\n") + + ct1 = ConditionalTask(m2) + ct2 = ConditionalTask(m1,m3,ct1) + ct2.replace(m1,m4) + self.assertEqual(ct2.dumpPython(), "cms.ConditionalTask(process.m2, process.m3, process.m4)\n") + + ct1 = ConditionalTask(m2) + ct2 = ConditionalTask(m1,m3,ct1) + ct2.replace(ct1,m4) + self.assertEqual(ct2.dumpPython(), "cms.ConditionalTask(process.m1, process.m3, process.m4)\n") + + ct1 = ConditionalTask(m2) + ct2 = ConditionalTask(m1,m3,ct1) + ct3 = ConditionalTask(m5) + ct2.replace(m2,ct3) + self.assertEqual(ct2.dumpPython(), "cms.ConditionalTask(process.m1, process.m3, process.m5)\n") + + #FinalPath fp = FinalPath() fp.replace(m1,m2) self.assertEqual(fp.dumpPython(), "cms.FinalPath()\n") @@ -2131,7 +2352,7 @@ def testReplaceIfHeldDirectly(self): s3._replaceIfHeldDirectly(~m1, m2) self.assertEqual(s3.dumpPython()[:-1], "cms.Sequence(process.m2+(process.m1+process.m2))") - + #Task m6 = DummyModule("m6") m7 = DummyModule("m7") m8 = DummyModule("m8") @@ -2153,6 +2374,23 @@ def testReplaceIfHeldDirectly(self): s1._replaceIfHeldDirectly(t6,t7) self.assertEqual(s1.dumpPython()[:-1],"cms.Sequence(cms.Task(process.m7))") + #ConditionalTask + ct6 = ConditionalTask(m6) + ct7 = ConditionalTask(m7) + ct89 = ConditionalTask(m8, m9) + + s1 = Sequence(m1+m2, ct6) + s2 = Sequence(m3+m4, ct7) + s3 = Sequence(s1+s2, ct89) + s3._replaceIfHeldDirectly(m3,m5) + self.assertEqual(s3.dumpPython()[:-1], "cms.Sequence(cms.Sequence(process.m1+process.m2, cms.ConditionalTask(process.m6))+cms.Sequence(process.m3+process.m4, cms.ConditionalTask(process.m7)), cms.ConditionalTask(process.m8, process.m9))") + s2._replaceIfHeldDirectly(m3,m5) + self.assertEqual(s2.dumpPython()[:-1],"cms.Sequence(process.m5+process.m4, cms.ConditionalTask(process.m7))") + self.assertEqual(s3.dumpPython()[:-1], "cms.Sequence(cms.Sequence(process.m1+process.m2, cms.ConditionalTask(process.m6))+cms.Sequence(process.m5+process.m4, cms.ConditionalTask(process.m7)), cms.ConditionalTask(process.m8, process.m9))") + + s1 = Sequence(ct6) + s1._replaceIfHeldDirectly(ct6,ct7) + self.assertEqual(s1.dumpPython()[:-1],"cms.Sequence(cms.ConditionalTask(process.m7))") def testIndex(self): m1 = DummyModule("a") m2 = DummyModule("b") @@ -2199,6 +2437,7 @@ def testExpandAndClone(self): p2.visit(namesVisitor) self.assertEqual(l, ['m1', '!m2', 'm1', 'm2', '-m2', '!m1', 'm1', 'm2']) + #Task m6 = DummyModule("m6") m7 = DummyModule("m7") m8 = DummyModule("m8") @@ -2208,7 +2447,7 @@ def testExpandAndClone(self): l[:] = [] p2.visit(namesVisitor) self.assertEqual(l, ['m1', '!m2', 'm1', 'm2', '-m2', '!m1', 'm1', 'm2']) - self.assertTrue(p2.dumpPython() == "cms.Path(process.m1+~process.m2+process.m1+process.m2+cms.ignore(process.m2)+~process.m1+process.m1+process.m2, cms.Task(process.m6))\n") + self.assertEqual(p2.dumpPython(), "cms.Path(process.m1+~process.m2+process.m1+process.m2+cms.ignore(process.m2)+~process.m1+process.m1+process.m2, cms.Task(process.m6))\n") s2 = Sequence(m1*m2, Task(m9)) s3 = Sequence(~m1*s2) @@ -2236,6 +2475,44 @@ def testExpandAndClone(self): t4 = Task() t5 = t4.expandAndClone() self.assertTrue(t5.dumpPython() == "cms.Task()\n") + #ConditionalTask + s1 = Sequence(m1*~m2*m1*m2*ignore(m2)) + s2 = Sequence(m1*m2) + s3 = Sequence(~m1*s2) + p = Path(s1+s3, ConditionalTask(m6)) + p2 = p.expandAndClone() + l[:] = [] + p2.visit(namesVisitor) + self.assertEqual(l, ['m1', '!m2', 'm1', 'm2', '-m2', '!m1', 'm1', 'm2']) + self.assertEqual(p2.dumpPython(), "cms.Path(process.m1+~process.m2+process.m1+process.m2+cms.ignore(process.m2)+~process.m1+process.m1+process.m2, cms.ConditionalTask(process.m6))\n") + + s2 = Sequence(m1*m2, ConditionalTask(m9)) + s3 = Sequence(~m1*s2) + ct8 = ConditionalTask(m8) + ct8.setLabel("ct8") + p = Path(s1+s3, ConditionalTask(m6, ConditionalTask(m7, ct8))) + p2 = p.expandAndClone() + l[:] = [] + p2.visit(namesVisitor) + self.assertEqual(l, ['m1', '!m2', 'm1', 'm2', '-m2', '!m1', 'm1', 'm2']) + self.assertEqual(p2.dumpPython(), "cms.Path(process.m1+~process.m2+process.m1+process.m2+cms.ignore(process.m2)+~process.m1+process.m1+process.m2, cms.ConditionalTask(process.m6, process.m7, process.m8, process.m9))\n") + + t1 = ConditionalTask(m1,m2,m3) + s1 = Sequence(t1) + s2 = s1.expandAndClone() + l[:] = [] + s2.visit(namesVisitor) + self.assertEqual(l, []) + self.assertEqual(s2.dumpPython(), "cms.Sequence(cms.ConditionalTask(process.m1, process.m2, process.m3))\n") + + t1 = ConditionalTask(m1,m2) + t2 = ConditionalTask(m1,m3,t1) + t3 = t2.expandAndClone() + self.assertEqual(t3.dumpPython(), "cms.ConditionalTask(process.m1, process.m2, process.m3)\n") + t4 = ConditionalTask() + t5 = t4.expandAndClone() + self.assertTrue(t5.dumpPython() == "cms.ConditionalTask()\n") + def testAdd(self): m1 = DummyModule("m1") m2 = DummyModule("m2") @@ -2320,6 +2597,7 @@ def testRemove(self): s4.remove(m1) l[:]=[]; s4.visit(namesVisitor); self.assertEqual(l,[]) self.assertEqual(s4.dumpPython(), "cms.Sequence()\n") + #Task s1 = Sequence(m1+m2, Task(m3), Task(m4)) s1.remove(m4) self.assertEqual(s1.dumpPython(), "cms.Sequence(process.m1+process.m2, cms.Task(process.m3))\n") @@ -2339,6 +2617,27 @@ def testRemove(self): self.assertTrue(t3.dumpPython() == "cms.Task(process.m2)\n") t3.remove(m2) self.assertTrue(t3.dumpPython() == "cms.Task()\n") + #ConditionalTask + s1 = Sequence(m1+m2, ConditionalTask(m3), ConditionalTask(m4)) + s1.remove(m4) + self.assertEqual(s1.dumpPython(), "cms.Sequence(process.m1+process.m2, cms.ConditionalTask(process.m3))\n") + s1 = Sequence(m1+m2+Sequence(ConditionalTask(m3,m4), ConditionalTask(m3), ConditionalTask(m4))) + s1.remove(m4) + self.assertEqual(s1.dumpPython(), "cms.Sequence(process.m1+process.m2, cms.ConditionalTask(process.m3), cms.ConditionalTask(process.m4))\n") + t1 = ConditionalTask(m1) + t1.setLabel("t1") + t2 = ConditionalTask(m2,t1) + t2.setLabel("t2") + t3 = ConditionalTask(t1,t2,m1) + t3.remove(m1) + self.assertEqual(t3.dumpPython(), "cms.ConditionalTask(process.m1, process.t2)\n") + t3.remove(m1) + self.assertEqual(t3.dumpPython(), "cms.ConditionalTask(process.m1, process.m2)\n") + t3.remove(m1) + self.assertEqual(t3.dumpPython(), "cms.ConditionalTask(process.m2)\n") + t3.remove(m2) + self.assertEqual(t3.dumpPython(), "cms.ConditionalTask()\n") + #FinalPath fp = FinalPath(m1+m2) fp.remove(m1) self.assertEqual(fp.dumpPython(), "cms.FinalPath(process.m2)\n") @@ -2445,6 +2744,7 @@ def testCopyAndExclude(self): self.assertEqual(s.copyAndExclude([d]).dumpPython(),"cms.Sequence(process.a+process.b+process.c)\n") self.assertEqual(s.copyAndExclude([a,b,c,d]).dumpPython(),"cms.Sequence()\n") + #Task e = DummyModule("e") f = DummyModule("f") g = DummyModule("g") @@ -2481,6 +2781,40 @@ def testCopyAndExclude(self): self.assertTrue(t3.dumpPython() == "cms.Task(process.f, process.g, process.t11)\n") t4 = t2.copyAndExclude([e,f,g,h,a]) self.assertTrue(t4.dumpPython() == "cms.Task()\n") + #ConditionalTask + t1 = ConditionalTask(h) + s = Sequence(a+b+c+~d, ConditionalTask(e,f,ConditionalTask(g,t1))) + self.assertEqual(s.copyAndExclude([a,h]).dumpPython(),"cms.Sequence(process.b+process.c+~process.d, cms.ConditionalTask(process.e, process.f, process.g))\n") + self.assertEqual(s.copyAndExclude([a,h]).dumpPython(),"cms.Sequence(process.b+process.c+~process.d, cms.ConditionalTask(process.e, process.f, process.g))\n") + self.assertEqual(s.copyAndExclude([a,e,h]).dumpPython(),"cms.Sequence(process.b+process.c+~process.d, cms.ConditionalTask(process.f, process.g))\n") + self.assertEqual(s.copyAndExclude([a,e,f,g,h]).dumpPython(),"cms.Sequence(process.b+process.c+~process.d)\n") + self.assertEqual(s.copyAndExclude([a,b,c,d]).dumpPython(),"cms.Sequence(cms.ConditionalTask(process.e, process.f, process.g, process.h))\n") + self.assertEqual(s.copyAndExclude([t1]).dumpPython(),"cms.Sequence(process.a+process.b+process.c+~process.d, cms.ConditionalTask(process.e, process.f, process.g))\n") + taskList = [] + taskVisitor = ConditionalTaskVisitor(taskList) + s.visit(taskVisitor) + self.assertEqual(len(taskList),3) + s2 = s.copyAndExclude([g,h]) + taskList[:] = [] + s2.visit(taskVisitor) + self.assertEqual(len(taskList),1) + t2 = ConditionalTask(t1) + taskList[:] = [] + t2.visit(taskVisitor) + self.assertEqual(taskList[0],t1) + s3 = Sequence(s) + self.assertEqual(s3.copyAndExclude([a,h]).dumpPython(),"cms.Sequence(process.b+process.c+~process.d, cms.ConditionalTask(process.e, process.f, process.g))\n") + s4 = Sequence(s) + self.assertEqual(s4.copyAndExclude([a,b,c,d,e,f,g,h]).dumpPython(),"cms.Sequence()\n") + t1 = ConditionalTask(e,f) + t11 = ConditionalTask(a) + t11.setLabel("t11") + t2 = ConditionalTask(g,t1,h,t11) + t3 = t2.copyAndExclude([e,h]) + self.assertEqual(t3.dumpPython(), "cms.ConditionalTask(process.f, process.g, process.t11)\n") + t4 = t2.copyAndExclude([e,f,g,h,a]) + self.assertEqual(t4.dumpPython(), "cms.ConditionalTask()\n") + def testSequenceTypeChecks(self): m1 = DummyModule("m1") m2 = DummyModule("m2") @@ -2506,6 +2840,7 @@ def testCopy(self): p1 += e self.assertEqual(p1.dumpPython(),"cms.Path(process.a+process.b+process.c+process.e)\n") self.assertEqual(p2.dumpPython(),"cms.Path(process.a+process.b+process.c)\n") + #Task t1 = Task(a, b) t2 = t1.copy() self.assertTrue(t1.dumpPython() == t2.dumpPython()) @@ -2514,12 +2849,23 @@ def testCopy(self): self.assertTrue(id(t1Contents[0]) == id(t2Contents[0])) self.assertTrue(id(t1Contents[1]) == id(t2Contents[1])) self.assertTrue(id(t1._collection) != id(t2._collection)) + #ConditionalTask + t1 = ConditionalTask(a, b) + t2 = t1.copy() + self.assertTrue(t1.dumpPython() == t2.dumpPython()) + t1Contents = list(t1._collection) + t2Contents = list(t2._collection) + self.assertTrue(id(t1Contents[0]) == id(t2Contents[0])) + self.assertTrue(id(t1Contents[1]) == id(t2Contents[1])) + self.assertTrue(id(t1._collection) != id(t2._collection)) + def testCopyAndAdd(self): a = DummyModule("a") b = DummyModule("b") c = DummyModule("c") d = DummyModule("d") e = DummyModule("e") + #Task t1 = Task(a, b, c) self.assertEqual(t1.dumpPython(), "cms.Task(process.a, process.b, process.c)\n") t2 = t1.copyAndAdd(d, e) @@ -2535,6 +2881,23 @@ def testCopyAndAdd(self): self.assertEqual(t5.dumpPython(), "cms.Task(process.a, process.c, process.d, process.e)\n") t6 = t4.copyAndAdd(Task(b)) self.assertEqual(t6.dumpPython(), "cms.Task(process.a, process.b, process.c, process.d)\n") + #ConditionalTask + t1 = ConditionalTask(a, b, c) + self.assertEqual(t1.dumpPython(), "cms.ConditionalTask(process.a, process.b, process.c)\n") + t2 = t1.copyAndAdd(d, e) + self.assertEqual(t1.dumpPython(), "cms.ConditionalTask(process.a, process.b, process.c)\n") + self.assertEqual(t2.dumpPython(), "cms.ConditionalTask(process.a, process.b, process.c, process.d, process.e)\n") + t3 = t2.copyAndExclude([b]) + self.assertEqual(t1.dumpPython(), "cms.ConditionalTask(process.a, process.b, process.c)\n") + self.assertEqual(t2.dumpPython(), "cms.ConditionalTask(process.a, process.b, process.c, process.d, process.e)\n") + self.assertEqual(t3.dumpPython(), "cms.ConditionalTask(process.a, process.c, process.d, process.e)\n") + t4 = t1.copyAndExclude([b]).copyAndAdd(d) + self.assertEqual(t4.dumpPython(), "cms.ConditionalTask(process.a, process.c, process.d)\n") + t5 = t2.copyAndExclude([b]).copyAndAdd(d) + self.assertEqual(t5.dumpPython(), "cms.ConditionalTask(process.a, process.c, process.d, process.e)\n") + t6 = t4.copyAndAdd(Task(b)) + self.assertEqual(t6.dumpPython(), "cms.ConditionalTask(process.a, process.b, process.c, process.d)\n") + def testInsertInto(self): from FWCore.ParameterSet.Types import vstring class TestPSet(object): diff --git a/FWCore/ParameterSet/python/SequenceVisitors.py b/FWCore/ParameterSet/python/SequenceVisitors.py index 933345bc0ef96..679d44614bcf7 100644 --- a/FWCore/ParameterSet/python/SequenceVisitors.py +++ b/FWCore/ParameterSet/python/SequenceVisitors.py @@ -120,19 +120,24 @@ def leave(self,visitee): class CompositeVisitor(object): """ Combines 3 different visitor classes in 1 so we only have to visit all the paths and endpaths once""" - def __init__(self, validator, node, decorated): + def __init__(self, validator, node, decorated, optional=None): self._validator = validator self._node = node self._decorated = decorated + self._optional = optional def enter(self, visitee): self._validator.enter(visitee) self._node.enter(visitee) self._decorated.enter(visitee) + if self._optional: + self._optional.enter(visitee) def leave(self, visitee): self._validator.leave(visitee) # The node visitor leave function does nothing #self._node.leave(visitee) self._decorated.leave(visitee) + if self._optional: + self._optional.leave(visitee) class ModuleNamesFromGlobalsVisitor(object): """Fill a list with the names of Event module types in a sequence. The names are determined From a571e1cdb1ddc240cca98b54ed287299648e6eff Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Thu, 10 Mar 2022 10:37:06 -0600 Subject: [PATCH 03/10] Added code to handle ConditionalTasks --- .../interface/ProductResolverIndexHelper.h | 4 + .../src/ProductResolverIndexHelper.cc | 26 +++ FWCore/Framework/interface/StreamSchedule.h | 16 ++ FWCore/Framework/src/StreamSchedule.cc | 220 +++++++++++++++++- FWCore/Framework/test/BuildFile.xml | 8 + .../test/test_conditionaltasks_cfg.py | 82 +++++++ 6 files changed, 351 insertions(+), 5 deletions(-) create mode 100644 FWCore/Framework/test/test_conditionaltasks_cfg.py diff --git a/DataFormats/Provenance/interface/ProductResolverIndexHelper.h b/DataFormats/Provenance/interface/ProductResolverIndexHelper.h index 73d20841bfa00..1b957e6bc85d9 100644 --- a/DataFormats/Provenance/interface/ProductResolverIndexHelper.h +++ b/DataFormats/Provenance/interface/ProductResolverIndexHelper.h @@ -84,6 +84,10 @@ namespace edm { // If the TypeID for the wrapped type is already available, // it is faster to call getContainedTypeFromWrapper directly. TypeID getContainedType(TypeID const& typeID); + + bool typeIsViewCompatible(TypeID const& requestedViewType, + TypeID const& wrappedtypeID, + std::string const& className); } // namespace productholderindexhelper class ProductResolverIndexHelper { diff --git a/DataFormats/Provenance/src/ProductResolverIndexHelper.cc b/DataFormats/Provenance/src/ProductResolverIndexHelper.cc index d47946fe1f85c..82221e5fa8a6b 100644 --- a/DataFormats/Provenance/src/ProductResolverIndexHelper.cc +++ b/DataFormats/Provenance/src/ProductResolverIndexHelper.cc @@ -70,6 +70,32 @@ namespace edm { TypeID const wrappedTypeID = TypeID(wrappedType.typeInfo()); return getContainedTypeFromWrapper(wrappedTypeID, className); } + + bool typeIsViewCompatible(TypeID const& requestedViewType, + TypeID const& wrappedtypeID, + std::string const& className) { + auto elementType = getContainedTypeFromWrapper(wrappedtypeID, className); + if (elementType == TypeID(typeid(void)) or elementType == TypeID()) { + //the wrapped type is not a container + return false; + } + if (elementType == requestedViewType) { + return true; + } + //need to check for inheritance match + std::vector missingDictionaries; + std::vector baseTypes; + if (!public_base_classes(missingDictionaries, elementType, baseTypes)) { + return false; + } + for (auto const& base : baseTypes) { + if (TypeID(base.typeInfo()) == requestedViewType) { + return true; + } + } + return false; + } + } // namespace productholderindexhelper ProductResolverIndexHelper::ProductResolverIndexHelper() diff --git a/FWCore/Framework/interface/StreamSchedule.h b/FWCore/Framework/interface/StreamSchedule.h index 640c6c0431313..4b8e743bc63e4 100644 --- a/FWCore/Framework/interface/StreamSchedule.h +++ b/FWCore/Framework/interface/StreamSchedule.h @@ -95,6 +95,7 @@ #include #include #include +#include namespace edm { @@ -288,6 +289,21 @@ namespace edm { void reportSkipped(EventPrincipal const& ep) const; + struct AliasInfo { + std::string friendlyClassName; + std::string instanceLabel; + std::string originalInstanceLabel; + std::string originalModuleLabel; + }; + std::vector tryToPlaceConditionalModules( + Worker*, + std::unordered_set& conditionalModules, + std::multimap const& conditionalModuleBranches, + std::multimap const& aliasMap, + ParameterSet& proc_pset, + ProductRegistry& preg, + PreallocationConfiguration const* prealloc, + std::shared_ptr processConfiguration); void fillWorkers(ParameterSet& proc_pset, ProductRegistry& preg, PreallocationConfiguration const* prealloc, diff --git a/FWCore/Framework/src/StreamSchedule.cc b/FWCore/Framework/src/StreamSchedule.cc index 342bdff7f242e..f7d969d93fb24 100644 --- a/FWCore/Framework/src/StreamSchedule.cc +++ b/FWCore/Framework/src/StreamSchedule.cc @@ -3,6 +3,7 @@ #include "DataFormats/Provenance/interface/BranchIDListHelper.h" #include "DataFormats/Provenance/interface/ProcessConfiguration.h" #include "DataFormats/Provenance/interface/ProductRegistry.h" +#include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h" #include "FWCore/Framework/src/OutputModuleDescription.h" #include "FWCore/Framework/interface/TriggerNamesService.h" #include "FWCore/Framework/src/TriggerReport.h" @@ -378,6 +379,155 @@ namespace edm { } } + static Worker* getWorker(std::string const& moduleLabel, + ParameterSet& proc_pset, + WorkerManager& workerManager, + ProductRegistry& preg, + PreallocationConfiguration const* prealloc, + std::shared_ptr processConfiguration) { + bool isTracked; + ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked); + if (modpset == nullptr) { + return nullptr; + } + assert(isTracked); + + return workerManager.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel); + } + + std::vector StreamSchedule::tryToPlaceConditionalModules( + Worker* worker, + std::unordered_set& conditionalModules, + std::multimap const& conditionalModuleBranches, + std::multimap const& aliasMap, + ParameterSet& proc_pset, + ProductRegistry& preg, + PreallocationConfiguration const* prealloc, + std::shared_ptr processConfiguration) { + std::vector returnValue; + auto const& consumesInfo = worker->consumesInfo(); + auto moduleLabel = worker->description()->moduleLabel(); + using namespace productholderindexhelper; + for (auto const& ci : consumesInfo) { + if (not ci.skipCurrentProcess() and + (ci.process().empty() or ci.process() == processConfiguration->processName())) { + auto productModuleLabel = ci.label(); + if (productModuleLabel.empty()) { + //this is a consumesMany request + for (auto const& branch : conditionalModuleBranches) { + if (ci.kindOfType() == edm::PRODUCT_TYPE) { + if (branch.second->unwrappedTypeID() != ci.type()) { + continue; + } + } else { + if (not typeIsViewCompatible( + ci.type(), TypeID(branch.second->wrappedType().typeInfo()), branch.second->className())) { + continue; + } + } + + auto condWorker = + getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration); + assert(condWorker); + + conditionalModules.erase(productModuleLabel); + + auto dependents = tryToPlaceConditionalModules(condWorker, + conditionalModules, + conditionalModuleBranches, + aliasMap, + proc_pset, + preg, + prealloc, + processConfiguration); + returnValue.insert(returnValue.end(), dependents.begin(), dependents.end()); + returnValue.push_back(condWorker); + } + } else { + //just a regular consumes + bool productFromConditionalModule = false; + auto itFound = conditionalModules.find(productModuleLabel); + if (itFound == conditionalModules.end()) { + //Check to see if this was an alias + auto findAlias = aliasMap.equal_range(productModuleLabel); + if (findAlias.first != findAlias.second) { + for (auto it = findAlias.first; it != findAlias.second; ++it) { + //this was previously filtered so only the conditional modules remain + productModuleLabel = it->second.originalModuleLabel; + if (it->second.friendlyClassName == "*" or + (ci.type().friendlyClassName() == it->second.friendlyClassName)) { + if (it->second.instanceLabel == "*" or ci.instance() == it->second.instanceLabel) { + productFromConditionalModule = true; + //need to check the rest of the data product info + break; + } + } else if (ci.kindOfType() == ELEMENT_TYPE) { + //consume is a View so need to do more intrusive search + if (it->second.instanceLabel == "*" or ci.instance() == it->second.instanceLabel) { + //find matching branches in module + auto branches = conditionalModuleBranches.equal_range(productModuleLabel); + for (auto itBranch = branches.first; itBranch != branches.second; ++it) { + if (it->second.originalInstanceLabel == "*" or + itBranch->second->productInstanceName() == it->second.originalInstanceLabel) { + if (typeIsViewCompatible(ci.type(), + TypeID(itBranch->second->wrappedType().typeInfo()), + itBranch->second->className())) { + productFromConditionalModule = true; + break; + } + } + } + } + } + } + } + itFound = conditionalModules.find(productModuleLabel); + } else { + //need to check the rest of the data product info + auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel); + for (auto itBranch = findBranches.first; itBranch != findBranches.second; ++itBranch) { + if (itBranch->second->productInstanceName() == ci.instance()) { + if (ci.kindOfType() == PRODUCT_TYPE) { + if (ci.type() == itBranch->second->unwrappedTypeID()) { + productFromConditionalModule = true; + break; + } + } else { + //this is a view + if (typeIsViewCompatible(ci.type(), + TypeID(itBranch->second->wrappedType().typeInfo()), + itBranch->second->className())) { + productFromConditionalModule = true; + break; + } + } + } + } + } + if (productFromConditionalModule) { + auto condWorker = + getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration); + assert(condWorker); + + conditionalModules.erase(itFound); + + auto dependents = tryToPlaceConditionalModules(condWorker, + conditionalModules, + conditionalModuleBranches, + aliasMap, + proc_pset, + preg, + prealloc, + processConfiguration); + returnValue.insert(returnValue.end(), dependents.begin(), dependents.end()); + returnValue.push_back(condWorker); + } + } + } + } + return returnValue; + } + void StreamSchedule::fillWorkers(ParameterSet& proc_pset, ProductRegistry& preg, PreallocationConfiguration const* prealloc, @@ -389,6 +539,61 @@ namespace edm { vstring modnames = proc_pset.getParameter(pathName); PathWorkers tmpworkers; + //Pull out ConditionalTask modules + auto itCondBegin = std::find(modnames.begin(), modnames.end(), "#"); + + std::unordered_set conditionalmods; + //An EDAlias may be redirecting to a module on a ConditionalTask + std::multimap aliasMap; + std::multimap conditionalModsBranches; + if (itCondBegin != modnames.end()) { + //the last entry should be ignored since it is required to be "@" + conditionalmods = std::unordered_set( + std::make_move_iterator(itCondBegin + 1), std::make_move_iterator(modnames.begin() + modnames.size() - 1)); + + for (auto const& cond : conditionalmods) { + //force the creation of the conditional modules so alias check can work + (void)getWorker(cond, proc_pset, workerManager_, preg, prealloc, processConfiguration); + } + //find aliases + { + auto aliases = proc_pset.getParameter>("@all_aliases"); + std::string const star("*"); + for (auto const& alias : aliases) { + auto info = proc_pset.getParameter(alias); + auto aliasedToModuleLabels = info.getParameterNames(); + for (auto const& mod : aliasedToModuleLabels) { + if (not mod.empty() and mod[0] != '@' and conditionalmods.find(mod) != conditionalmods.end()) { + auto aliasPSet = proc_pset.getParameter(mod); + std::string type = star; + std::string instance = star; + std::string originalInstance = star; + if (aliasPSet.exists("type")) { + type = aliasPSet.getParameter("type"); + } + if (aliasPSet.exists("toProductInstance")) { + instance = aliasPSet.getParameter("toProductInstance"); + } + if (aliasPSet.exists("fromProductInstance")) { + originalInstance = aliasPSet.getParameter("fromProductInstance"); + } + + aliasMap.emplace(alias, AliasInfo{type, instance, originalInstance, mod}); + } + } + } + } + { + //find branches created by the conditional modules + for (auto const& prod : preg.productList()) { + if (conditionalmods.find(prod.first.moduleLabel()) != conditionalmods.end()) { + conditionalModsBranches.emplace(prod.first.moduleLabel(), &prod.second); + } + } + } + } + modnames.erase(itCondBegin, modnames.end()); + unsigned int placeInPath = 0; for (auto const& name : modnames) { //Modules except EDFilters are set to run concurrently by default @@ -409,9 +614,8 @@ namespace edm { moduleLabel.erase(0, 1); } - bool isTracked; - ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked); - if (modpset == nullptr) { + Worker* worker = getWorker(moduleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration); + if (worker == nullptr) { std::string pathType("endpath"); if (!search_all(endPathNames, pathName)) { pathType = std::string("path"); @@ -420,9 +624,7 @@ namespace edm { << "The unknown module label \"" << moduleLabel << "\" appears in " << pathType << " \"" << pathName << "\"\n please check spelling or remove that label from the path."; } - assert(isTracked); - Worker* worker = workerManager_.getWorker(*modpset, preg, prealloc, processConfiguration, moduleLabel); if (ignoreFilters && filterAction != WorkerInPath::Ignore && worker->moduleType() == Worker::kFilter) { // We have a filter on an end path, and the filter is not explicitly ignored. // See if the filter is allowed. @@ -442,6 +644,14 @@ namespace edm { if (runConcurrently && worker->moduleType() == Worker::kFilter and filterAction != WorkerInPath::Ignore) { runConcurrently = false; } + + auto condModules = tryToPlaceConditionalModules( + worker, conditionalmods, conditionalModsBranches, aliasMap, proc_pset, preg, prealloc, processConfiguration); + for (auto condMod : condModules) { + tmpworkers.emplace_back(condMod, WorkerInPath::Ignore, placeInPath, true); + ++placeInPath; + } + tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently); ++placeInPath; } diff --git a/FWCore/Framework/test/BuildFile.xml b/FWCore/Framework/test/BuildFile.xml index fbdfec87b1812..a14dbe7358c35 100644 --- a/FWCore/Framework/test/BuildFile.xml +++ b/FWCore/Framework/test/BuildFile.xml @@ -404,4 +404,12 @@ + + + + + + + + diff --git a/FWCore/Framework/test/test_conditionaltasks_cfg.py b/FWCore/Framework/test/test_conditionaltasks_cfg.py new file mode 100644 index 0000000000000..a1e92eb31e05f --- /dev/null +++ b/FWCore/Framework/test/test_conditionaltasks_cfg.py @@ -0,0 +1,82 @@ +import FWCore.ParameterSet.Config as cms + +import argparse +import sys + +parser = argparse.ArgumentParser(prog=sys.argv[0], description='Test ConditionalTasks.') + +parser.add_argument("--filterSucceeds", help="Have filter succeed", action="store_true") +parser.add_argument("--reverseDependencies", help="Switch the order of dependencies", action="store_true") +parser.add_argument("--testAlias", help="Get data from an alias", action="store_true") +parser.add_argument("--testView", help="Get data via a view", action="store_true") +parser.add_argument("--aliasWithStar", help="when using testAlias use '*' as type", action="store_true") + +argv = sys.argv[:] +if '--' in argv: + argv.remove("--") +args, unknown = parser.parse_known_args(argv) + +process = cms.Process("Test") + +process.source = cms.Source("EmptySource") + +process.maxEvents.input = 1 + +process.a = cms.EDProducer("IntProducer", ivalue = cms.int32(1)) +process.b = cms.EDProducer("AddIntsProducer", labels = cms.VInputTag(cms.InputTag("a"))) + +process.f1 = cms.EDFilter("IntProductFilter", label = cms.InputTag("b")) + +process.c = cms.EDProducer("IntProducer", ivalue = cms.int32(2)) +process.d = cms.EDProducer("AddIntsProducer", labels = cms.VInputTag(cms.InputTag("c"))) +process.e = cms.EDProducer("AddIntsProducer", labels = cms.VInputTag(cms.InputTag("d"))) + +process.prodOnPath = cms.EDProducer("AddIntsProducer", labels = cms.VInputTag(cms.InputTag("d"), cms.InputTag("e"))) + +if args.filterSucceeds: + threshold = 1 +else: + threshold = 3 + +process.f2 = cms.EDFilter("IntProductFilter", label = cms.InputTag("e"), threshold = cms.int32(threshold)) + +if args.reverseDependencies: + process.d.labels[0]=cms.InputTag("e") + process.e.labels[0]=cms.InputTag("c") + process.f2.label = cms.InputTag("d") + +if args.testView: + process.f3 = cms.EDAnalyzer("SimpleViewAnalyzer", + label = cms.untracked.InputTag("f"), + sizeMustMatch = cms.untracked.uint32(10), + checkSize = cms.untracked.bool(False) + ) + process.f = cms.EDProducer("OVSimpleProducer", size = cms.int32(10)) + producttype = "edmtestSimplesOwned" +else: + process.f= cms.EDProducer("IntProducer", ivalue = cms.int32(3)) + process.f3 = cms.EDFilter("IntProductFilter", label = cms.InputTag("f")) + producttype = "edmtestIntProduct" + +if args.testAlias: + if args.aliasWithStar: + producttype = "*" + + process.f3.label = "aliasToF" + process.aliasToF = cms.EDAlias( + f = cms.VPSet( + cms.PSet( + type = cms.string(producttype), + ) + ) + ) + + +process.p = cms.Path(process.f1+process.prodOnPath+process.f2+process.f3, cms.ConditionalTask(process.a, process.b, process.c, process.d, process.e, process.f)) + +process.tst = cms.EDAnalyzer("IntTestAnalyzer", moduleLabel = cms.untracked.InputTag("f"), valueMustMatch = cms.untracked.int32(3), + valueMustBeMissing = cms.untracked.bool(not args.filterSucceeds)) + +process.endp = cms.EndPath(process.tst) + +#process.add_(cms.Service("Tracer")) From 7a58508da70536e62250e7ba7f4a2eb0ce6180b3 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Wed, 23 Mar 2022 12:36:40 -0500 Subject: [PATCH 04/10] Improvements based on review comments --- FWCore/ParameterSet/python/Config.py | 10 +++++----- FWCore/ParameterSet/python/SequenceTypes.py | 16 +++++++++++----- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/FWCore/ParameterSet/python/Config.py b/FWCore/ParameterSet/python/Config.py index 7247ee9f527fc..a7e36652c3893 100644 --- a/FWCore/ParameterSet/python/Config.py +++ b/FWCore/ParameterSet/python/Config.py @@ -1282,8 +1282,8 @@ def _insertPaths(self, processPSet, nodeVisitor): decoratedList = [] lister = DecoratedNodeNameVisitor(decoratedList) condTaskModules = [] - constTaskVistor = ModuleNodeOnConditionalTaskVisitor(condTaskModules) - pathCompositeVisitor = CompositeVisitor(pathValidator, nodeVisitor, lister, constTaskVistor) + condTaskVistor = ModuleNodeOnConditionalTaskVisitor(condTaskModules) + pathCompositeVisitor = CompositeVisitor(pathValidator, nodeVisitor, lister, condTaskVistor) endpathCompositeVisitor = CompositeVisitor(endpathValidator, nodeVisitor, lister) for triggername in triggerPaths: iPath = self.paths_()[triggername] @@ -3002,15 +3002,15 @@ def testConditionalTask(self): testTask1.add(testTask3) process.myTask1 = testTask1 - # test the validation that occurs when attaching a Task to a Process + # test the validation that occurs when attaching a ConditionalTask to a Process # first a case that passes, then one the fails on an EDProducer # then one that fails on a service l = set() visitor = NodeNameVisitor(l) testTask1.visit(visitor) self.assertEqual(l, set(['mesproducer', 'mproducer', 'mproducer2', 'mfilter', 'd', 'messource'])) - l2 = testTask1.moduleNames - self.assertEqual(l, set(['mesproducer', 'mproducer', 'mproducer2', 'mfilter', 'd', 'messource'])) + l2 = testTask1.moduleNames() + self.assertEqual(l2, set(['mesproducer', 'mproducer', 'mproducer2', 'mfilter', 'd', 'messource'])) testTask4 = ConditionalTask(edproducer3) l.clear() diff --git a/FWCore/ParameterSet/python/SequenceTypes.py b/FWCore/ParameterSet/python/SequenceTypes.py index b32d91ac1ef42..e465491600b4a 100644 --- a/FWCore/ParameterSet/python/SequenceTypes.py +++ b/FWCore/ParameterSet/python/SequenceTypes.py @@ -238,7 +238,7 @@ def findDirectDependencies(element, collection,sortByType=True): continue t = 'sequences' # cms.Task - elif isinstance(item, _Task): + elif isinstance(item, Task): if not item.hasLabel_(): dependencies += item.directDependencies(sortByType) continue @@ -450,8 +450,10 @@ def replace(self, original, replacement): # where objects that contain other objects are involved. See the comments # for the _MutatingSequenceVisitor. - if (isinstance(original,Task) != isinstance(replacement,Task)) or (isinstance(original,ConditionalTask) != isinstance(replacement,ConditionalTask)): + if (isinstance(original,Task) != isinstance(replacement,Task)): raise TypeError("replace only works if both arguments are Tasks or neither") + if (isinstance(original,ConditionalTask) != isinstance(replacement,ConditionalTask)): + raise TypeError("replace only works if both arguments are ConditionalTasks or neither") v = _CopyAndReplaceSequenceVisitor(original,replacement) self.visit(v) if v.didReplace(): @@ -1634,7 +1636,7 @@ def replace(self, original, replacement): # for the _MutatingSequenceVisitor. if not self._allowedInTask(original) or (not replacement is None and not self._allowedInTask(replacement)): - raise TypeError("The Task replace function only works with objects that can be placed on a {}\n".format(self._taskType()) + \ + raise TypeError("The {0} replace function only works with objects that can be placed on a {0}\n".format(self._taskType()) + \ " replace was called with original type = {}\n".format(str(type(original))) + \ " and replacement type = {}\n".format(str(type(replacement)))) else: @@ -1714,7 +1716,7 @@ def dumpPython(self, options=PrintOptions()): class Task(_TaskBase) : """Holds EDProducers, EDFilters, ESProducers, ESSources, Services, and Tasks. - A Task can be associated with Sequences, Paths, EndPaths and the Schedule. + A Task can be associated with Sequences, Paths, EndPaths, ConditionalTasks and the Schedule. An EDProducer or EDFilter will be enabled to run unscheduled if it is on a task associated with the Schedule or any scheduled Path or EndPath (directly or indirectly through Sequences) and not be on any scheduled Path or EndPath. @@ -1763,7 +1765,7 @@ class ConditionalTask(_TaskBase) : An EDProducer or EDFilter will be added to a Path or EndPath based on which other modules on the Path consumes its data products. If that ConditionalTask assigned module is placed after an EDFilter, the module will only run if the EDFilter passes. If no module - on the Path needs the module's data products, the module will be removed from the job. + on the Path needs the module's data products, the module will be treated as if it were on a Task. """ @staticmethod def _taskType(): @@ -1898,6 +1900,10 @@ def testBoolean(self): p6.visit(namesVisitor) self.assertEqual(l,['!&','a','b','@']) + def testTaskConstructor(self): + a = DummyModule("a") + self.assertRaises(RuntimeError, lambda : Task(ConditionalTask(a)) ) + def testDumpPython(self): a = DummyModule("a") b = DummyModule('b') From ca06fe21b4c694b123785614b56794a206dc08f5 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Mon, 28 Mar 2022 14:22:14 -0500 Subject: [PATCH 05/10] Added ConditionalTask test using consumesMany --- FWCore/Framework/test/BuildFile.xml | 1 + .../Framework/test/stubs/ToyIntProducers.cc | 32 +++++++++++++++++++ .../test/test_conditionaltasks_cfg.py | 7 +++- 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/FWCore/Framework/test/BuildFile.xml b/FWCore/Framework/test/BuildFile.xml index a14dbe7358c35..e4fff434e081f 100644 --- a/FWCore/Framework/test/BuildFile.xml +++ b/FWCore/Framework/test/BuildFile.xml @@ -412,4 +412,5 @@ + diff --git a/FWCore/Framework/test/stubs/ToyIntProducers.cc b/FWCore/Framework/test/stubs/ToyIntProducers.cc index c5934bf12b7b8..a84e47c9e1ecd 100644 --- a/FWCore/Framework/test/stubs/ToyIntProducers.cc +++ b/FWCore/Framework/test/stubs/ToyIntProducers.cc @@ -414,6 +414,36 @@ namespace edmtest { e.emplace(otherPutToken_, value); } + // + // Produces an IntProduct instance, using many IntProducts as input. + // + + class AddAllIntsProducer : public edm::global::EDProducer<> { + public: + explicit AddAllIntsProducer(edm::ParameterSet const& p) : putToken_{produces()} { consumesMany(); } + void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override; + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + edm::ParameterSetDescription desc; + descriptions.addDefault(desc); + } + + private: + const edm::EDPutTokenT putToken_; + }; + + void AddAllIntsProducer::produce(edm::StreamID, edm::Event& e, edm::EventSetup const&) const { + std::vector> ints; + e.getManyByType(ints); + + int value = 0; + for (auto const& i : ints) { + value += i->value; + } + + e.emplace(putToken_, value); + } + // // Produces multiple IntProduct products // @@ -844,6 +874,7 @@ namespace edmtest { }; } // namespace edmtest +using edmtest::AddAllIntsProducer; using edmtest::AddIntsProducer; using edmtest::BusyWaitIntLegacyProducer; using edmtest::BusyWaitIntLimitedProducer; @@ -878,6 +909,7 @@ DEFINE_FWK_MODULE(TransientIntProducer); DEFINE_FWK_MODULE(IntProducerFromTransient); DEFINE_FWK_MODULE(Int16_tProducer); DEFINE_FWK_MODULE(AddIntsProducer); +DEFINE_FWK_MODULE(AddAllIntsProducer); DEFINE_FWK_MODULE(ManyIntProducer); DEFINE_FWK_MODULE(ManyIntWhenRegisteredProducer); DEFINE_FWK_MODULE(NonEventIntProducer); diff --git a/FWCore/Framework/test/test_conditionaltasks_cfg.py b/FWCore/Framework/test/test_conditionaltasks_cfg.py index a1e92eb31e05f..8e033ea9c9cb1 100644 --- a/FWCore/Framework/test/test_conditionaltasks_cfg.py +++ b/FWCore/Framework/test/test_conditionaltasks_cfg.py @@ -10,6 +10,7 @@ parser.add_argument("--testAlias", help="Get data from an alias", action="store_true") parser.add_argument("--testView", help="Get data via a view", action="store_true") parser.add_argument("--aliasWithStar", help="when using testAlias use '*' as type", action="store_true") +parser.add_argument("--testConsumesMany", help="use ConsumesMany", action="store_true") argv = sys.argv[:] if '--' in argv: @@ -33,6 +34,9 @@ process.prodOnPath = cms.EDProducer("AddIntsProducer", labels = cms.VInputTag(cms.InputTag("d"), cms.InputTag("e"))) +if args.testConsumesMany: + process.prodOnPath = cms.EDProducer("AddAllIntsProducer") + if args.filterSucceeds: threshold = 1 else: @@ -75,8 +79,9 @@ process.p = cms.Path(process.f1+process.prodOnPath+process.f2+process.f3, cms.ConditionalTask(process.a, process.b, process.c, process.d, process.e, process.f)) process.tst = cms.EDAnalyzer("IntTestAnalyzer", moduleLabel = cms.untracked.InputTag("f"), valueMustMatch = cms.untracked.int32(3), - valueMustBeMissing = cms.untracked.bool(not args.filterSucceeds)) + valueMustBeMissing = cms.untracked.bool(not args.filterSucceeds and not args.testConsumesMany)) process.endp = cms.EndPath(process.tst) #process.add_(cms.Service("Tracer")) +#process.options.wantSummary=True From b60fb74fd98fba0f412f14e313e580ab768c6ef8 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Mon, 28 Mar 2022 14:31:24 -0500 Subject: [PATCH 06/10] Fix mayConsumes handling for ConditionalTasks Use proper module label and avoid reusing a module. --- FWCore/Framework/src/StreamSchedule.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/FWCore/Framework/src/StreamSchedule.cc b/FWCore/Framework/src/StreamSchedule.cc index f7d969d93fb24..b4e55777701a6 100644 --- a/FWCore/Framework/src/StreamSchedule.cc +++ b/FWCore/Framework/src/StreamSchedule.cc @@ -415,6 +415,10 @@ namespace edm { if (productModuleLabel.empty()) { //this is a consumesMany request for (auto const& branch : conditionalModuleBranches) { + //check that the conditional module has not been used + if (conditionalModules.find(branch.first) == conditionalModules.end()) { + continue; + } if (ci.kindOfType() == edm::PRODUCT_TYPE) { if (branch.second->unwrappedTypeID() != ci.type()) { continue; @@ -426,11 +430,10 @@ namespace edm { } } - auto condWorker = - getWorker(productModuleLabel, proc_pset, workerManager_, preg, prealloc, processConfiguration); + auto condWorker = getWorker(branch.first, proc_pset, workerManager_, preg, prealloc, processConfiguration); assert(condWorker); - conditionalModules.erase(productModuleLabel); + conditionalModules.erase(branch.first); auto dependents = tryToPlaceConditionalModules(condWorker, conditionalModules, From 466921a1068ec6eb46f5912ca7cfffc0a8a13bab Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Mon, 28 Mar 2022 14:35:36 -0500 Subject: [PATCH 07/10] Improved handling of regular consumes for ConditionalTasks - avoid unnecessary check on alias finding - break out of loop properly --- FWCore/Framework/src/StreamSchedule.cc | 45 +++++++++++++------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/FWCore/Framework/src/StreamSchedule.cc b/FWCore/Framework/src/StreamSchedule.cc index b4e55777701a6..8466ffdef7704 100644 --- a/FWCore/Framework/src/StreamSchedule.cc +++ b/FWCore/Framework/src/StreamSchedule.cc @@ -453,38 +453,39 @@ namespace edm { if (itFound == conditionalModules.end()) { //Check to see if this was an alias auto findAlias = aliasMap.equal_range(productModuleLabel); - if (findAlias.first != findAlias.second) { - for (auto it = findAlias.first; it != findAlias.second; ++it) { - //this was previously filtered so only the conditional modules remain - productModuleLabel = it->second.originalModuleLabel; + for (auto it = findAlias.first; it != findAlias.second; ++it) { + //this was previously filtered so only the conditional modules remain + productModuleLabel = it->second.originalModuleLabel; + if (it->second.instanceLabel == "*" or ci.instance() == it->second.instanceLabel) { if (it->second.friendlyClassName == "*" or (ci.type().friendlyClassName() == it->second.friendlyClassName)) { - if (it->second.instanceLabel == "*" or ci.instance() == it->second.instanceLabel) { - productFromConditionalModule = true; - //need to check the rest of the data product info - break; - } + productFromConditionalModule = true; + //need to check the rest of the data product info + break; } else if (ci.kindOfType() == ELEMENT_TYPE) { //consume is a View so need to do more intrusive search - if (it->second.instanceLabel == "*" or ci.instance() == it->second.instanceLabel) { - //find matching branches in module - auto branches = conditionalModuleBranches.equal_range(productModuleLabel); - for (auto itBranch = branches.first; itBranch != branches.second; ++it) { - if (it->second.originalInstanceLabel == "*" or - itBranch->second->productInstanceName() == it->second.originalInstanceLabel) { - if (typeIsViewCompatible(ci.type(), - TypeID(itBranch->second->wrappedType().typeInfo()), - itBranch->second->className())) { - productFromConditionalModule = true; - break; - } + //find matching branches in module + auto branches = conditionalModuleBranches.equal_range(productModuleLabel); + for (auto itBranch = branches.first; itBranch != branches.second; ++it) { + if (it->second.originalInstanceLabel == "*" or + itBranch->second->productInstanceName() == it->second.originalInstanceLabel) { + if (typeIsViewCompatible(ci.type(), + TypeID(itBranch->second->wrappedType().typeInfo()), + itBranch->second->className())) { + productFromConditionalModule = true; + break; } } } + if (productFromConditionalModule) { + break; + } } } } - itFound = conditionalModules.find(productModuleLabel); + if (productFromConditionalModule) { + itFound = conditionalModules.find(productModuleLabel); + } } else { //need to check the rest of the data product info auto findBranches = conditionalModuleBranches.equal_range(productModuleLabel); From 290304617c55c02efa08699346d4a6568da0b8d0 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Mon, 28 Mar 2022 14:55:09 -0500 Subject: [PATCH 08/10] A module's path bit position can be different from processing order The bit position of a module onto a path does not have to be the same as the modules order to be run on the path. This is needed to allow ConditionalTasks to be added to a path in a set run order but still be able to use the unaltered parameter value holding the list of module labels in order to later find the module labels. All ConditionalTask's modules are added to the end of the parameter and are bracketed by two non-module labels [# and @]. Modules added explicitly to the path are at the front of the parameter and in their run order. The TriggerNamesService knows how to translate the path bit position into the proper module label stored in the parameter. --- FWCore/Framework/interface/Path.h | 1 + .../Framework/interface/TriggerNamesService.h | 4 ++++ FWCore/Framework/interface/WorkerInPath.h | 1 + FWCore/Framework/src/Path.cc | 5 +++-- FWCore/Framework/src/Schedule.cc | 6 ++---- FWCore/Framework/src/StreamSchedule.cc | 10 ++++++++-- FWCore/Framework/src/TriggerNamesService.cc | 18 ++++++++++++++++++ FWCore/Framework/src/TriggerReport.h | 1 + 8 files changed, 38 insertions(+), 8 deletions(-) diff --git a/FWCore/Framework/interface/Path.h b/FWCore/Framework/interface/Path.h index 0cf30bc5950a6..b96e53456efb1 100644 --- a/FWCore/Framework/interface/Path.h +++ b/FWCore/Framework/interface/Path.h @@ -87,6 +87,7 @@ namespace edm { int timesFailed(size_type i) const { return workers_.at(i).timesFailed(); } int timesExcept(size_type i) const { return workers_.at(i).timesExcept(); } Worker const* getWorker(size_type i) const { return workers_.at(i).getWorker(); } + unsigned int bitPosition(size_type i) const { return workers_.at(i).bitPosition(); } void setEarlyDeleteHelpers(std::map const&); diff --git a/FWCore/Framework/interface/TriggerNamesService.h b/FWCore/Framework/interface/TriggerNamesService.h index 1d31fafaf4510..e8f4dc0f2fb54 100644 --- a/FWCore/Framework/interface/TriggerNamesService.h +++ b/FWCore/Framework/interface/TriggerNamesService.h @@ -33,6 +33,7 @@ #include #include #include +#include namespace edm { @@ -83,6 +84,9 @@ namespace edm { std::string const& getTrigPathModule(size_type const i, size_type const j) const { return (modulenames_.at(i)).at(j); } + ///The label is an indicator of a path scheduling construct and not an actual module. + using ScheduingConstructLabelSet = std::unordered_set, std::equal_to<>>; + static ScheduingConstructLabelSet const& schedulingConstructLabels(); Strings const& getEndPathModules(std::string const& name) const { return end_modulenames_.at(find(end_pos_, name)); diff --git a/FWCore/Framework/interface/WorkerInPath.h b/FWCore/Framework/interface/WorkerInPath.h index f57721a6e8b3a..e018a84d566e1 100644 --- a/FWCore/Framework/interface/WorkerInPath.h +++ b/FWCore/Framework/interface/WorkerInPath.h @@ -51,6 +51,7 @@ namespace edm { FilterAction filterAction() const { return filterAction_; } Worker* getWorker() const { return worker_; } bool runConcurrently() const noexcept { return runConcurrently_; } + unsigned int bitPosition() const noexcept { return placeInPathContext_.placeInPath(); } void setPathContext(PathContext const* v) { placeInPathContext_.setPathContext(v); } diff --git a/FWCore/Framework/src/Path.cc b/FWCore/Framework/src/Path.cc index 1f6f11a8872ad..9e9540e548e96 100644 --- a/FWCore/Framework/src/Path.cc +++ b/FWCore/Framework/src/Path.cc @@ -326,10 +326,11 @@ namespace edm { EventTransitionInfo const& iInfo, StreamID const& streamID) { updateCounters(state_); - recordStatus(failedModuleIndex_, state_); + auto failedModuleBitPosition = bitPosition(failedModuleIndex_); + recordStatus(failedModuleBitPosition, state_); // Caught exception is propagated via WaitingTaskList CMS_SA_ALLOW try { - HLTPathStatus status(state_, failedModuleIndex_); + HLTPathStatus status(state_, failedModuleBitPosition); if (pathStatusInserter_) { // pathStatusInserter is null for EndPaths pathStatusInserter_->setPathStatus(streamID, status); diff --git a/FWCore/Framework/src/Schedule.cc b/FWCore/Framework/src/Schedule.cc index 340ea339bf902..802cb7c5a54da 100644 --- a/FWCore/Framework/src/Schedule.cc +++ b/FWCore/Framework/src/Schedule.cc @@ -1042,14 +1042,12 @@ namespace edm { << "Name" << ""; - unsigned int bitpos = 0; for (auto const& mod : p.moduleInPathSummaries) { LogFwkVerbatim("FwkSummary") << "TrigReport " << std::right << std::setw(5) << 1 << std::right << std::setw(5) - << bitpos << " " << std::right << std::setw(10) << mod.timesVisited << " " - << std::right << std::setw(10) << mod.timesPassed << " " << std::right + << mod.bitPosition << " " << std::right << std::setw(10) << mod.timesVisited + << " " << std::right << std::setw(10) << mod.timesPassed << " " << std::right << std::setw(10) << mod.timesFailed << " " << std::right << std::setw(10) << mod.timesExcept << " " << mod.moduleLabel << ""; - ++bitpos; } } diff --git a/FWCore/Framework/src/StreamSchedule.cc b/FWCore/Framework/src/StreamSchedule.cc index 8466ffdef7704..6a3476ba17026 100644 --- a/FWCore/Framework/src/StreamSchedule.cc +++ b/FWCore/Framework/src/StreamSchedule.cc @@ -550,7 +550,12 @@ namespace edm { //An EDAlias may be redirecting to a module on a ConditionalTask std::multimap aliasMap; std::multimap conditionalModsBranches; + std::unordered_map conditionalModOrder; if (itCondBegin != modnames.end()) { + for (auto it = itCondBegin + 1; it != modnames.begin() + modnames.size() - 1; ++it) { + // ordering needs to skip the # token in the path list + conditionalModOrder.emplace(*it, it - modnames.begin() - 1); + } //the last entry should be ignored since it is required to be "@" conditionalmods = std::unordered_set( std::make_move_iterator(itCondBegin + 1), std::make_move_iterator(modnames.begin() + modnames.size() - 1)); @@ -652,8 +657,8 @@ namespace edm { auto condModules = tryToPlaceConditionalModules( worker, conditionalmods, conditionalModsBranches, aliasMap, proc_pset, preg, prealloc, processConfiguration); for (auto condMod : condModules) { - tmpworkers.emplace_back(condMod, WorkerInPath::Ignore, placeInPath, true); - ++placeInPath; + tmpworkers.emplace_back( + condMod, WorkerInPath::Ignore, conditionalModOrder[condMod->description()->moduleLabel()], true); } tmpworkers.emplace_back(worker, filterAction, placeInPath, runConcurrently); @@ -1033,6 +1038,7 @@ namespace edm { sum.timesFailed += path.timesFailed(which); sum.timesExcept += path.timesExcept(which); sum.moduleLabel = path.getWorker(which)->description()->moduleLabel(); + sum.bitPosition = path.bitPosition(which); } static void fillPathSummary(Path const& path, PathSummary& sum) { diff --git a/FWCore/Framework/src/TriggerNamesService.cc b/FWCore/Framework/src/TriggerNamesService.cc index 6fb82a7b2d934..2c9924f74b86c 100644 --- a/FWCore/Framework/src/TriggerNamesService.cc +++ b/FWCore/Framework/src/TriggerNamesService.cc @@ -31,11 +31,24 @@ namespace edm { loadPosMap(end_pos_, end_names_); const unsigned int n(trignames_.size()); + auto const& labelsToRemove = schedulingConstructLabels(); for (unsigned int i = 0; i != n; ++i) { modulenames_.push_back(pset.getParameter(trignames_[i])); + auto& names = modulenames_.back(); + names.erase( + std::remove_if(names.begin(), + names.end(), + [&labelsToRemove](auto const& n) { return labelsToRemove.find(n) != labelsToRemove.end(); }), + names.end()); } for (unsigned int i = 0; i != end_names_.size(); ++i) { end_modulenames_.push_back(pset.getParameter(end_names_[i])); + auto& names = end_modulenames_.back(); + names.erase( + std::remove_if(names.begin(), + names.end(), + [&labelsToRemove](auto const& n) { return labelsToRemove.find(n) != labelsToRemove.end(); }), + names.end()); } } @@ -88,5 +101,10 @@ namespace edm { bool dummy; return getTrigPaths(triggerResults, trigPaths, dummy); } + + TriggerNamesService::ScheduingConstructLabelSet const& TriggerNamesService::schedulingConstructLabels() { + static const ScheduingConstructLabelSet s_set({{"@"}, {"#"}}); + return s_set; + } } // namespace service } // namespace edm diff --git a/FWCore/Framework/src/TriggerReport.h b/FWCore/Framework/src/TriggerReport.h index a3a875367f47a..ea64efbd99cff 100644 --- a/FWCore/Framework/src/TriggerReport.h +++ b/FWCore/Framework/src/TriggerReport.h @@ -26,6 +26,7 @@ namespace edm { int timesPassed = 0; int timesFailed = 0; int timesExcept = 0; + int bitPosition = 0; std::string moduleLabel; }; From 8099e0a907083c9958163f4a725446f83df16a68 Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Tue, 29 Mar 2022 14:30:59 -0500 Subject: [PATCH 09/10] Update HLTConfigData to handle ConditionalTasks --- HLTrigger/HLTcore/src/HLTConfigData.cc | 9 +++ HLTrigger/HLTcore/test/BuildFile.xml | 4 ++ .../HLTcore/test/test_catch2_HLTConfigData.cc | 72 +++++++++++++++++++ 3 files changed, 85 insertions(+) create mode 100644 HLTrigger/HLTcore/test/BuildFile.xml create mode 100644 HLTrigger/HLTcore/test/test_catch2_HLTConfigData.cc diff --git a/HLTrigger/HLTcore/src/HLTConfigData.cc b/HLTrigger/HLTcore/src/HLTConfigData.cc index c5f53652a61eb..ee22c5412dd52 100644 --- a/HLTrigger/HLTcore/src/HLTConfigData.cc +++ b/HLTrigger/HLTcore/src/HLTConfigData.cc @@ -10,6 +10,7 @@ #include "HLTrigger/HLTcore/interface/HLTConfigData.h" #include "FWCore/MessageLogger/interface/MessageLogger.h" +#include #include //Using this function with the 'const static within s_dummyPSet' @@ -117,9 +118,17 @@ void HLTConfigData::extract() { // Obtain module labels of all modules on all trigger paths const unsigned int n(size()); moduleLabels_.reserve(n); + std::unordered_set processDirectives = {{"@"}, {"#"}}; for (unsigned int i = 0; i != n; ++i) { if (processPSet_->existsAs>(triggerNames_[i], true)) { moduleLabels_.push_back(processPSet_->getParameter>(triggerNames_[i])); + auto& ml = moduleLabels_.back(); + ml.erase(std::remove_if(ml.begin(), + ml.end(), + [&processDirectives](auto const& l) { + return processDirectives.find(l) != processDirectives.end(); + }), + ml.end()); } } saveTagsModules_.reserve(n); diff --git a/HLTrigger/HLTcore/test/BuildFile.xml b/HLTrigger/HLTcore/test/BuildFile.xml new file mode 100644 index 0000000000000..63bcf53bb555a --- /dev/null +++ b/HLTrigger/HLTcore/test/BuildFile.xml @@ -0,0 +1,4 @@ + + + + diff --git a/HLTrigger/HLTcore/test/test_catch2_HLTConfigData.cc b/HLTrigger/HLTcore/test/test_catch2_HLTConfigData.cc new file mode 100644 index 0000000000000..95e37903cc88a --- /dev/null +++ b/HLTrigger/HLTcore/test/test_catch2_HLTConfigData.cc @@ -0,0 +1,72 @@ +#define CATCH_CONFIG_MAIN +#include "catch.hpp" + +#include "HLTrigger/HLTcore/interface/HLTConfigData.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" + +namespace { + edm::ParameterSet buildModulePSet(std::string const& iLabel, std::string const& iType, std::string const& iEDMType) { + edm::ParameterSet pset; + pset.addParameter("@module_label", iLabel); + pset.addParameter("@module_type", iType); + pset.addParameter("@module_edm_type", iEDMType); + return pset; + } +} // namespace + +TEST_CASE("Test HLTConfigData", "[HLTConfigData]") { + SECTION("TriggerPaths") { + edm::ParameterSet pset; + pset.addParameter("@process_name", "TEST"); + const std::vector names = {{"b1"}, {"b2"}, {"a1"}, {"z5"}}; + { + edm::ParameterSet tpset; + tpset.addParameter>("@trigger_paths", names); + pset.addParameter("@trigger_paths", tpset); + } + pset.addParameter>("b1", {{"f1"}, {"f2"}}); + pset.addParameter>("b2", {{"f3"}, {"#"}, {"c1"}, {"c2"}, {"@"}}); + pset.addParameter>("a1", {{"f1"}, {"f4"}}); + pset.addParameter>("z5", {{"f5"}}); + + pset.addParameter("f1", buildModulePSet("f1", "F1Filter", "EDFilter")); + pset.addParameter("f2", buildModulePSet("f2", "F2Filter", "EDFilter")); + pset.addParameter("f3", buildModulePSet("f3", "F3Filter", "EDFilter")); + pset.addParameter("f4", buildModulePSet("f4", "F4Filter", "EDFilter")); + pset.addParameter("f5", buildModulePSet("f5", "F5Filter", "EDFilter")); + + pset.addParameter("c1", buildModulePSet("c1", "CProducer", "EDProducer")); + pset.addParameter("c2", buildModulePSet("c2", "CProducer", "EDProducer")); + pset.registerIt(); + + HLTConfigData cd(&pset); + + SECTION("check paths") { + REQUIRE(cd.size() == 4); + REQUIRE(cd.triggerName(0) == names[0]); + REQUIRE(cd.triggerName(1) == names[1]); + REQUIRE(cd.triggerName(2) == names[2]); + REQUIRE(cd.triggerName(3) == names[3]); + //cd.dump("Triggers"); + } + + SECTION("check modules on paths") { + REQUIRE(cd.size(0) == 2); + REQUIRE(cd.moduleLabel(0, 0) == "f1"); + REQUIRE(cd.moduleLabel(0, 1) == "f2"); + + REQUIRE(cd.size(1) == 3); + REQUIRE(cd.moduleLabel(1, 0) == "f3"); + REQUIRE(cd.moduleLabel(1, 1) == "c1"); + REQUIRE(cd.moduleLabel(1, 2) == "c2"); + + REQUIRE(cd.size(2) == 2); + REQUIRE(cd.moduleLabel(2, 0) == "f1"); + REQUIRE(cd.moduleLabel(2, 1) == "f4"); + + REQUIRE(cd.size(3) == 1); + REQUIRE(cd.moduleLabel(3, 0) == "f5"); + //cd.dump("Modules"); + } + } +} From 8b992105533716be1100a6c5d94290ee645a504f Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Wed, 30 Mar 2022 11:25:02 -0500 Subject: [PATCH 10/10] Moved path configuration parsing to a standalone function This allows sharing of this code across two cases. --- .../Framework/interface/TriggerNamesService.h | 4 -- FWCore/Framework/src/TriggerNamesService.cc | 25 +++--------- .../Utilities/interface/path_configuration.h | 40 +++++++++++++++++++ FWCore/Utilities/src/path_configuration.cc | 38 ++++++++++++++++++ HLTrigger/HLTcore/src/HLTConfigData.cc | 12 ++---- 5 files changed, 86 insertions(+), 33 deletions(-) create mode 100644 FWCore/Utilities/interface/path_configuration.h create mode 100644 FWCore/Utilities/src/path_configuration.cc diff --git a/FWCore/Framework/interface/TriggerNamesService.h b/FWCore/Framework/interface/TriggerNamesService.h index e8f4dc0f2fb54..304e290309291 100644 --- a/FWCore/Framework/interface/TriggerNamesService.h +++ b/FWCore/Framework/interface/TriggerNamesService.h @@ -84,10 +84,6 @@ namespace edm { std::string const& getTrigPathModule(size_type const i, size_type const j) const { return (modulenames_.at(i)).at(j); } - ///The label is an indicator of a path scheduling construct and not an actual module. - using ScheduingConstructLabelSet = std::unordered_set, std::equal_to<>>; - static ScheduingConstructLabelSet const& schedulingConstructLabels(); - Strings const& getEndPathModules(std::string const& name) const { return end_modulenames_.at(find(end_pos_, name)); } diff --git a/FWCore/Framework/src/TriggerNamesService.cc b/FWCore/Framework/src/TriggerNamesService.cc index 2c9924f74b86c..453c0a65edc62 100644 --- a/FWCore/Framework/src/TriggerNamesService.cc +++ b/FWCore/Framework/src/TriggerNamesService.cc @@ -11,6 +11,7 @@ #include "FWCore/Utilities/interface/Exception.h" #include "FWCore/Utilities/interface/EDMException.h" #include "FWCore/Utilities/interface/Algorithms.h" +#include "FWCore/Utilities/interface/path_configuration.h" namespace edm { namespace service { @@ -31,24 +32,13 @@ namespace edm { loadPosMap(end_pos_, end_names_); const unsigned int n(trignames_.size()); - auto const& labelsToRemove = schedulingConstructLabels(); for (unsigned int i = 0; i != n; ++i) { - modulenames_.push_back(pset.getParameter(trignames_[i])); - auto& names = modulenames_.back(); - names.erase( - std::remove_if(names.begin(), - names.end(), - [&labelsToRemove](auto const& n) { return labelsToRemove.find(n) != labelsToRemove.end(); }), - names.end()); + modulenames_.push_back( + path_configuration::configurationToModuleBitPosition(pset.getParameter(trignames_[i]))); } for (unsigned int i = 0; i != end_names_.size(); ++i) { - end_modulenames_.push_back(pset.getParameter(end_names_[i])); - auto& names = end_modulenames_.back(); - names.erase( - std::remove_if(names.begin(), - names.end(), - [&labelsToRemove](auto const& n) { return labelsToRemove.find(n) != labelsToRemove.end(); }), - names.end()); + end_modulenames_.push_back( + path_configuration::configurationToModuleBitPosition(pset.getParameter(end_names_[i]))); } } @@ -101,10 +91,5 @@ namespace edm { bool dummy; return getTrigPaths(triggerResults, trigPaths, dummy); } - - TriggerNamesService::ScheduingConstructLabelSet const& TriggerNamesService::schedulingConstructLabels() { - static const ScheduingConstructLabelSet s_set({{"@"}, {"#"}}); - return s_set; - } } // namespace service } // namespace edm diff --git a/FWCore/Utilities/interface/path_configuration.h b/FWCore/Utilities/interface/path_configuration.h new file mode 100644 index 0000000000000..72c5f03eebd19 --- /dev/null +++ b/FWCore/Utilities/interface/path_configuration.h @@ -0,0 +1,40 @@ +#ifndef FWCore_Utilities_path_configuration_h +#define FWCore_Utilities_path_configuration_h +// -*- C++ -*- +// +// Package : FWCore/Utilities +// namespace: path_configuration +// +/**\class path_configuration path_configuration.h "FWCore/Utilities/interface/path_configuration.h" + + Description: Functions used to understand Path configurations + + Usage: + + +*/ +// +// Original Author: Christopher Jones +// Created: Wed, 30 Mar 2022 14:59:29 GMT +// + +// system include files +#include +#include +#include + +// user include files + +// forward declarations + +namespace edm::path_configuration { + ///The label is an indicator of a path scheduling construct and not an actual module. + using SchedulingConstructLabelSet = std::unordered_set, std::equal_to<>>; + SchedulingConstructLabelSet const& schedulingConstructLabels(); + + //Takes the Parameter associated to a given Path and converts it to the list of modules + // in the same order as the Path's position bits + std::vector configurationToModuleBitPosition(std::vector); +} // namespace edm::path_configuration + +#endif diff --git a/FWCore/Utilities/src/path_configuration.cc b/FWCore/Utilities/src/path_configuration.cc new file mode 100644 index 0000000000000..ce27f0c50433d --- /dev/null +++ b/FWCore/Utilities/src/path_configuration.cc @@ -0,0 +1,38 @@ +// -*- C++ -*- +// +// Package: FWCore/Utilities +// namespace : path_configuration +// +// Implementation: +// [Notes on implementation] +// +// Original Author: Christopher Jones +// Created: Wed, 30 Mar 2022 15:04:35 GMT +// + +// system include files +#include + +// user include files +#include "FWCore/Utilities/interface/path_configuration.h" + +namespace edm::path_configuration { + + SchedulingConstructLabelSet const& schedulingConstructLabels() { + static const SchedulingConstructLabelSet s_set({{"@"}, {"#"}}); + return s_set; + } + + //Takes the Parameter associated to a given Path and converts it to the list of modules + // in the same order as the Path's position bits + std::vector configurationToModuleBitPosition(std::vector iConfig) { + auto const& labelsToRemove = schedulingConstructLabels(); + iConfig.erase( + std::remove_if(iConfig.begin(), + iConfig.end(), + [&labelsToRemove](auto const& n) { return labelsToRemove.find(n) != labelsToRemove.end(); }), + iConfig.end()); + return iConfig; + } + +} // namespace edm::path_configuration diff --git a/HLTrigger/HLTcore/src/HLTConfigData.cc b/HLTrigger/HLTcore/src/HLTConfigData.cc index ee22c5412dd52..657abc6fd7803 100644 --- a/HLTrigger/HLTcore/src/HLTConfigData.cc +++ b/HLTrigger/HLTcore/src/HLTConfigData.cc @@ -9,6 +9,7 @@ #include "HLTrigger/HLTcore/interface/HLTConfigData.h" #include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/Utilities/interface/path_configuration.h" #include #include @@ -118,17 +119,10 @@ void HLTConfigData::extract() { // Obtain module labels of all modules on all trigger paths const unsigned int n(size()); moduleLabels_.reserve(n); - std::unordered_set processDirectives = {{"@"}, {"#"}}; for (unsigned int i = 0; i != n; ++i) { if (processPSet_->existsAs>(triggerNames_[i], true)) { - moduleLabels_.push_back(processPSet_->getParameter>(triggerNames_[i])); - auto& ml = moduleLabels_.back(); - ml.erase(std::remove_if(ml.begin(), - ml.end(), - [&processDirectives](auto const& l) { - return processDirectives.find(l) != processDirectives.end(); - }), - ml.end()); + moduleLabels_.push_back(path_configuration::configurationToModuleBitPosition( + processPSet_->getParameter>(triggerNames_[i]))); } } saveTagsModules_.reserve(n);