diff --git a/test/src/unit-s3.cc b/test/src/unit-s3.cc index 295bece2665..c7d06c03e6d 100644 --- a/test/src/unit-s3.cc +++ b/test/src/unit-s3.cc @@ -34,6 +34,7 @@ #include #include "test/support/src/helpers.h" +#include "test/support/src/vfs_helpers.h" #include "tiledb/common/thread_pool.h" #include "tiledb/sm/config/config.h" #include "tiledb/sm/filesystem/s3.h" @@ -554,4 +555,116 @@ TEST_CASE_METHOD(S3Fx, "Test S3 use Bucket/Object CannedACL", "[s3]") { try_with_bucket_object_canned_acl("public_read_write", "public_read_write"); try_with_bucket_object_canned_acl("authenticated_read", "authenticated_read"); } + +TEST_CASE( + "S3: S3Scanner iterator to populate vector", "[s3][ls-scan-iterator]") { + S3Test s3_test({10, 50}); + bool recursive = true; + // 1000 is the default max_keys for S3. This is the same default used by + // S3Scanner. Testing with small max_keys validates the iterator handles batch + // collection and filtering appropriately. + int max_keys = GENERATE(1000, 10, 7); + + DYNAMIC_SECTION("Testing with " << max_keys << " max keys from S3") { + FileFilter file_filter; + auto expected = s3_test.expected_results(); + + SECTION("Accept all objects") { + file_filter = [](const std::string_view&, uint64_t) { return true; }; + std::sort(expected.begin(), expected.end()); + } + + SECTION("Reject all objects") { + file_filter = [](const std::string_view&, uint64_t) { return false; }; + } + + SECTION("Filter objects including 'test_file_1' in key") { + file_filter = [](const std::string_view& path, uint64_t) { + if (path.find("test_file_1") != std::string::npos) { + return true; + } + return false; + }; + } + + SECTION("Scan for a single object") { + file_filter = [](const std::string_view& path, uint64_t) { + if (path.find("test_file_50") != std::string::npos) { + return true; + } + return false; + }; + } + + // Filter expected results to apply file_filter. + std::erase_if(expected, [&file_filter](const auto& a) { + return !file_filter(a.first, a.second); + }); + + auto scan = s3_test.get_s3().scanner( + s3_test.temp_dir_, file_filter, accept_all_dirs, recursive, max_keys); + std::vector results_vector(scan.begin(), scan.end()); + + CHECK(results_vector.size() == expected.size()); + for (size_t i = 0; i < expected.size(); i++) { + auto s3_object = results_vector[i]; + CHECK(file_filter(s3_object.GetKey(), s3_object.GetSize())); + auto full_uri = s3_test.temp_dir_.to_string() + "/" + s3_object.GetKey(); + CHECK(full_uri == expected[i].first); + CHECK(static_cast(s3_object.GetSize()) == expected[i].second); + } + } +} + +TEST_CASE("S3: S3Scanner iterator", "[s3][ls-scan-iterator]") { + S3Test s3_test({10, 50, 7}); + bool recursive = true; + int max_keys = GENERATE(1000, 11); + + std::vector results_vector; + DYNAMIC_SECTION("Testing with " << max_keys << " max keys from S3") { + auto scan = s3_test.get_s3().scanner( + s3_test.temp_dir_, + VFSTest::accept_all_files, + accept_all_dirs, + recursive, + max_keys); + + SECTION("for loop") { + SECTION("range based for") { + for (const auto& result : scan) { + results_vector.push_back(result); + } + } + SECTION("prefix operator") { + for (auto it = scan.begin(); it != scan.end(); ++it) { + results_vector.push_back(*it); + } + } + SECTION("postfix operator") { + for (auto it = scan.begin(); it != scan.end(); it++) { + results_vector.push_back(*it); + } + } + } + + SECTION("vector::assign") { + results_vector.assign(scan.begin(), scan.end()); + } + + SECTION("std::move") { + std::move(scan.begin(), scan.end(), std::back_inserter(results_vector)); + } + } + + auto expected = s3_test.expected_results(); + CHECK(results_vector.size() == expected.size()); + for (size_t i = 0; i < expected.size(); i++) { + auto s3_object = results_vector[i]; + auto full_uri = s3_test.temp_dir_.to_string() + "/" + s3_object.GetKey(); + CHECK(full_uri == expected[i].first); + CHECK(static_cast(s3_object.GetSize()) == expected[i].second); + } +} + #endif diff --git a/test/src/unit-vfs.cc b/test/src/unit-vfs.cc index 366d5fd2ffe..e167ffe612a 100644 --- a/test/src/unit-vfs.cc +++ b/test/src/unit-vfs.cc @@ -37,6 +37,7 @@ #ifdef _WIN32 #include "tiledb/sm/filesystem/path_win.h" #endif +#include "test/support/src/vfs_helpers.h" #include "tiledb/sm/tile/tile.h" using namespace tiledb::common; @@ -344,3 +345,93 @@ TEST_CASE("VFS: test ls_with_sizes", "[vfs][ls-with-sizes]") { // Clean up REQUIRE(vfs_ls.remove_dir(URI(path)).ok()); } + +// Currently only S3 is supported for VFS::ls_recursive. +using TestBackends = std::tuple; +TEMPLATE_LIST_TEST_CASE( + "VFS: Test internal ls_filtered recursion argument", + "[vfs][ls_filtered][recursion]", + TestBackends) { + TestType fs({10, 50}); + if (!fs.is_supported()) { + return; + } + + bool recursive = GENERATE(true, false); + DYNAMIC_SECTION( + fs.temp_dir_.backend_name() + << " ls_filtered with recursion: " << (recursive ? "true" : "false")) { +#ifdef HAVE_S3 + // If testing with recursion use the root directory, otherwise use a subdir. + auto path = recursive ? fs.temp_dir_ : fs.temp_dir_.join_path("subdir_1"); + auto ls_objects = fs.get_s3().ls_filtered( + path, VFSTestBase::accept_all_files, accept_all_dirs, recursive); + + auto expected = fs.expected_results(); + if (!recursive) { + // If non-recursive, all objects in the first directory should be + // returned. + std::erase_if(expected, [](const auto& p) { + return p.first.find("subdir_1") == std::string::npos; + }); + } + + CHECK(ls_objects.size() == expected.size()); + CHECK(ls_objects == expected); +#endif + } +} + +TEST_CASE( + "VFS: ls_recursive throws for unsupported backends", + "[vfs][ls_recursive]") { + // Local and mem fs tests are in tiledb/sm/filesystem/test/unit_ls_filtered.cc + std::string prefix = GENERATE("s3://", "hdfs://", "azure://", "gcs://"); + VFSTest vfs_test({1}, prefix); + if (!vfs_test.is_supported()) { + return; + } + std::string backend = vfs_test.temp_dir_.backend_name(); + + if (vfs_test.temp_dir_.is_s3()) { + DYNAMIC_SECTION(backend << " supported backend should not throw") { + CHECK_NOTHROW(vfs_test.vfs_.ls_recursive( + vfs_test.temp_dir_, VFSTestBase::accept_all_files)); + } + } else { + DYNAMIC_SECTION(backend << " unsupported backend should throw") { + CHECK_THROWS_WITH( + vfs_test.vfs_.ls_recursive( + vfs_test.temp_dir_, VFSTestBase::accept_all_files), + Catch::Matchers::ContainsSubstring( + "storage backend is not supported")); + } + } +} + +TEST_CASE( + "VFS: Throwing FileFilter for ls_recursive", + "[vfs][ls_recursive][file-filter]") { + std::string prefix = "s3://"; + VFSTest vfs_test({0}, prefix); + if (!vfs_test.is_supported()) { + return; + } + + auto file_filter = [](const std::string_view&, uint64_t) -> bool { + throw std::logic_error("Throwing FileFilter"); + }; + SECTION("Throwing FileFilter with 0 objects should not throw") { + CHECK_NOTHROW(vfs_test.vfs_.ls_recursive( + vfs_test.temp_dir_, file_filter, tiledb::sm::accept_all_dirs)); + } + SECTION("Throwing FileFilter with N objects should throw") { + vfs_test.vfs_.touch(vfs_test.temp_dir_.join_path("file")).ok(); + CHECK_THROWS_AS( + vfs_test.vfs_.ls_recursive(vfs_test.temp_dir_, file_filter), + std::logic_error); + CHECK_THROWS_WITH( + vfs_test.vfs_.ls_recursive(vfs_test.temp_dir_, file_filter), + Catch::Matchers::ContainsSubstring("Throwing FileFilter")); + } +} diff --git a/test/support/src/vfs_helpers.cc b/test/support/src/vfs_helpers.cc index 285981a8148..14bde730bd1 100644 --- a/test/support/src/vfs_helpers.cc +++ b/test/support/src/vfs_helpers.cc @@ -42,12 +42,15 @@ // after tdb_catch.h. #include "test/support/src/serialization_wrappers.h" -#include #include "test/support/src/helpers.h" #include "test/support/src/vfs_helpers.h" namespace tiledb::test { +tiledb::sm::URI test_dir(const std::string& prefix) { + return tiledb::sm::URI(prefix + "tiledb-" + std::to_string(PRNG::get()())); +} + std::vector> vfs_test_get_fs_vec() { std::vector> fs_vec; @@ -414,4 +417,86 @@ std::string TemporaryDirectoryFixture::create_temporary_array( return array_uri; } +VFSTestBase::VFSTestBase( + const std::vector& test_tree, const std::string& prefix) + : test_tree_(test_tree) + , compute_(4) + , io_(4) + , vfs_(&tiledb::test::g_helper_stats, &io_, &compute_, create_test_config()) + , prefix_(prefix) + , temp_dir_(tiledb::test::test_dir(prefix_)) + , is_supported_(vfs_.supports_uri_scheme(temp_dir_)) { + // TODO: Throw when we can provide a list of supported filesystems to Catch2. +} + +VFSTestBase::~VFSTestBase() { + if (vfs_.supports_uri_scheme(temp_dir_)) { + bool is_dir = false; + vfs_.is_dir(temp_dir_, &is_dir).ok(); + if (is_dir) { + vfs_.remove_dir(temp_dir_).ok(); + } + } +} + +tiledb::sm::Config VFSTestBase::create_test_config() { + tiledb::sm::Config cfg; + if constexpr (!tiledb::test::aws_s3_config) { + // Set up connection to minio backend emulator. + cfg.set("vfs.s3.endpoint_override", "localhost:9999").ok(); + cfg.set("vfs.s3.scheme", "https").ok(); + cfg.set("vfs.s3.use_virtual_addressing", "false").ok(); + cfg.set("vfs.s3.verify_ssl", "false").ok(); + } + cfg.set("vfs.azure.storage_account_name", "devstoreaccount1").ok(); + cfg.set( + "vfs.azure.storage_account_key", + "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/" + "K1SZFPTOtr/KBHBeksoGMGw==") + .ok(); + cfg.set("vfs.azure.blob_endpoint", "http://127.0.0.1:10000/devstoreaccount1") + .ok(); + return cfg; +} + +VFSTest::VFSTest( + const std::vector& test_tree, const std::string& prefix) + : VFSTestBase(test_tree, prefix) { + if (!is_supported()) { + return; + } + + if (temp_dir_.is_file() || temp_dir_.is_memfs() || temp_dir_.is_hdfs()) { + vfs_.create_dir(temp_dir_).ok(); + } else { + vfs_.create_bucket(temp_dir_).ok(); + } + for (size_t i = 1; i <= test_tree_.size(); i++) { + sm::URI path = temp_dir_.join_path("subdir_" + std::to_string(i)); + // VFS::create_dir is a no-op for S3. + vfs_.create_dir(path).ok(); + for (size_t j = 1; j <= test_tree_[i - 1]; j++) { + auto object_uri = path.join_path("test_file_" + std::to_string(j)); + vfs_.touch(object_uri).ok(); + std::string data(j * 10, 'a'); + vfs_.open_file(object_uri, sm::VFSMode::VFS_WRITE).ok(); + vfs_.write(object_uri, data.data(), data.size()).ok(); + vfs_.close_file(object_uri).ok(); + expected_results().emplace_back(object_uri.to_string(), data.size()); + } + } + std::sort(expected_results().begin(), expected_results().end()); +} + +LocalFsTest::LocalFsTest(const std::vector& test_tree) + : VFSTestBase(test_tree, "file://") { +#ifdef _WIN32 + temp_dir_ = + tiledb::test::test_dir(prefix_ + tiledb::sm::Win::current_dir() + "/"); +#else + temp_dir_ = + tiledb::test::test_dir(prefix_ + tiledb::sm::Posix::current_dir() + "/"); +#endif +} + } // namespace tiledb::test diff --git a/test/support/src/vfs_helpers.h b/test/support/src/vfs_helpers.h index d5c5a2ec20e..9e11b22cc4e 100644 --- a/test/support/src/vfs_helpers.h +++ b/test/support/src/vfs_helpers.h @@ -33,21 +33,32 @@ #ifndef TILEDB_VFS_HELPERS_H #define TILEDB_VFS_HELPERS_H +#include #include #include "test/support/src/helpers.h" -#include "test/support/tdb_catch.h" - -#ifdef _WIN32 -#include "tiledb/sm/filesystem/win.h" -#else -#include "tiledb/sm/filesystem/posix.h" -#endif +#include "tiledb/sm/enums/vfs_mode.h" +#include "tiledb/sm/filesystem/vfs.h" namespace tiledb::test { // Forward declaration class SupportedFs; +#ifdef TILEDB_TESTS_AWS_CONFIG +constexpr bool aws_s3_config = true; +#else +constexpr bool aws_s3_config = false; +#endif + +/** + * Generates a random temp directory URI for use in VFS tests. + * + * @param prefix A prefix to use for the temp directory name. Should include + * `s3://`, `mem://` or other URI prefix for the backend. + * @return URI which the caller can use to create a temp directory. + */ +tiledb::sm::URI test_dir(const std::string& prefix); + /** * Create the vector of supported filesystems. */ @@ -773,6 +784,151 @@ class DenyWriteAccess { const std::filesystem::perms previous_perms_; }; +/** + * Base class use for VFS and file system test objects. Deriving classes are + * responsible for creating a temporary directory and populating it with test + * objects for the related file system. + */ +class VFSTestBase { + protected: + /** + * Requires derived class to create a temporary directory. + * + * @param test_tree Vector used to build test directory and objects. + * For each element we create a nested directory with N objects. + * @param prefix The URI prefix to use for the test directory. + */ + VFSTestBase(const std::vector& test_tree, const std::string& prefix); + + public: + /** Type definition for objects returned from ls_recursive */ + using LsObjects = std::vector>; + + virtual ~VFSTestBase(); + + /** + * @return True if the URI prefix is supported by the build, else false. + */ + inline bool is_supported() const { + return is_supported_; + } + + inline LsObjects& expected_results() { + return expected_results_; + } + + /** + * Creates a config for testing VFS storage backends over local emulators. + * + * @return Fully initialized configuration for testing VFS storage backends. + */ + static tiledb::sm::Config create_test_config(); + + /** FilePredicate for passing to ls_filtered that accepts all files. */ + static bool accept_all_files(const std::string_view&, uint64_t) { + return true; + } + + std::vector test_tree_; + ThreadPool compute_, io_; + tiledb::sm::VFS vfs_; + std::string prefix_; + tiledb::sm::URI temp_dir_; + + private: + LsObjects expected_results_; + bool is_supported_; +}; + +/** + * Test object for tiledb::sm::VFS functionality. When constructed, this test + * object creates a temporary directory and populates it using the test_tree + * vector passed to the constructor. For each element in the vector, we create a + * nested directory with N objects. The constructor also writes `10 * N` bytes + * of data to each object created for testing returned object sizes are correct. + * + * This test object can be used for any valid VFS URI prefix, and is not + * specific to any one backend. + */ +class VFSTest : public VFSTestBase { + public: + VFSTest(const std::vector& test_tree, const std::string& prefix); +}; + +/** Test object for tiledb::sm::S3 functionality. */ +class S3Test : public VFSTestBase, protected tiledb::sm::S3_within_VFS { + public: + explicit S3Test(const std::vector& test_tree) + : VFSTestBase(test_tree, "s3://") + , S3_within_VFS(&tiledb::test::g_helper_stats, &io_, vfs_.config()) { +#ifdef HAVE_S3 + s3().create_bucket(temp_dir_).ok(); + for (size_t i = 1; i <= test_tree_.size(); i++) { + sm::URI path = temp_dir_.join_path("subdir_" + std::to_string(i)); + // VFS::create_dir is a no-op for S3; Just create objects. + for (size_t j = 1; j <= test_tree_[i - 1]; j++) { + auto object_uri = path.join_path("test_file_" + std::to_string(j)); + s3().touch(object_uri).ok(); + std::string data(j * 10, 'a'); + s3().write(object_uri, data.data(), data.size()).ok(); + s3().flush_object(object_uri).ok(); + expected_results().emplace_back(object_uri.to_string(), data.size()); + } + } + std::sort(expected_results().begin(), expected_results().end()); +#endif + } + +#ifdef HAVE_S3 + /** Expose protected accessor from S3_within_VFS. */ + tiledb::sm::S3& get_s3() { + return s3(); + } + + /** Expose protected const accessor from S3_within_VFS. */ + const tiledb::sm::S3& get_s3() const { + return s3(); + } +#endif +}; + +/** Stub test object for tiledb::sm::Win and Posix functionality. */ +class LocalFsTest : public VFSTestBase { + public: + explicit LocalFsTest(const std::vector& test_tree); +}; + +/** Stub test object for tiledb::sm::Azure functionality. */ +class AzureTest : public VFSTestBase { + public: + explicit AzureTest(const std::vector& test_tree) + : VFSTestBase(test_tree, "azure://") { + } +}; + +/** Stub test object for tiledb::sm::GCS functionality. */ +class GCSTest : public VFSTestBase { + public: + explicit GCSTest(const std::vector& test_tree) + : VFSTestBase(test_tree, "gcs://") { + } +}; + +/** Stub test object for tiledb::sm::HDFS functionality. */ +class HDFSTest : public VFSTestBase { + public: + explicit HDFSTest(const std::vector& test_tree) + : VFSTestBase(test_tree, "hdfs://") { + } +}; + +/** Stub test object for tiledb::sm::MemFilesystem functionality. */ +class MemFsTest : public VFSTestBase { + public: + explicit MemFsTest(const std::vector& test_tree) + : VFSTestBase(test_tree, "mem://") { + } +}; } // namespace tiledb::test #endif diff --git a/tiledb/CMakeLists.txt b/tiledb/CMakeLists.txt index 530d2b8c68b..dbf8112020f 100644 --- a/tiledb/CMakeLists.txt +++ b/tiledb/CMakeLists.txt @@ -201,6 +201,7 @@ set(TILEDB_CORE_SOURCES ${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/filesystem/uri.cc ${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/filesystem/vfs.cc ${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/filesystem/vfs_file_handle.cc + ${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/filesystem/ls_scanner.cc ${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/filesystem/win.cc ${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/filesystem/filesystem_base.cc ${TILEDB_CORE_INCLUDE_DIR}/tiledb/sm/filter/bit_width_reduction_filter.cc diff --git a/tiledb/sm/filesystem/CMakeLists.txt b/tiledb/sm/filesystem/CMakeLists.txt index 12182937934..fd0e1f34a40 100644 --- a/tiledb/sm/filesystem/CMakeLists.txt +++ b/tiledb/sm/filesystem/CMakeLists.txt @@ -38,6 +38,7 @@ commence(object_library vfs) uri.cc vfs.cc vfs_file_handle.cc + ls_scanner.cc win.cc filesystem_base.cc ../curl/curl_init.cc diff --git a/tiledb/sm/filesystem/filesystem_base.h b/tiledb/sm/filesystem/filesystem_base.h index b1e9409bc3e..89be313d116 100644 --- a/tiledb/sm/filesystem/filesystem_base.h +++ b/tiledb/sm/filesystem/filesystem_base.h @@ -1,5 +1,5 @@ /** - * @file filesystem.h + * @file filesystem_base.h * * @section LICENSE * @@ -40,8 +40,6 @@ namespace tiledb::sm { -class VFS; - class FilesystemBase { public: FilesystemBase() = default; diff --git a/tiledb/sm/filesystem/ls_scanner.cc b/tiledb/sm/filesystem/ls_scanner.cc new file mode 100644 index 00000000000..bcccd82870e --- /dev/null +++ b/tiledb/sm/filesystem/ls_scanner.cc @@ -0,0 +1,33 @@ +/** + * @file ls_scanner.cc + * + * @section LICENSE + * + * The MIT License + * + * @copyright Copyright (c) 2023 TileDB, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + * + * @section DESCRIPTION + * + * This defines the LsScanner class and related types used for VFS. + */ + +#include "ls_scanner.h" diff --git a/tiledb/sm/filesystem/ls_scanner.h b/tiledb/sm/filesystem/ls_scanner.h new file mode 100644 index 00000000000..724ae667e05 --- /dev/null +++ b/tiledb/sm/filesystem/ls_scanner.h @@ -0,0 +1,244 @@ +/** + * @file ls_scanner.h + * + * @section LICENSE + * + * The MIT License + * + * @copyright Copyright (c) 2023 TileDB, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + * + * @section DESCRIPTION + * + * This defines the LsScanner class and related types used for VFS. + */ + +#ifndef TILEDB_LS_SCANNER_H +#define TILEDB_LS_SCANNER_H + +#include "tiledb/common/exception/exception.h" +#include "tiledb/sm/filesystem/uri.h" + +#include +#include +#include + +/** Inclusion predicate for objects collected by ls */ +template +concept FilePredicate = true; + +/** + * DirectoryPredicate is currently unused, but is kept here for adding directory + * pruning support in the future. + */ +template +concept DirectoryPredicate = true; + +namespace tiledb::sm { +class LsScanException : public StatusException { + public: + explicit LsScanException(const std::string& message) + : StatusException("LsScan", message) { + } +}; + +using FileFilter = std::function; + +using DirectoryFilter = std::function; +/** Static DirectoryFilter used as default argument. */ +[[maybe_unused]] static bool accept_all_dirs(const std::string_view&) { + return true; +} + +/** Type defintion for objects returned from ls_recursive. */ +using LsObjects = std::vector>; + +/** + * LsScanIterator iterates over the results of ls requests wrapped by classes + * deriving from LsScanner. See S3Scanner as an example. + * + * The iterator is implemented as an input iterator, where the end() iterator + * is default constructed, resulting in an LsScanIterator::ptr_ that is a + * non-dereferencable iterator. + * + * + * @tparam scanner_type The LsScanner type that created this iterator. + * @tparam T The data type stored by this iterator. + * TODO: Discuss using T for iterator type instead of underlying data type. + */ +template +class LsScanIterator { + public: + using value_type = T; + using difference_type = ptrdiff_t; + using pointer = typename std::vector::const_iterator; + using reference = const T&; + using iterator_category = std::input_iterator_tag; + + /** + * Default constructor. + * + * This constructor is used to construct the end() iterator, where ptr_ is + * default constructed as a non-de-referencable iterator. + */ + LsScanIterator() noexcept = default; + + /** + * Constructor. + * + * @param scanner The scanner that created this iterator. + */ + explicit LsScanIterator(scanner_type* scanner) noexcept + : scanner_(scanner) + , ptr_(scanner->begin_) { + } + + /** + * Constructor. + * + * @param scanner The scanner that created this iterator. + * @param ptr Pointer to set as the current object. + */ + LsScanIterator(scanner_type* scanner, pointer ptr) noexcept + : scanner_(scanner) + , ptr_(ptr) { + } + + /** Copy constructor. */ + LsScanIterator(const LsScanIterator& rhs) noexcept { + ptr_ = rhs.ptr_; + scanner_ = rhs.scanner_; + } + + /** Copy assignment operator. */ + LsScanIterator& operator=(LsScanIterator rhs) noexcept { + if (&rhs != this) { + std::swap(ptr_, rhs.ptr_); + std::swap(scanner_, rhs.scanner_); + } + return *this; + } + + inline void ensure_dereferenceable() const { + if (ptr_ == pointer()) { + throw LsScanException("Failed to dereference invalid iterator."); + } + } + + /** + * Dereference operator. + * + * @return The current object being visited. + */ + constexpr reference operator*() const { + ensure_dereferenceable(); + return *ptr_; + } + + /** + * Dereference operator. + * + * @return The current object being visited. + */ + constexpr pointer operator->() const { + ensure_dereferenceable(); + return ptr_; + } + + /** + * Prefix increment operator. + * Calls the scanner's next() method to advance to the next object. + * + * @return Reference to this iterator after advancing to the next object. + */ + LsScanIterator& operator++() { + if (++ptr_ != pointer()) { + scanner_->next(ptr_); + } + return *this; + } + + /** + * Postfix increment operator. + * Calls next() method to advance to the next object via prefix operator. + * + * @return Copy of this iterator prior to advancing to the next object. + */ + LsScanIterator operator++(int) { + LsScanIterator tmp(*this); + operator++(); + return tmp; + } + + /** Inequality operator. */ + friend constexpr bool operator!=( + const LsScanIterator& lhs, const LsScanIterator& rhs) noexcept { + return !(lhs.ptr_ == rhs.ptr_); + } + + /** Equality operator. */ + friend constexpr bool operator==( + const LsScanIterator& lhs, const LsScanIterator& rhs) noexcept { + return lhs.ptr_ == rhs.ptr_; + } + + private: + /** Pointer to the scanner that created this iterator. */ + scanner_type* scanner_; + + /** Pointer to the current object. */ + pointer ptr_; +}; + +/** + * LsScanner is a base class for scanning a filesystem for objects that match + * the given file and directory predicates. This should be used as a common + * base class for future filesystem scanner implementations, similar to + * S3Scanner. + * + * @tparam F The FilePredicate type used to filter object results. + * @tparam D The DirectoryPredicate type used to prune prefix results. + */ +template +class LsScanner { + public: + /** Constructor. */ + LsScanner( + const URI& prefix, F file_filter, D dir_filter, bool recursive = false) + : prefix_(prefix) + , file_filter_(file_filter) + , dir_filter_(dir_filter) + , is_recursive_(recursive) { + } + + protected: + /** URI prefix being scanned and filtered for results. */ + const URI prefix_; + /** File predicate used to filter file or object results. */ + const F file_filter_; + /** Directory predicate used to prune directory or prefix results. */ + const D dir_filter_; + /** Whether or not to recursively scan the prefix. */ + const bool is_recursive_; +}; + +} // namespace tiledb::sm + +#endif // TILEDB_LS_SCANNER_H diff --git a/tiledb/sm/filesystem/s3.cc b/tiledb/sm/filesystem/s3.cc index f645c614948..c85dc30125c 100644 --- a/tiledb/sm/filesystem/s3.cc +++ b/tiledb/sm/filesystem/s3.cc @@ -162,67 +162,6 @@ using namespace tiledb::common; namespace tiledb::sm { -namespace { - -/** - * Return the exception name and error message from the given outcome object. - * - * @tparam R AWS result type - * @tparam E AWS error type - * @param outcome Outcome to retrieve error message from - * @return Error message string - */ -template -std::string outcome_error_message(const Aws::Utils::Outcome& outcome) { - if (outcome.IsSuccess()) { - return "Success"; - } - - auto err = outcome.GetError(); - Aws::StringStream ss; - - ss << "[Error Type: " << static_cast(err.GetErrorType()) << "]" - << " [HTTP Response Code: " << static_cast(err.GetResponseCode()) - << "]"; - - if (!err.GetExceptionName().empty()) { - ss << " [Exception: " << err.GetExceptionName() << "]"; - } - - // For some reason, these symbols are not exposed when building with MINGW - // so for now we just disable adding the tags on Windows. - if constexpr (!platform::is_os_windows) { - if (!err.GetRemoteHostIpAddress().empty()) { - ss << " [Remote IP: " << err.GetRemoteHostIpAddress() << "]"; - } - - if (!err.GetRequestId().empty()) { - ss << " [Request ID: " << err.GetRequestId() << "]"; - } - } - - if (err.GetResponseHeaders().size() > 0) { - ss << " [Headers:"; - for (auto&& h : err.GetResponseHeaders()) { - ss << " '" << h.first << "' = '" << h.second << "'"; - } - ss << "]"; - } - - ss << " : " << err.GetMessage(); - - return ss.str(); -} - -} // namespace - -class S3Exception : public StatusException { - public: - explicit S3Exception(const std::string& message) - : StatusException("S3", message) { - } -}; - S3Parameters::Headers S3Parameters::load_headers(const Config& cfg) { Headers ret; auto iter = ConfigIter(cfg, constants::s3_header_prefix); @@ -236,58 +175,6 @@ S3Parameters::Headers S3Parameters::load_headers(const Config& cfg) { return ret; } -/** - * Helper class which overrides Aws::S3::S3Client to set headers from - * vfs.s3.custom_headers.* - * - * @note The AWS SDK does not have a common base class, so there's no - * straightforward way to add a header to a unique request before submitting - * it. This class exists solely to override the S3Client, adding custom headers - * upon building the Http Request. - */ -class TileDBS3Client : public Aws::S3::S3Client { - public: - TileDBS3Client( - const S3Parameters& s3_params, - const Aws::Client::ClientConfiguration& client_config, - Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads, - bool use_virtual_addressing) - : Aws::S3::S3Client(client_config, sign_payloads, use_virtual_addressing) - , params_(s3_params) { - } - - TileDBS3Client( - const S3Parameters& s3_params, - const std::shared_ptr& creds, - const Aws::Client::ClientConfiguration& client_config, - Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads, - bool use_virtual_addressing) - : Aws::S3::S3Client( - creds, client_config, sign_payloads, use_virtual_addressing) - , params_(s3_params) { - } - - virtual void BuildHttpRequest( - const Aws::AmazonWebServiceRequest& request, - const std::shared_ptr& httpRequest) - const override { - S3Client::BuildHttpRequest(request, httpRequest); - - // Set header from S3Parameters custom headers - for (auto& [key, val] : params_.custom_headers_) { - httpRequest->SetHeaderValue(key, val); - } - } - - protected: - /** - * A reference to the S3 configuration parameters, which stores the header. - * - * @note Until the removal of init_client(), this must be const-qualified. - */ - const S3Parameters& params_; -}; - /* ********************************* */ /* GLOBAL VARIABLES */ /* ********************************* */ @@ -1626,17 +1513,17 @@ Status S3::fill_file_buffer( return Status::Ok(); } -std::string S3::add_front_slash(const std::string& path) const { +std::string S3::add_front_slash(const std::string& path) { return (path.front() != '/') ? (std::string("/") + path) : path; } -std::string S3::remove_front_slash(const std::string& path) const { +std::string S3::remove_front_slash(const std::string& path) { if (path.front() == '/') return path.substr(1, path.length()); return path; } -std::string S3::remove_trailing_slash(const std::string& path) const { +std::string S3::remove_trailing_slash(const std::string& path) { if (path.back() == '/') { return path.substr(0, path.length() - 1); } diff --git a/tiledb/sm/filesystem/s3.h b/tiledb/sm/filesystem/s3.h index 038472876bb..1bdf0210779 100644 --- a/tiledb/sm/filesystem/s3.h +++ b/tiledb/sm/filesystem/s3.h @@ -34,10 +34,15 @@ #define TILEDB_S3_H #ifdef HAVE_S3 + +#include "filesystem_base.h" +#include "ls_scanner.h" #include "tiledb/common/common.h" +#include "tiledb/common/filesystem/directory_entry.h" #include "tiledb/common/rwlock.h" #include "tiledb/common/status.h" #include "tiledb/common/thread_pool.h" +#include "tiledb/platform/platform.h" #include "tiledb/sm/buffer/buffer.h" #include "tiledb/sm/config/config.h" #include "tiledb/sm/curl/curl_init.h" @@ -86,7 +91,67 @@ using tiledb::common::filesystem::directory_entry; namespace tiledb::sm { -class TileDBS3Client; +/** Class for S3 status exceptions. */ +class S3Exception : public StatusException { + public: + explicit S3Exception(const std::string& msg) + : StatusException("S3", msg) { + } +}; + +namespace { + +/** + * Return the exception name and error message from the given outcome object. + * + * @tparam R AWS result type + * @tparam E AWS error type + * @param outcome Outcome to retrieve error message from + * @return Error message string + */ +template +std::string outcome_error_message(const Aws::Utils::Outcome& outcome) { + if (outcome.IsSuccess()) { + return "Success"; + } + + auto err = outcome.GetError(); + Aws::StringStream ss; + + ss << "[Error Type: " << static_cast(err.GetErrorType()) << "]" + << " [HTTP Response Code: " << static_cast(err.GetResponseCode()) + << "]"; + + if (!err.GetExceptionName().empty()) { + ss << " [Exception: " << err.GetExceptionName() << "]"; + } + + // For some reason, these symbols are not exposed when building with MINGW + // so for now we just disable adding the tags on Windows. + if constexpr (!platform::is_os_windows) { + if (!err.GetRemoteHostIpAddress().empty()) { + ss << " [Remote IP: " << err.GetRemoteHostIpAddress() << "]"; + } + + if (!err.GetRequestId().empty()) { + ss << " [Request ID: " << err.GetRequestId() << "]"; + } + } + + if (err.GetResponseHeaders().size() > 0) { + ss << " [Headers:"; + for (auto&& h : err.GetResponseHeaders()) { + ss << " '" << h.first << "' = '" << h.second << "'"; + } + ss << "]"; + } + + ss << " : " << err.GetMessage(); + + return ss.str(); +} + +} // namespace /** * The s3-specific configuration parameters. @@ -271,6 +336,219 @@ struct S3Parameters { std::string config_source_; }; +/** + * Helper class which overrides Aws::S3::S3Client to set headers from + * vfs.s3.custom_headers.* + * + * @note The AWS SDK does not have a common base class, so there's no + * straightforward way to add a header to a unique request before submitting + * it. This class exists solely to override the S3Client, adding custom headers + * upon building the Http Request. + */ +class TileDBS3Client : public Aws::S3::S3Client { + public: + TileDBS3Client( + const S3Parameters& s3_params, + const Aws::Client::ClientConfiguration& client_config, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads, + bool use_virtual_addressing) + : Aws::S3::S3Client(client_config, sign_payloads, use_virtual_addressing) + , params_(s3_params) { + } + + TileDBS3Client( + const S3Parameters& s3_params, + const std::shared_ptr& creds, + const Aws::Client::ClientConfiguration& client_config, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads, + bool use_virtual_addressing) + : Aws::S3::S3Client( + creds, client_config, sign_payloads, use_virtual_addressing) + , params_(s3_params) { + } + + void BuildHttpRequest( + const Aws::AmazonWebServiceRequest& request, + const std::shared_ptr& httpRequest) + const override { + S3Client::BuildHttpRequest(request, httpRequest); + + // Set header from S3Parameters custom headers + for (auto& [key, val] : params_.custom_headers_) { + httpRequest->SetHeaderValue(key, val); + } + } + + inline bool requester_pays() const { + return params_.requester_pays_; + } + + protected: + /** + * A reference to the S3 configuration parameters, which stores the header. + * + * @note Until the removal of init_client(), this must be const-qualified. + */ + const S3Parameters& params_; +}; + +/** + * S3Scanner wraps the AWS ListObjectsV2 request and provides an iterator for + * results. If we reach the end of the current batch of results and results are + * truncated, we fetch the next batch of results from S3. + * + * For each batch of results collected by fetch_results(), the begin_ and end_ + * members are initialized to the first and last elements of the batch. The scan + * steps through each result in the range [begin_, end_), using next() to + * advance to the next object accepted by the filters for this scan. + * + * @section Known Defect + * The iterators of S3Scanner are initialized by the AWS ListObjectsV2 + * results and there is no way to determine if they are different from + * iterators returned by a previous request. To be able to detect this, we + * can track the batch number and compare it to the batch number associated + * with the iterator returned by the previous request. Batch number can be + * tracked by the total number of times we submit a ListObjectsV2 request + * within fetch_results(). + * + * @tparam F The FilePredicate type used to filter object results. + * @tparam D The DirectoryPredicate type used to prune prefix results. + */ +template +class S3Scanner : public LsScanner { + public: + /** Declare LsScanIterator as a friend class for access to call next(). */ + template + friend class LsScanIterator; + using Iterator = LsScanIterator, Aws::S3::Model::Object>; + + /** Constructor. */ + S3Scanner( + const std::shared_ptr& client, + const URI& prefix, + F file_filter, + D dir_filter = accept_all_dirs, + bool recursive = false, + int max_keys = 1000); + + /** + * Returns true if there are more results to fetch from S3. + */ + [[nodiscard]] inline bool more_to_fetch() const { + return list_objects_outcome_.GetResult().GetIsTruncated(); + } + + /** + * @return Iterator to the beginning of the results being iterated on. + * Input iterators are single-pass, so we return a copy of this iterator at + * it's current position. + */ + Iterator begin() { + return Iterator(this); + } + + /** + * @return Default constructed iterator, which marks the end of results using + * nullptr. + */ + Iterator end() { + return Iterator(); + } + + private: + /** + * Advance to the next object accepted by the filters for this scan. + * + * @param ptr Reference to the current data pointer. + * @sa LsScanIterator::operator++() + */ + void next(typename Iterator::pointer& ptr); + + /** + * If the iterator is at the end of the current batch, this will fetch the + * next batch of results from S3. This does not check if the results are + * accepted by the filters for this scan. + * + * @param ptr Reference to the current data iterator. + */ + void advance(typename Iterator::pointer& ptr) { + ptr++; + if (ptr == end_) { + if (more_to_fetch()) { + // Fetch results and reset the iterator. + ptr = fetch_results(); + } else { + // Set the pointer to nullptr to indicate the end of results. + end_ = ptr = typename Iterator::pointer(); + } + } + } + + /** + * Fetch the next batch of results from S3. This also handles setting the + * continuation token for the next request, if the results were truncated. + * + * @return A pointer to the first result in the new batch. The return value + * is used to update the pointer managed by the iterator during traversal. + * If the request returned no results, this will return nullptr to mark the + * end of the scan. + * @sa LsScanIterator::operator++() + * @sa S3Scanner::next(typename Iterator::pointer&) + */ + typename Iterator::pointer fetch_results() { + // If this is our first request, GetIsTruncated() will be false. + if (more_to_fetch()) { + // If results are truncated on a subsequent request, we set the next + // continuation token before resubmitting our request. + Aws::String next_marker = + list_objects_outcome_.GetResult().GetNextContinuationToken(); + if (next_marker.empty()) { + throw S3Exception( + "Failed to retrieve next continuation token for ListObjectsV2 " + "request."); + } + list_objects_request_.SetContinuationToken(std::move(next_marker)); + } else if (list_objects_outcome_.IsSuccess()) { + // If we have previously submitted a successful request and there are no + // more results, we've reached the end of the scan. + begin_ = end_ = typename Iterator::pointer(); + return end_; + } + + list_objects_outcome_ = client_->ListObjectsV2(list_objects_request_); + if (!list_objects_outcome_.IsSuccess()) { + throw S3Exception( + std::string("Error while listing with prefix '") + + this->prefix_.add_trailing_slash().to_string() + "' and delimiter '" + + delimiter_ + "'" + outcome_error_message(list_objects_outcome_)); + } + // Update pointers to the newly fetched results. + begin_ = list_objects_outcome_.GetResult().GetContents().begin(); + end_ = list_objects_outcome_.GetResult().GetContents().end(); + + if (list_objects_outcome_.GetResult().GetContents().empty()) { + // If the request returned no results, we've reached the end of the scan. + // We hit this case when the number of objects in the bucket is a multiple + // of the current max_keys. + return end_; + } + + return begin_; + } + + /** Pointer to the S3 client initialized by VFS. */ + shared_ptr client_; + /** Delimiter used for ListObjects request. */ + std::string delimiter_; + /** Iterators for the current objects fetched from S3. */ + typename Iterator::pointer begin_, end_; + + /** The current request being scanned. */ + Aws::S3::Model::ListObjectsV2Request list_objects_request_; + /** The current request outcome being scanned. */ + Aws::S3::Model::ListObjectsV2Outcome list_objects_outcome_; +}; + /** * This class implements the various S3 filesystem functions. It also * maintains buffer caches for writing into the various attribute files. @@ -413,18 +691,72 @@ class S3 { /** * * Lists objects and object information that start with `prefix`. + * For recursive results, an empty string can be passed as the `delimiter`. * * @param prefix The parent path to list sub-paths. - * @param delimiter The uri is truncated to the first delimiter - * @param max_paths The maximum number of paths to be retrieved - * @return A list of directory_entry objects + * @param delimiter The uri is truncated to the first delimiter. + * @param max_paths The maximum number of paths to be retrieved. + * @return Status tuple where second is a list of directory_entry objects. */ - tuple>> - ls_with_sizes( + tuple>> ls_with_sizes( const URI& prefix, const std::string& delimiter = "/", int max_paths = -1) const; + /** + * Lists objects and object information that start with `prefix`, invoking + * the FilePredicate on each entry collected and the DirectoryPredicate on + * common prefixes for pruning. + * + * @param parent The parent prefix to list sub-paths. + * @param f The FilePredicate to invoke on each object for filtering. + * @param d The DirectoryPredicate to invoke on each common prefix for + * pruning. This is currently unused, but is kept here for future support. + * @param recursive Whether to recursively list subdirectories. + */ + template + LsObjects ls_filtered( + const URI& parent, + F f, + D d = accept_all_dirs, + bool recursive = false) const { + throw_if_not_ok(init_client()); + S3Scanner s3_scanner(client_, parent, f, d, recursive); + // Prepend each object key with the bucket URI. + auto prefix = parent.to_string(); + prefix = prefix.substr(0, prefix.find('/', 5)); + + LsObjects objects; + for (auto object : s3_scanner) { + objects.emplace_back(prefix + "/" + object.GetKey(), object.GetSize()); + } + return objects; + } + + /** + * Constructs a scanner for listing S3 objects. The scanner can be used to + * retrieve an InputIterator for passing to algorithms such as `std::copy_if` + * or STL constructors supporting initialization via input iterators. + * + * @param parent The parent prefix to list sub-paths. + * @param f The FilePredicate to invoke on each object for filtering. + * @param d The DirectoryPredicate to invoke on each common prefix for + * pruning. This is currently unused, but is kept here for future support. + * @param recursive Whether to recursively list subdirectories. + * @param max_keys The maximum number of keys to retrieve per request. + * @return Fully constructed S3Scanner object. + */ + template + S3Scanner scanner( + const URI& parent, + F f, + D d = accept_all_dirs, + bool recursive = false, + int max_keys = 1000) const { + throw_if_not_ok(init_client()); + return S3Scanner(client_, parent, f, d, recursive, max_keys); + } + /** * Renames an object. * @@ -577,6 +909,28 @@ class S3 { */ void global_order_write(const URI& uri, const void* buffer, uint64_t length); + /* ********************************* */ + /* PUBLIC STATIC METHODS */ + /* ********************************* */ + + /** + * Returns the input `path` after adding a `/` character + * at the front if it does not exist. + */ + static std::string add_front_slash(const std::string& path); + + /** + * Returns the input `path` after removing a potential `/` character + * from the front if it exists. + */ + static std::string remove_front_slash(const std::string& path); + + /** + * Returns the input `path` after removing a potential `/` character + * from the end if it exists. + */ + static std::string remove_trailing_slash(const std::string& path); + private: /* ********************************* */ /* PRIVATE DATATYPES */ @@ -973,24 +1327,6 @@ class S3 { uint64_t length, uint64_t* nbytes_filled); - /** - * Returns the input `path` after adding a `/` character - * at the front if it does not exist. - */ - std::string add_front_slash(const std::string& path) const; - - /** - * Returns the input `path` after removing a potential `/` character - * from the front if it exists. - */ - std::string remove_front_slash(const std::string& path) const; - - /** - * Returns the input `path` after removing a potential `/` character - * from the end if it exists. - */ - std::string remove_trailing_slash(const std::string& path) const; - /** * Writes the contents of the input buffer to the S3 object given by * the input `uri` as a new series of multipart uploads. It then @@ -1192,6 +1528,63 @@ class S3 { const URI& attribute_uri, const std::string& chunk_name); }; +template +S3Scanner::S3Scanner( + const shared_ptr& client, + const URI& prefix, + F file_filter, + D dir_filter, + bool recursive, + int max_keys) + : LsScanner(prefix, file_filter, dir_filter, recursive) + , client_(client) + , delimiter_(this->is_recursive_ ? "" : "/") { + const auto prefix_dir = prefix.add_trailing_slash(); + auto prefix_str = prefix_dir.to_string(); + Aws::Http::URI aws_uri = prefix_str.c_str(); + if (!prefix_dir.is_s3()) { + throw S3Exception("URI is not an S3 URI: " + prefix_str); + } + + list_objects_request_.SetBucket(aws_uri.GetAuthority()); + const std::string aws_uri_str(S3::remove_front_slash(aws_uri.GetPath())); + list_objects_request_.SetPrefix(aws_uri_str.c_str()); + // Empty delimiter returns recursive results from S3. + list_objects_request_.SetDelimiter(delimiter_.c_str()); + // The default max_keys for ListObjects is 1000. + list_objects_request_.SetMaxKeys(max_keys); + + if (client_->requester_pays()) { + list_objects_request_.SetRequestPayer( + Aws::S3::Model::RequestPayer::requester); + } + fetch_results(); + next(begin_); +} + +template +void S3Scanner::next(typename Iterator::pointer& ptr) { + if (ptr == end_) { + ptr = fetch_results(); + } + + while (ptr != end_) { + auto object = *ptr; + uint64_t size = object.GetSize(); + std::string path = "s3://" + list_objects_request_.GetBucket() + + S3::add_front_slash(object.GetKey()); + + // TODO: Add support for directory pruning. + if (this->file_filter_(path, size)) { + // Iterator is at the next object within results accepted by the filters. + return; + } else { + // Object was rejected by the FilePredicate, do not include it in results. + advance(ptr); + } + } +} + } // namespace tiledb::sm #endif // HAVE_S3 diff --git a/tiledb/sm/filesystem/test/CMakeLists.txt b/tiledb/sm/filesystem/test/CMakeLists.txt index faa4f65f615..f41457f6231 100644 --- a/tiledb/sm/filesystem/test/CMakeLists.txt +++ b/tiledb/sm/filesystem/test/CMakeLists.txt @@ -27,7 +27,7 @@ include(unit_test) commence(unit_test vfs) this_target_object_libraries(vfs) - this_target_sources(main.cc unit_uri.cc) + this_target_sources(main.cc unit_uri.cc unit_ls_filtered.cc) conclude(unit_test) commence(unit_test vfs_read_log_modes) diff --git a/tiledb/sm/filesystem/test/unit_ls_filtered.cc b/tiledb/sm/filesystem/test/unit_ls_filtered.cc new file mode 100644 index 00000000000..8af2ec8f225 --- /dev/null +++ b/tiledb/sm/filesystem/test/unit_ls_filtered.cc @@ -0,0 +1,124 @@ +/** + * @file unit_ls_filtered.cc + * + * @section LICENSE + * + * The MIT License + * + * @copyright Copyright (c) 2023 TileDB, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + * + * @section DESCRIPTION + * + * Tests internal ls recursive filter. + */ + +#include +#include +#include "tiledb/sm/config/config.h" +#include "tiledb/sm/filesystem/vfs.h" + +class VFSTest { + public: + /** + * Requires derived class to create a temporary directory. + * + * @param test_tree Vector used to build test directory and objects. + * For each element we create a nested directory with N objects. + * @param prefix The URI prefix to use for the test directory. + */ + explicit VFSTest( + const std::vector& test_tree, const std::string& prefix) + : stats_("unit_ls_filtered") + , io_(4) + , compute_(4) + , vfs_(&stats_, &io_, &compute_, tiledb::sm::Config()) + , test_tree_(test_tree) + , prefix_(prefix) + , temp_dir_(prefix_) { + } + + virtual ~VFSTest() { + bool is_dir = false; + vfs_.is_dir(temp_dir_, &is_dir).ok(); + if (is_dir) { + vfs_.remove_dir(temp_dir_).ok(); + } + } + + /** FilePredicate for passing to ls_filtered that accepts all files. */ + static bool accept_all_files(const std::string_view&, uint64_t) { + return true; + } + + /** Resources needed to construct VFS */ + tiledb::sm::stats::Stats stats_; + ThreadPool io_, compute_; + tiledb::sm::VFS vfs_; + + std::vector test_tree_; + std::string prefix_; + tiledb::sm::URI temp_dir_; + + private: + tiledb::sm::LsObjects expected_results_; +}; + +// TODO: Disable shouldfail when file:// or mem:// support is added. +TEST_CASE( + "VFS: Throwing FileFilter ls_recursive", + "[vfs][ls_recursive][!shouldfail]") { + std::string prefix = GENERATE("file://", "mem://"); + prefix += std::filesystem::current_path().string() + "/ls_filtered_test"; + + VFSTest vfs_test({0}, prefix); + auto file_filter = [](const std::string_view&, uint64_t) -> bool { + throw std::logic_error("Throwing FileFilter"); + }; + SECTION("Throwing FileFilter with 0 objects should not throw") { + CHECK_NOTHROW(vfs_test.vfs_.ls_recursive( + vfs_test.temp_dir_, file_filter, tiledb::sm::accept_all_dirs)); + } + SECTION("Throwing FileFilter with N objects should throw") { + vfs_test.vfs_.touch(vfs_test.temp_dir_.join_path("file")).ok(); + CHECK_THROWS_AS( + vfs_test.vfs_.ls_recursive(vfs_test.temp_dir_, file_filter), + std::logic_error); + CHECK_THROWS_WITH( + vfs_test.vfs_.ls_recursive(vfs_test.temp_dir_, file_filter), + Catch::Matchers::ContainsSubstring("Throwing FileFilter")); + } +} + +TEST_CASE( + "VFS: ls_recursive throws for unsupported filesystems", + "[vfs][ls_recursive]") { + std::string prefix = GENERATE("file://", "mem://"); + prefix += std::filesystem::current_path().string() + "/ls_filtered_test"; + + VFSTest vfs_test({1}, prefix); + std::string backend = vfs_test.temp_dir_.backend_name(); + DYNAMIC_SECTION(backend << " unsupported backend should throw") { + CHECK_THROWS_WITH( + vfs_test.vfs_.ls_recursive( + vfs_test.temp_dir_, VFSTest::accept_all_files), + Catch::Matchers::ContainsSubstring("storage backend is not supported")); + } +} diff --git a/tiledb/sm/filesystem/vfs.cc b/tiledb/sm/filesystem/vfs.cc index c517d13517c..9faa1ac3fa5 100644 --- a/tiledb/sm/filesystem/vfs.cc +++ b/tiledb/sm/filesystem/vfs.cc @@ -252,7 +252,7 @@ Status VFS::touch(const URI& uri) const { } if (uri.is_s3()) { #ifdef HAVE_S3 - return s3_.touch(uri); + return s3().touch(uri); #else throw BuiltWithout("S3"); #endif @@ -285,7 +285,7 @@ Status VFS::cancel_all_tasks() { Status VFS::create_bucket(const URI& uri) const { if (uri.is_s3()) { #ifdef HAVE_S3 - return s3_.create_bucket(uri); + return s3().create_bucket(uri); #else throw BuiltWithout("S3"); #endif @@ -310,7 +310,7 @@ Status VFS::create_bucket(const URI& uri) const { Status VFS::remove_bucket(const URI& uri) const { if (uri.is_s3()) { #ifdef HAVE_S3 - return s3_.remove_bucket(uri); + return s3().remove_bucket(uri); #else throw BuiltWithout("S3"); #endif @@ -335,7 +335,7 @@ Status VFS::remove_bucket(const URI& uri) const { Status VFS::empty_bucket(const URI& uri) const { if (uri.is_s3()) { #ifdef HAVE_S3 - return s3_.empty_bucket(uri); + return s3().empty_bucket(uri); #else throw BuiltWithout("S3"); #endif @@ -361,7 +361,7 @@ Status VFS::is_empty_bucket( const URI& uri, [[maybe_unused]] bool* is_empty) const { if (uri.is_s3()) { #ifdef HAVE_S3 - return s3_.is_empty_bucket(uri, is_empty); + return s3().is_empty_bucket(uri, is_empty); #else throw BuiltWithout("S3"); #endif @@ -398,7 +398,7 @@ Status VFS::remove_dir(const URI& uri) const { #endif } else if (uri.is_s3()) { #ifdef HAVE_S3 - return s3_.remove_dir(uri); + return s3().remove_dir(uri); #else throw BuiltWithout("S3"); #endif @@ -450,7 +450,7 @@ Status VFS::remove_file(const URI& uri) const { } if (uri.is_s3()) { #ifdef HAVE_S3 - return s3_.remove_object(uri); + return s3().remove_object(uri); #else throw BuiltWithout("S3"); #endif @@ -535,7 +535,7 @@ Status VFS::file_size(const URI& uri, uint64_t* size) const { } if (uri.is_s3()) { #ifdef HAVE_S3 - return s3_.object_size(uri, size); + return s3().object_size(uri, size); #else throw BuiltWithout("S3"); #endif @@ -579,7 +579,7 @@ Status VFS::is_dir(const URI& uri, bool* is_dir) const { } if (uri.is_s3()) { #ifdef HAVE_S3 - return s3_.is_dir(uri, is_dir); + return s3().is_dir(uri, is_dir); #else *is_dir = false; throw BuiltWithout("S3"); @@ -628,7 +628,7 @@ Status VFS::is_file(const URI& uri, bool* is_file) const { } if (uri.is_s3()) { #ifdef HAVE_S3 - RETURN_NOT_OK(s3_.is_object(uri, is_file)); + RETURN_NOT_OK(s3().is_object(uri, is_file)); return Status::Ok(); #else *is_file = false; @@ -661,7 +661,7 @@ Status VFS::is_file(const URI& uri, bool* is_file) const { Status VFS::is_bucket(const URI& uri, bool* is_bucket) const { if (uri.is_s3()) { #ifdef HAVE_S3 - RETURN_NOT_OK(s3_.is_bucket(uri, is_bucket)); + RETURN_NOT_OK(s3().is_bucket(uri, is_bucket)); return Status::Ok(); #else *is_bucket = false; @@ -729,7 +729,7 @@ tuple>> VFS::ls_with_sizes( } else if (parent.is_s3()) { #ifdef HAVE_S3 Status st; - std::tie(st, entries) = s3_.ls_with_sizes(parent); + std::tie(st, entries) = s3().ls_with_sizes(parent); #else auto st = Status_VFSError("TileDB was built without S3 support"); #endif @@ -812,7 +812,7 @@ Status VFS::move_file(const URI& old_uri, const URI& new_uri) { if (old_uri.is_s3()) { if (new_uri.is_s3()) #ifdef HAVE_S3 - return s3_.move_object(old_uri, new_uri); + return s3().move_object(old_uri, new_uri); #else throw BuiltWithout("S3"); #endif @@ -881,7 +881,7 @@ Status VFS::move_dir(const URI& old_uri, const URI& new_uri) { if (old_uri.is_s3()) { if (new_uri.is_s3()) #ifdef HAVE_S3 - return s3_.move_dir(old_uri, new_uri); + return s3().move_dir(old_uri, new_uri); #else throw BuiltWithout("S3"); #endif @@ -957,7 +957,7 @@ Status VFS::copy_file(const URI& old_uri, const URI& new_uri) { if (old_uri.is_s3()) { if (new_uri.is_s3()) #ifdef HAVE_S3 - return s3_.copy_file(old_uri, new_uri); + return s3().copy_file(old_uri, new_uri); #else throw BuiltWithout("S3"); #endif @@ -1022,7 +1022,7 @@ Status VFS::copy_dir(const URI& old_uri, const URI& new_uri) { if (old_uri.is_s3()) { if (new_uri.is_s3()) #ifdef HAVE_S3 - return s3_.copy_dir(old_uri, new_uri); + return s3().copy_dir(old_uri, new_uri); #else throw BuiltWithout("S3"); #endif @@ -1149,7 +1149,7 @@ Status VFS::read_impl( #ifdef HAVE_S3 const auto read_fn = std::bind( &S3::read, - &s3_, + &s3(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, @@ -1390,7 +1390,7 @@ Status VFS::close_file(const URI& uri) { } if (uri.is_s3()) { #ifdef HAVE_S3 - return s3_.flush_object(uri); + return s3().flush_object(uri); #else throw BuiltWithout("S3"); #endif @@ -1418,7 +1418,7 @@ Status VFS::close_file(const URI& uri) { void VFS::finalize_and_close_file(const URI& uri) { if (uri.is_s3()) { #ifdef HAVE_S3 - s3_.finalize_and_flush_object(uri); + s3().finalize_and_flush_object(uri); return; #else throw BuiltWithout("S3"); @@ -1452,10 +1452,10 @@ Status VFS::write( if (uri.is_s3()) { #ifdef HAVE_S3 if (remote_global_order_write) { - s3_.global_order_write_buffered(uri, buffer, buffer_size); + s3().global_order_write_buffered(uri, buffer, buffer_size); return Status::Ok(); } - return s3_.write(uri, buffer, buffer_size); + return s3().write(uri, buffer, buffer_size); #else throw BuiltWithout("S3"); #endif @@ -1487,7 +1487,7 @@ VFS::multipart_upload_state(const URI& uri) { } else if (uri.is_s3()) { #ifdef HAVE_S3 VFS::MultiPartUploadState state; - auto s3_state = s3_.multipart_upload_state(uri); + auto s3_state = s3().multipart_upload_state(uri); if (!s3_state.has_value()) { return {Status::Ok(), nullopt}; } @@ -1553,11 +1553,11 @@ Status VFS::set_multipart_upload_state( // Chunk URI gets reconstructed from the serialized chunk name // and the real attribute uri s3_state.buffered_chunks.emplace_back( - s3_.generate_chunk_uri(uri, chunk.uri).to_string(), chunk.size); + s3().generate_chunk_uri(uri, chunk.uri).to_string(), chunk.size); } } - return s3_.set_multipart_upload_state(uri.to_string(), s3_state); + return s3().set_multipart_upload_state(uri.to_string(), s3_state); #else throw BuiltWithout("S3"); #endif @@ -1582,8 +1582,8 @@ Status VFS::flush_multipart_file_buffer(const URI& uri) { if (uri.is_s3()) { #ifdef HAVE_S3 Buffer* buff = nullptr; - throw_if_not_ok(s3_.get_file_buffer(uri, &buff)); - s3_.global_order_write(uri, buff->data(), buff->size()); + throw_if_not_ok(s3().get_file_buffer(uri, &buff)); + s3().global_order_write(uri, buff->data(), buff->size()); buff->reset_size(); #else diff --git a/tiledb/sm/filesystem/vfs.h b/tiledb/sm/filesystem/vfs.h index 95942370577..99c5ac4e77e 100644 --- a/tiledb/sm/filesystem/vfs.h +++ b/tiledb/sm/filesystem/vfs.h @@ -40,6 +40,7 @@ #include #include "filesystem_base.h" +#include "ls_scanner.h" #include "tiledb/common/common.h" #include "tiledb/common/filesystem/directory_entry.h" #include "tiledb/common/macros.h" @@ -227,15 +228,29 @@ struct VFSBase { /** The S3 filesystem. */ #ifdef HAVE_S3 -struct S3_within_VFS { +class S3_within_VFS { + /** Private member variable */ S3 s3_; + + protected: template S3_within_VFS(Args&&... args) : s3_(std::forward(args)...) { } + + /** Protected accessor for the S3 object. */ + inline tiledb::sm::S3& s3() { + return s3_; + } + + /** Protected accessor for the const S3 object. */ + inline const tiledb::sm::S3& s3() const { + return s3_; + } }; #else -struct S3_within_VFS { +class S3_within_VFS { + protected: template S3_within_VFS(Args&&...) { } // empty constructor @@ -246,7 +261,7 @@ struct S3_within_VFS { * This class implements a virtual filesystem that directs filesystem-related * function execution to the appropriate backend based on the input URI. */ -class VFS : private VFSBase, S3_within_VFS { +class VFS : private VFSBase, protected S3_within_VFS { public: /* ********************************* */ /* TYPE DEFINITIONS */ @@ -494,6 +509,38 @@ class VFS : private VFSBase, S3_within_VFS { tuple>> ls_with_sizes( const URI& parent) const; + /** + * Lists objects and object information that start with `prefix`, invoking + * the FilePredicate on each entry collected and the DirectoryPredicate on + * common prefixes for pruning. + * + * Currently only S3 is supported for ls_recursive. + * + * @param parent The parent prefix to list sub-paths. + * @param f The FilePredicate to invoke on each object for filtering. + * @param d The DirectoryPredicate to invoke on each common prefix for + * pruning. This is currently unused, but is kept here for future support. + * @return Vector of results with each entry being a pair of the string URI + * and object size. + */ + template + LsObjects ls_recursive( + const URI& parent, + [[maybe_unused]] F f, + [[maybe_unused]] D d = accept_all_dirs) const { + if (parent.is_s3()) { +#ifdef HAVE_S3 + return s3().ls_filtered(parent, f, d, true); +#else + throw filesystem::VFSException("TileDB was built without S3 support"); +#endif + } else { + throw filesystem::VFSException( + "Recursive ls over " + parent.backend_name() + + " storage backend is not supported."); + } + } + /** * Renames a file. *