Skip to content

Commit

Permalink
- remove EvFOutputModule which was replaced by GlobalEvFOutputModule
Browse files Browse the repository at this point in the history
- clean up unused functionality of EvFDaqDirector (reading PSets from the menu which was never used)
  • Loading branch information
smorovic committed Feb 2, 2024
1 parent 8cd1c8e commit 0a7e228
Show file tree
Hide file tree
Showing 10 changed files with 13 additions and 574 deletions.
16 changes: 4 additions & 12 deletions EventFilter/Utilities/interface/EvFDaqDirector.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include <cstring>
#include <cstdio>

#include <oneapi/tbb/concurrent_hash_map.h>
#include <boost/asio.hpp>

class SystemBounds;
Expand Down Expand Up @@ -68,7 +67,6 @@ namespace evf {
void initRun();
static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
void preallocate(edm::service::SystemBounds const& bounds);
void preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const&);
void preBeginRun(edm::GlobalContext const& globalContext);
void postEndRun(edm::GlobalContext const& globalContext);
void preGlobalEndLumi(edm::GlobalContext const& globalContext);
Expand Down Expand Up @@ -186,10 +184,10 @@ namespace evf {
filesToDeletePtr_ = filesToDelete;
}

void checkTransferSystemPSet(edm::ProcessContext const& pc);
void checkMergeTypePSet(edm::ProcessContext const& pc);
std::string getStreamDestinations(std::string const& stream) const;
std::string getStreamMergeType(std::string const& stream, MergeType defaultType);
std::string getStreamDestinations(std::string const&) const { return std::string(""); }
std::string getStreamMergeType(std::string const&, MergeType defaultType) const {
return MergeTypeNames_[defaultType];
}
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid);
bool inputThrottled();
bool lumisectionDiscarded(unsigned int ls);
Expand Down Expand Up @@ -225,9 +223,6 @@ namespace evf {
bool fileBrokerUseLocalLock_;
unsigned int fuLockPollInterval_;
bool outputAdler32Recheck_;
bool requireTSPSet_;
std::string selectedTransferMode_;
std::string mergeTypePset_;
bool directorBU_;
std::string hltSourceDirectory_;

Expand Down Expand Up @@ -282,9 +277,6 @@ namespace evf {
std::string stopFilePathPid_;
unsigned int stop_ls_override_ = 0;

std::shared_ptr<Json::Value> transferSystemJson_;
tbb::concurrent_hash_map<std::string, std::string> mergeTypeMap_;

//values initialized in .cc file
static const std::vector<std::string> MergeTypeNames_;

Expand Down
108 changes: 0 additions & 108 deletions EventFilter/Utilities/interface/EvFOutputModule.h

This file was deleted.

2 changes: 0 additions & 2 deletions EventFilter/Utilities/plugins/modules.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include "EventFilter/Utilities/interface/EvFDaqDirector.h"
#include "EventFilter/Utilities/interface/EvFOutputModule.h"
#include "EventFilter/Utilities/interface/FastMonitoringService.h"
#include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
#include "EventFilter/Utilities/interface/DAQSource.h"
Expand Down Expand Up @@ -29,7 +28,6 @@ DEFINE_FWK_SERVICE(EvFDaqDirector);
DEFINE_FWK_MODULE(ExceptionGenerator);
DEFINE_FWK_MODULE(EvFFEDSelector);
DEFINE_FWK_MODULE(EvFFEDExcluder);
DEFINE_FWK_MODULE(EvFOutputModule);
DEFINE_FWK_MODULE(DaqFakeReader);
DEFINE_FWK_INPUT_SOURCE(FedRawDataInputSource);
DEFINE_FWK_INPUT_SOURCE(DAQSource);
155 changes: 1 addition & 154 deletions EventFilter/Utilities/src/EvFDaqDirector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,13 @@ namespace evf {
bu_base_dirs_all_(pset.getUntrackedParameter<std::vector<std::string>>("buBaseDirsAll")),
run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", false)),
fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
hostname_(""),
Expand All @@ -72,7 +69,6 @@ namespace evf {
fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
reg.watchPreallocate(this, &EvFDaqDirector::preallocate);
reg.watchPreBeginJob(this, &EvFDaqDirector::preBeginJob);
reg.watchPreGlobalBeginRun(this, &EvFDaqDirector::preBeginRun);
reg.watchPostGlobalEndRun(this, &EvFDaqDirector::postEndRun);
reg.watchPreGlobalEndLumi(this, &EvFDaqDirector::preGlobalEndLumi);
Expand Down Expand Up @@ -385,22 +381,13 @@ namespace evf {
->setComment("Lock polling interval in microseconds for the input directory file lock");
desc.addUntracked<bool>("outputAdler32Recheck", false)
->setComment("Check Adler32 of per-process output files while micro-merging");
desc.addUntracked<bool>("requireTransfersPSet", false)
->setComment("Require complete transferSystem PSet in the process configuration");
desc.addUntracked<std::string>("selectedTransferMode", "")
->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
desc.addUntracked<std::string>("mergingPset", "")
->setComment("Name of merging PSet to look for merging type definitions for streams");
descriptions.add("EvFDaqDirector", desc);
}

void EvFDaqDirector::preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const& pc) {
checkTransferSystemPSet(pc);
checkMergeTypePSet(pc);
}

void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
//assert(run_ == id.run());

Expand Down Expand Up @@ -1987,146 +1974,6 @@ namespace evf {
}

//if transferSystem PSet is present in the menu, we require it to be complete and consistent for all specified streams
void EvFDaqDirector::checkTransferSystemPSet(edm::ProcessContext const& pc) {
if (transferSystemJson_)
return;

transferSystemJson_.reset(new Json::Value);
edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));

Json::Value destinationsVal(Json::arrayValue);
std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
for (auto& dest : destinations)
destinationsVal.append(dest);
(*transferSystemJson_)["destinations"] = destinationsVal;

Json::Value modesVal(Json::arrayValue);
std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
for (auto& mode : modes)
modesVal.append(mode);
(*transferSystemJson_)["transferModes"] = modesVal;

for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
Json::Value streamVal;
for (auto& mode : modes) {
//validation
if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
throw cms::Exception("EvFDaqDirector")
<< " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
<< ")";
std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);

Json::Value sDestsValue(Json::arrayValue);

if (streamDestinations.empty())
throw cms::Exception("EvFDaqDirector")
<< " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;

for (auto& sdest : streamDestinations) {
bool sDestValid = false;
sDestsValue.append(sdest);
for (auto& dest : destinations) {
if (dest == sdest)
sDestValid = true;
}
if (!sDestValid)
throw cms::Exception("EvFDaqDirector")
<< " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
<< ", dest:" << sdest;
}
streamVal[mode] = sDestsValue;
}
(*transferSystemJson_)[psKeyItr->first] = streamVal;
}
}
} else {
if (requireTSPSet_)
throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
}
}

