Skip to content

Commit

Permalink
Simplify migration to FileSystem API (#6552)
Browse files Browse the repository at this point in the history
Summary:
The current Env/FileSystem API separation has a couple of issues -
1. It requires the user to specify 2 options - ```Options::env``` and ```Options::file_system``` - which means they have to make code changes to benefit from the new APIs. Furthermore, there is a risk of accessing the same APIs in two different ways, through Env in the old way and through FileSystem in the new way. The two may not always match, for example, if env is ```PosixEnv``` and FileSystem is a custom implementation. Any stray RocksDB calls to env will use the ```PosixEnv``` implementation rather than the file_system implementation.
2. There needs to be a simple way for the FileSystem developer to instantiate an Env for backward compatibility purposes.

This PR solves the above issues and simplifies the migration in the following ways -
1. Embed a shared_ptr to the ```FileSystem``` in the ```Env```, and remove ```Options::file_system``` as a configurable option. This way, no code changes will be required in application code to benefit from the new API. The default Env constructor uses a ```LegacyFileSystemWrapper``` as the embedded ```FileSystem```.
1a. - This also makes it more robust by ensuring that even if RocksDB
  has some stray calls to Env APIs rather than FileSystem, they will go
  through the same object and thus there is no risk of getting out of
  sync.
2. Provide a ```NewCompositeEnv()``` API that can be used to construct a
PosixEnv with a custom FileSystem implementation. This eliminates an
indirection to call Env APIs, and relieves the FileSystem developer of
the burden of having to implement wrappers for the Env APIs.
3. Add a couple of missing FileSystem APIs - ```SanitizeEnvOptions()``` and
```NewLogger()```

Tests:
1. New unit tests
2. make check and make asan_check
Pull Request resolved: facebook/rocksdb#6552

Reviewed By: riversand963

Differential Revision: D20592038

Pulled By: anand1976

fbshipit-source-id: c3801ad4153f96d21d5a3ae26c92ba454d1bf1f7
Signed-off-by: Changlong Chen <levisonchen@live.cn>
  • Loading branch information
anand76 authored and Changlong Chen committed Jun 18, 2021
1 parent 21e7d4d commit 37df8a4
Showing 1 changed file with 55 additions and 56 deletions.
111 changes: 55 additions & 56 deletions env/composite_env_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,20 +291,18 @@ class CompositeEnvWrapper : public Env {
public:
// Initialize a CompositeEnvWrapper that delegates all thread/time related
// calls to env, and all file operations to fs
explicit CompositeEnvWrapper(Env* env, FileSystem* fs)
: env_target_(env), fs_env_target_(fs) {}
explicit CompositeEnvWrapper(Env* env, std::shared_ptr<FileSystem> fs)
: Env(fs), env_target_(env) {}
~CompositeEnvWrapper() {}

// Return the target to which this Env forwards all calls
Env* env_target() const { return env_target_; }

FileSystem* fs_env_target() const { return fs_env_target_; }

Status RegisterDbPaths(const std::vector<std::string>& paths) override {
return fs_env_target_->RegisterDbPaths(paths);
return file_system_->RegisterDbPaths(paths);
}
Status UnregisterDbPaths(const std::vector<std::string>& paths) override {
return fs_env_target_->UnregisterDbPaths(paths);
return file_system_->UnregisterDbPaths(paths);
}

// The following text is boilerplate that forwards all methods to target()
Expand All @@ -315,7 +313,7 @@ class CompositeEnvWrapper : public Env {
std::unique_ptr<FSSequentialFile> file;
Status status;
status =
fs_env_target_->NewSequentialFile(f, FileOptions(options), &file, &dbg);
file_system_->NewSequentialFile(f, FileOptions(options), &file, &dbg);
if (status.ok()) {
r->reset(new CompositeSequentialFileWrapper(file));
}
Expand All @@ -327,8 +325,8 @@ class CompositeEnvWrapper : public Env {
IODebugContext dbg;
std::unique_ptr<FSRandomAccessFile> file;
Status status;
status = fs_env_target_->NewRandomAccessFile(f, FileOptions(options), &file,
&dbg);
status =
file_system_->NewRandomAccessFile(f, FileOptions(options), &file, &dbg);
if (status.ok()) {
r->reset(new CompositeRandomAccessFileWrapper(file));
}
Expand All @@ -340,7 +338,7 @@ class CompositeEnvWrapper : public Env {
std::unique_ptr<FSWritableFile> file;
Status status;
status =
fs_env_target_->NewWritableFile(f, FileOptions(options), &file, &dbg);
file_system_->NewWritableFile(f, FileOptions(options), &file, &dbg);
if (status.ok()) {
r->reset(new CompositeWritableFileWrapper(file));
}
Expand All @@ -352,8 +350,8 @@ class CompositeEnvWrapper : public Env {
IODebugContext dbg;
Status status;
std::unique_ptr<FSWritableFile> file;
status = fs_env_target_->ReopenWritableFile(fname, FileOptions(options),
&file, &dbg);
status = file_system_->ReopenWritableFile(fname, FileOptions(options),
&file, &dbg);
if (status.ok()) {
result->reset(new CompositeWritableFileWrapper(file));
}
Expand All @@ -366,8 +364,8 @@ class CompositeEnvWrapper : public Env {
IODebugContext dbg;
Status status;
std::unique_ptr<FSWritableFile> file;
status = fs_env_target_->ReuseWritableFile(
fname, old_fname, FileOptions(options), &file, &dbg);
status = file_system_->ReuseWritableFile(fname, old_fname,
FileOptions(options), &file, &dbg);
if (status.ok()) {
r->reset(new CompositeWritableFileWrapper(file));
}
Expand All @@ -379,8 +377,8 @@ class CompositeEnvWrapper : public Env {
IODebugContext dbg;
std::unique_ptr<FSRandomRWFile> file;
Status status;
status = fs_env_target_->NewRandomRWFile(fname, FileOptions(options), &file,
&dbg);
status =
file_system_->NewRandomRWFile(fname, FileOptions(options), &file, &dbg);
if (status.ok()) {
result->reset(new CompositeRandomRWFileWrapper(file));
}
Expand All @@ -389,15 +387,15 @@ class CompositeEnvWrapper : public Env {
Status NewMemoryMappedFileBuffer(
const std::string& fname,
std::unique_ptr<MemoryMappedFileBuffer>* result) override {
return fs_env_target_->NewMemoryMappedFileBuffer(fname, result);
return file_system_->NewMemoryMappedFileBuffer(fname, result);
}
Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) override {
IOOptions io_opts;
IODebugContext dbg;
std::unique_ptr<FSDirectory> dir;
Status status;
status = fs_env_target_->NewDirectory(name, io_opts, &dir, &dbg);
status = file_system_->NewDirectory(name, io_opts, &dir, &dbg);
if (status.ok()) {
result->reset(new CompositeDirectoryWrapper(dir));
}
Expand All @@ -406,102 +404,108 @@ class CompositeEnvWrapper : public Env {
Status FileExists(const std::string& f) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->FileExists(f, io_opts, &dbg);
return file_system_->FileExists(f, io_opts, &dbg);
}
Status GetChildren(const std::string& dir,
std::vector<std::string>* r) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->GetChildren(dir, io_opts, r, &dbg);
return file_system_->GetChildren(dir, io_opts, r, &dbg);
}
Status GetChildrenFileAttributes(
const std::string& dir, std::vector<FileAttributes>* result) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->GetChildrenFileAttributes(dir, io_opts, result,
&dbg);
return file_system_->GetChildrenFileAttributes(dir, io_opts, result, &dbg);
}
Status DeleteFile(const std::string& f) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->DeleteFile(f, io_opts, &dbg);
return file_system_->DeleteFile(f, io_opts, &dbg);
}
Status Truncate(const std::string& fname, size_t size) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->Truncate(fname, size, io_opts, &dbg);
return file_system_->Truncate(fname, size, io_opts, &dbg);
}
Status CreateDir(const std::string& d) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->CreateDir(d, io_opts, &dbg);
return file_system_->CreateDir(d, io_opts, &dbg);
}
Status CreateDirIfMissing(const std::string& d) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->CreateDirIfMissing(d, io_opts, &dbg);
return file_system_->CreateDirIfMissing(d, io_opts, &dbg);
}
Status DeleteDir(const std::string& d) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->DeleteDir(d, io_opts, &dbg);
return file_system_->DeleteDir(d, io_opts, &dbg);
}
Status GetFileSize(const std::string& f, uint64_t* s) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->GetFileSize(f, io_opts, s, &dbg);
return file_system_->GetFileSize(f, io_opts, s, &dbg);
}

Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->GetFileModificationTime(fname, io_opts, file_mtime,
&dbg);
return file_system_->GetFileModificationTime(fname, io_opts, file_mtime,
&dbg);
}

