Skip to content

Commit

Permalink
Allow an InputSource to declare a run/lumi entry is the last to be me…
Browse files Browse the repository at this point in the history
…rged
  • Loading branch information
wddgit committed Dec 11, 2023
1 parent bb13ad1 commit 1b3d395
Show file tree
Hide file tree
Showing 32 changed files with 519 additions and 242 deletions.
20 changes: 10 additions & 10 deletions DQMServices/FwkIO/plugins/DQMRootSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ class DQMRootSource : public edm::PuttableSourceBase, DQMTTreeIO {
static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);

private:
edm::InputSource::ItemType getNextItemType() override;
edm::InputSource::ItemTypeInfo getNextItemType() override;

std::shared_ptr<edm::FileBlock> readFile_() override;
std::shared_ptr<edm::RunAuxiliary> readRunAuxiliary_() override;
Expand Down Expand Up @@ -440,15 +440,15 @@ DQMRootSource::DQMRootSource(edm::ParameterSet const& iPSet, const edm::InputSou
{"LUMI", MonitorElementData::Scope::LUMI},
{"RUN", MonitorElementData::Scope::RUN},
{"JOB", MonitorElementData::Scope::JOB}}[iPSet.getUntrackedParameter<std::string>("reScope", "JOB")]),
m_nextItemType(edm::InputSource::IsFile),
m_nextItemType(edm::InputSource::ItemType::IsFile),
m_treeReaders(kNIndicies, std::shared_ptr<TreeReaderBase>()),
m_currentIndex(0),
m_openFiles(std::vector<OpenFileInfo>()),
m_fileMetadatas(std::vector<FileMetadata>()) {
edm::sortAndRemoveOverlaps(m_lumisToProcess);

if (m_catalog.fileNames(0).empty()) {
m_nextItemType = edm::InputSource::IsStop;
m_nextItemType = edm::InputSource::ItemType::IsStop;
} else {
m_treeReaders[kIntIndex].reset(new TreeSimpleReader<Long64_t>(MonitorElementData::Kind::INT, m_rescope));
m_treeReaders[kFloatIndex].reset(new TreeSimpleReader<double>(MonitorElementData::Kind::REAL, m_rescope));
Expand Down Expand Up @@ -483,7 +483,7 @@ DQMRootSource::~DQMRootSource() {
// member functions
//

edm::InputSource::ItemType DQMRootSource::getNextItemType() { return m_nextItemType; }
edm::InputSource::ItemTypeInfo DQMRootSource::getNextItemType() { return m_nextItemType; }

// We will read the metadata of all files and fill m_fileMetadatas vector
std::shared_ptr<edm::FileBlock> DQMRootSource::readFile_() {
Expand Down Expand Up @@ -630,9 +630,9 @@ std::shared_ptr<edm::FileBlock> DQMRootSource::readFile_() {

// Stop if there's nothing to process. Otherwise start the run.
if (m_fileMetadatas.empty())
m_nextItemType = edm::InputSource::IsStop;
m_nextItemType = edm::InputSource::ItemType::IsStop;
else
m_nextItemType = edm::InputSource::IsRun;
m_nextItemType = edm::InputSource::ItemType::IsRun;

// We have to return something but not sure why
return std::make_shared<edm::FileBlock>();
Expand Down Expand Up @@ -728,18 +728,18 @@ bool DQMRootSource::isRunOrLumiTransition() const {

void DQMRootSource::readNextItemType() {
if (m_currentIndex == 0) {
m_nextItemType = edm::InputSource::IsRun;
m_nextItemType = edm::InputSource::ItemType::IsRun;
} else if (m_currentIndex > m_fileMetadatas.size() - 1) {
// We reached the end
m_nextItemType = edm::InputSource::IsStop;
m_nextItemType = edm::InputSource::ItemType::IsStop;
} else {
FileMetadata previousMetadata = m_fileMetadatas[m_currentIndex - 1];
FileMetadata metadata = m_fileMetadatas[m_currentIndex];

if (previousMetadata.m_run != metadata.m_run) {
m_nextItemType = edm::InputSource::IsRun;
m_nextItemType = edm::InputSource::ItemType::IsRun;
} else if (previousMetadata.m_lumi != metadata.m_lumi) {
m_nextItemType = edm::InputSource::IsLumi;
m_nextItemType = edm::InputSource::ItemType::IsLumi;
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions DQMServices/StreamerIO/plugins/DQMProtobufReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ DQMProtobufReader::DQMProtobufReader(edm::ParameterSet const& pset, edm::InputSo
produces<DQMToken, edm::Transition::BeginLuminosityBlock>("DQMGenerationRecoLumi");
}

edm::InputSource::ItemType DQMProtobufReader::getNextItemType() {
edm::InputSource::ItemTypeInfo DQMProtobufReader::getNextItemType() {
typedef DQMFileIterator::State State;
typedef DQMFileIterator::LumiEntry LumiEntry;

Expand All @@ -49,31 +49,31 @@ edm::InputSource::ItemType DQMProtobufReader::getNextItemType() {

if (edm::shutdown_flag.load()) {
fiterator_.logFileAction("Shutdown flag was set, shutting down.");
return InputSource::IsStop;
return InputSource::ItemType::IsStop;
}

// check for end of run file and force quit
if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
return InputSource::IsStop;
return InputSource::ItemType::IsStop;
}

// check for end of run and quit if everything has been processed.
// this is the clean exit
if ((!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
return InputSource::IsStop;
return InputSource::ItemType::IsStop;
}

// skip to the next file if we have no files openned yet
if (fiterator_.lumiReady()) {
return InputSource::IsLumi;
return InputSource::ItemType::IsLumi;
}

fiterator_.delay();
// BUG: for an unknown reason it fails after a certain time if we use
// IsSynchronize state
//
// comment out in order to block at this level
// return InputSource::IsSynchronize;
// return InputSource::ItemType::IsSynchronize;
}

// this is unreachable
Expand Down
2 changes: 1 addition & 1 deletion DQMServices/StreamerIO/plugins/DQMProtobufReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace dqmservices {

private:
void load(DQMStore* store, std::string filename);
edm::InputSource::ItemType getNextItemType() override;
edm::InputSource::ItemTypeInfo getNextItemType() override;
std::shared_ptr<edm::RunAuxiliary> readRunAuxiliary_() override;
std::shared_ptr<edm::LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() override;
void readRun_(edm::RunPrincipal& rpCache) override;
Expand Down
13 changes: 9 additions & 4 deletions FWCore/Framework/interface/EventProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,9 @@ namespace edm {
// The following functions are used by the code implementing
// transition handling.

InputSource::ItemType nextTransitionType();
InputSource::ItemType lastTransitionType() const { return lastSourceTransition_; }
InputSource::ItemTypeInfo nextTransitionType();
InputSource::ItemTypeInfo lastTransitionType() const { return lastSourceTransition_; }
void nextTransitionTypeAsync(std::shared_ptr<RunProcessingStatus> iRunStatus, WaitingTaskHolder nextTask);

void readFile();
bool fileBlockValid() { return fb_.get() != nullptr; }
Expand Down Expand Up @@ -294,6 +295,10 @@ namespace edm {
void throwAboutModulesRequiringLuminosityBlockSynchronization() const;
void warnAboutModulesRequiringRunSynchronization() const;
void warnAboutLegacyModules() const;

bool needToCallNext() const { return needToCallNext_; }
void setNeedToCallNext(bool val) { needToCallNext_ = val; }

//------------------------------------------------------------------
//
// Data members below.
Expand All @@ -311,7 +316,7 @@ namespace edm {
edm::propagate_const<std::shared_ptr<ThinnedAssociationsHelper>> thinnedAssociationsHelper_;
ServiceToken serviceToken_;
edm::propagate_const<std::unique_ptr<InputSource>> input_;
InputSource::ItemType lastSourceTransition_ = InputSource::IsInvalid;
InputSource::ItemTypeInfo lastSourceTransition_;
edm::propagate_const<std::unique_ptr<ModuleTypeResolverMaker const>> moduleTypeResolverMaker_;
edm::propagate_const<std::unique_ptr<eventsetup::EventSetupsController>> espController_;
edm::propagate_const<std::shared_ptr<eventsetup::EventSetupProvider>> esp_;
Expand Down Expand Up @@ -369,7 +374,7 @@ namespace edm {

bool printDependencies_ = false;
bool deleteNonConsumedUnscheduledModules_ = true;
bool firstItemAfterLumiMerge_ = true;
bool needToCallNext_ = true;
}; // class EventProcessor

//--------------------------------------------------------------------
Expand Down
43 changes: 36 additions & 7 deletions FWCore/Framework/interface/InputSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,36 @@ namespace edm {

class InputSource {
public:
enum ItemType { IsInvalid, IsStop, IsFile, IsRun, IsLumi, IsEvent, IsRepeat, IsSynchronize };
enum class ItemType : char { IsInvalid, IsStop, IsFile, IsRun, IsLumi, IsEvent, IsRepeat, IsSynchronize };
enum class ItemPosition : char { Invalid, LastItemToBeMerged, NotLastItemToBeMerged };

class ItemTypeInfo {
public:
constexpr ItemTypeInfo(ItemType type = ItemType::IsInvalid, ItemPosition position = ItemPosition::Invalid)
: type_(type), position_(position) {}
ItemType itemType() const { return type_; }
ItemPosition itemPosition() const { return position_; }

// Note that conversion to ItemType is defined and often used to
// compare an ItemTypeInfo with an ItemType.
// operator== of two ItemTypeInfo's is intentionally NOT defined.
// The constructor also allows implicit conversion from ItemType and
// often assignment from ItemType to ItemTypeInfo occurs.
operator ItemType() const { return type_; }

private:
ItemType type_;

// position_ should always be Invalid if the itemType_ is not IsRun or IsLumi.
// Even for runs and lumis, it is OK to leave it Invalid because the
// Framework can figure this out based on the next item. Offline it is
// simplest to always leave it Invalid. For online sources, there are
// optimizations that the Framework can use when it knows that a run or
// lumi is the last to be merged before the following item is known. This
// is useful in cases where the function named getNextItemType
// might take a long time to return.
ItemPosition position_;
};

enum ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents };

Expand All @@ -70,7 +99,7 @@ namespace edm {
static void prevalidate(ConfigurationDescriptions&);

/// Advances the source to the next item
ItemType nextItemType();
ItemTypeInfo nextItemType();

/// Read next event
void readEvent(EventPrincipal& ep, StreamContext&);
Expand Down Expand Up @@ -329,7 +358,7 @@ namespace edm {

ProductRegistry& productRegistryUpdate() { return *productRegistry_; }
ProcessHistoryRegistry& processHistoryRegistryForUpdate() { return *processHistoryRegistry_; }
ItemType state() const { return state_; }
ItemTypeInfo state() const { return state_; }
void setRunAuxiliary(RunAuxiliary* rp) {
runAuxiliary_.reset(rp);
newRun_ = newLumi_ = true;
Expand All @@ -349,7 +378,7 @@ namespace edm {
void reset() const {
resetLuminosityBlockAuxiliary();
resetRunAuxiliary();
state_ = IsInvalid;
state_ = ItemTypeInfo();
}
bool newRun() const { return newRun_; }
void setNewRun() { newRun_ = true; }
Expand Down Expand Up @@ -386,8 +415,8 @@ namespace edm {
return false;
}
bool limitReached() const { return eventLimitReached() || lumiLimitReached(); }
virtual ItemType getNextItemType() = 0;
ItemType nextItemType_();
virtual ItemTypeInfo getNextItemType() = 0;
ItemTypeInfo nextItemType_();
virtual std::shared_ptr<RunAuxiliary> readRunAuxiliary_() = 0;
virtual std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() = 0;
virtual void fillProcessBlockHelper_();
Expand Down Expand Up @@ -431,7 +460,7 @@ namespace edm {
mutable bool newRun_;
mutable bool newLumi_;
bool eventCached_;
mutable ItemType state_;
mutable ItemTypeInfo state_;
mutable std::shared_ptr<RunAuxiliary> runAuxiliary_;
mutable std::shared_ptr<LuminosityBlockAuxiliary> lumiAuxiliary_;
std::string statusFileName_;
Expand Down
Loading

0 comments on commit 1b3d395

Please sign in to comment.