Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add internal definitions for S3 ls recursive #4467

Merged
merged 36 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e32a1c7
Add ls_cb for S3
shaunrd0 Oct 26, 2023
dab2072
Refactor s3 test
shaunrd0 Oct 27, 2023
a9c3195
Changes from review
shaunrd0 Nov 3, 2023
706b0fc
Refactor unit tests from sync
shaunrd0 Nov 8, 2023
fb114b6
Remove tagv from vfs.h
shaunrd0 Nov 8, 2023
729abdd
Add unit_ls_filtered.cc and update test docs
shaunrd0 Nov 9, 2023
b021bd2
WIP
shaunrd0 Nov 9, 2023
b7d1c4c
WIP2
shaunrd0 Nov 14, 2023
7b10129
Start work on S3ScanIterator
shaunrd0 Nov 14, 2023
ae0cebb
Move outcome_error_message to header
shaunrd0 Nov 15, 2023
e361efd
WIP
shaunrd0 Nov 16, 2023
c341618
S3Scanner owns S3ScanIterator
shaunrd0 Nov 16, 2023
ccc4858
S3ScanIterator owns S3Scanner
shaunrd0 Nov 16, 2023
6732fde
Fix fetching more results
shaunrd0 Nov 16, 2023
9a54f20
Rename ls_callback -> ls_scanner
shaunrd0 Nov 17, 2023
a663d8c
Populate results vector with S3ScanIterator
shaunrd0 Nov 19, 2023
7c5ba18
Merge remote-tracking branch 'origin/dev' into smr/sc-35767/vfs-ls-cb-s3
shaunrd0 Nov 20, 2023
3487029
maybe_unused and format
shaunrd0 Nov 20, 2023
1cda9bf
Convert S3ScanIterator to LsScanIterator
shaunrd0 Nov 21, 2023
aaa6f6e
CI
shaunrd0 Nov 21, 2023
5a34f35
TempFilesystemBase
shaunrd0 Nov 21, 2023
8da19b7
Update tests
shaunrd0 Nov 22, 2023
877258c
Fix test
shaunrd0 Nov 23, 2023
df1c234
Use sentinel for checking end of results
shaunrd0 Dec 5, 2023
ca34b88
Merge remote-tracking branch 'origin/dev' into smr/sc-35767/vfs-ls-cb-s3
shaunrd0 Dec 5, 2023
10827db
Fix merge for builds with s3 enabled
shaunrd0 Dec 5, 2023
8eefb48
CI
shaunrd0 Dec 6, 2023
9a831a5
Add iterator test
shaunrd0 Dec 6, 2023
e3e36be
Small fixes and update docs
shaunrd0 Dec 7, 2023
9ccfa53
CI
shaunrd0 Dec 7, 2023
5339829
Update begin, end methods and member variables
shaunrd0 Dec 19, 2023
e5b0459
Changes from review
shaunrd0 Dec 19, 2023
dd4c74e
Merge remote-tracking branch 'origin/dev' into smr/sc-35767/vfs-ls-cb-s3
shaunrd0 Dec 19, 2023
1649515
Fix multiple PRNG seed from merge
shaunrd0 Dec 20, 2023
3e74a77
Update expected results, docs
shaunrd0 Dec 20, 2023
c5b2c58
Add ensure_dereferenceable()
shaunrd0 Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions test/src/unit-s3.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

#include <test/support/tdb_catch.h>
#include "test/support/src/helpers.h"
#include "test/support/src/vfs_helpers.h"
#include "tiledb/common/thread_pool.h"
#include "tiledb/sm/config/config.h"
#include "tiledb/sm/filesystem/s3.h"
Expand Down Expand Up @@ -554,4 +555,116 @@ TEST_CASE_METHOD(S3Fx, "Test S3 use Bucket/Object CannedACL", "[s3]") {
try_with_bucket_object_canned_acl("public_read_write", "public_read_write");
try_with_bucket_object_canned_acl("authenticated_read", "authenticated_read");
}