Status RenameFile(const std::string& s, const std::string& t) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->RenameFile(s, t, io_opts, &dbg);
return file_system_->RenameFile(s, t, io_opts, &dbg);
}

Status LinkFile(const std::string& s, const std::string& t) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->LinkFile(s, t, io_opts, &dbg);
return file_system_->LinkFile(s, t, io_opts, &dbg);
}

Status NumFileLinks(const std::string& fname, uint64_t* count) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->NumFileLinks(fname, io_opts, count, &dbg);
return file_system_->NumFileLinks(fname, io_opts, count, &dbg);
}

Status AreFilesSame(const std::string& first, const std::string& second,
bool* res) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->AreFilesSame(first, second, io_opts, res, &dbg);
return file_system_->AreFilesSame(first, second, io_opts, res, &dbg);
}

Status LockFile(const std::string& f, FileLock** l) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->LockFile(f, io_opts, l, &dbg);
return file_system_->LockFile(f, io_opts, l, &dbg);
}

Status UnlockFile(FileLock* l) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->UnlockFile(l, io_opts, &dbg);
return file_system_->UnlockFile(l, io_opts, &dbg);
}

Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->GetAbsolutePath(db_path, io_opts, output_path, &dbg);
return file_system_->GetAbsolutePath(db_path, io_opts, output_path, &dbg);
}

Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override {
IOOptions io_opts;
IODebugContext dbg;
return file_system_->NewLogger(fname, io_opts, result, &dbg);
}

