From 864c800bec39c8ad54b99f42949a4859320945d8 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Fri, 12 Jul 2024 14:46:14 -0500 Subject: [PATCH 1/7] Fix incorrect name for exception --- FWCore/TestProcessor/python/TestProcess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/FWCore/TestProcessor/python/TestProcess.py b/FWCore/TestProcessor/python/TestProcess.py index 79a9b3cb46d3d..5ba1bc2c4538b 100644 --- a/FWCore/TestProcessor/python/TestProcess.py +++ b/FWCore/TestProcessor/python/TestProcess.py @@ -12,7 +12,7 @@ def moduleToTest(self,mod,task=cms.Task()): self._test_endpath = cms.EndPath(mod,task) def fillProcessDesc(self, processPSet): if self.__dict__["_TestProcess__moduleToTest"] is None: - raise LogicError("moduleToTest was not called") + raise RuntimeError("moduleToTest was not called") for p in self.paths.iterkeys(): if p != "_test_path": delattr(self,p) From 078d2cb009777f98048c224d3c274f6731943abd Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Fri, 12 Jul 2024 14:47:50 -0500 Subject: [PATCH 2/7] Added TestSourceProcessor testing helper This is similar to TestProcessor but specialized for testing Sources. --- .../TestProcessor/interface/EventFromSource.h | 68 +++++ .../interface/LuminosityBlockFromSource.h | 70 +++++ .../TestProcessor/interface/RunFromSource.h | 65 +++++ .../interface/TestSourceProcessor.h | 94 +++++++ .../TestProcessor/python/TestSourceProcess.py | 9 + .../TestProcessor/src/TestSourceProcessor.cc | 257 ++++++++++++++++++ FWCore/TestProcessor/test/BuildFile.xml | 2 +- .../test/testsourceprocessor_t.cppunit.cc | 113 ++++++++ 8 files changed, 677 insertions(+), 1 deletion(-) create mode 100644 FWCore/TestProcessor/interface/EventFromSource.h create mode 100644 FWCore/TestProcessor/interface/LuminosityBlockFromSource.h create mode 100644 FWCore/TestProcessor/interface/RunFromSource.h create mode 100644 FWCore/TestProcessor/interface/TestSourceProcessor.h create mode 100644 FWCore/TestProcessor/python/TestSourceProcess.py create mode 100644 FWCore/TestProcessor/src/TestSourceProcessor.cc create mode 100644 FWCore/TestProcessor/test/testsourceprocessor_t.cppunit.cc diff --git a/FWCore/TestProcessor/interface/EventFromSource.h b/FWCore/TestProcessor/interface/EventFromSource.h new file mode 100644 index 0000000000000..ab4f3bca690d4 --- /dev/null +++ b/FWCore/TestProcessor/interface/EventFromSource.h @@ -0,0 +1,68 @@ +#ifndef FWCore_TestProcessor_EventFromSource_h +#define FWCore_TestProcessor_EventFromSource_h +// -*- C++ -*- +// +// Package: FWCore/TestProcessor +// Class : EventFromSource +// +/**\class EventFromSource EventFromSource.h "EventFromSource.h" + + Description: [one line class summary] + + Usage: + + +*/ +// +// Original Author: Chris Jones +// Created: Mon, 30 Apr 2018 18:51:27 GMT +// + +// system include files +#include + +// user include files +#include "FWCore/TestProcessor/interface/TestHandle.h" +#include "FWCore/Framework/interface/EventPrincipal.h" +#include "FWCore/Utilities/interface/TypeID.h" + +// forward declarations + +namespace edm { + class EventFromSourcePrincipal; + + namespace test { + + class EventFromSource { + public: + EventFromSource(EventPrincipal const& iPrincipal) : principal_(&iPrincipal) {} + + // ---------- const member functions --------------------- + template + TestHandle get(std::string const& iModule, + std::string const& iInstanceLabel, + std::string const& iProcess) const { + auto h = principal_->getByLabel( + edm::PRODUCT_TYPE, edm::TypeID(typeid(T)), iModule, iInstanceLabel, iProcess, nullptr, nullptr, nullptr); + if (h.failedToGet()) { + return TestHandle(std::move(h.whyFailedFactory())); + } + void const* basicWrapper = h.wrapper(); + assert(basicWrapper); + Wrapper const* wrapper = static_cast const*>(basicWrapper); + return TestHandle(wrapper->product()); + } + + RunNumber_t run() const { return principal_->run(); } + LuminosityBlockNumber_t luminosityBlock() const { return principal_->luminosityBlock(); } + EventNumber_t event() const { return principal_->aux().event(); } + EventAuxiliary const& aux() const { return principal_->aux(); } + + private: + // ---------- member data -------------------------------- + EventPrincipal const* principal_; + }; + } // namespace test +} // namespace edm + +#endif diff --git a/FWCore/TestProcessor/interface/LuminosityBlockFromSource.h b/FWCore/TestProcessor/interface/LuminosityBlockFromSource.h new file mode 100644 index 0000000000000..71d322fc0ac77 --- /dev/null +++ b/FWCore/TestProcessor/interface/LuminosityBlockFromSource.h @@ -0,0 +1,70 @@ +#ifndef FWCore_TestProcessor_LuminosityBlockFromSource_h +#define FWCore_TestProcessor_LuminosityBlockFromSource_h +// -*- C++ -*- +// +// Package: FWCore/TestProcessor +// Class : LuminosityBlockFromSource +// +/**\class LuminosityBlockFromSource LuminosityBlockFromSource.h "LuminosityBlockFromSource.h" + + Description: [one line class summary] + + Usage: + + +*/ +// +// Original Author: Chris Jones +// Created: Mon, 30 Apr 2018 18:51:27 GMT +// + +// system include files +#include + +// user include files +#include "FWCore/TestProcessor/interface/TestHandle.h" +#include "FWCore/Framework/interface/LuminosityBlockPrincipal.h" +#include "FWCore/Utilities/interface/TypeID.h" + +// forward declarations + +namespace edm { + + namespace test { + + class LuminosityBlockFromSource { + public: + LuminosityBlockFromSource(std::shared_ptr iPrincipal) : principal_(iPrincipal) {} + + // ---------- const member functions --------------------- + template + TestHandle get(std::string const& iModule, + std::string const& iInstanceLabel, + std::string const& iProcess) const { + auto h = principal_->getByLabel( + edm::PRODUCT_TYPE, edm::TypeID(typeid(T)), iModule, iInstanceLabel, iProcess, nullptr, nullptr, nullptr); + if (h.failedToGet()) { + return TestHandle(std::move(h.whyFailedFactory())); + } + void const* basicWrapper = h.wrapper(); + assert(basicWrapper); + Wrapper const* wrapper = static_cast const*>(basicWrapper); + return TestHandle(wrapper->product()); + } + + RunNumber_t run() const { return principal_->run(); } + LuminosityBlockNumber_t luminosityBlock() const { return principal_->luminosityBlock(); } + LuminosityBlockAuxiliary const& aux() const { return principal_->aux(); } + + // ---------- static member functions -------------------- + + // ---------- member functions --------------------------- + + private: + // ---------- member data -------------------------------- + std::shared_ptr principal_; + }; + } // namespace test +} // namespace edm + +#endif diff --git a/FWCore/TestProcessor/interface/RunFromSource.h b/FWCore/TestProcessor/interface/RunFromSource.h new file mode 100644 index 0000000000000..4e9e2543f45a2 --- /dev/null +++ b/FWCore/TestProcessor/interface/RunFromSource.h @@ -0,0 +1,65 @@ +#ifndef FWCore_TestProcessor_RunFromSource_h +#define FWCore_TestProcessor_RunFromSource_h +// -*- C++ -*- +// +// Package: FWCore/TestProcessor +// Class : RunFromSource +// +/**\class RunFromSource RunFromSource.h "RunFromSource.h" + + Description: [one line class summary] + + Usage: + + +*/ +// +// Original Author: Chris Jones +// Created: Mon, 30 Apr 2018 18:51:27 GMT +// + +// system include files +#include + +// user include files +#include "FWCore/TestProcessor/interface/TestHandle.h" +#include "FWCore/Framework/interface/RunPrincipal.h" +#include "FWCore/Utilities/interface/TypeID.h" + +// forward declarations + +namespace edm { + + namespace test { + + class RunFromSource { + public: + RunFromSource(std::shared_ptr iPrincipal) : principal_(iPrincipal) {} + + // ---------- const member functions --------------------- + template + TestHandle get(std::string const& iModule, + std::string const& iInstanceLabel, + std::string const& iProcess) const { + auto h = principal_->getByLabel( + edm::PRODUCT_TYPE, edm::TypeID(typeid(T)), iModule, iInstanceLabel, iProcess, nullptr, nullptr, nullptr); + if (h.failedToGet()) { + return TestHandle(std::move(h.whyFailedFactory())); + } + void const* basicWrapper = h.wrapper(); + assert(basicWrapper); + Wrapper const* wrapper = static_cast const*>(basicWrapper); + return TestHandle(wrapper->product()); + } + + RunNumber_t run() const { return principal_->run(); } + RunAuxiliary const& aux() const { return principal_->aux(); } + + private: + // ---------- member data -------------------------------- + std::shared_ptr principal_; + }; + } // namespace test +} // namespace edm + +#endif diff --git a/FWCore/TestProcessor/interface/TestSourceProcessor.h b/FWCore/TestProcessor/interface/TestSourceProcessor.h new file mode 100644 index 0000000000000..e0cc8c2465431 --- /dev/null +++ b/FWCore/TestProcessor/interface/TestSourceProcessor.h @@ -0,0 +1,94 @@ +#ifndef FWCore_TestProcessor_TestSourceProcessor_h +#define FWCore_TestProcessor_TestSourceProcessor_h +// -*- C++ -*- +// +// Package: FWCore/TestProcessor +// Class : TestSourceProcessor +// +/**\class TestSourceProcessor TestSourceProcessor.h "TestSourceProcessor.h" + + Description: Used for testing InputSources + + Usage: + + +*/ +// +// Original Author: Chris Jones +// Created: Mon, 30 Apr 2018 18:51:00 GMT +// +#include +#include +#include +#include "oneapi/tbb/global_control.h" +#include "oneapi/tbb/task_arena.h" +#include "oneapi/tbb/task_group.h" + +#include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h" + +#include "FWCore/Common/interface/FWCoreCommonFwd.h" + +#include "FWCore/Framework/interface/HistoryAppender.h" +#include "FWCore/Framework/interface/InputSource.h" +#include "FWCore/Framework/interface/SharedResourcesAcquirer.h" +#include "FWCore/Framework/interface/PrincipalCache.h" +#include "FWCore/Framework/interface/SignallingProductRegistry.h" +#include "FWCore/Framework/interface/PreallocationConfiguration.h" +#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" +#include "FWCore/ServiceRegistry/interface/ProcessContext.h" +#include "FWCore/ServiceRegistry/interface/ServiceLegacy.h" +#include "FWCore/ServiceRegistry/interface/ServiceToken.h" + +#include "FWCore/TestProcessor/interface/EventFromSource.h" +#include "FWCore/TestProcessor/interface/LuminosityBlockFromSource.h" +#include "FWCore/TestProcessor/interface/ProcessBlock.h" +#include "FWCore/TestProcessor/interface/RunFromSource.h" + +namespace edm::test { + + class TestSourceProcessor { + public: + TestSourceProcessor(std::string const& iConfig, ServiceToken iToken = ServiceToken()); + + InputSource::ItemTypeInfo findNextTransition(); + + std::shared_ptr openFile(); + void closeFile(std::shared_ptr); + + edm::test::RunFromSource readRun(); + + edm::test::LuminosityBlockFromSource readLuminosityBlock(); + + edm::test::EventFromSource readEvent(); + + private: + std::unique_ptr source_; + edm::InputSource::ItemTypeInfo lastTransition_; + + oneapi::tbb::global_control globalControl_; + oneapi::tbb::task_group taskGroup_; + oneapi::tbb::task_arena arena_; + std::shared_ptr actReg_; // We do not use propagate_const because the registry itself is mutable. + std::shared_ptr preg_; + std::shared_ptr branchIDListHelper_; + std::shared_ptr processBlockHelper_; + std::shared_ptr thinnedAssociationsHelper_; + ServiceToken serviceToken_; + + std::shared_ptr processConfiguration_; + ProcessContext processContext_; + + ProcessHistoryRegistry processHistoryRegistry_; + std::unique_ptr historyAppender_; + + PrincipalCache principalCache_; + PreallocationConfiguration preallocations_; + + std::shared_ptr runPrincipal_; + std::shared_ptr lumiPrincipal_; + + std::shared_ptr fb_; + }; +} // namespace edm::test + +#endif \ No newline at end of file diff --git a/FWCore/TestProcessor/python/TestSourceProcess.py b/FWCore/TestProcessor/python/TestSourceProcess.py new file mode 100644 index 0000000000000..bfa991788fe58 --- /dev/null +++ b/FWCore/TestProcessor/python/TestSourceProcess.py @@ -0,0 +1,9 @@ +import FWCore.ParameterSet.Config as cms + +class TestSourceProcess(cms.Process): + def __init__(self,name="TEST",*modifiers): + super(TestSourceProcess,self).__init__(name,*modifiers) + def fillProcessDesc(self, processPSet): + if not hasattr(self,"options"): + self.options = cms.untracked.PSet() + cms.Process.fillProcessDesc(self,processPSet) diff --git a/FWCore/TestProcessor/src/TestSourceProcessor.cc b/FWCore/TestProcessor/src/TestSourceProcessor.cc new file mode 100644 index 0000000000000..ee7662a6fd796 --- /dev/null +++ b/FWCore/TestProcessor/src/TestSourceProcessor.cc @@ -0,0 +1,257 @@ +#include "FWCore/TestProcessor/interface/TestSourceProcessor.h" + +#include "FWCore/Framework/interface/ScheduleItems.h" +#include "FWCore/Framework/interface/EventPrincipal.h" +#include "FWCore/Framework/interface/LuminosityBlockPrincipal.h" +#include "FWCore/Framework/interface/ProcessBlockPrincipal.h" +#include "FWCore/Framework/interface/RunPrincipal.h" +#include "FWCore/Framework/interface/DelayedReader.h" +#include "FWCore/Framework/interface/InputSourceDescription.h" +#include "FWCore/Framework/interface/maker/InputSourceFactory.h" + +#include "FWCore/Common/interface/ProcessBlockHelper.h" + +#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h" +#include "FWCore/ServiceRegistry/interface/SystemBounds.h" + +#include "FWCore/PluginManager/interface/PluginManager.h" +#include "FWCore/PluginManager/interface/standard.h" + +#include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerBase.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescriptionFillerPluginFactory.h" +#include "FWCore/ParameterSetReader/interface/ProcessDescImpl.h" +#include "FWCore/ParameterSet/interface/ProcessDesc.h" +#include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h" + +#include "FWCore/Concurrency/interface/ThreadsController.h" +#include "FWCore/Concurrency/interface/FinalWaitingTask.h" + +#include "DataFormats/Provenance/interface/ParentageRegistry.h" +namespace { + using namespace edm; + + std::string name(edm::InputSource::ItemType iType) { + switch (iType) { + case edm::InputSource::ItemType::IsEvent: + return "Event"; + case edm::InputSource::ItemType::IsFile: + return "File"; + case edm::InputSource::ItemType::IsLumi: + return "LuminosityBlock"; + case edm::InputSource::ItemType::IsRepeat: + return "Repeat"; + case edm::InputSource::ItemType::IsStop: + return "Stop"; + case edm::InputSource::ItemType::IsRun: + return "Run"; + case edm::InputSource::ItemType::IsSynchronize: + return "Synchronize"; + case edm::InputSource::ItemType::IsInvalid: + return "Invalid"; + } + return "Invalid"; + } + // --------------------------------------------------------------- + std::unique_ptr makeInput(unsigned int moduleIndex, + ParameterSet& params, + std::shared_ptr preg, + std::shared_ptr branchIDListHelper, + std::shared_ptr const& processBlockHelper, + std::shared_ptr thinnedAssociationsHelper, + std::shared_ptr areg, + std::shared_ptr processConfiguration, + PreallocationConfiguration const& allocations) { + ParameterSet* main_input = params.getPSetForUpdate("@main_input"); + if (main_input == nullptr) { + throw Exception(errors::Configuration) + << "There must be exactly one source in the configuration.\n" + << "It is missing (or there are sufficient syntax errors such that it is not recognized as the source)\n"; + } + + std::string modtype(main_input->getParameter("@module_type")); + + std::unique_ptr filler( + ParameterSetDescriptionFillerPluginFactory::get()->create(modtype)); + ConfigurationDescriptions descriptions(filler->baseType(), modtype); + filler->fill(descriptions); + + descriptions.validate(*main_input, std::string("source")); + + main_input->registerIt(); + + // Fill in "ModuleDescription", in case the input source produces + // any EDProducts, which would be registered in the ProductRegistry. + // Also fill in the process history item for this process. + // There is no module label for the unnamed input source, so + // just use "source". + // Only the tracked parameters belong in the process configuration. + ModuleDescription md(main_input->id(), + main_input->getParameter("@module_type"), + "source", + processConfiguration.get(), + moduleIndex); + + InputSourceDescription isdesc( + md, preg, branchIDListHelper, processBlockHelper, thinnedAssociationsHelper, areg, -1, -1, 0, allocations); + + return std::unique_ptr(InputSourceFactory::get()->makeInputSource(*main_input, isdesc).release()); + } +} // namespace + +namespace edm::test { + + TestSourceProcessor::TestSourceProcessor(std::string const& iConfig, ServiceToken iToken) + : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1), + arena_(1), + historyAppender_(std::make_unique()) { + ProcessDescImpl desc(iConfig, false); + + auto psetPtr = desc.parameterSet(); + + validateTopLevelParameterSets(psetPtr.get()); + + auto procDesc = desc.processDesc(); + // Now do general initialization + ScheduleItems items; + + //initialize the services + auto& serviceSets = procDesc->getServicesPSets(); + ServiceToken token = items.initServices(serviceSets, *psetPtr, iToken, serviceregistry::kOverlapIsError, true); + serviceToken_ = items.addCPRandTNS(*psetPtr, token); + + //make the services available + ServiceRegistry::Operate operate(serviceToken_); + + // intialize miscellaneous items + items.initMisc(*psetPtr); + + auto nThreads = 1U; + auto nStreams = 1U; + auto nConcurrentLumis = 1U; + auto nConcurrentRuns = 1U; + preallocations_ = PreallocationConfiguration{nThreads, nStreams, nConcurrentLumis, nConcurrentRuns}; + + processBlockHelper_ = std::make_shared(); + + { + // initialize the input source + auto tempReg = std::make_shared(); + auto sourceID = ModuleDescription::getUniqueID(); + + ServiceRegistry::Operate operate(serviceToken_); + source_ = makeInput(sourceID, + *psetPtr, + /*items.preg(),*/ tempReg, + items.branchIDListHelper(), + processBlockHelper_, + items.thinnedAssociationsHelper(), + items.actReg_, + items.processConfiguration(), + preallocations_); + items.preg()->addFromInput(*tempReg); + source_->switchTo(items.preg()); + } + + actReg_ = items.actReg_; + branchIDListHelper_ = items.branchIDListHelper(); + thinnedAssociationsHelper_ = items.thinnedAssociationsHelper(); + processContext_.setProcessConfiguration(processConfiguration_.get()); + preg_ = items.preg(); + principalCache_.setNumberOfConcurrentPrincipals(preallocations_); + + preg_->setFrozen(); + + for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) { + // Reusable event principal + auto ep = std::make_shared( + preg_, branchIDListHelper_, thinnedAssociationsHelper_, *processConfiguration_, historyAppender_.get(), index); + principalCache_.insert(std::move(ep)); + } + for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) { + auto rp = std::make_unique(preg_, *processConfiguration_, historyAppender_.get(), index); + principalCache_.insert(std::move(rp)); + } + for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) { + auto lp = + std::make_unique(preg_, *processConfiguration_, historyAppender_.get(), index); + principalCache_.insert(std::move(lp)); + } + { + auto pb = std::make_unique(preg_, *processConfiguration_); + principalCache_.insert(std::move(pb)); + } + + source_->doBeginJob(); + } + + edm::InputSource::ItemTypeInfo TestSourceProcessor::findNextTransition() { + lastTransition_ = source_->nextItemType(); + return lastTransition_; + } + + std::shared_ptr TestSourceProcessor::openFile() { + size_t size = preg_->size(); + fb_ = source_->readFile(); + if (size < preg_->size()) { + principalCache_.adjustIndexesAfterProductRegistryAddition(); + } + principalCache_.adjustEventsToNewProductRegistry(preg_); + + source_->fillProcessBlockHelper(); + ProcessBlockPrincipal& processBlockPrincipal = principalCache_.inputProcessBlockPrincipal(); + while (source_->nextProcessBlock(processBlockPrincipal)) { + source_->readProcessBlock(processBlockPrincipal); + processBlockPrincipal.clearPrincipal(); + } + return fb_; + } + void TestSourceProcessor::closeFile(std::shared_ptr iBlock) { + if (iBlock.get() != fb_.get()) { + throw cms::Exception("IncorrectFileBlock") + << "closeFile given a FileBlock that does not correspond to the one returned by openFile"; + } + if (fb_) { + source_->closeFile(fb_.get(), false); + } + } + + edm::test::RunFromSource TestSourceProcessor::readRun() { + if (lastTransition_.itemType() != edm::InputSource::ItemType::IsRun) { + throw cms::Exception("NotARun") << "The last transition is " << name(lastTransition_.itemType()) << " not a Run"; + } + //NOTE: should probably handle merging as well + runPrincipal_ = principalCache_.getAvailableRunPrincipalPtr(); + runPrincipal_->setAux(*source_->runAuxiliary()); + source_->readRun(*runPrincipal_, *historyAppender_); + + return edm::test::RunFromSource(runPrincipal_); + } + + edm::test::LuminosityBlockFromSource TestSourceProcessor::readLuminosityBlock() { + if (lastTransition_.itemType() != edm::InputSource::ItemType::IsLumi) { + throw cms::Exception("NotALuminosityBlock") + << "The last transition is " << name(lastTransition_.itemType()) << " not a LuminosityBlock"; + } + + lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr(); + assert(lumiPrincipal_); + lumiPrincipal_->setAux(*source_->luminosityBlockAuxiliary()); + source_->readLuminosityBlock(*lumiPrincipal_, *historyAppender_); + + return edm::test::LuminosityBlockFromSource(lumiPrincipal_); + } + + edm::test::EventFromSource TestSourceProcessor::readEvent() { + if (lastTransition_.itemType() != edm::InputSource::ItemType::IsEvent) { + throw cms::Exception("NotAnEvent") << "The last transition is " << name(lastTransition_.itemType()) + << " not a Event"; + } + + auto& event = principalCache_.eventPrincipal(0); + StreamContext streamContext(event.streamID(), &processContext_); + + source_->readEvent(event, streamContext); + + return edm::test::EventFromSource(event); + } +} // namespace edm::test \ No newline at end of file diff --git a/FWCore/TestProcessor/test/BuildFile.xml b/FWCore/TestProcessor/test/BuildFile.xml index 3f5e2e888c964..46bbd9b6d6c48 100644 --- a/FWCore/TestProcessor/test/BuildFile.xml +++ b/FWCore/TestProcessor/test/BuildFile.xml @@ -1,4 +1,4 @@ - + diff --git a/FWCore/TestProcessor/test/testsourceprocessor_t.cppunit.cc b/FWCore/TestProcessor/test/testsourceprocessor_t.cppunit.cc new file mode 100644 index 0000000000000..a9565a86d28aa --- /dev/null +++ b/FWCore/TestProcessor/test/testsourceprocessor_t.cppunit.cc @@ -0,0 +1,113 @@ +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/TestProcessor/interface/TestSourceProcessor.h" + +#include + +class testTestSourceProcessor : public CppUnit::TestFixture { + CPPUNIT_TEST_SUITE(testTestSourceProcessor); + CPPUNIT_TEST(simpleTest); + + CPPUNIT_TEST_SUITE_END(); + +public: + void setUp() {} + void tearDown() {} + void simpleTest(); + +private: +}; + +///registration of the test so that the runner can find it +CPPUNIT_TEST_SUITE_REGISTRATION(testTestSourceProcessor); + +void testTestSourceProcessor::simpleTest() { + char const* kTest = + "from FWCore.TestProcessor.TestSourceProcess import *\n" + "process = TestSourceProcess()\n" + "process.source = cms.Source('TestSource'," + "transitions = cms.untracked.VPSet(\n" + "cms.PSet(type = cms.untracked.string('IsFile'),\n" + " id = cms.untracked.EventID(0,0,0)),\n" + "cms.PSet(type = cms.untracked.string('IsRun'),\n" + " id = cms.untracked.EventID(1,0,0)),\n" + "cms.PSet(type = cms.untracked.string('IsLumi'),\n" + " id = cms.untracked.EventID(1,1,0)),\n" + "cms.PSet(type = cms.untracked.string('IsEvent'),\n" + " id = cms.untracked.EventID(1,1,1)),\n" + "cms.PSet(type = cms.untracked.string('IsEvent'),\n" + " id = cms.untracked.EventID(1,1,2)),\n" + "cms.PSet(type = cms.untracked.string('IsEvent'),\n" + " id = cms.untracked.EventID(1,1,3)),\n" + "cms.PSet(type = cms.untracked.string('IsEvent'),\n" + " id = cms.untracked.EventID(1,1,4)),\n" + "cms.PSet(type = cms.untracked.string('IsEvent'),\n" + " id = cms.untracked.EventID(1,1,5)),\n" + "cms.PSet(type = cms.untracked.string('IsStop'),\n" + " id = cms.untracked.EventID(0,0,0))\n" + "))\n"; + edm::test::TestSourceProcessor tester(kTest); + + { + auto n = tester.findNextTransition(); + CPPUNIT_ASSERT(n == edm::InputSource::ItemType::IsFile); + auto f = tester.openFile(); + CPPUNIT_ASSERT(bool(f)); + } + { + auto n = tester.findNextTransition(); + CPPUNIT_ASSERT(n == edm::InputSource::ItemType::IsRun); + auto r = tester.readRun(); + CPPUNIT_ASSERT(r.run() == 1); + } + { + auto n = tester.findNextTransition(); + CPPUNIT_ASSERT(n == edm::InputSource::ItemType::IsLumi); + auto r = tester.readLuminosityBlock(); + CPPUNIT_ASSERT(r.run() == 1); + CPPUNIT_ASSERT(r.luminosityBlock() == 1); + } + { + auto n = tester.findNextTransition(); + CPPUNIT_ASSERT(n == edm::InputSource::ItemType::IsEvent); + auto r = tester.readEvent(); + CPPUNIT_ASSERT(r.run() == 1); + CPPUNIT_ASSERT(r.luminosityBlock() == 1); + CPPUNIT_ASSERT(r.event() == 1); + } + { + auto n = tester.findNextTransition(); + CPPUNIT_ASSERT(n == edm::InputSource::ItemType::IsEvent); + auto r = tester.readEvent(); + CPPUNIT_ASSERT(r.run() == 1); + CPPUNIT_ASSERT(r.luminosityBlock() == 1); + CPPUNIT_ASSERT(r.event() == 2); + } + { + auto n = tester.findNextTransition(); + CPPUNIT_ASSERT(n == edm::InputSource::ItemType::IsEvent); + auto r = tester.readEvent(); + CPPUNIT_ASSERT(r.run() == 1); + CPPUNIT_ASSERT(r.luminosityBlock() == 1); + CPPUNIT_ASSERT(r.event() == 3); + } + { + auto n = tester.findNextTransition(); + CPPUNIT_ASSERT(n == edm::InputSource::ItemType::IsEvent); + auto r = tester.readEvent(); + CPPUNIT_ASSERT(r.run() == 1); + CPPUNIT_ASSERT(r.luminosityBlock() == 1); + CPPUNIT_ASSERT(r.event() == 4); + } + { + auto n = tester.findNextTransition(); + CPPUNIT_ASSERT(n == edm::InputSource::ItemType::IsEvent); + auto r = tester.readEvent(); + CPPUNIT_ASSERT(r.run() == 1); + CPPUNIT_ASSERT(r.luminosityBlock() == 1); + CPPUNIT_ASSERT(r.event() == 5); + } + { + auto n = tester.findNextTransition(); + CPPUNIT_ASSERT(n == edm::InputSource::ItemType::IsStop); + } +} \ No newline at end of file From ff4d600d11051573dbd9ad8987a8d64f1e5745b4 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Mon, 15 Jul 2024 09:54:57 -0500 Subject: [PATCH 3/7] Share one time initialization between testing frameworks --- FWCore/TestProcessor/src/TestProcessor.cc | 29 ++--------------- .../TestProcessor/src/TestSourceProcessor.cc | 6 ++++ .../src/oneTimeInitialization.cc | 31 +++++++++++++++++++ .../TestProcessor/src/oneTimeInitialization.h | 9 ++++++ 4 files changed, 48 insertions(+), 27 deletions(-) create mode 100644 FWCore/TestProcessor/src/oneTimeInitialization.cc create mode 100644 FWCore/TestProcessor/src/oneTimeInitialization.h diff --git a/FWCore/TestProcessor/src/TestProcessor.cc b/FWCore/TestProcessor/src/TestProcessor.cc index 6662bea40e534..001f35bb3d289 100644 --- a/FWCore/TestProcessor/src/TestProcessor.cc +++ b/FWCore/TestProcessor/src/TestProcessor.cc @@ -38,19 +38,15 @@ #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h" #include "FWCore/ServiceRegistry/interface/SystemBounds.h" -#include "FWCore/PluginManager/interface/PluginManager.h" -#include "FWCore/PluginManager/interface/standard.h" - #include "FWCore/ParameterSetReader/interface/ProcessDescImpl.h" #include "FWCore/ParameterSet/interface/ProcessDesc.h" #include "FWCore/ParameterSet/interface/validateTopLevelParameterSets.h" #include "FWCore/Utilities/interface/ExceptionCollector.h" -#include "FWCore/Concurrency/interface/ThreadsController.h" #include "FWCore/Concurrency/interface/FinalWaitingTask.h" -#include "DataFormats/Provenance/interface/ParentageRegistry.h" +#include "oneTimeInitialization.h" #define xstr(s) str(s) #define str(s) #s @@ -58,27 +54,6 @@ namespace edm { namespace test { - namespace { - - bool oneTimeInitializationImpl() { - edmplugin::PluginManager::configure(edmplugin::standard::config()); - - static std::unique_ptr tsiPtr = std::make_unique(1); - - // register the empty parentage vector , once and for all - ParentageRegistry::instance()->insertMapped(Parentage()); - - // register the empty parameter set, once and for all. - ParameterSet().registerIt(); - return true; - } - - bool oneTimeInitialization() { - static const bool s_init{oneTimeInitializationImpl()}; - return s_init; - } - } // namespace - // // constructors and destructor // @@ -88,7 +63,7 @@ namespace edm { historyAppender_(std::make_unique()), moduleRegistry_(std::make_shared()) { //Setup various singletons - (void)oneTimeInitialization(); + (void)testprocessor::oneTimeInitialization(); ProcessDescImpl desc(iConfig.pythonConfiguration(), false); diff --git a/FWCore/TestProcessor/src/TestSourceProcessor.cc b/FWCore/TestProcessor/src/TestSourceProcessor.cc index ee7662a6fd796..cee0d30f4fa80 100644 --- a/FWCore/TestProcessor/src/TestSourceProcessor.cc +++ b/FWCore/TestProcessor/src/TestSourceProcessor.cc @@ -27,6 +27,9 @@ #include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include "DataFormats/Provenance/interface/ParentageRegistry.h" + +#include "oneTimeInitialization.h" + namespace { using namespace edm; @@ -104,6 +107,9 @@ namespace edm::test { : globalControl_(oneapi::tbb::global_control::max_allowed_parallelism, 1), arena_(1), historyAppender_(std::make_unique()) { + //Setup various singletons + (void)testprocessor::oneTimeInitialization(); + ProcessDescImpl desc(iConfig, false); auto psetPtr = desc.parameterSet(); diff --git a/FWCore/TestProcessor/src/oneTimeInitialization.cc b/FWCore/TestProcessor/src/oneTimeInitialization.cc new file mode 100644 index 0000000000000..5db30ac31d8ae --- /dev/null +++ b/FWCore/TestProcessor/src/oneTimeInitialization.cc @@ -0,0 +1,31 @@ + +#include "oneTimeInitialization.h" + +#include "FWCore/PluginManager/interface/PluginManager.h" +#include "FWCore/PluginManager/interface/standard.h" +#include "FWCore/Concurrency/interface/ThreadsController.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "DataFormats/Provenance/interface/ParentageRegistry.h" + +namespace { + + bool oneTimeInitializationImpl() { + edmplugin::PluginManager::configure(edmplugin::standard::config()); + + static std::unique_ptr tsiPtr = std::make_unique(1); + + // register the empty parentage vector , once and for all + edm::ParentageRegistry::instance()->insertMapped(edm::Parentage()); + + // register the empty parameter set, once and for all. + edm::ParameterSet().registerIt(); + return true; + } +} //namespace + +namespace edm::testprocessor { + bool oneTimeInitialization() { + static const bool s_init{oneTimeInitializationImpl()}; + return s_init; + } +} // namespace edm::testprocessor diff --git a/FWCore/TestProcessor/src/oneTimeInitialization.h b/FWCore/TestProcessor/src/oneTimeInitialization.h new file mode 100644 index 0000000000000..9dfd856c1a123 --- /dev/null +++ b/FWCore/TestProcessor/src/oneTimeInitialization.h @@ -0,0 +1,9 @@ +#if !defined(FWCore_TestProcessor_oneTimeInitialization_h) +#define FWCore_TestProcessor_oneTimeInitialization_h + +//Used to initialize the plugin manager +namespace edm::testprocessor { + bool oneTimeInitialization(); +} + +#endif \ No newline at end of file From 87efee38b5f2c824503c468547c854e4d17c519c Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Wed, 17 Jul 2024 08:34:27 -0500 Subject: [PATCH 4/7] Fix meta-data and Service issues in TestSourceProcessor Testing using PoolSource uncovered these issues. --- .../TestProcessor/interface/EventFromSource.h | 8 +++- .../interface/LuminosityBlockFromSource.h | 8 +++- .../TestProcessor/interface/RunFromSource.h | 8 +++- .../interface/TestSourceProcessor.h | 7 +++- .../TestProcessor/src/TestSourceProcessor.cc | 39 +++++++++++++++++-- 5 files changed, 62 insertions(+), 8 deletions(-) diff --git a/FWCore/TestProcessor/interface/EventFromSource.h b/FWCore/TestProcessor/interface/EventFromSource.h index ab4f3bca690d4..6be85eba0f674 100644 --- a/FWCore/TestProcessor/interface/EventFromSource.h +++ b/FWCore/TestProcessor/interface/EventFromSource.h @@ -25,6 +25,8 @@ #include "FWCore/TestProcessor/interface/TestHandle.h" #include "FWCore/Framework/interface/EventPrincipal.h" #include "FWCore/Utilities/interface/TypeID.h" +#include "FWCore/ServiceRegistry/interface/ServiceToken.h" +#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h" // forward declarations @@ -35,13 +37,16 @@ namespace edm { class EventFromSource { public: - EventFromSource(EventPrincipal const& iPrincipal) : principal_(&iPrincipal) {} + EventFromSource(EventPrincipal const& iPrincipal, edm::ServiceToken iToken) + : principal_(&iPrincipal), token_(iToken) {} // ---------- const member functions --------------------- template TestHandle get(std::string const& iModule, std::string const& iInstanceLabel, std::string const& iProcess) const { + ServiceRegistry::Operate operate(token_); + auto h = principal_->getByLabel( edm::PRODUCT_TYPE, edm::TypeID(typeid(T)), iModule, iInstanceLabel, iProcess, nullptr, nullptr, nullptr); if (h.failedToGet()) { @@ -61,6 +66,7 @@ namespace edm { private: // ---------- member data -------------------------------- EventPrincipal const* principal_; + edm::ServiceToken token_; }; } // namespace test } // namespace edm diff --git a/FWCore/TestProcessor/interface/LuminosityBlockFromSource.h b/FWCore/TestProcessor/interface/LuminosityBlockFromSource.h index 71d322fc0ac77..85632d4475bc7 100644 --- a/FWCore/TestProcessor/interface/LuminosityBlockFromSource.h +++ b/FWCore/TestProcessor/interface/LuminosityBlockFromSource.h @@ -25,6 +25,8 @@ #include "FWCore/TestProcessor/interface/TestHandle.h" #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h" #include "FWCore/Utilities/interface/TypeID.h" +#include "FWCore/ServiceRegistry/interface/ServiceToken.h" +#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h" // forward declarations @@ -34,13 +36,16 @@ namespace edm { class LuminosityBlockFromSource { public: - LuminosityBlockFromSource(std::shared_ptr iPrincipal) : principal_(iPrincipal) {} + LuminosityBlockFromSource(std::shared_ptr iPrincipal, edm::ServiceToken iToken) + : principal_(iPrincipal), token_(iToken) {} // ---------- const member functions --------------------- template TestHandle get(std::string const& iModule, std::string const& iInstanceLabel, std::string const& iProcess) const { + ServiceRegistry::Operate operate(token_); + auto h = principal_->getByLabel( edm::PRODUCT_TYPE, edm::TypeID(typeid(T)), iModule, iInstanceLabel, iProcess, nullptr, nullptr, nullptr); if (h.failedToGet()) { @@ -63,6 +68,7 @@ namespace edm { private: // ---------- member data -------------------------------- std::shared_ptr principal_; + edm::ServiceToken token_; }; } // namespace test } // namespace edm diff --git a/FWCore/TestProcessor/interface/RunFromSource.h b/FWCore/TestProcessor/interface/RunFromSource.h index 4e9e2543f45a2..9123c26ddd2ef 100644 --- a/FWCore/TestProcessor/interface/RunFromSource.h +++ b/FWCore/TestProcessor/interface/RunFromSource.h @@ -25,6 +25,8 @@ #include "FWCore/TestProcessor/interface/TestHandle.h" #include "FWCore/Framework/interface/RunPrincipal.h" #include "FWCore/Utilities/interface/TypeID.h" +#include "FWCore/ServiceRegistry/interface/ServiceToken.h" +#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h" // forward declarations @@ -34,13 +36,16 @@ namespace edm { class RunFromSource { public: - RunFromSource(std::shared_ptr iPrincipal) : principal_(iPrincipal) {} + RunFromSource(std::shared_ptr iPrincipal, edm::ServiceToken iToken) + : principal_(iPrincipal), token_(iToken) {} // ---------- const member functions --------------------- template TestHandle get(std::string const& iModule, std::string const& iInstanceLabel, std::string const& iProcess) const { + ServiceRegistry::Operate operate(token_); + auto h = principal_->getByLabel( edm::PRODUCT_TYPE, edm::TypeID(typeid(T)), iModule, iInstanceLabel, iProcess, nullptr, nullptr, nullptr); if (h.failedToGet()) { @@ -58,6 +63,7 @@ namespace edm { private: // ---------- member data -------------------------------- std::shared_ptr principal_; + edm::ServiceToken token_; }; } // namespace test } // namespace edm diff --git a/FWCore/TestProcessor/interface/TestSourceProcessor.h b/FWCore/TestProcessor/interface/TestSourceProcessor.h index e0cc8c2465431..bd79a9c217663 100644 --- a/FWCore/TestProcessor/interface/TestSourceProcessor.h +++ b/FWCore/TestProcessor/interface/TestSourceProcessor.h @@ -34,6 +34,8 @@ #include "FWCore/Framework/interface/PrincipalCache.h" #include "FWCore/Framework/interface/SignallingProductRegistry.h" #include "FWCore/Framework/interface/PreallocationConfiguration.h" +#include "FWCore/Framework/interface/MergeableRunProductProcesses.h" + #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" #include "FWCore/ServiceRegistry/interface/ProcessContext.h" #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h" @@ -49,6 +51,7 @@ namespace edm::test { class TestSourceProcessor { public: TestSourceProcessor(std::string const& iConfig, ServiceToken iToken = ServiceToken()); + ~TestSourceProcessor(); InputSource::ItemTypeInfo findNextTransition(); @@ -62,7 +65,6 @@ namespace edm::test { edm::test::EventFromSource readEvent(); private: - std::unique_ptr source_; edm::InputSource::ItemTypeInfo lastTransition_; oneapi::tbb::global_control globalControl_; @@ -77,6 +79,7 @@ namespace edm::test { std::shared_ptr processConfiguration_; ProcessContext processContext_; + MergeableRunProductProcesses mergeableRunProductProcesses_; ProcessHistoryRegistry processHistoryRegistry_; std::unique_ptr historyAppender_; @@ -84,6 +87,8 @@ namespace edm::test { PrincipalCache principalCache_; PreallocationConfiguration preallocations_; + std::unique_ptr source_; + std::shared_ptr runPrincipal_; std::shared_ptr lumiPrincipal_; diff --git a/FWCore/TestProcessor/src/TestSourceProcessor.cc b/FWCore/TestProcessor/src/TestSourceProcessor.cc index cee0d30f4fa80..2f26cb3a193b4 100644 --- a/FWCore/TestProcessor/src/TestSourceProcessor.cc +++ b/FWCore/TestProcessor/src/TestSourceProcessor.cc @@ -161,11 +161,14 @@ namespace edm::test { actReg_ = items.actReg_; branchIDListHelper_ = items.branchIDListHelper(); thinnedAssociationsHelper_ = items.thinnedAssociationsHelper(); + processConfiguration_ = items.processConfiguration(); + processContext_.setProcessConfiguration(processConfiguration_.get()); preg_ = items.preg(); principalCache_.setNumberOfConcurrentPrincipals(preallocations_); preg_->setFrozen(); + mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_); for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) { // Reusable event principal @@ -174,7 +177,8 @@ namespace edm::test { principalCache_.insert(std::move(ep)); } for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) { - auto rp = std::make_unique(preg_, *processConfiguration_, historyAppender_.get(), index); + auto rp = std::make_unique( + preg_, *processConfiguration_, historyAppender_.get(), index, true, &mergeableRunProductProcesses_); principalCache_.insert(std::move(rp)); } for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) { @@ -190,12 +194,28 @@ namespace edm::test { source_->doBeginJob(); } + TestSourceProcessor::~TestSourceProcessor() { + //make the services available + ServiceRegistry::Operate operate(serviceToken_); + try { + source_.reset(); + } catch (std::exception const& iExcept) { + std::cerr << " caught exception while destroying TestSourceProcessor\n" << iExcept.what(); + } + } + edm::InputSource::ItemTypeInfo TestSourceProcessor::findNextTransition() { + //make the services available + ServiceRegistry::Operate operate(serviceToken_); + lastTransition_ = source_->nextItemType(); return lastTransition_; } std::shared_ptr TestSourceProcessor::openFile() { + //make the services available + ServiceRegistry::Operate operate(serviceToken_); + size_t size = preg_->size(); fb_ = source_->readFile(); if (size < preg_->size()) { @@ -217,6 +237,9 @@ namespace edm::test { << "closeFile given a FileBlock that does not correspond to the one returned by openFile"; } if (fb_) { + //make the services available + ServiceRegistry::Operate operate(serviceToken_); + source_->closeFile(fb_.get(), false); } } @@ -225,12 +248,15 @@ namespace edm::test { if (lastTransition_.itemType() != edm::InputSource::ItemType::IsRun) { throw cms::Exception("NotARun") << "The last transition is " << name(lastTransition_.itemType()) << " not a Run"; } + //make the services available + ServiceRegistry::Operate operate(serviceToken_); + //NOTE: should probably handle merging as well runPrincipal_ = principalCache_.getAvailableRunPrincipalPtr(); runPrincipal_->setAux(*source_->runAuxiliary()); source_->readRun(*runPrincipal_, *historyAppender_); - return edm::test::RunFromSource(runPrincipal_); + return edm::test::RunFromSource(runPrincipal_, serviceToken_); } edm::test::LuminosityBlockFromSource TestSourceProcessor::readLuminosityBlock() { @@ -239,12 +265,15 @@ namespace edm::test { << "The last transition is " << name(lastTransition_.itemType()) << " not a LuminosityBlock"; } + //make the services available + ServiceRegistry::Operate operate(serviceToken_); + lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr(); assert(lumiPrincipal_); lumiPrincipal_->setAux(*source_->luminosityBlockAuxiliary()); source_->readLuminosityBlock(*lumiPrincipal_, *historyAppender_); - return edm::test::LuminosityBlockFromSource(lumiPrincipal_); + return edm::test::LuminosityBlockFromSource(lumiPrincipal_, serviceToken_); } edm::test::EventFromSource TestSourceProcessor::readEvent() { @@ -252,12 +281,14 @@ namespace edm::test { throw cms::Exception("NotAnEvent") << "The last transition is " << name(lastTransition_.itemType()) << " not a Event"; } + //make the services available + ServiceRegistry::Operate operate(serviceToken_); auto& event = principalCache_.eventPrincipal(0); StreamContext streamContext(event.streamID(), &processContext_); source_->readEvent(event, streamContext); - return edm::test::EventFromSource(event); + return edm::test::EventFromSource(event, serviceToken_); } } // namespace edm::test \ No newline at end of file From 8b9b9fa8761db96fa031156e7c15dc879789f148 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Wed, 17 Jul 2024 08:35:45 -0500 Subject: [PATCH 5/7] Fixed OutputModule issues in TestProcessor Testing with PoolOutputModule showed issues with meta-data and missing framework calls. --- .../TestProcessor/interface/TestProcessor.h | 11 ++ FWCore/TestProcessor/src/TestProcessor.cc | 126 ++++++++++++++++-- 2 files changed, 126 insertions(+), 11 deletions(-) diff --git a/FWCore/TestProcessor/interface/TestProcessor.h b/FWCore/TestProcessor/interface/TestProcessor.h index f9b387b306e5d..68391d79b2661 100644 --- a/FWCore/TestProcessor/interface/TestProcessor.h +++ b/FWCore/TestProcessor/interface/TestProcessor.h @@ -36,6 +36,8 @@ #include "FWCore/Framework/interface/Schedule.h" #include "FWCore/Framework/interface/EventSetupRecordKey.h" #include "FWCore/Framework/interface/DataKey.h" +#include "FWCore/Framework/interface/MergeableRunProductProcesses.h" + #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h" #include "FWCore/ServiceRegistry/interface/ProcessContext.h" #include "FWCore/ServiceRegistry/interface/ServiceLegacy.h" @@ -317,13 +319,17 @@ This simulates a problem happening early in the job which causes processing not void teardownProcessing(); void beginJob(); + void respondToOpenInputFile(); + void openOutputFiles(); void beginProcessBlock(); void beginRun(); void beginLuminosityBlock(); void event(); std::shared_ptr endLuminosityBlock(); std::shared_ptr endRun(); + void respondToCloseInputFile(); ProcessBlockPrincipal const* endProcessBlock(); + void closeOutputFiles(); void endJob(); // ---------- member data -------------------------------- @@ -346,7 +352,10 @@ This simulates a problem happening early in the job which causes processing not std::shared_ptr processConfiguration_; ProcessContext processContext_; + MergeableRunProductProcesses mergeableRunProductProcesses_; + ProcessHistoryRegistry processHistoryRegistry_; + ProcessHistory processHistory_; std::unique_ptr historyAppender_; PrincipalCache principalCache_; @@ -363,9 +372,11 @@ This simulates a problem happening early in the job which causes processing not LuminosityBlockNumber_t lumiNumber_ = 1; EventNumber_t eventNumber_ = 1; bool beginJobCalled_ = false; + bool respondToOpenInputFileCalled_ = false; bool beginProcessBlockCalled_ = false; bool beginRunCalled_ = false; bool beginLumiCalled_ = false; + bool openOutputFilesCalled_ = false; }; } // namespace test } // namespace edm diff --git a/FWCore/TestProcessor/src/TestProcessor.cc b/FWCore/TestProcessor/src/TestProcessor.cc index 001f35bb3d289..80524005434c6 100644 --- a/FWCore/TestProcessor/src/TestProcessor.cc +++ b/FWCore/TestProcessor/src/TestProcessor.cc @@ -34,6 +34,8 @@ #include "FWCore/Framework/interface/DelayedReader.h" #include "FWCore/Framework/interface/ensureAvailableAccelerators.h" #include "FWCore/Framework/interface/makeModuleTypeResolverMaker.h" +#include "FWCore/Framework/interface/FileBlock.h" +#include "FWCore/Framework/interface/MergeableRunProductMetadata.h" #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h" #include "FWCore/ServiceRegistry/interface/SystemBounds.h" @@ -114,10 +116,9 @@ namespace edm { emptyPSet.registerIt(); auto psetid = emptyPSet.id(); - ProcessHistory oldHistory; for (auto const& p : iConfig.extraProcesses()) { - oldHistory.emplace_back(p, psetid, xstr(PROJECT_VERSION), "0"); - processHistoryRegistry_.registerProcessHistory(oldHistory); + processHistory_.emplace_back(p, psetid, xstr(PROJECT_VERSION), "0"); + processHistoryRegistry_.registerProcessHistory(processHistory_); } //setup the products we will be adding to the event @@ -157,6 +158,8 @@ namespace edm { principalCache_.setNumberOfConcurrentPrincipals(preallocations_); preg_->setFrozen(); + mergeableRunProductProcesses_.setProcessesWithMergeableRunProducts(*preg_); + for (unsigned int index = 0; index < preallocations_.numberOfStreams(); ++index) { // Reusable event principal auto ep = std::make_shared(preg_, @@ -168,7 +171,8 @@ namespace edm { principalCache_.insert(std::move(ep)); } for (unsigned int index = 0; index < preallocations_.numberOfRuns(); ++index) { - auto rp = std::make_unique(preg_, *processConfiguration_, historyAppender_.get(), index); + auto rp = std::make_unique( + preg_, *processConfiguration_, historyAppender_.get(), index, true, &mergeableRunProductProcesses_); principalCache_.insert(std::move(rp)); } for (unsigned int index = 0; index < preallocations_.numberOfLuminosityBlocks(); ++index) { @@ -217,9 +221,16 @@ namespace edm { if (not beginJobCalled_) { beginJob(); } + if (not respondToOpenInputFileCalled_) { + respondToOpenInputFile(); + } if (not beginProcessBlockCalled_) { beginProcessBlock(); } + if (not openOutputFilesCalled_) { + openOutputFiles(); + } + if (not beginRunCalled_) { beginRun(); } @@ -235,7 +246,6 @@ namespace edm { //We want each test to have its own ES data products esHelper_->resetAllProxies(); } - return edm::test::LuminosityBlock(lumiPrincipal_, labelOfTestModule_, processConfiguration_->processName()); } @@ -247,9 +257,15 @@ namespace edm { if (not beginJobCalled_) { beginJob(); } + if (not respondToOpenInputFileCalled_) { + respondToOpenInputFile(); + } if (not beginProcessBlockCalled_) { beginProcessBlock(); } + if (not openOutputFilesCalled_) { + openOutputFiles(); + } if (not beginRunCalled_) { beginRun(); } @@ -271,9 +287,15 @@ namespace edm { if (not beginJobCalled_) { beginJob(); } + if (not respondToOpenInputFileCalled_) { + respondToOpenInputFile(); + } if (not beginProcessBlockCalled_) { beginProcessBlock(); } + if (not openOutputFilesCalled_) { + openOutputFiles(); + } if (beginRunCalled_) { assert(runNumber_ != iNum); endRun(); @@ -285,7 +307,6 @@ namespace edm { //We want each test to have its own ES data products esHelper_->resetAllProxies(); } - return edm::test::Run(runPrincipal_, labelOfTestModule_, processConfiguration_->processName()); } edm::test::Run TestProcessor::testEndRunImpl() { @@ -296,9 +317,15 @@ namespace edm { if (not beginJobCalled_) { beginJob(); } + if (not respondToOpenInputFileCalled_) { + respondToOpenInputFile(); + } if (not beginProcessBlockCalled_) { beginProcessBlock(); } + if (not openOutputFilesCalled_) { + openOutputFiles(); + } if (not beginRunCalled_) { beginRun(); } @@ -339,9 +366,15 @@ namespace edm { if (not beginJobCalled_) { beginJob(); } + if (not respondToOpenInputFileCalled_) { + respondToOpenInputFile(); + } if (not beginProcessBlockCalled_) { beginProcessBlock(); } + if (not openOutputFilesCalled_) { + openOutputFiles(); + } if (not beginRunCalled_) { beginRun(); } @@ -360,10 +393,17 @@ namespace edm { endRun(); beginRunCalled_ = false; } + if (respondToOpenInputFileCalled_) { + respondToCloseInputFile(); + } if (beginProcessBlockCalled_) { endProcessBlock(); beginProcessBlockCalled_ = false; } + if (openOutputFilesCalled_) { + closeOutputFiles(); + openOutputFilesCalled_ = false; + } if (beginJobCalled_) { endJob(); } @@ -419,11 +459,53 @@ namespace edm { beginProcessBlockCalled_ = true; } + void TestProcessor::openOutputFiles() { + //make the services available + ServiceRegistry::Operate operate(serviceToken_); + + edm::FileBlock fb; + schedule_->openOutputFiles(fb); + openOutputFilesCalled_ = true; + } + + void TestProcessor::closeOutputFiles() { + if (openOutputFilesCalled_) { + //make the services available + ServiceRegistry::Operate operate(serviceToken_); + schedule_->closeOutputFiles(); + + openOutputFilesCalled_ = false; + } + } + + void TestProcessor::respondToOpenInputFile() { + respondToOpenInputFileCalled_ = true; + edm::FileBlock fb; + //make the services available + ServiceRegistry::Operate operate(serviceToken_); + schedule_->respondToOpenInputFile(fb); + } + + void TestProcessor::respondToCloseInputFile() { + if (respondToOpenInputFileCalled_) { + edm::FileBlock fb; + //make the services available + ServiceRegistry::Operate operate(serviceToken_); + + schedule_->respondToCloseInputFile(fb); + respondToOpenInputFileCalled_ = false; + } + } + void TestProcessor::beginRun() { runPrincipal_ = principalCache_.getAvailableRunPrincipalPtr(); runPrincipal_->clearPrincipal(); assert(runPrincipal_); - runPrincipal_->setAux(edm::RunAuxiliary(runNumber_, Timestamp(), Timestamp())); + edm::RunAuxiliary aux(runNumber_, Timestamp(), Timestamp()); + aux.setProcessHistoryID(processHistory_.id()); + runPrincipal_->setAux(aux); + + runPrincipal_->fillRunPrincipal(processHistoryRegistry_); IOVSyncValue ts(EventID(runPrincipal_->run(), 0, 0), runPrincipal_->beginTime()); eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_); @@ -458,12 +540,14 @@ namespace edm { void TestProcessor::beginLuminosityBlock() { LuminosityBlockAuxiliary aux(runNumber_, lumiNumber_, Timestamp(), Timestamp()); + aux.setProcessHistoryID(processHistory_.id()); lumiPrincipal_ = principalCache_.getAvailableLumiPrincipalPtr(); lumiPrincipal_->clearPrincipal(); assert(lumiPrincipal_); lumiPrincipal_->setAux(aux); lumiPrincipal_->setRunPrincipal(runPrincipal_); + lumiPrincipal_->fillLuminosityBlockPrincipal(&processHistory_); IOVSyncValue ts(EventID(runNumber_, lumiNumber_, eventNumber_), lumiPrincipal_->beginTime()); eventsetup::synchronousEventSetupForInstance(ts, taskGroup_, *espController_); @@ -503,10 +587,9 @@ namespace edm { //this resets the EventPrincipal (if it had been used before) pep->clearEventPrincipal(); - pep->fillEventPrincipal( - edm::EventAuxiliary(EventID(runNumber_, lumiNumber_, eventNumber_), "", Timestamp(), false), - nullptr, - nullptr); + edm::EventAuxiliary aux(EventID(runNumber_, lumiNumber_, eventNumber_), "", Timestamp(), false); + aux.setProcessHistoryID(processHistory_.id()); + pep->fillEventPrincipal(aux, nullptr, nullptr); assert(lumiPrincipal_.get() != nullptr); pep->setLuminosityBlockPrincipal(lumiPrincipal_.get()); @@ -535,6 +618,9 @@ namespace edm { std::shared_ptr TestProcessor::endLuminosityBlock() { auto lumiPrincipal = lumiPrincipal_; if (beginLumiCalled_) { + //make the services available + ServiceRegistry::Operate operate(serviceToken_); + beginLumiCalled_ = false; lumiPrincipal_.reset(); @@ -575,6 +661,12 @@ namespace edm { false); globalWaitTask.wait(); } + { + FinalWaitingTask globalWaitTask{taskGroup_}; + schedule_->writeLumiAsync( + WaitingTaskHolder(taskGroup_, &globalWaitTask), *lumiPrincipal, &processContext_, actReg_.get()); + globalWaitTask.wait(); + } } lumiPrincipal->setRunPrincipal(std::shared_ptr()); return lumiPrincipal; @@ -586,6 +678,9 @@ namespace edm { if (beginRunCalled_) { beginRunCalled_ = false; + //make the services available + ServiceRegistry::Operate operate(serviceToken_); + IOVSyncValue ts( EventID(runPrincipal->run(), LuminosityBlockID::maxLuminosityBlockNumber(), EventID::maxEventNumber()), runPrincipal->endTime()); @@ -625,6 +720,15 @@ namespace edm { false); globalWaitTask.wait(); } + { + FinalWaitingTask globalWaitTask{taskGroup_}; + schedule_->writeRunAsync(WaitingTaskHolder(taskGroup_, &globalWaitTask), + *runPrincipal, + &processContext_, + actReg_.get(), + runPrincipal->mergeableRunProductMetadata()); + globalWaitTask.wait(); + } } return runPrincipal; } From d08336b2957dd876306c7634dffd4bf4016147ce Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Wed, 17 Jul 2024 08:36:44 -0500 Subject: [PATCH 6/7] Add unit test for Pool Output/Source This uses the new TestSourceProcessor and changes in TestProcessor to support OutputModules. --- IOPool/Common/test/BuildFile.xml | 4 + IOPool/Common/test/test_catch2_main.cc | 2 + .../Common/test/test_catch2_output2input.cc | 146 ++++++++++++++++++ 3 files changed, 152 insertions(+) create mode 100644 IOPool/Common/test/test_catch2_main.cc create mode 100644 IOPool/Common/test/test_catch2_output2input.cc diff --git a/IOPool/Common/test/BuildFile.xml b/IOPool/Common/test/BuildFile.xml index ee99d10de3405..aa85f33bc4536 100644 --- a/IOPool/Common/test/BuildFile.xml +++ b/IOPool/Common/test/BuildFile.xml @@ -1,3 +1,7 @@ + + + + diff --git a/IOPool/Common/test/test_catch2_main.cc b/IOPool/Common/test/test_catch2_main.cc new file mode 100644 index 0000000000000..0c7c351f437f5 --- /dev/null +++ b/IOPool/Common/test/test_catch2_main.cc @@ -0,0 +1,2 @@ +#define CATCH_CONFIG_MAIN +#include "catch.hpp" diff --git a/IOPool/Common/test/test_catch2_output2input.cc b/IOPool/Common/test/test_catch2_output2input.cc new file mode 100644 index 0000000000000..8dd8679d9b4ca --- /dev/null +++ b/IOPool/Common/test/test_catch2_output2input.cc @@ -0,0 +1,146 @@ +#include "FWCore/TestProcessor/interface/TestProcessor.h" +#include "FWCore/TestProcessor/interface/TestSourceProcessor.h" +#include "FWCore/Utilities/interface/Exception.h" + +#include "DataFormats/TestObjects/interface/Thing.h" +#include +#include +#include "catch.hpp" + +static constexpr auto s_tag = "[PoolOutputSource]"; + +namespace { + std::string setOutputFile(std::string const& iConfig, std::string const& iFileName) { + using namespace std::string_literals; + return iConfig + "\nprocess.out.fileName = '"s + iFileName + "'\n"; + } + + std::string setInputFile(std::string const& iConfig, std::string const& iFileName) { + using namespace std::string_literals; + return iConfig + "\nprocess.source.fileNames = ['file:"s + iFileName + "']\n"; + } +} // namespace +TEST_CASE("Tests of PoolOuput -> PoolSource", s_tag) { + const std::string baseOutConfig{ + R"_(from FWCore.TestProcessor.TestProcess import * +process = TestProcess() +process.out = cms.OutputModule('PoolOutputModule', + fileName = cms.untracked.string('') +) +process.add_(cms.Service("InitRootHandlers")) +process.add_(cms.Service("JobReportService")) + +process.moduleToTest(process.out) +)_"}; + + const std::string baseSourceConfig{ + R"_(from FWCore.TestProcessor.TestSourceProcess import * +process = TestSourceProcess() +process.source = cms.Source("PoolSource", fileNames = cms.untracked.vstring('')) +process.add_(cms.Service("InitRootHandlers")) +process.add_(cms.Service("SiteLocalConfigService")) +process.add_(cms.Service("JobReportService")) + )_"}; + + SECTION("OneEmptyEvent") { + const std::string fileName = "one_event.root"; + { + auto configString = setOutputFile(baseOutConfig, fileName); + + edm::test::TestProcessor::Config config{configString}; + + edm::test::TestProcessor tester(config); + tester.test(); + } + { + auto config = setInputFile(baseSourceConfig, fileName); + edm::test::TestSourceProcessor tester(config); + + { + auto n = tester.findNextTransition(); + REQUIRE(n == edm::InputSource::ItemType::IsFile); + auto f = tester.openFile(); + REQUIRE(bool(f)); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsRun); + auto r = tester.readRun(); + REQUIRE(r.run() == 1); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsLumi); + auto r = tester.readLuminosityBlock(); + REQUIRE(r.run() == 1); + REQUIRE(r.luminosityBlock() == 1); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsEvent); + auto r = tester.readEvent(); + REQUIRE(r.run() == 1); + REQUIRE(r.luminosityBlock() == 1); + REQUIRE(r.event() == 1); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsStop); + } + } + std::filesystem::remove(fileName); + } + + SECTION("EventWithThing") { + const std::string fileName = "thing.root"; + { + auto configString = setOutputFile(baseOutConfig, fileName); + + edm::test::TestProcessor::Config config{configString}; + auto thingToken = config.produces>("thing"); + + edm::test::TestProcessor tester(config); + tester.test(std::make_pair(thingToken, std::make_unique>(1, edmtest::Thing{1}))); + } + { + auto config = setInputFile(baseSourceConfig, fileName); + edm::test::TestSourceProcessor tester(config); + + { + auto n = tester.findNextTransition(); + REQUIRE(n == edm::InputSource::ItemType::IsFile); + auto f = tester.openFile(); + REQUIRE(bool(f)); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsRun); + auto r = tester.readRun(); + REQUIRE(r.run() == 1); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsLumi); + auto r = tester.readLuminosityBlock(); + REQUIRE(r.run() == 1); + REQUIRE(r.luminosityBlock() == 1); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsEvent); + auto r = tester.readEvent(); + REQUIRE(r.run() == 1); + REQUIRE(r.luminosityBlock() == 1); + REQUIRE(r.event() == 1); + auto v = r.get>("thing", "", "TEST"); + REQUIRE(v->size() == 1); + REQUIRE((*v)[0].a == 1); + } + { + auto n = tester.findNextTransition(); + REQUIRE(n.itemType() == edm::InputSource::ItemType::IsStop); + } + } + std::filesystem::remove(fileName); + } +} \ No newline at end of file From 9bbc86ac83b47ab3937dc6af4eb80fe682ecafa1 Mon Sep 17 00:00:00 2001 From: Chris Jones Date: Thu, 18 Jul 2024 09:51:01 -0500 Subject: [PATCH 7/7] Use ifndef to be consistent with other headers --- FWCore/TestProcessor/src/oneTimeInitialization.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/FWCore/TestProcessor/src/oneTimeInitialization.h b/FWCore/TestProcessor/src/oneTimeInitialization.h index 9dfd856c1a123..5e627118df122 100644 --- a/FWCore/TestProcessor/src/oneTimeInitialization.h +++ b/FWCore/TestProcessor/src/oneTimeInitialization.h @@ -1,4 +1,4 @@ -#if !defined(FWCore_TestProcessor_oneTimeInitialization_h) +#ifndef FWCore_TestProcessor_oneTimeInitialization_h #define FWCore_TestProcessor_oneTimeInitialization_h //Used to initialize the plugin manager