std::string EvFDaqDirector::getStreamDestinations(std::string const& stream) const {
std::string streamRequestName;
if (transferSystemJson_->isMember(stream.c_str()))
streamRequestName = stream;
else {
std::stringstream msg;
msg << "Transfer system mode definitions missing for -: " << stream;
if (requireTSPSet_)
throw cms::Exception("EvFDaqDirector") << msg.str();
else {
edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
return std::string("Failsafe");
}
}
//return empty if strict check parameter is not on
if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
edm::LogWarning("EvFDaqDirector")
<< "Selected mode string is not provided as DaqDirector parameter."
<< "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
return std::string("Failsafe");
}
if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
}
//check if stream has properly listed transfer stream
if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
std::stringstream msg;
msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
if (requireTSPSet_)
throw cms::Exception("EvFDaqDirector") << msg.str();
else
edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
return std::string("Failsafe");
}
Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");

//flatten string json::Array into CSV std::string
std::string ret;
for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
if (!ret.empty())
ret += ",";
ret += (*it).asString();
}
return ret;
}

void EvFDaqDirector::checkMergeTypePSet(edm::ProcessContext const& pc) {
if (mergeTypePset_.empty())
return;
if (!mergeTypeMap_.empty())
return;
edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
for (const std::string& pname : tsPset.getParameterNames()) {
std::string streamType = tsPset.getParameter<std::string>(pname);
tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
mergeTypeMap_.insert(ac, pname);
ac->second = streamType;
ac.release();
}
}
}

std::string EvFDaqDirector::getStreamMergeType(std::string const& stream, MergeType defaultType) {
tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
if (mergeTypeMap_.find(search_ac, stream))
return search_ac->second;

edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
std::string defaultName = MergeTypeNames_[defaultType];
tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
mergeTypeMap_.insert(ac, stream);
ac->second = defaultName;
ac.release();
return defaultName;
}

void EvFDaqDirector::createProcessingNotificationMaybe() const {
std::string proc_flag = run_dir_ + "/processing";
int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
Expand Down
Loading

0 comments on commit 0a7e228

Please sign in to comment.