Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[11_0_X] Protect storage accounting UDP messages from NaN, and Use StatisticsSenderService for all framework files #36358

Merged
2 changes: 1 addition & 1 deletion IOPool/Input/src/EmbeddedRootSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace edm {
InputFile::reportReadBranches();
}

void EmbeddedRootSource::closeFile_() { fileSequence_->closeFile_(); }
void EmbeddedRootSource::closeFile_() { fileSequence_->closeFile(); }

bool EmbeddedRootSource::readOneEvent(EventPrincipal& cache,
size_t& fileNameHash,
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/PoolSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ namespace edm {
return fb;
}

void PoolSource::closeFile_() { primaryFileSequence_->closeFile_(); }
void PoolSource::closeFile_() { primaryFileSequence_->closeFile(); }

std::shared_ptr<RunAuxiliary> PoolSource::readRunAuxiliary_() { return primaryFileSequence_->readRunAuxiliary_(); }

Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootEmbeddedFileSequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ namespace edm {

RootEmbeddedFileSequence::~RootEmbeddedFileSequence() {}

void RootEmbeddedFileSequence::endJob() { closeFile_(); }
void RootEmbeddedFileSequence::endJob() { closeFile(); }

void RootEmbeddedFileSequence::closeFile_() {
// delete the RootFile object.
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootEmbeddedFileSequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ namespace edm {
RootEmbeddedFileSequence(RootEmbeddedFileSequence const&) = delete; // Disallow copying and moving
RootEmbeddedFileSequence& operator=(RootEmbeddedFileSequence const&) = delete; // Disallow copying and moving

void closeFile_() override;
void endJob();
void skipEntries(unsigned int offset);
bool readOneEvent(
Expand All @@ -56,6 +55,7 @@ namespace edm {
static void fillDescription(ParameterSetDescription& desc);

private:
void closeFile_() override;
void initFile_(bool skipBadFiles) override;
RootFileSharedPtr makeRootFile(std::shared_ptr<InputFile> filePtr) override;

Expand Down
103 changes: 58 additions & 45 deletions IOPool/Input/src/RootInputFileSequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "Utilities/StorageFactory/interface/StorageFactory.h"
#include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
#include "FWCore/ServiceRegistry/interface/Service.h"

#include "TSystem.h"

Expand Down Expand Up @@ -192,7 +194,7 @@ namespace edm {
}
fileIterLastOpened_ = fileIterEnd_;
}
closeFile_();
closeFile();

if (noMoreFiles()) {
// No files specified
Expand Down Expand Up @@ -224,58 +226,61 @@ namespace edm {

std::shared_ptr<InputFile> filePtr;
std::list<std::string> originalInfo;
try {
{
std::unique_ptr<InputSource::FileOpenSentry> sentry(
input ? std::make_unique<InputSource::FileOpenSentry>(*input, lfn_, usedFallback_) : nullptr);
std::unique_ptr<char[]> name(gSystem->ExpandPathName(fileName().c_str()));
;
filePtr = std::make_shared<InputFile>(name.get(), " Initiating request to open file ", inputType);
} catch (cms::Exception const& e) {
if (!skipBadFiles) {
if (hasFallbackUrl) {
std::ostringstream out;
out << e.explainSelf();

std::unique_ptr<char[]> name(gSystem->ExpandPathName(fallbackFileName().c_str()));
std::string pfn(name.get());
InputFile::reportFallbackAttempt(pfn, logicalFileName(), out.str());
originalInfo = e.additionalInfo();
} else {
InputFile::reportSkippedFile(fileName(), logicalFileName());
Exception ex(errors::FileOpenError, "", e);
ex.addContext("Calling RootFileSequenceBase::initTheFile()");
std::ostringstream out;
out << "Input file " << fileName() << " could not be opened.";
ex.addAdditionalInfo(out.str());
throw ex;
}
input ? std::make_unique<InputSource::FileOpenSentry>(*input, lfn_, false) : nullptr);
edm::Service<edm::storage::StatisticsSenderService> service;
if (service.isAvailable()) {
service->openingFile(lfn(), inputType, -1);
}
}
if (!filePtr && (hasFallbackUrl)) {
try {
usedFallback_ = true;
std::unique_ptr<InputSource::FileOpenSentry> sentry(
input ? std::make_unique<InputSource::FileOpenSentry>(*input, lfn_, usedFallback_) : nullptr);
std::unique_ptr<char[]> fallbackFullName(gSystem->ExpandPathName(fallbackFileName().c_str()));
filePtr.reset(new InputFile(fallbackFullName.get(), " Fallback request to file ", inputType));
std::unique_ptr<char[]> name(gSystem->ExpandPathName(fileName().c_str()));
filePtr = std::make_shared<InputFile>(name.get(), " Initiating request to open file ", inputType);
} catch (cms::Exception const& e) {
if (!skipBadFiles) {
InputFile::reportSkippedFile(fileName(), logicalFileName());
Exception ex(errors::FallbackFileOpenError, "", e);
ex.addContext("Calling RootFileSequenceBase::initTheFile()");
std::ostringstream out;
out << "Input file " << fileName() << " could not be opened.\n";
out << "Fallback Input file " << fallbackFileName() << " also could not be opened.";
if (!originalInfo.empty()) {
out << std::endl << "Original exception info is above; fallback exception info is below.";
ex.addAdditionalInfo(out.str());
for (auto const& s : originalInfo) {
ex.addAdditionalInfo(s);
}
if (hasFallbackUrl) {
std::ostringstream out;
out << e.explainSelf();

std::unique_ptr<char[]> name(gSystem->ExpandPathName(fallbackFileName().c_str()));
std::string pfn(name.get());
InputFile::reportFallbackAttempt(pfn, logicalFileName(), out.str());
originalInfo = e.additionalInfo();
} else {
InputFile::reportSkippedFile(fileName(), logicalFileName());
Exception ex(errors::FileOpenError, "", e);
ex.addContext("Calling RootFileSequenceBase::initTheFile()");
std::ostringstream out;
out << "Input file " << fileName() << " could not be opened.";
ex.addAdditionalInfo(out.str());
throw ex;
}
}
}
if (!filePtr && (hasFallbackUrl)) {
try {
usedFallback_ = true;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, since the usedFallback_ = true is set here, 11_0_X (and earlier) do not need the backport of #36379. The FileOpenSentry will still always signal that none of the files are fallbacks, but that information is not being used anywhere (except in Tracer Service, but those being "wrong" is not a big deal). The StatisticsSenderService anyway gets the value of this boolean via direct call (instead of the ActivityRegistry callbacks).

std::unique_ptr<char[]> fallbackFullName(gSystem->ExpandPathName(fallbackFileName().c_str()));
filePtr.reset(new InputFile(fallbackFullName.get(), " Fallback request to file ", inputType));
} catch (cms::Exception const& e) {
if (!skipBadFiles) {
InputFile::reportSkippedFile(fileName(), logicalFileName());
Exception ex(errors::FallbackFileOpenError, "", e);
ex.addContext("Calling RootFileSequenceBase::initTheFile()");
std::ostringstream out;
out << "Input file " << fileName() << " could not be opened.\n";
out << "Fallback Input file " << fallbackFileName() << " also could not be opened.";
if (!originalInfo.empty()) {
out << std::endl << "Original exception info is above; fallback exception info is below.";
ex.addAdditionalInfo(out.str());
for (auto const& s : originalInfo) {
ex.addAdditionalInfo(s);
}
} else {
ex.addAdditionalInfo(out.str());
}
throw ex;
}
throw ex;
}
}
}
Expand All @@ -299,6 +304,14 @@ namespace edm {
}
}

void RootInputFileSequence::closeFile() {
edm::Service<edm::storage::StatisticsSenderService> service;
if (rootFile() and service.isAvailable()) {
service->closedFile(lfn(), usedFallback());
}
closeFile_();
}

void RootInputFileSequence::setIndexIntoFile(size_t index) {
indexesIntoFiles_[index] = rootFile()->indexIntoFileSharedPtr();
}
Expand Down
2 changes: 2 additions & 0 deletions IOPool/Input/src/RootInputFileSequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ namespace edm {
std::shared_ptr<ProductRegistry const> fileProductRegistry() const;
std::shared_ptr<BranchIDListHelper const> fileBranchIDListHelper() const;

void closeFile();

protected:
typedef std::shared_ptr<RootFile> RootFileSharedPtr;
void initFile(bool skipBadFiles) { initFile_(skipBadFiles); }
Expand Down
4 changes: 2 additions & 2 deletions IOPool/Input/src/RootPrimaryFileSequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ namespace edm {

RootPrimaryFileSequence::~RootPrimaryFileSequence() {}

void RootPrimaryFileSequence::endJob() { closeFile_(); }
void RootPrimaryFileSequence::endJob() { closeFile(); }

std::unique_ptr<FileBlock> RootPrimaryFileSequence::readFile_() {
if (firstFile_) {
Expand Down Expand Up @@ -212,7 +212,7 @@ namespace edm {
// Rewind to before the first event that was read.
void RootPrimaryFileSequence::rewind_() {
if (!atFirstFile()) {
closeFile_();
closeFile();
setAtFirstFile();
}
if (!rootFile()) {
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootPrimaryFileSequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ namespace edm {
RootPrimaryFileSequence& operator=(RootPrimaryFileSequence const&) = delete; // Disallow copying and moving

std::unique_ptr<FileBlock> readFile_();
void closeFile_() override;
void endJob();
InputSource::ItemType getNextItemType(RunNumber_t& run, LuminosityBlockNumber_t& lumi, EventNumber_t& event);
bool skipEvents(int offset);
Expand All @@ -56,6 +55,7 @@ namespace edm {
bool nextFile();
bool previousFile();
void rewindFile();
void closeFile_() override;

int remainingEvents() const;
int remainingLuminosityBlocks() const;
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootSecondaryFileSequence.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ namespace edm {

RootSecondaryFileSequence::~RootSecondaryFileSequence() {}

void RootSecondaryFileSequence::endJob() { closeFile_(); }
void RootSecondaryFileSequence::endJob() { closeFile(); }

void RootSecondaryFileSequence::closeFile_() {
// close the currently open file, if any, and delete the RootFile object.
Expand Down
2 changes: 1 addition & 1 deletion IOPool/Input/src/RootSecondaryFileSequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ namespace edm {
RootSecondaryFileSequence(RootSecondaryFileSequence const&) = delete; // Disallow copying and moving
RootSecondaryFileSequence& operator=(RootSecondaryFileSequence const&) = delete; // Disallow copying and moving

void closeFile_() override;
void endJob();
void initAssociationsFromSecondary(std::set<BranchID> const&);

private:
void closeFile_() override;
void initFile_(bool skipBadFiles) override;
RootFileSharedPtr makeRootFile(std::shared_ptr<InputFile> filePtr) override;

Expand Down
4 changes: 2 additions & 2 deletions IOPool/SecondaryInput/test/SecondaryProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
************************************************************/

#include "DataFormats/Provenance/interface/EventID.h"
#include "FWCore/Framework/interface/EDProducer.h"
#include "FWCore/Framework/interface/one/EDProducer.h"
#include "FWCore/Utilities/interface/get_underlying_safe.h"

#include <memory>
Expand All @@ -19,7 +19,7 @@ namespace edm {
class ProcessConfiguration;
class VectorInputSource;

class SecondaryProducer : public EDProducer {
class SecondaryProducer : public one::EDProducer<> {
public:
/** standard constructor*/
explicit SecondaryProducer(ParameterSet const& pset);
Expand Down
2 changes: 1 addition & 1 deletion IOPool/TFileAdaptor/src/TStorageFactoryFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void TStorageFactoryFile::Initialize(const char *path, Option_t *option /* = ""
try {
edm::Service<edm::storage::StatisticsSenderService> statsService;
if (statsService.isAvailable()) {
statsService->setSize(storage_->size());
statsService->setSize(path, storage_->size());
}
} catch (edm::Exception const &e) {
if (e.categoryCode() != edm::errors::NotFound) {
Expand Down
55 changes: 42 additions & 13 deletions Utilities/StorageFactory/interface/StatisticsSenderService.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <sstream>
#include <atomic>
#include <mutex>
#include <tbb/concurrent_unordered_map.h>
#include "FWCore/Utilities/interface/InputType.h"

namespace edm {

Expand All @@ -16,19 +18,25 @@ namespace edm {

class StatisticsSenderService {
public:
StatisticsSenderService(edm::ParameterSet const &pset, edm::ActivityRegistry &ar);
StatisticsSenderService(edm::ParameterSet const& pset, edm::ActivityRegistry& ar);

void setSize(size_t size);
void setCurrentServer(const std::string &servername);
void filePreCloseEvent(std::string const &lfn, bool usedFallback);
static const char *getJobID();
static bool getX509Subject(std::string &);
void setSize(const std::string& urlOrLfn, size_t size);
void setCurrentServer(const std::string& urlOrLfn, const std::string& servername);
static const char* getJobID();
static bool getX509Subject(std::string&);

void openingFile(std::string const& lfn, edm::InputType type, size_t size = -1);
void closedFile(std::string const& lfn, bool usedFallback);

private:
void filePostCloseEvent(std::string const& lfn, bool usedFallback);

std::string const* matchedLfn(std::string const& iURL); //updates its internal cache
class FileStatistics {
public:
FileStatistics();
void fillUDP(std::ostringstream &os);
void fillUDP(std::ostringstream& os) const;
void update();

private:
ssize_t m_read_single_operations;
Expand All @@ -42,19 +50,40 @@ namespace edm {
time_t m_start_time;
};

void determineHostnames(void);
void fillUDP(const std::string &, bool, std::string &);
struct FileInfo {
explicit FileInfo(std::string const& iLFN, edm::InputType);

FileInfo(FileInfo&& iInfo)
: m_filelfn(std::move(iInfo.m_filelfn)),
m_serverhost(std::move(iInfo.m_serverhost)),
m_serverdomain(std::move(iInfo.m_serverdomain)),
m_type(iInfo.m_type),
m_size(iInfo.m_size.load()),
m_id(iInfo.m_id),
m_openCount(iInfo.m_openCount.load()) {}
std::string m_filelfn;
std::string m_serverhost;
std::string m_serverdomain;
edm::InputType m_type;
std::atomic<ssize_t> m_size;
size_t m_id; //from m_counter
std::atomic<int> m_openCount;
};

void determineHostnames();
void fillUDP(const std::string& site, const FileInfo& fileinfo, bool, std::string&) const;
void cleanupOldFiles();

std::string m_clienthost;
std::string m_clientdomain;
std::string m_serverhost;
std::string m_serverdomain;
std::string m_filelfn;
tbb::concurrent_unordered_map<std::string, FileInfo> m_lfnToFileInfo;
tbb::concurrent_unordered_map<std::string, std::string> m_urlToLfn;
FileStatistics m_filestats;
std::string m_guid;
size_t m_counter;
std::atomic<ssize_t> m_size;
std::string m_userdn;
std::mutex m_servermutex;
const bool m_debug;
};

} // namespace storage
Expand Down
Loading