TEST_CASE(
"S3: S3Scanner iterator to populate vector", "[s3][ls-scan-iterator]") {
S3Test s3_test({10, 50});
bool recursive = true;
// 1000 is the default max_keys for S3. This is the same default used by
// S3Scanner. Testing with small max_keys validates the iterator handles batch
// collection and filtering appropriately.
int max_keys = GENERATE(1000, 10, 7);

DYNAMIC_SECTION("Testing with " << max_keys << " max keys from S3") {
FileFilter file_filter;
auto expected = s3_test.expected_results();

SECTION("Accept all objects") {
file_filter = [](const std::string_view&, uint64_t) { return true; };
std::sort(expected.begin(), expected.end());
}

SECTION("Reject all objects") {
file_filter = [](const std::string_view&, uint64_t) { return false; };
}

SECTION("Filter objects including 'test_file_1' in key") {
file_filter = [](const std::string_view& path, uint64_t) {
if (path.find("test_file_1") != std::string::npos) {
return true;
}
return false;
};
}

SECTION("Scan for a single object") {
file_filter = [](const std::string_view& path, uint64_t) {
if (path.find("test_file_50") != std::string::npos) {
return true;
}
return false;
};
}

// Filter expected results to apply file_filter.
std::erase_if(expected, [&file_filter](const auto& a) {
return !file_filter(a.first, a.second);
});

auto scan = s3_test.get_s3().scanner(
s3_test.temp_dir_, file_filter, accept_all_dirs, recursive, max_keys);
std::vector results_vector(scan.begin(), scan.end());

CHECK(results_vector.size() == expected.size());
for (size_t i = 0; i < expected.size(); i++) {
auto s3_object = results_vector[i];
CHECK(file_filter(s3_object.GetKey(), s3_object.GetSize()));
auto full_uri = s3_test.temp_dir_.to_string() + "/" + s3_object.GetKey();
CHECK(full_uri == expected[i].first);
CHECK(static_cast<uint64_t>(s3_object.GetSize()) == expected[i].second);
}
}
}

TEST_CASE("S3: S3Scanner iterator", "[s3][ls-scan-iterator]") {
S3Test s3_test({10, 50, 7});
bool recursive = true;
int max_keys = GENERATE(1000, 11);

std::vector<Aws::S3::Model::Object> results_vector;
DYNAMIC_SECTION("Testing with " << max_keys << " max keys from S3") {
auto scan = s3_test.get_s3().scanner(
s3_test.temp_dir_,
VFSTest::accept_all_files,
accept_all_dirs,
recursive,
max_keys);

SECTION("for loop") {
SECTION("range based for") {
for (const auto& result : scan) {
results_vector.push_back(result);
}
}
SECTION("prefix operator") {
for (auto it = scan.begin(); it != scan.end(); ++it) {
results_vector.push_back(*it);
}
}
SECTION("postfix operator") {
for (auto it = scan.begin(); it != scan.end(); it++) {
results_vector.push_back(*it);
}
}
}

SECTION("vector::assign") {
results_vector.assign(scan.begin(), scan.end());
}

SECTION("std::move") {
std::move(scan.begin(), scan.end(), std::back_inserter(results_vector));
}
}

auto expected = s3_test.expected_results();
CHECK(results_vector.size() == expected.size());
for (size_t i = 0; i < expected.size(); i++) {
auto s3_object = results_vector[i];
auto full_uri = s3_test.temp_dir_.to_string() + "/" + s3_object.GetKey();
CHECK(full_uri == expected[i].first);
CHECK(static_cast<uint64_t>(s3_object.GetSize()) == expected[i].second);
}
}

#endif
91 changes: 91 additions & 0 deletions test/src/unit-vfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#ifdef _WIN32
#include "tiledb/sm/filesystem/path_win.h"
#endif
#include "test/support/src/vfs_helpers.h"
#include "tiledb/sm/tile/tile.h"

using namespace tiledb::common;
Expand Down Expand Up @@ -344,3 +345,93 @@ TEST_CASE("VFS: test ls_with_sizes", "[vfs][ls-with-sizes]") {
// Clean up
REQUIRE(vfs_ls.remove_dir(URI(path)).ok());
}

