Skip to content

Commit

Permalink
Merge pull request #35833 from smorovic/12_1_X_input_throttled
Browse files Browse the repository at this point in the history
DAQ input throttling (12_1_X)
  • Loading branch information
cmsbuild authored Oct 26, 2021
2 parents d8f8424 + 0c7ec10 commit 2ff161c
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 4 deletions.
6 changes: 3 additions & 3 deletions EventFilter/Utilities/interface/EvFDaqDirector.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ namespace evf {
std::string getStreamDestinations(std::string const& stream) const;
std::string getStreamMergeType(std::string const& stream, MergeType defaultType);
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid);
bool inputThrottled();

private:
bool bumpFile(unsigned int& ls,
Expand Down Expand Up @@ -283,9 +284,8 @@ namespace evf {
std::unique_ptr<boost::asio::ip::tcp::resolver::query> query_;
std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> endpoint_iterator_;
std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
//boost::asio::io_context io_context_;
//tcp::resolver resolver_;
//tcp::resolver::results_type endpoints_;

std::string input_throttled_file_;
};
} // namespace evf

Expand Down
2 changes: 2 additions & 0 deletions EventFilter/Utilities/interface/FastMonitoringService.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ namespace evf {
inWaitChunk_newFileWaitThread,
inWaitChunk_newFileWaitChunkCopying,
inWaitChunk_newFileWaitChunk,
inSupThrottled,
inThrottled,
inCOUNT
};
} // namespace FastMonState
Expand Down
6 changes: 6 additions & 0 deletions EventFilter/Utilities/src/EvFDaqDirector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ namespace evf {
ss << run_;
run_nstring_ = ss.str();
run_dir_ = base_dir_ + "/" + run_string_;
input_throttled_file_ = run_dir_ + "/input_throttle";
ss = std::stringstream();
ss << getpid();
pid_ = ss.str();
Expand Down Expand Up @@ -2061,4 +2062,9 @@ namespace evf {
#endif
}

bool EvFDaqDirector::inputThrottled() {
struct stat buf;
return (stat(input_throttled_file_.c_str(), &buf) == 0);
}

} // namespace evf
7 changes: 6 additions & 1 deletion EventFilter/Utilities/src/FastMonitoringService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ namespace evf {
"WaitChunk_newFileWaitThreadCopying",
"WaitChunk_newFileWaitThread",
"WaitChunk_newFileWaitChunkCopying",
"WaitChunk_newFileWaitChunk"};
"WaitChunk_newFileWaitChunk",
"inSupThrottled",
"inThrottled"};

const std::string FastMonitoringService::nopath_ = "NoPath";

Expand Down Expand Up @@ -972,6 +974,9 @@ namespace evf {
microstateCopy[i] == &reservedMicroStateNames[FastMonState::mFwkEoL])
fmt_->m_data.inputState_[i] = FastMonState::inNewLumi;
}
} else if (inputSupervisorState_ == FastMonState::inSupThrottled) {
//apply directly throttled state from supervisor
fmt_->m_data.inputState_[0] = inputSupervisorState_;
} else
fmt_->m_data.inputState_[0] = inputState_;

Expand Down
13 changes: 13 additions & 0 deletions EventFilter/Utilities/src/FedRawDataInputSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ void FedRawDataInputSource::readSupervisor() {
while (!stop) {
//wait for at least one free thread and chunk
int counter = 0;

while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
readingFilesCount_ >= maxBufferedFiles_) {
//report state to monitoring
Expand Down Expand Up @@ -826,6 +827,18 @@ void FedRawDataInputSource::readSupervisor() {

//entering loop which tries to grab new file from ramdisk
while (status == evf::EvFDaqDirector::noFile) {
//check if hltd has signalled to throttle input
counter = 0;
while (daqDirector_->inputThrottled()) {
if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
break;
setMonStateSup(inThrottled);
if (!(counter % 50))
edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
usleep(100000);
counter++;
}

if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
stop = true;
break;
Expand Down

0 comments on commit 2ff161c

Please sign in to comment.