Skip to content

Commit

Permalink
Adding fd to FileInfo interface, simplifying O_DIRECT code, fixing bugs
Browse files Browse the repository at this point in the history
Summary: 1) Added fd to FileInfo. If fd, is provided, we use that fd and do not open the file. I am currently using pread always. I have to more perf test for pread. Max send is really slow in my new dev server, so need to test for perf in some other machine.
2) If connection url was provided, sender ignored file list. Fixed that and
added a test for file info list
3) Added an option in e2e simple test to test o_direct mode
4) Simplifying o_direct code. We should never define macros named O_DIRECT.

TODO: Finalize the fd api. Current version in the diff will work with fastcopy.

Reviewed By: @ldemailly

Differential Revision: D2447381
  • Loading branch information
uddipta authored and ldemailly committed Sep 19, 2015
1 parent dd13d87 commit 4bfe52a
Show file tree
Hide file tree
Showing 22 changed files with 557 additions and 314 deletions.
17 changes: 15 additions & 2 deletions ByteSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,21 @@ struct SourceMetaData {
FileAllocationStatus allocationStatus{NOT_EXISTS};
/// if there is a size mismatch, this is the previous sequence id
int64_t prevSeqId{0};
/// Read the file with these flags
int oFlags{0};
/// If true, files are read using O_DIRECT or F_NOCACHE
bool directReads{false};
/// File descriptor. If this is not -1, then wdt uses this to read
int fd{-1};
/// If true, fd was opened by wdt and must be closed after transfer finish
bool needToClose{false};

~SourceMetaData() {
if (needToClose && fd >= 0) {
int ret = ::close(fd);
if (ret) {
PLOG(ERROR) << "Failed to close file " << relPath;
}
}
}
};

class ByteSource {
Expand Down
7 changes: 6 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ if (BUILD_TESTING)
COMPILE_DEFINITIONS "STANDALONE_APP")
target_link_libraries(option_type_test_short_flags wdt4tests_min)


add_test(NAME WdtRandGenTest COMMAND
"${CMAKE_CURRENT_SOURCE_DIR}/wdt_rand_gen_test.sh")

Expand All @@ -292,4 +291,10 @@ if (BUILD_TESTING)
add_test(NAME WdtProtocolNegotiationTest COMMAND
"${CMAKE_CURRENT_SOURCE_DIR}/wdt_protocol_negotiation_test.py")

add_test(NAME WdtSimpleOdirectTest COMMAND
"${CMAKE_CURRENT_SOURCE_DIR}/wdt_e2e_simple_test.sh" -o true)

add_test(NAME WdtFileListTest COMMAND
"${CMAKE_CURRENT_SOURCE_DIR}/wdt_file_list_test.py")

