Skip to content

Commit

Permalink
Merge pull request #40099 from smorovic/12_6_X-discardls
Browse files Browse the repository at this point in the history
(DAQ) File based protocol update
  • Loading branch information
cmsbuild authored Nov 25, 2022
2 parents 4791225 + fb80c07 commit 168c9f8
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 41 deletions.
18 changes: 11 additions & 7 deletions DQMServices/FileIO/plugins/DQMFileSaverPB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ DQMFileSaverPB::DQMFileSaverPB(const edm::ParameterSet& ps) : DQMFileSaverBase(p
if (tag_ != "UNKNOWN") {
streamLabel_ = "DQMLive";
}

if (!fakeFilterUnitMode_) {
if (!edm::Service<evf::EvFDaqDirector>().isAvailable())
throw cms::Exception("DQMFileSaverPB") << "EvFDaqDirector is not available";
std::string initFileName = edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_);
std::ofstream file(initFileName);
if (!file)
throw cms::Exception("DQMFileSaverPB")
<< "Cannot create INI file: " << initFileName << " error: " << strerror(errno);
file.close();
}
}

DQMFileSaverPB::~DQMFileSaverPB() = default;
Expand All @@ -52,13 +63,6 @@ void DQMFileSaverPB::initRun() const {
transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel_);
mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel_, evf::MergeTypePB);
}

if (!fakeFilterUnitMode_) {
evf::EvFDaqDirector* daqDirector = (evf::EvFDaqDirector*)(edm::Service<evf::EvFDaqDirector>().operator->());
const std::string initFileName = daqDirector->getInitFilePath(streamLabel_);
std::ofstream file(initFileName);
file.close();
}
}