// Currently only S3 is supported for VFS::ls_recursive.
using TestBackends = std::tuple<S3Test>;
TEMPLATE_LIST_TEST_CASE(
"VFS: Test internal ls_filtered recursion argument",
"[vfs][ls_filtered][recursion]",
TestBackends) {
TestType fs({10, 50});
if (!fs.is_supported()) {
return;
shaunrd0 marked this conversation as resolved.
Show resolved Hide resolved
}

bool recursive = GENERATE(true, false);
DYNAMIC_SECTION(
fs.temp_dir_.backend_name()
<< " ls_filtered with recursion: " << (recursive ? "true" : "false")) {
#ifdef HAVE_S3
// If testing with recursion use the root directory, otherwise use a subdir.
auto path = recursive ? fs.temp_dir_ : fs.temp_dir_.join_path("subdir_1");
auto ls_objects = fs.get_s3().ls_filtered(
path, VFSTestBase::accept_all_files, accept_all_dirs, recursive);

auto expected = fs.expected_results();
if (!recursive) {
// If non-recursive, all objects in the first directory should be
// returned.
std::erase_if(expected, [](const auto& p) {
return p.first.find("subdir_1") == std::string::npos;
});
}

CHECK(ls_objects.size() == expected.size());
CHECK(ls_objects == expected);
#endif
}
}

TEST_CASE(
"VFS: ls_recursive throws for unsupported backends",
"[vfs][ls_recursive]") {
// Local and mem fs tests are in tiledb/sm/filesystem/test/unit_ls_filtered.cc
std::string prefix = GENERATE("s3://", "hdfs://", "azure://", "gcs://");
VFSTest vfs_test({1}, prefix);
if (!vfs_test.is_supported()) {
return;
}
std::string backend = vfs_test.temp_dir_.backend_name();

if (vfs_test.temp_dir_.is_s3()) {
DYNAMIC_SECTION(backend << " supported backend should not throw") {
CHECK_NOTHROW(vfs_test.vfs_.ls_recursive(
vfs_test.temp_dir_, VFSTestBase::accept_all_files));
}
} else {
DYNAMIC_SECTION(backend << " unsupported backend should throw") {
CHECK_THROWS_WITH(
vfs_test.vfs_.ls_recursive(
vfs_test.temp_dir_, VFSTestBase::accept_all_files),
Catch::Matchers::ContainsSubstring(
"storage backend is not supported"));
}
}
}

TEST_CASE(
"VFS: Throwing FileFilter for ls_recursive",
"[vfs][ls_recursive][file-filter]") {
std::string prefix = "s3://";
VFSTest vfs_test({0}, prefix);
if (!vfs_test.is_supported()) {
return;
}

auto file_filter = [](const std::string_view&, uint64_t) -> bool {
throw std::logic_error("Throwing FileFilter");
};
SECTION("Throwing FileFilter with 0 objects should not throw") {
CHECK_NOTHROW(vfs_test.vfs_.ls_recursive(
vfs_test.temp_dir_, file_filter, tiledb::sm::accept_all_dirs));
}
SECTION("Throwing FileFilter with N objects should throw") {
vfs_test.vfs_.touch(vfs_test.temp_dir_.join_path("file")).ok();
CHECK_THROWS_AS(
vfs_test.vfs_.ls_recursive(vfs_test.temp_dir_, file_filter),
std::logic_error);
CHECK_THROWS_WITH(
vfs_test.vfs_.ls_recursive(vfs_test.temp_dir_, file_filter),
Catch::Matchers::ContainsSubstring("Throwing FileFilter"));
}
}
87 changes: 86 additions & 1 deletion test/support/src/vfs_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@
// after tdb_catch.h.
#include "test/support/src/serialization_wrappers.h"

#include <test/support/tdb_catch.h>
#include "test/support/src/helpers.h"
#include "test/support/src/vfs_helpers.h"