#if !defined(OS_WIN) && !defined(ROCKSDB_NO_DYNAMIC_EXTENSION)
Expand Down Expand Up @@ -531,10 +535,6 @@ class CompositeEnvWrapper : public Env {
Status GetTestDirectory(std::string* path) override {
return env_target_->GetTestDirectory(path);
}
Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override {
return env_target_->NewLogger(fname, result);
}
uint64_t NowMicros() override { return env_target_->NowMicros(); }
uint64_t NowNanos() override { return env_target_->NowNanos(); }
uint64_t NowCPUNanos() override { return env_target_->NowCPUNanos(); }
Expand Down Expand Up @@ -590,46 +590,41 @@ class CompositeEnvWrapper : public Env {
}

EnvOptions OptimizeForLogRead(const EnvOptions& env_options) const override {
return fs_env_target_->OptimizeForLogRead(FileOptions(env_options));
return file_system_->OptimizeForLogRead(FileOptions(env_options));
}
EnvOptions OptimizeForManifestRead(
const EnvOptions& env_options) const override {
return fs_env_target_->OptimizeForManifestRead(
FileOptions(env_options));
return file_system_->OptimizeForManifestRead(FileOptions(env_options));
}
EnvOptions OptimizeForLogWrite(const EnvOptions& env_options,
const DBOptions& db_options) const override {
return fs_env_target_->OptimizeForLogWrite(FileOptions(env_options),
db_options);
return file_system_->OptimizeForLogWrite(FileOptions(env_options),
db_options);
}
EnvOptions OptimizeForManifestWrite(
const EnvOptions& env_options) const override {
return fs_env_target_->OptimizeForManifestWrite(
FileOptions(env_options));
return file_system_->OptimizeForManifestWrite(FileOptions(env_options));
}
EnvOptions OptimizeForCompactionTableWrite(
const EnvOptions& env_options,
const ImmutableDBOptions& immutable_ops) const override {
return fs_env_target_->OptimizeForCompactionTableWrite(
FileOptions(env_options),
immutable_ops);
return file_system_->OptimizeForCompactionTableWrite(
FileOptions(env_options), immutable_ops);
}
EnvOptions OptimizeForCompactionTableRead(
const EnvOptions& env_options,
const ImmutableDBOptions& db_options) const override {
return fs_env_target_->OptimizeForCompactionTableRead(
FileOptions(env_options),
db_options);
return file_system_->OptimizeForCompactionTableRead(
FileOptions(env_options), db_options);
}
Status GetFreeSpace(const std::string& path, uint64_t* diskfree) override {
IOOptions io_opts;
IODebugContext dbg;
return fs_env_target_->GetFreeSpace(path, io_opts, diskfree, &dbg);
return file_system_->GetFreeSpace(path, io_opts, diskfree, &dbg);
}

private:
Env* env_target_;
FileSystem* fs_env_target_;
};

class LegacySequentialFileWrapper : public FSSequentialFile {
Expand Down Expand Up @@ -1067,6 +1062,10 @@ class LegacyFileSystemWrapper : public FileSystem {
return status_to_io_status(target_->NewLogger(fname, result));
}

void SanitizeFileOptions(FileOptions* opts) const override {
target_->SanitizeEnvOptions(opts);
}

FileOptions OptimizeForLogRead(
const FileOptions& file_options) const override {
return target_->OptimizeForLogRead(file_options);
Expand Down

0 comments on commit 37df8a4

Please sign in to comment.