Skip to content

Commit

Permalink
first implementation of algorithm and test
Browse files Browse the repository at this point in the history
  • Loading branch information
DimitrisStaratzis committed Aug 22, 2024
1 parent 51ae9a9 commit 5e182e4
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 18 deletions.
39 changes: 28 additions & 11 deletions test/src/unit-cppapi-consolidation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ void create_array(const std::string& array_name) {
Array::create(array_name, schema);
}

void create_array_2d(const std::string& array_name) {
Context ctx;
Domain domain(ctx);
auto d1 = Dimension::create<int>(ctx, "d1", {{1, 10}}, 2);
auto d2 = Dimension::create<int>(ctx, "d2", {{1, 10}}, 2);
domain.add_dimensions(d1);
domain.add_dimensions(d2);
auto a = Attribute::create<int>(ctx, "a");
ArraySchema schema(ctx, TILEDB_DENSE);
schema.set_domain(domain);
schema.add_attributes(a);
Array::create(array_name, schema);
}

void write_array(
const std::string& array_name,
const std::vector<int>& subarray,
Expand Down Expand Up @@ -255,21 +269,24 @@ TEST_CASE(
std::string array_name = "cppapi_consolidation";
remove_array(array_name);

create_array(array_name);
write_array(array_name, {1, 2}, {1, 2});
write_array(array_name, {3, 3}, {3});
CHECK(tiledb::test::num_fragments(array_name) == 2);
create_array_2d(array_name);
// order matters
write_array(array_name, {1, 3, 7, 9}, {1, 2, 3, 4, 5, 6, 7, 8, 9});
write_array(array_name, {2, 4, 2, 3}, {10, 11, 12, 13, 14, 15});
write_array(array_name, {3, 5, 4, 5}, {16, 17, 18, 19, 20, 21});
write_array(array_name, {7, 9, 6, 8}, {22, 23, 24, 25, 26, 27, 28, 29, 30});
// write_array(array_name, {7, 8, 3, 4}, {25, 26, 27, 28}); //this is ok

read_array(array_name, {1, 3}, {1, 2, 3});
CHECK(tiledb::test::num_fragments(array_name) == 4);

Context ctx;
Config config;
config.set("sm.consolidation.buffer_size", "1000");
// config.set("sm.consolidation.buffer_size", "1000");

FragmentInfo fragment_info(ctx, array_name);
fragment_info.load();
std::string fragment_name1 = fragment_info.fragment_uri(0);
std::string fragment_name2 = fragment_info.fragment_uri(1);
std::string fragment_name1 = fragment_info.fragment_uri(1);
std::string fragment_name2 = fragment_info.fragment_uri(3);
std::string short_fragment_name1 =
fragment_name1.substr(fragment_name1.find_last_of('/') + 1);
std::string short_fragment_name2 =
Expand All @@ -278,11 +295,11 @@ TEST_CASE(
const char* fragment_uris[2] = {
short_fragment_name1.c_str(), short_fragment_name2.c_str()};

REQUIRE_NOTHROW(
REQUIRE_THROWS(
Array::consolidate(ctx, array_name, fragment_uris, 2, &config));
CHECK(tiledb::test::num_fragments(array_name) == 3);
// CHECK(tiledb::test::num_fragments(array_name) == 3);

read_array(array_name, {1, 3}, {1, 2, 3});
// read_array(array_name, {1, 3}, {1, 2, 3});

remove_array(array_name);
}
Expand Down
47 changes: 41 additions & 6 deletions tiledb/sm/consolidator/fragment_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -407,13 +407,48 @@ Status FragmentConsolidator::consolidate_fragments(
std::to_string(fragment_uris.size()) + " required fragments.");
}

// In case we have a dense Array check that the fragments can be consolidated without data loss
// Get timestamp bounds for the fragments we want to consolidate
uint64_t min_timestamp = std::numeric_limits<uint64_t>::max();
uint64_t max_timestamp = std::numeric_limits<uint64_t>::min();

// Iterate over the vector
for (const auto& item : to_consolidate) {
const auto& range = item.timestamp_range();

// Update min and max values
min_timestamp = std::min(min_timestamp, range.first);
max_timestamp = std::max(max_timestamp, range.second);
}

// Output the results
std::cout << "Minimum timestamp: " << min_timestamp << "\n";
std::cout << "Maximum timestamp: " << max_timestamp << "\n";

// In case we have a dense Array check that the fragments can be consolidated
// without data loss
if (array_for_reads->array_schema_latest().array_type() == ArrayType::DENSE) {
// pseudocode
// search every other fragment in this array
// if any of them overlaps in ranges AND its timestamp range falls between the range of the fragments to consolidate
// throw descriptive error
}
// Search every other fragment in this array if any of them overlaps in
// ranges and its timestamp range falls between the range of the fragments
// to consolidate throw error
for (auto& frag_info : frag_info_vec) {
// Ignore the fragments that are requested to be consolidated
auto uri = frag_info.uri().last_path_part();
if (to_consolidate_set.count(uri) != 0) {
continue;
}

// Check domain and timestamp overlap
if (domain.overlap(
union_non_empty_domains, frag_info.non_empty_domain())) {
auto timestamp_range{frag_info.timestamp_range()};
bool timestamp_before = !(timestamp_range.first > max_timestamp);
if (timestamp_before) {
throw FragmentConsolidatorException(
"Cannot consolidate; Invalid fragments");
}
}
}
}

FragmentConsolidationWorkspace cw(consolidator_memory_tracker_);

Expand Down
2 changes: 1 addition & 1 deletion tiledb/sm/consolidator/fragment_consolidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
#include "tiledb/common/status.h"
#include "tiledb/sm/array/array.h"
#include "tiledb/sm/consolidator/consolidator.h"
#include "tiledb/sm/misc/types.h"
#include "tiledb/sm/enums/array_type.h"
#include "tiledb/sm/misc/types.h"
#include "tiledb/sm/storage_manager/context_resources.h"
#include "tiledb/sm/storage_manager/storage_manager_declaration.h"

Expand Down

0 comments on commit 5e182e4

Please sign in to comment.