namespace tiledb::test {

tiledb::sm::URI test_dir(const std::string& prefix) {
return tiledb::sm::URI(prefix + "tiledb-" + std::to_string(PRNG::get()()));
}

std::vector<std::unique_ptr<SupportedFs>> vfs_test_get_fs_vec() {
std::vector<std::unique_ptr<SupportedFs>> fs_vec;

Expand Down Expand Up @@ -414,4 +417,86 @@ std::string TemporaryDirectoryFixture::create_temporary_array(
return array_uri;
}

VFSTestBase::VFSTestBase(
const std::vector<size_t>& test_tree, const std::string& prefix)
: test_tree_(test_tree)
, compute_(4)
, io_(4)
, vfs_(&tiledb::test::g_helper_stats, &io_, &compute_, create_test_config())
, prefix_(prefix)
, temp_dir_(tiledb::test::test_dir(prefix_))
, is_supported_(vfs_.supports_uri_scheme(temp_dir_)) {
// TODO: Throw when we can provide a list of supported filesystems to Catch2.
}

VFSTestBase::~VFSTestBase() {
if (vfs_.supports_uri_scheme(temp_dir_)) {
bool is_dir = false;
vfs_.is_dir(temp_dir_, &is_dir).ok();
if (is_dir) {
vfs_.remove_dir(temp_dir_).ok();
}
}
}

tiledb::sm::Config VFSTestBase::create_test_config() {
tiledb::sm::Config cfg;
if constexpr (!tiledb::test::aws_s3_config) {
// Set up connection to minio backend emulator.
cfg.set("vfs.s3.endpoint_override", "localhost:9999").ok();
cfg.set("vfs.s3.scheme", "https").ok();
cfg.set("vfs.s3.use_virtual_addressing", "false").ok();
cfg.set("vfs.s3.verify_ssl", "false").ok();
}
cfg.set("vfs.azure.storage_account_name", "devstoreaccount1").ok();
cfg.set(
"vfs.azure.storage_account_key",
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/"
"K1SZFPTOtr/KBHBeksoGMGw==")
.ok();
cfg.set("vfs.azure.blob_endpoint", "http://127.0.0.1:10000/devstoreaccount1")
.ok();
return cfg;
}

VFSTest::VFSTest(
const std::vector<size_t>& test_tree, const std::string& prefix)
: VFSTestBase(test_tree, prefix) {
if (!is_supported()) {
return;
}

if (temp_dir_.is_file() || temp_dir_.is_memfs() || temp_dir_.is_hdfs()) {
vfs_.create_dir(temp_dir_).ok();
} else {
vfs_.create_bucket(temp_dir_).ok();
}
for (size_t i = 1; i <= test_tree_.size(); i++) {
sm::URI path = temp_dir_.join_path("subdir_" + std::to_string(i));
// VFS::create_dir is a no-op for S3.
vfs_.create_dir(path).ok();
for (size_t j = 1; j <= test_tree_[i - 1]; j++) {
auto object_uri = path.join_path("test_file_" + std::to_string(j));
vfs_.touch(object_uri).ok();
std::string data(j * 10, 'a');
vfs_.open_file(object_uri, sm::VFSMode::VFS_WRITE).ok();
vfs_.write(object_uri, data.data(), data.size()).ok();
vfs_.close_file(object_uri).ok();
expected_results().emplace_back(object_uri.to_string(), data.size());
}
}
std::sort(expected_results().begin(), expected_results().end());
}

LocalFsTest::LocalFsTest(const std::vector<size_t>& test_tree)
: VFSTestBase(test_tree, "file://") {
#ifdef _WIN32
temp_dir_ =
tiledb::test::test_dir(prefix_ + tiledb::sm::Win::current_dir() + "/");
#else
temp_dir_ =
tiledb::test::test_dir(prefix_ + tiledb::sm::Posix::current_dir() + "/");
#endif
eric-hughes-tiledb marked this conversation as resolved.
Show resolved Hide resolved
}

} // namespace tiledb::test
Loading
Loading