void DQMFileSaverPB::saveLumi(const FileParameters& fp) const {
Expand Down
5 changes: 5 additions & 0 deletions EventFilter/Utilities/interface/EvFDaqDirector.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ namespace evf {
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const;
std::string getOpenInitFilePath(std::string const& stream) const;
std::string getInitFilePath(std::string const& stream) const;
std::string getInitTempFilePath(std::string const& stream) const;
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
Expand Down Expand Up @@ -120,6 +121,7 @@ namespace evf {
void unlockInitLock();
void setFMS(evf::FastMonitoringService* fms) { fms_ = fms; }
bool isSingleStreamThread() { return nStreams_ == 1 && nThreads_ == 1; }
unsigned int numConcurrentLumis() const { return nConcurrentLumis_; }
void lockFULocal();
void unlockFULocal();
void lockFULocal2();
Expand Down Expand Up @@ -185,6 +187,7 @@ namespace evf {
std::string getStreamMergeType(std::string const& stream, MergeType defaultType);
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid);
bool inputThrottled();
bool lumisectionDiscarded(unsigned int ls);

private:
bool bumpFile(unsigned int& ls,
Expand Down Expand Up @@ -263,6 +266,7 @@ namespace evf {

unsigned int nStreams_ = 0;
unsigned int nThreads_ = 0;
unsigned int nConcurrentLumis_ = 0;

bool readEolsDefinition_ = true;
unsigned int eolsNFilesIndex_ = 1;
Expand All @@ -286,6 +290,7 @@ namespace evf {
std::unique_ptr<boost::asio::ip::tcp::socket> socket_;

std::string input_throttled_file_;
std::string discard_ls_filestem_;
};
} // namespace evf

Expand Down
7 changes: 7 additions & 0 deletions EventFilter/Utilities/interface/FFFNamingSchema.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ namespace fffnaming {
return ss.str();
}

inline std::string initTempFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const& stream) {
std::stringstream ss;
runLumiPrefixFill(ss, run, ls);
ss << "_" << stream << "_pid" << std::setfill('0') << std::setw(5) << getpid() << ".initemp";
return ss.str();
}

inline std::string initFileNameWithInstance(const unsigned int run,
const unsigned int ls,
std::string const& stream,
Expand Down
116 changes: 86 additions & 30 deletions EventFilter/Utilities/plugins/GlobalEvFOutputModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,15 @@ namespace evf {

class GlobalEvFOutputEventWriter {
public:
explicit GlobalEvFOutputEventWriter(std::string const& filePath)
: filePath_(filePath), accepted_(0), stream_writer_events_(new StreamerOutputFile(filePath)) {}
explicit GlobalEvFOutputEventWriter(std::string const& filePath, unsigned int ls)
: filePath_(filePath), ls_(ls), accepted_(0), stream_writer_events_(new StreamerOutputFile(filePath)) {}

~GlobalEvFOutputEventWriter() {}

void close() { stream_writer_events_->close(); }
bool close() {
stream_writer_events_->close();
return (discarded_ || edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_));
}

void doOutputEvent(EventMsgBuilder const& msg) {
EventMsgView eview(msg.startAddress());
Expand All @@ -58,6 +61,12 @@ namespace evf {

void doOutputEventAsync(std::unique_ptr<EventMsgBuilder> msg, edm::WaitingTaskHolder iHolder) {
throttledCheck();
discardedCheck();
if (discarded_) {
incAccepted();
msg.reset();
return;
}
auto group = iHolder.group();
writeQueue_.push(*group, [holder = std::move(iHolder), msg = msg.release(), this]() {
try {
Expand All @@ -72,13 +81,24 @@ namespace evf {

inline void throttledCheck() {
unsigned int counter = 0;
while (edm::Service<evf::EvFDaqDirector>()->inputThrottled()) {
while (edm::Service<evf::EvFDaqDirector>()->inputThrottled() && !discarded_) {
if (edm::shutdown_flag.load(std::memory_order_relaxed))
break;
if (!(counter % 100))
edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, writing is paused...";
usleep(100000);
counter++;
if (edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_)) {
edm::LogWarning("FedRawDataInputSource") << "Detected that the lumisection is discarded -: " << ls_;
discarded_ = true;
}
}
}

inline void discardedCheck() {
if (!discarded_ && edm::Service<evf::EvFDaqDirector>()->lumisectionDiscarded(ls_)) {
edm::LogWarning("FedRawDataInputSource") << "Detected that the lumisection is discarded -: " << ls_;
discarded_ = true;
}
}

Expand All @@ -93,14 +113,17 @@ namespace evf {

private:
std::string filePath_;
const unsigned ls_;
std::atomic<unsigned long> accepted_;
edm::propagate_const<std::unique_ptr<StreamerOutputFile>> stream_writer_events_;
edm::SerialTaskQueue writeQueue_;
bool discarded_ = false;
};

class GlobalEvFOutputJSONDef {
public:
GlobalEvFOutputJSONDef(std::string const& streamLabel);
GlobalEvFOutputJSONDef(std::string const& streamLabel, bool writeJsd);
void updateDestination(std::string const& streamLabel);

jsoncollector::DataPointDefinition outJsonDef_;
std::string outJsonDefName_;
Expand Down Expand Up @@ -170,12 +193,10 @@ namespace evf {

}; //end-of-class-def

GlobalEvFOutputJSONDef::GlobalEvFOutputJSONDef(std::string const& streamLabel) {
GlobalEvFOutputJSONDef::GlobalEvFOutputJSONDef(std::string const& streamLabel, bool writeJsd) {
std::string baseRunDir = edm::Service<evf::EvFDaqDirector>()->baseRunDir();
LogDebug("GlobalEvFOutputModule") << "writing .dat files to -: " << baseRunDir;

edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();

outJsonDef_.setDefaultGroup("data");
outJsonDef_.addLegendItem("Processed", "integer", jsoncollector::DataPointDefinition::SUM);
outJsonDef_.addLegendItem("Accepted", "integer", jsoncollector::DataPointDefinition::SUM);
Expand All @@ -189,25 +210,31 @@ namespace evf {
outJsonDef_.addLegendItem("MergeType", "string", jsoncollector::DataPointDefinition::SAME);
outJsonDef_.addLegendItem("HLTErrorEvents", "integer", jsoncollector::DataPointDefinition::SUM);

std::stringstream tmpss, ss;
tmpss << baseRunDir << "/open/"
<< "output_" << getpid() << ".jsd";
std::stringstream ss;
ss << baseRunDir << "/"
<< "output_" << getpid() << ".jsd";
std::string outTmpJsonDefName = tmpss.str();
outJsonDefName_ = ss.str();

edm::Service<evf::EvFDaqDirector>()->lockInitLock();
struct stat fstat;
if (stat(outJsonDefName_.c_str(), &fstat) != 0) { //file does not exist
LogDebug("GlobalEvFOutputModule") << "writing output definition file -: " << outJsonDefName_;
std::string content;
jsoncollector::JSONSerializer::serialize(&outJsonDef_, content);
jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
std::filesystem::rename(outTmpJsonDefName, outJsonDefName_);
if (writeJsd) {
std::stringstream tmpss;
tmpss << baseRunDir << "/open/"
<< "output_" << getpid() << ".jsd";
std::string outTmpJsonDefName = tmpss.str();
edm::Service<evf::EvFDaqDirector>()->createRunOpendirMaybe();
edm::Service<evf::EvFDaqDirector>()->lockInitLock();
struct stat fstat;
if (stat(outJsonDefName_.c_str(), &fstat) != 0) { //file does not exist
LogDebug("GlobalEvFOutputModule") << "writing output definition file -: " << outJsonDefName_;
std::string content;
jsoncollector::JSONSerializer::serialize(&outJsonDef_, content);
jsoncollector::FileIO::writeStringToFile(outTmpJsonDefName, content);
std::filesystem::rename(outTmpJsonDefName, outJsonDefName_);
}
}
edm::Service<evf::EvFDaqDirector>()->unlockInitLock();
}

void GlobalEvFOutputJSONDef::updateDestination(std::string const& streamLabel) {
transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel);
mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel, evf::MergeTypeDAT);
}
Expand Down Expand Up @@ -284,6 +311,21 @@ namespace evf {
<< "stream (case-insensitive) sequence was found in stream suffix. This is reserved and can not be used for "
"names in FFF based HLT, but was detected in stream name";

//output initemp file. This lets hltd know number of streams early on
if (!edm::Service<evf::EvFDaqDirector>().isAvailable())
throw cms::Exception("GlobalEvFOutputModule") << "EvFDaqDirector is not available";

const std::string iniFileName = edm::Service<evf::EvFDaqDirector>()->getInitTempFilePath(streamLabel_);
std::ofstream file(iniFileName);
if (!file)
throw cms::Exception("GlobalEvFOutputModule") << "can not create " << iniFileName << "error: " << strerror(errno);
file.close();

edm::LogInfo("GlobalEvFOutputModule") << "Constructor created initemp file -: " << iniFileName;

//create JSD
GlobalEvFOutputJSONDef(streamLabel_, true);

fms_ = (evf::FastMonitoringService*)(edm::Service<evf::MicroStateService>().operator->());
}

Expand All @@ -305,8 +347,8 @@ namespace evf {

std::shared_ptr<GlobalEvFOutputJSONDef> GlobalEvFOutputModule::globalBeginRun(edm::RunForOutput const& run) const {
//create run Cache holding JSON file writer and variables
auto jsonDef = std::make_unique<GlobalEvFOutputJSONDef>(streamLabel_);

auto jsonDef = std::make_unique<GlobalEvFOutputJSONDef>(streamLabel_, false);
jsonDef->updateDestination(streamLabel_);
edm::StreamerOutputModuleCommon streamerCommon(ps_, &keptProducts()[edm::InEvent], description().moduleLabel());

//output INI file (non-const). This doesn't require globalBeginRun to be finished
Expand Down Expand Up @@ -341,17 +383,21 @@ namespace evf {
//read back file to check integrity of what was written
off_t readInput = 0;
uint32_t adlera = 1, adlerb = 0;
FILE* src = fopen(openIniFileName.c_str(), "r");
std::ifstream src(openIniFileName, std::ifstream::binary);
if (!src)
throw cms::Exception("GlobalEvFOutputModule")
<< "can not read back " << openIniFileName << " error: " << strerror(errno);

//allocate buffer to write INI file
std::unique_ptr<unsigned char[]> outBuf = std::make_unique<unsigned char[]>(1024 * 1024);
std::unique_ptr<char[]> outBuf = std::make_unique<char[]>(1024 * 1024);
while (readInput < istat.st_size) {
size_t toRead = readInput + 1024 * 1024 < istat.st_size ? 1024 * 1024 : istat.st_size - readInput;
fread(outBuf.get(), toRead, 1, src);
cms::Adler32(const_cast<const char*>(reinterpret_cast<char*>(outBuf.get())), toRead, adlera, adlerb);
src.read(outBuf.get(), toRead);
//cms::Adler32(const_cast<const char*>(reinterpret_cast<char*>(outBuf.get())), toRead, adlera, adlerb);
cms::Adler32(const_cast<const char*>(outBuf.get()), toRead, adlera, adlerb);
readInput += toRead;
}
fclose(src);
src.close();

//clear serialization buffers
streamerCommon.getSerializerBuffer()->clearHeaderBuffer();
Expand Down Expand Up @@ -382,7 +428,7 @@ namespace evf {
edm::LuminosityBlockForOutput const& iLB) const {
auto openDatFilePath = edm::Service<evf::EvFDaqDirector>()->getOpenDatFilePath(iLB.luminosityBlock(), streamLabel_);

return std::make_shared<GlobalEvFOutputEventWriter>(openDatFilePath);
return std::make_shared<GlobalEvFOutputEventWriter>(openDatFilePath, iLB.luminosityBlock());
}

void GlobalEvFOutputModule::acquire(edm::StreamID id,
Expand All @@ -403,7 +449,7 @@ namespace evf {
void GlobalEvFOutputModule::globalEndLuminosityBlock(edm::LuminosityBlockForOutput const& iLB) const {
auto lumiWriter = luminosityBlockCache(iLB.index());
//close dat file
const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)->close();
const bool discarded = const_cast<evf::GlobalEvFOutputEventWriter*>(lumiWriter)->close();

//auto jsonWriter = const_cast<GlobalEvFOutputJSONWriter*>(runCache(iLB.getRun().index()));
auto jsonDef = runCache(iLB.getRun().index());
Expand All @@ -417,7 +463,17 @@ namespace evf {
jsonWriter.accepted_.value() = lumiWriter->getAccepted();

bool abortFlag = false;
jsonWriter.processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);

if (!discarded) {
jsonWriter.processed_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
} else {
jsonWriter.errorEvents_.value() = fms_->getEventsProcessedForLumi(iLB.luminosityBlock(), &abortFlag);
jsonWriter.processed_.value() = 0;
jsonWriter.accepted_.value() = 0;
edm::LogInfo("GlobalEvFOutputModule")
<< "Output suppressed, setting error events for LS -: " << iLB.luminosityBlock();
}

if (abortFlag) {
edm::LogInfo("GlobalEvFOutputModule") << "Abort flag has been set. Output is suppressed";
return;
Expand Down
15 changes: 13 additions & 2 deletions EventFilter/Utilities/src/EvFDaqDirector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,7 @@ namespace evf {
edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
}
}
}

void EvFDaqDirector::initRun() {
std::stringstream ss;
ss << "run" << std::setfill('0') << std::setw(6) << run_;
run_string_ = ss.str();
Expand All @@ -154,10 +152,13 @@ namespace evf {
run_nstring_ = ss.str();
run_dir_ = base_dir_ + "/" + run_string_;
input_throttled_file_ = run_dir_ + "/input_throttle";
discard_ls_filestem_ = run_dir_ + "/discard_ls";
ss = std::stringstream();
ss << getpid();
pid_ = ss.str();
}

void EvFDaqDirector::initRun() {
// check if base dir exists or create it accordingly
int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
if (retval != 0 && errno != EEXIST) {
Expand Down Expand Up @@ -322,6 +323,7 @@ namespace evf {

nThreads_ = bounds.maxNumberOfStreams();
nStreams_ = bounds.maxNumberOfThreads();
nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
}

void EvFDaqDirector::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
Expand Down Expand Up @@ -446,6 +448,10 @@ namespace evf {
return run_dir_ + "/" + fffnaming::initFileNameWithPid(run_, 0, stream);
}

std::string EvFDaqDirector::getInitTempFilePath(std::string const& stream) const {
return run_dir_ + "/" + fffnaming::initTempFileNameWithPid(run_, 0, stream);
}

std::string EvFDaqDirector::getOpenProtocolBufferHistogramFilePath(const unsigned int ls,
std::string const& stream) const {
return run_dir_ + "/open/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
Expand Down Expand Up @@ -2067,4 +2073,9 @@ namespace evf {
return (stat(input_throttled_file_.c_str(), &buf) == 0);
}

bool EvFDaqDirector::lumisectionDiscarded(unsigned int ls) {
struct stat buf;
return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
}

} // namespace evf
16 changes: 16 additions & 0 deletions EventFilter/Utilities/src/FedRawDataInputSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,23 @@ void FedRawDataInputSource::readSupervisor() {
while (daqDirector_->inputThrottled()) {
if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
break;

unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
bool hasDiscardedLumi = false;
for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
if (daqDirector_->lumisectionDiscarded(i)) {
edm::LogWarning("FedRawDataInputSource") << "Source detected that the lumisection is discarded -: " << i;
hasDiscardedLumi = true;
break;
}
}
if (hasDiscardedLumi)
break;

setMonStateSup(inThrottled);

if (!(counter % 50))
edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
usleep(100000);
Expand Down
Loading

0 comments on commit 168c9f8

Please sign in to comment.