endif(BUILD_TESTING)
44 changes: 25 additions & 19 deletions DirectorySourceQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,32 @@ namespace wdt {

using std::string;

FileInfo::FileInfo(const string &name, int64_t size)
FileInfo::FileInfo(const string &name, int64_t size, bool directReads)
: fileName(name), fileSize(size) {
oFlags = O_RDONLY;
const auto &options = WdtOptions::get();
if (options.odirect_reads) {
oFlags |= O_DIRECT;
}
this->directReads = directReads;
}

FileInfo::FileInfo(const string &name, int64_t size, int fd)
: FileInfo(name, size) {
this->fd = fd;
}

void FileInfo::verifyAndFixFlags() {
if (oFlags & ~(O_DIRECT | O_RDONLY)) {
LOG(WARNING) << "Flags apart from O_RDONLY and O_DIRECT "
<< "provided, for " << fileName
<< ". Wdt will ignore the extra flags";
oFlags &= (O_RDONLY | O_DIRECT);
if (fd >= 0) {
#ifdef O_DIRECT
int flags = fcntl(fd, F_GETFL, 0);
// directReads does not depend on the option in this case
directReads = (flags & O_DIRECT);
// Do not have to worry about F_NOCACHE, since it has no alignment
// requirement
#endif
}
if (directReads) {
#ifndef WDT_SUPPORTS_ODIRECT
bool hasOdirect = oFlags & O_DIRECT;
if (hasOdirect) {
LOG(WARNING) << "Can't read " << fileName << " in O_DIRECT"
<< ". Memalign not found, turning O_DIRECT flag"
<< " off for this file";
oFlags &= ~(O_DIRECT);
}
LOG(WARNING) << "Wdt can't handle O_DIRECT in this system. " << fileName;
directReads = false;
#endif
}
}

DirectorySourceQueue::DirectorySourceQueue(const string &rootDir,
Expand Down Expand Up @@ -472,8 +473,13 @@ void DirectorySourceQueue::createIntoQueue(const string &fullPath,
SourceMetaData *metadata = new SourceMetaData();
metadata->fullPath = fullPath;
metadata->relPath = relPath;
metadata->fd = fileInfo.fd;
metadata->directReads = fileInfo.directReads;
if (options_.open_files_during_discovery && metadata->fd < 0) {
metadata->fd = FileUtil::openForRead(fullPath, metadata->directReads);
metadata->needToClose = (metadata->fd >= 0);
}
metadata->seqId = seqId;
metadata->oFlags = fileInfo.oFlags;
metadata->size = fileSize;
metadata->allocationStatus = allocationStatus;
metadata->prevSeqId = prevSeqId;
Expand Down
16 changes: 11 additions & 5 deletions DirectorySourceQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,19 @@ struct FileInfo {
std::string fileName;
/// Size of the file to be read, default is -1
int64_t fileSize;
/// Whether read should be done using o_direct
bool directReads{false};
/// File descriptor. If this is not -1, then wdt uses this to read
int fd{-1};
/// Constructor for file info with name and size
explicit FileInfo(const std::string &name, int64_t size = -1,
bool directReads = WdtOptions::get().odirect_reads);
/**
* Flags to read the file with, wdt supports only
* O_RDONLY, and O_DIRECT no matter what flags are provided
* Constructor with name, size and fd
* If this constructor is used, then whether to do direct reads is decided
* by fd flags
*/
int oFlags;
/// Constructor for file info with name and size
explicit FileInfo(const std::string &name, int64_t size = -1);
FileInfo(const std::string &name, int64_t size, int fd);
/// Verify that we can align for reading in O_DIRECT and
/// the flags make sense
void verifyAndFixFlags();
Expand Down
107 changes: 57 additions & 50 deletions FileByteSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,70 +19,78 @@ namespace wdt {

folly::ThreadLocalPtr<FileByteSource::Buffer> FileByteSource::buffer_;

int FileUtil::openForRead(const std::string &filename, bool isDirectReads) {
int openFlags = O_RDONLY;
if (isDirectReads) {
#ifdef O_DIRECT
// no need to change any flags if we are using F_NOCACHE
openFlags |= O_DIRECT;
#endif
}
START_PERF_TIMER
int fd = ::open(filename.c_str(), openFlags);
RECORD_PERF_RESULT(PerfStatReport::FILE_OPEN)
if (fd >= 0) {
if (isDirectReads) {
#ifndef O_DIRECT
#ifdef F_NOCACHE
VLOG(1) << "O_DIRECT not found, using F_NOCACHE instead "
<< "for " << filename;
int ret = fcntl(fd, F_NOCACHE, 1);
if (ret) {
PLOG(ERROR) << "Not able to set F_NOCACHE";
}
#else
WDT_CHECK(false)
<< "Direct read enabled, but both O_DIRECT and F_NOCACHE not defined "
<< filename;
#endif
#endif
}
} else {
PLOG(ERROR) << "Error opening file " << filename;
}
return fd;
}

FileByteSource::FileByteSource(SourceMetaData *metadata, int64_t size,
int64_t offset, int64_t bufferSize)
: metadata_(metadata),
size_(size),
offset_(offset),
bytesRead_(0),
bufferSize_(bufferSize) {
bufferSize_(bufferSize),
alignedReadNeeded_(false) {
transferStats_.setId(getIdentifier());
}

ErrorCode FileByteSource::open() {
bytesRead_ = 0;
this->close();
bool isOdirect = (metadata_->oFlags & O_DIRECT);
int oFlags = O_RDONLY;
if (isOdirect) {
oFlags |= O_DIRECT;
}
#ifdef WDT_SIMULATED_ODIRECT
if (isOdirect) {
oFlags &= ~(O_DIRECT);
}
#endif
VLOG(1) << "Reading with O_DIRECT " << ((oFlags & O_DIRECT) > 0);
ErrorCode errCode = OK;
bool isDirectReads = metadata_->directReads;
VLOG(1) << "Reading in direct mode " << isDirectReads;
if (isDirectReads) {
#ifdef O_DIRECT
alignedReadNeeded_ = true;
#endif
}
bool hasValidBuffer = (buffer_ && bufferSize_ <= buffer_->size_);
if (!hasValidBuffer || (isOdirect && !buffer_->isMemAligned_)) {
buffer_.reset(new Buffer(bufferSize_, isOdirect));
if (!hasValidBuffer || (alignedReadNeeded_ && !buffer_->isMemAligned_)) {
// TODO: if posix_memalign is present, create aligned buffer by default
buffer_.reset(new Buffer(bufferSize_, alignedReadNeeded_));
}
const std::string &fullPath = metadata_->fullPath;
START_PERF_TIMER
fd_ = ::open(fullPath.c_str(), oFlags);
if (fd_ < 0) {
errCode = BYTE_SOURCE_READ_ERROR;
PLOG(ERROR) << "Error opening file " << fullPath;

if (metadata_->fd >= 0) {
VLOG(1) << "metadata already has fd, no need to open " << getIdentifier();
fd_ = metadata_->fd;
} else {
#ifdef WDT_SIMULATED_ODIRECT
#ifdef F_NOCACHE
if (isOdirect) {
LOG(WARNING) << "O_DIRECT not found, using F_NOCACHE instead "
<< "for " << getIdentifier();
int ret = fcntl(fd_, F_NOCACHE, 1);
if (ret) {
PLOG(ERROR) << "Not able to do F_NOCACHE";
}
}
#else
if (isOdirect) {
LOG(ERROR) << "O_DIRECT requested but this OS doesn't support "
<< "O_DIRECT or F_NOCACHE";
}
#endif
#endif
RECORD_PERF_RESULT(PerfStatReport::FILE_OPEN)
if (offset_ > 0) {
START_PERF_TIMER
if (lseek(fd_, offset_, SEEK_SET) < 0) {
errCode = BYTE_SOURCE_READ_ERROR;
PLOG(ERROR) << "Error seeking file " << fullPath;
} else {
RECORD_PERF_RESULT(PerfStatReport::FILE_SEEK)
}
fd_ = FileUtil::openForRead(metadata_->fullPath, isDirectReads);
if (fd_ < 0) {
errCode = BYTE_SOURCE_READ_ERROR;
}
}

transferStats_.setErrorCode(errCode);
return errCode;
}
Expand All @@ -100,8 +108,7 @@ char *FileByteSource::read(int64_t &size) {
int64_t expectedRead =
(int64_t)std::min<int64_t>(buffer_->size_, size_ - bytesRead_);
int64_t toRead = expectedRead;
bool isOdirect = (metadata_->oFlags & O_DIRECT);
if (isOdirect) {
if (alignedReadNeeded_) {
toRead =
((expectedRead + kDiskBlockSize - 1) / kDiskBlockSize) * kDiskBlockSize;
}
Expand All @@ -110,7 +117,7 @@ char *FileByteSource::read(int64_t &size) {
<< " while buffer size is "
<< buffer_->size_;
START_PERF_TIMER
int64_t numRead = ::read(fd_, buffer_->data_, toRead);
int64_t numRead = ::pread(fd_, buffer_->data_, toRead, offset_ + bytesRead_);
if (numRead < 0) {
PLOG(ERROR) << "failure while reading file " << metadata_->fullPath;
this->close();
Expand All @@ -128,7 +135,7 @@ char *FileByteSource::read(int64_t &size) {
// from a sub block of the file smaller than disk block
// size
if (numRead > expectedRead) {
WDT_CHECK(isOdirect);
WDT_CHECK(alignedReadNeeded_);
numRead = expectedRead;
}
bytesRead_ += numRead;
Expand Down
25 changes: 25 additions & 0 deletions FileByteSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,22 @@ namespace facebook {
namespace wdt {
const int64_t kDiskBlockSize = 4 * 1024;

/// File related code
class FileUtil {
public:
/**
* Opens the file for reading.
*
* @param filename name of the file
* @param isDirectReads whether to open for direct reads
*
* @return If successful, fd is returned, else -1 is returned
*/
static int openForRead(const std::string &filename, bool isDirectReads);
// TODO: create a separate file for this class and move other file related
// code here
};

/**
* ByteSource that reads data from a file. The buffer used is thread-local
* for efficiency reasons so only one FileByteSource can be created/used
Expand Down Expand Up @@ -85,6 +101,12 @@ class FileByteSource : public ByteSource {

/// close the source for reading
virtual void close() override {
if (metadata_->fd >= 0) {
// if the fd is not opened by this source, no need to close it
VLOG(1) << "No need to close " << getIdentifier()
<< ", this was not opened by FileByteSource";
return;
}
if (fd_ >= 0) {
START_PERF_TIMER
::close(fd_);
Expand Down Expand Up @@ -176,6 +198,9 @@ class FileByteSource : public ByteSource {
/// buffer size
int64_t bufferSize_;

/// Whether reads have to be done using aligned buffer and size
bool alignedReadNeeded_{false};

/// transfer stats
TransferStats transferStats_;
};
Expand Down
Loading

0 comments on commit 4bfe52a

Please sign in to comment.