Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAQ: support for FRD file header with file locking #30206

Merged
merged 5 commits into from
Jun 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions EventFilter/Utilities/interface/EvFDaqDirector.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@ namespace evf {
void removeFile(unsigned int ls, unsigned int index);
void removeFile(std::string);

FileStatus updateFuLock(unsigned int& ls, std::string& nextFile, uint32_t& fsize, uint64_t& lockWaitTime);
FileStatus updateFuLock(unsigned int& ls,
std::string& nextFile,
uint32_t& fsize,
uint16_t& rawHeaderSize,
uint64_t& lockWaitTime,
bool& setExceptionState);
void tryInitializeFuLockFile();
unsigned int getRunNumber() const { return run_; }
void lockInitLock();
Expand All @@ -132,12 +137,14 @@ namespace evf {
bool requireHeader,
bool retry,
bool closeFile);
bool rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize);
int grabNextJsonFromRaw(std::string const& rawSourcePath,
int& rawFd,
uint16_t& rawHeaderSize,
int64_t& fileSizeFromHeader,
bool& fileFound,
uint32_t serverLS);
uint32_t serverLS,
bool closeFile);
int grabNextJsonFile(std::string const& jsonSourcePath,
std::string const& rawSourcePath,
int64_t& fileSizeFromJson,
Expand Down Expand Up @@ -177,7 +184,13 @@ namespace evf {
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid);

private:
bool bumpFile(unsigned int& ls, unsigned int& index, std::string& nextFile, uint32_t& fsize, int maxLS);
bool bumpFile(unsigned int& ls,
unsigned int& index,
std::string& nextFile,
uint32_t& fsize,
uint16_t& rawHeaderSize,
int maxLS,
bool& setExceptionState);
void openFULockfileStream(bool create);
std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const;
std::string outputFileNameStem(const unsigned int ls, std::string const& stream) const;
Expand Down
123 changes: 96 additions & 27 deletions EventFilter/Utilities/src/EvFDaqDirector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,11 @@ namespace evf {
EvFDaqDirector::FileStatus EvFDaqDirector::updateFuLock(unsigned int& ls,
std::string& nextFile,
uint32_t& fsize,
uint64_t& lockWaitTime) {
uint16_t& rawHeaderSize,
uint64_t& lockWaitTime,
bool& setExceptionState) {
EvFDaqDirector::FileStatus fileStatus = noFile;
rawHeaderSize = 0;

int retval = -1;
int lock_attempts = 0;
Expand Down Expand Up @@ -607,7 +610,7 @@ namespace evf {
ls++;
else {
// try to bump (look for new index or EoLS file)
bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, stopFileLS);
bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
//avoid 2 lumisections jump
if (ls && readLs > currentLs && currentLs > ls) {
ls++;
Expand Down Expand Up @@ -641,8 +644,10 @@ namespace evf {
fileStatus = newFile;
LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
} else {
throw cms::Exception("EvFDaqDirector")
edm::LogError("EvFDaqDirector")
<< "seek on fu read/write lock for updating failed with error " << strerror(errno);
setExceptionState = true;
return noFile;
}
} else if (currentLs < readLs) {
//there is no new file in next LS (yet), but lock file can be updated to the next LS
Expand All @@ -655,8 +660,10 @@ namespace evf {
fsync(fu_readwritelock_fd2);
LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
} else {
throw cms::Exception("EvFDaqDirector")
edm::LogError("EvFDaqDirector")
<< "seek on fu read/write lock for updating failed with error " << strerror(errno);
setExceptionState = true;
return noFile;
}
}
} else {
Expand Down Expand Up @@ -763,8 +770,13 @@ namespace evf {
return boost::lexical_cast<int>(data);
}

bool EvFDaqDirector::bumpFile(
unsigned int& ls, unsigned int& index, std::string& nextFile, uint32_t& fsize, int maxLS) {
bool EvFDaqDirector::bumpFile(unsigned int& ls,
unsigned int& index,
std::string& nextFile,
uint32_t& fsize,
uint16_t& rawHeaderSize,
int maxLS,
bool& setExceptionState) {
if (previousFileSize_ != 0) {
if (!fms_) {
fms_ = (FastMonitoringService*)(edm::Service<evf::MicroStateService>().operator->());
Expand All @@ -773,6 +785,7 @@ namespace evf {
fms_->accumulateFileSize(ls, previousFileSize_);
previousFileSize_ = 0;
}
nextFile = "";

//reached limit
if (maxLS >= 0 && ls > (unsigned int)maxLS)
Expand All @@ -784,21 +797,34 @@ namespace evf {
nextIndex++;

// 1. Check suggested file
nextFile = getInputJsonFilePath(ls, index);
if (stat(nextFile.c_str(), &buf) == 0) {
previousFileSize_ = buf.st_size;
fsize = buf.st_size;
std::string nextFileJson = getInputJsonFilePath(ls, index);
if (stat(nextFileJson.c_str(), &buf) == 0) {
fsize = previousFileSize_ = buf.st_size;
nextFile = nextFileJson;
return true;
}
// 2. No file -> lumi ended? (and how many?)
else {
// 3. No file -> check for standalone raw file
std::string nextFileRaw = getRawFilePath(ls, index);
if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
fsize = previousFileSize_ = buf.st_size;
nextFile = nextFileRaw;
return true;
}

std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
bool eolFound = (stat(BUEoLSFile.c_str(), &buf) == 0);
while (eolFound) {

if (stat(BUEoLSFile.c_str(), &buf) == 0) {
// recheck that no raw file appeared in the meantime
if (stat(nextFile.c_str(), &buf) == 0) {
previousFileSize_ = buf.st_size;
fsize = buf.st_size;
if (stat(nextFileJson.c_str(), &buf) == 0) {
fsize = previousFileSize_ = buf.st_size;
nextFile = nextFileJson;
return true;
}
if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
fsize = previousFileSize_ = buf.st_size;
nextFile = nextFileRaw;
return true;
}

Expand All @@ -813,6 +839,7 @@ namespace evf {
edm::LogError("EvFDaqDirector")
<< "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
<< indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
setExceptionState = true;
return false;
}
}
Expand All @@ -824,18 +851,20 @@ namespace evf {
if (maxLS >= 0 && ls > (unsigned int)maxLS)
return false;

nextFile = getInputJsonFilePath(ls, 0);
if (stat(nextFile.c_str(), &buf) == 0) {
nextFileJson = getInputJsonFilePath(ls, 0);
nextFileRaw = getRawFilePath(ls, 0);
if (stat(nextFileJson.c_str(), &buf) == 0) {
// a new file was found at new lumisection, index 0
previousFileSize_ = buf.st_size;
fsize = buf.st_size;
fsize = previousFileSize_ = buf.st_size;
nextFile = nextFileJson;
return true;
}
if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
fsize = previousFileSize_ = buf.st_size;
nextFile = nextFileRaw;
return true;
} else {
//change of policy: we need to cycle through each LS
return false;
}
BUEoLSFile = getEoLSFilePathOnBU(ls);
eolFound = (stat(BUEoLSFile.c_str(), &buf) == 0);
return false;
}
}
// no new file found
Expand Down Expand Up @@ -1019,12 +1048,52 @@ namespace evf {
return 0; //OK
}

bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
int infile;
if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
<< strerror(errno);
return false;
}
constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
FRDFileHeader_v1 fileHead;

ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);

if (sz_read < 0) {
edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << rawSourcePath << " : "
<< strerror(errno);
if (infile != -1)
close(infile);
return false;
}
if ((size_t)sz_read < buf_sz) {
edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << rawSourcePath;
if (infile != -1)
close(infile);
return false;
}

uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);

close(infile);

if (frd_version > 0) {
rawHeaderSize = fileHead.headerSize_;
return true;
}

rawHeaderSize = 0;
return false;
}

int EvFDaqDirector::grabNextJsonFromRaw(std::string const& rawSourcePath,
int& rawFd,
uint16_t& rawHeaderSize,
int64_t& fileSizeFromHeader,
bool& fileFound,
uint32_t serverLS) {
uint32_t serverLS,
bool closeFile) {
fileFound = true;

//take only first three tokens delimited by "_" in the renamed raw file name
Expand All @@ -1047,7 +1116,7 @@ namespace evf {
int32_t nbEventsWrittenRaw;
int64_t fileSizeFromRaw;
auto ret = parseFRDFileHeader(
rawSourcePath, rawFd, rawHeaderSize, lsFromRaw, nbEventsWrittenRaw, fileSizeFromRaw, true, true, false);
rawSourcePath, rawFd, rawHeaderSize, lsFromRaw, nbEventsWrittenRaw, fileSizeFromRaw, true, true, closeFile);
if (ret != 0) {
if (ret == 1)
fileFound = false;
Expand Down Expand Up @@ -1719,7 +1788,7 @@ namespace evf {
if (fileStatus == newFile) {
if (rawHeader > 0)
serverEventsInNewFile =
grabNextJsonFromRaw(nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS);
grabNextJsonFromRaw(nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false);
else
serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
}
Expand Down
Loading