From 9f25912e0de9a873cc208405494d0836a705a32b Mon Sep 17 00:00:00 2001 From: "Paul J. Davis" Date: Wed, 10 Jan 2024 12:45:50 -0600 Subject: [PATCH] Don't reallocate buffers in fragment consolidation This is based on the work from @Shelnutt2 on branch `ss/sc-36372`. --- test/regression/CMakeLists.txt | 1 + test/regression/targets/sc-36372.cc | 193 ++++++++++++++++ .../sm/consolidator/fragment_consolidator.cc | 214 +++++++++--------- .../sm/consolidator/fragment_consolidator.h | 73 +++--- 4 files changed, 335 insertions(+), 146 deletions(-) create mode 100644 test/regression/targets/sc-36372.cc diff --git a/test/regression/CMakeLists.txt b/test/regression/CMakeLists.txt index 7545cda9b1b8..a14dd3198b48 100644 --- a/test/regression/CMakeLists.txt +++ b/test/regression/CMakeLists.txt @@ -48,6 +48,7 @@ if (TILEDB_CPP_API) list(APPEND SOURCES targets/sc-29682.cc) list(APPEND SOURCES targets/sc-33480.cc) list(APPEND SOURCES targets/sc-35424.cc) + list(APPEND SOURCES targets/sc-36372.cc) list(APPEND SOURCES targets/sc-38300.cc) endif() diff --git a/test/regression/targets/sc-36372.cc b/test/regression/targets/sc-36372.cc new file mode 100644 index 000000000000..7d805afdf351 --- /dev/null +++ b/test/regression/targets/sc-36372.cc @@ -0,0 +1,193 @@ +#include +#include + +#include + +#include + +TEST_CASE( + "C++ API: Consolidation slowness in create_buffer with large number of " + "attributes" + "[cppapi][consolidation][sc36372]") { + std::string array_name = "cpp_unit_array_36372"; + + tiledb::Config cfg; + cfg["sm.consolidation.step_min_frags"] = 2; + cfg["sm.consolidation.step_max_frags"] = 4; + // cfg["sm.compute_concurrency_level"] = 1; + // cfg["sm.io_concurrency_level"] = 1; + tiledb::Context ctx(cfg); + tiledb::VFS vfs(ctx); + + if (vfs.is_dir(array_name)) { + vfs.remove_dir(array_name); + } + + tiledb::Domain domain(ctx); + auto domain_lo = std::numeric_limits::min(); + auto domain_hi = std::numeric_limits::max() - 1; + + // Create and initialize dimension. + auto d0 = tiledb::Dimension::create( + ctx, "d0", {{domain_lo, domain_hi}}, 2); + auto d1 = tiledb::Dimension::create( + ctx, "d1", {{domain_lo, domain_hi}}, 4); + auto d2 = tiledb::Dimension::create( + ctx, "d2", {{domain_lo, domain_hi}}, 50); + auto d3 = tiledb::Dimension::create( + ctx, "d3", {{domain_lo, domain_hi}}, 200); + auto d4 = tiledb::Dimension::create( + ctx, "d4", {{domain_lo, domain_hi}}, 2); + auto d5 = tiledb::Dimension::create( + ctx, "d5", {{domain_lo, domain_hi}}, 2); + + domain.add_dimensions(d0, d1, d2, d3, d4, d5); + + auto a0 = tiledb::Attribute::create(ctx, "a0").set_cell_val_num( + TILEDB_VAR_NUM); + auto a1 = tiledb::Attribute::create(ctx, "a1").set_cell_val_num( + TILEDB_VAR_NUM); + auto a2 = tiledb::Attribute::create(ctx, "a2").set_cell_val_num( + TILEDB_VAR_NUM); + auto a3 = tiledb::Attribute::create(ctx, "a3").set_cell_val_num( + TILEDB_VAR_NUM); + auto a4 = tiledb::Attribute::create(ctx, "a4").set_cell_val_num( + TILEDB_VAR_NUM); + auto a5 = tiledb::Attribute::create(ctx, "a5").set_cell_val_num( + TILEDB_VAR_NUM); + auto a6 = tiledb::Attribute::create(ctx, "a6").set_cell_val_num( + TILEDB_VAR_NUM); + auto a7 = tiledb::Attribute::create(ctx, "a7").set_cell_val_num( + TILEDB_VAR_NUM); + auto a8 = tiledb::Attribute::create(ctx, "a8").set_cell_val_num( + TILEDB_VAR_NUM); + auto a9 = tiledb::Attribute::create(ctx, "a9").set_cell_val_num( + TILEDB_VAR_NUM); + auto a10 = tiledb::Attribute::create(ctx, "a10") + .set_cell_val_num(TILEDB_VAR_NUM); + auto a11 = tiledb::Attribute::create(ctx, "a11") + .set_cell_val_num(TILEDB_VAR_NUM); + auto a12 = tiledb::Attribute::create(ctx, "a12") + .set_cell_val_num(TILEDB_VAR_NUM); + auto a13 = tiledb::Attribute::create(ctx, "a13") + .set_cell_val_num(TILEDB_VAR_NUM); + auto a14 = tiledb::Attribute::create(ctx, "a14") + .set_cell_val_num(TILEDB_VAR_NUM); + + tiledb::ArraySchema schema(ctx, TILEDB_SPARSE); + schema.set_domain(domain); + schema.add_attributes( + a0, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14); + schema.set_capacity(10000000); + schema.set_cell_order(TILEDB_ROW_MAJOR); + schema.set_tile_order(TILEDB_ROW_MAJOR); + CHECK_NOTHROW(tiledb::Array::create(array_name, schema)); + + // Perform Write + std::vector d0_data(1, 0); + std::vector d1_data(1, 0); + std::vector d2_data(1, 0); + std::vector d3_data(1, 0); + std::vector d4_data(1, 0); + std::vector d5_data(1, 0); + std::vector a0_data(1, 0); + std::vector a0_offsets(1, 0); + std::vector a1_data(1, 0); + std::vector a1_offsets(1, 0); + std::vector a2_data(1, 0); + std::vector a2_offsets(1, 0); + std::vector a3_data(1, 0); + std::vector a3_offsets(1, 0); + std::vector a4_data(1, 0); + std::vector a4_offsets(1, 0); + std::vector a5_data(1, 0); + std::vector a5_offsets(1, 0); + std::vector a6_data(1, 0); + std::vector a6_offsets(1, 0); + std::vector a7_data(1, 0); + std::vector a7_offsets(1, 0); + std::vector a8_data(1, 0); + std::vector a8_offsets(1, 0); + std::vector a9_data(1, 0); + std::vector a9_offsets(1, 0); + std::vector a10_data(1, 0); + std::vector a10_offsets(1, 0); + std::vector a11_data(1, 0); + std::vector a11_offsets(1, 0); + std::vector a12_data(1, 0); + std::vector a12_offsets(1, 0); + std::vector a13_data(1, 0); + std::vector a13_offsets(1, 0); + std::vector a14_data(1, 0); + std::vector a14_offsets(1, 0); + + uint8_t fragments_to_create = 196; + tiledb::Array array(ctx, array_name, TILEDB_WRITE); + for (uint8_t i = 0; i < fragments_to_create; i++) { + d0_data[0] = i; + d1_data[0] = i; + d2_data[0] = i; + d3_data[0] = i; + d4_data[0] = i; + d5_data[0] = i; + a0_data[0] = i; + a1_data[0] = i; + a2_data[0] = i; + a3_data[0] = i; + a4_data[0] = i; + a5_data[0] = i; + a6_data[0] = i; + a7_data[0] = i; + a8_data[0] = i; + a9_data[0] = i; + a10_data[0] = i; + a11_data[0] = i; + a12_data[0] = i; + a13_data[0] = i; + a14_data[0] = i; + + tiledb::Query query(ctx, array); + query.set_data_buffer("d0", d0_data); + query.set_data_buffer("d1", d1_data); + query.set_data_buffer("d2", d2_data); + query.set_data_buffer("d3", d3_data); + query.set_data_buffer("d4", d4_data); + query.set_data_buffer("d5", d5_data); + + query.set_data_buffer("a0", a0_data).set_offsets_buffer("a0", a0_offsets); + query.set_data_buffer("a1", a1_data).set_offsets_buffer("a1", a1_offsets); + query.set_data_buffer("a2", a2_data).set_offsets_buffer("a2", a2_offsets); + query.set_data_buffer("a3", a3_data).set_offsets_buffer("a3", a3_offsets); + query.set_data_buffer("a4", a4_data).set_offsets_buffer("a4", a4_offsets); + query.set_data_buffer("a5", a5_data).set_offsets_buffer("a5", a5_offsets); + query.set_data_buffer("a6", a6_data).set_offsets_buffer("a6", a6_offsets); + query.set_data_buffer("a7", a7_data).set_offsets_buffer("a7", a7_offsets); + query.set_data_buffer("a8", a8_data).set_offsets_buffer("a8", a8_offsets); + query.set_data_buffer("a9", a9_data).set_offsets_buffer("a9", a9_offsets); + query.set_data_buffer("a10", a10_data) + .set_offsets_buffer("a10", a10_offsets); + query.set_data_buffer("a11", a11_data) + .set_offsets_buffer("a11", a11_offsets); + query.set_data_buffer("a12", a12_data) + .set_offsets_buffer("a12", a12_offsets); + query.set_data_buffer("a13", a13_data) + .set_offsets_buffer("a13", a13_offsets); + query.set_data_buffer("a14", a14_data) + .set_offsets_buffer("a14", a14_offsets); + + query.submit(); + } + + // Consolidate + tiledb::Stats::enable(); + tiledb::Array::consolidate(ctx, array_name); + tiledb::Stats::dump(); + + // Vacuum + tiledb::Array::vacuum(ctx, array_name); + + // Cleanup. + if (vfs.is_dir(array_name)) { + vfs.remove_dir(array_name); + } +} diff --git a/tiledb/sm/consolidator/fragment_consolidator.cc b/tiledb/sm/consolidator/fragment_consolidator.cc index 946889e670de..a494bc02444d 100644 --- a/tiledb/sm/consolidator/fragment_consolidator.cc +++ b/tiledb/sm/consolidator/fragment_consolidator.cc @@ -58,6 +58,107 @@ class FragmentConsolidatorException : public StatusException { } }; +void FragmentConsolidationWorkspace::resize_buffers( + stats::Stats* stats, + const FragmentConsolidationConfig& config, + const ArraySchema& array_schema, + std::unordered_map& avg_cell_sizes) { + auto timer_se = stats->start_timer("resize_buffers"); + + // For easy reference + auto attribute_num = array_schema.attribute_num(); + auto& domain{array_schema.domain()}; + auto dim_num = array_schema.dim_num(); + auto sparse = !array_schema.dense(); + + // Calculate buffer weights. We reserve the maximum possible number of buffers + // to make only one allocation. If an attribute is var size and nullable, it + // has 3 buffers, dimensions only have 2 as they cannot be nullable. Then one + // buffer for timestamps, and 2 for delete metadata. + std::vector buffer_weights; + buffer_weights.reserve(attribute_num * 3 + dim_num * 2 + 3); + for (unsigned i = 0; i < attribute_num; ++i) { + const auto attr = array_schema.attributes()[i]; + const auto var_size = attr->var_size(); + + // First buffer is either the var size offsets or the fixed size data. + buffer_weights.emplace_back( + var_size ? constants::cell_var_offset_size : attr->cell_size()); + + // For var size attributes, add the data buffer weight. + if (var_size) { + buffer_weights.emplace_back(avg_cell_sizes[attr->name()]); + } + + // For nullable attributes, add the validity buffer weight. + if (attr->nullable()) { + buffer_weights.emplace_back(constants::cell_validity_size); + } + } + + if (sparse) { + for (unsigned i = 0; i < dim_num; ++i) { + const auto dim = domain.dimension_ptr(i); + const auto var_size = dim->var_size(); + + // First buffer is either the var size offsets or the fixed size data. + buffer_weights.emplace_back( + var_size ? constants::cell_var_offset_size : dim->coord_size()); + + // For var size attributes, add the data buffer weight. + if (var_size) { + buffer_weights.emplace_back(avg_cell_sizes[dim->name()]); + } + } + } + + if (config.with_timestamps_ && sparse) { + buffer_weights.emplace_back(constants::timestamp_size); + } + + // Adding buffers for delete meta, one for timestamp and one for condition + // index. + if (config.with_delete_meta_) { + buffer_weights.emplace_back(constants::timestamp_size); + buffer_weights.emplace_back(sizeof(uint64_t)); + } + + // Use the old buffer size setting to see how much memory we would use. + auto buffer_num = buffer_weights.size(); + uint64_t total_budget = config.total_buffer_size_; + + // If a user set the per-attribute buffer size configuration, we override + // the use of the total_budget_size config setting for backwards + // compatible behavior. + if (config.buffer_size_ != 0) { + total_budget = config.buffer_size_ * buffer_num; + } + + if (buffer_num > buffers_.size()) { + buffers_.resize(buffer_num); + sizes_.resize(buffer_num); + } + + // Create buffers. + auto total_weights = std::accumulate( + buffer_weights.begin(), buffer_weights.end(), static_cast(0)); + + // Allocate space for each buffer. + uint64_t adjusted_budget = total_budget / total_weights * total_weights; + + if (adjusted_budget > backing_buffer_.size()) { + backing_buffer_.resize(adjusted_budget); + } + + size_t offset = 0; + for (unsigned i = 0; i < buffer_num; ++i) { + sizes_[i] = std::max( + 1, adjusted_budget * buffer_weights[i] / total_weights); + buffers_[i] = span(&(backing_buffer_[offset]), sizes_[i]); + offset += sizes_[i]; + } +} + /* ****************************** */ /* CONSTRUCTOR */ /* ****************************** */ @@ -122,6 +223,8 @@ Status FragmentConsolidator::consolidate( return st; } + FragmentConsolidationWorkspace cw; + uint32_t step = 0; std::vector to_consolidate; do { @@ -154,7 +257,8 @@ Status FragmentConsolidator::consolidate( array_for_writes, to_consolidate, union_non_empty_domains, - &new_fragment_uri); + &new_fragment_uri, + cw); if (!st.ok()) { throw_if_not_ok(array_for_reads->close()); throw_if_not_ok(array_for_writes->close()); @@ -260,6 +364,8 @@ Status FragmentConsolidator::consolidate_fragments( "Cannot consolidate; Not all fragments could be found")); } + FragmentConsolidationWorkspace cw; + // Consolidate the selected fragments URI new_fragment_uri; st = consolidate_internal( @@ -267,7 +373,8 @@ Status FragmentConsolidator::consolidate_fragments( array_for_writes, to_consolidate, union_non_empty_domains, - &new_fragment_uri); + &new_fragment_uri, + cw); if (!st.ok()) { throw_if_not_ok(array_for_reads->close()); throw_if_not_ok(array_for_writes->close()); @@ -376,7 +483,8 @@ Status FragmentConsolidator::consolidate_internal( shared_ptr array_for_writes, const std::vector& to_consolidate, const NDRange& union_non_empty_domains, - URI* new_fragment_uri) { + URI* new_fragment_uri, + FragmentConsolidationWorkspace& cw) { auto timer_se = stats_->start_timer("consolidate_internal"); array_for_reads->load_fragments(to_consolidate); @@ -419,8 +527,7 @@ Status FragmentConsolidator::consolidate_internal( // Prepare buffers auto average_var_cell_sizes = array_for_reads->get_average_var_cell_sizes(); - FragmentConsolidationWorkspace cw{ - create_buffers(stats_, config_, array_schema, average_var_cell_sizes)}; + cw.resize_buffers(stats_, config_, array_schema, average_var_cell_sizes); // Create queries auto query_r = (Query*)nullptr; @@ -528,101 +635,6 @@ void FragmentConsolidator::copy_array( } while (query_r->status() == QueryStatus::INCOMPLETE); } -FragmentConsolidationWorkspace FragmentConsolidator::create_buffers( - stats::Stats* stats, - const FragmentConsolidationConfig& config, - const ArraySchema& array_schema, - std::unordered_map& avg_cell_sizes) { - auto timer_se = stats->start_timer("consolidate_create_buffers"); - - // For easy reference - auto attribute_num = array_schema.attribute_num(); - auto& domain{array_schema.domain()}; - auto dim_num = array_schema.dim_num(); - auto sparse = !array_schema.dense(); - - // Calculate buffer weights. We reserve the maximum possible number of buffers - // to make only one allocation. If an attribute is var size and nullable, it - // has 3 buffers, dimensions only have 2 as they cannot be nullable. Then one - // buffer for timestamps, and 2 for delete metadata. - std::vector buffer_weights; - buffer_weights.reserve(attribute_num * 3 + dim_num * 2 + 3); - for (unsigned i = 0; i < attribute_num; ++i) { - const auto attr = array_schema.attributes()[i]; - const auto var_size = attr->var_size(); - - // First buffer is either the var size offsets or the fixed size data. - buffer_weights.emplace_back( - var_size ? constants::cell_var_offset_size : attr->cell_size()); - - // For var size attributes, add the data buffer weight. - if (var_size) { - buffer_weights.emplace_back(avg_cell_sizes[attr->name()]); - } - - // For nullable attributes, add the validity buffer weight. - if (attr->nullable()) { - buffer_weights.emplace_back(constants::cell_validity_size); - } - } - - if (sparse) { - for (unsigned i = 0; i < dim_num; ++i) { - const auto dim = domain.dimension_ptr(i); - const auto var_size = dim->var_size(); - - // First buffer is either the var size offsets or the fixed size data. - buffer_weights.emplace_back( - var_size ? constants::cell_var_offset_size : dim->coord_size()); - - // For var size attributes, add the data buffer weight. - if (var_size) { - buffer_weights.emplace_back(avg_cell_sizes[dim->name()]); - } - } - } - - if (config.with_timestamps_ && sparse) { - buffer_weights.emplace_back(constants::timestamp_size); - } - - // Adding buffers for delete meta, one for timestamp and one for condition - // index. - if (config.with_delete_meta_) { - buffer_weights.emplace_back(constants::timestamp_size); - buffer_weights.emplace_back(sizeof(uint64_t)); - } - - // Use the old buffer size setting to see how much memory we would use. - auto buffer_num = buffer_weights.size(); - uint64_t total_budget = config.total_buffer_size_; - - // If a user set the per-attribute buffer size configuration, we override - // the use of the total_budget_size config setting for backwards - // compatible behavior. - if (config.buffer_size_ != 0) { - total_budget = config.buffer_size_ * buffer_num; - } - - // Create buffers. - FragmentConsolidationWorkspace cw{buffer_num}; - std::vector& buffers{cw.buffers()}; - std::vector& buffer_sizes{cw.sizes()}; - auto total_weights = std::accumulate( - buffer_weights.begin(), buffer_weights.end(), static_cast(0)); - - // Allocate space for each buffer. - uint64_t adjusted_budget = total_budget / total_weights * total_weights; - for (unsigned i = 0; i < buffer_num; ++i) { - buffer_sizes[i] = std::max( - 1, adjusted_budget * buffer_weights[i] / total_weights); - buffers[i].resize(buffer_sizes[i]); - } - - // Success - return cw; -} - Status FragmentConsolidator::create_queries( shared_ptr array_for_reads, shared_ptr array_for_writes, @@ -808,7 +820,7 @@ Status FragmentConsolidator::compute_next_to_consolidate( void FragmentConsolidator::set_query_buffers( Query* query, FragmentConsolidationWorkspace& cw) const { - std::vector* buffers{&cw.buffers()}; + std::vector>* buffers{&cw.buffers()}; std::vector* buffer_sizes{&cw.sizes()}; const auto& array_schema = query->array_schema(); diff --git a/tiledb/sm/consolidator/fragment_consolidator.h b/tiledb/sm/consolidator/fragment_consolidator.h index 1087e71c9f7e..667c029c6a5f 100644 --- a/tiledb/sm/consolidator/fragment_consolidator.h +++ b/tiledb/sm/consolidator/fragment_consolidator.h @@ -107,42 +107,35 @@ struct FragmentConsolidationConfig : Consolidator::ConsolidationConfigBase { * Consolidation workspace holds the large buffers used by the operation. */ class FragmentConsolidationWorkspace { - std::vector buffers_; - std::vector sizes_; - public: - FragmentConsolidationWorkspace() = delete; - explicit FragmentConsolidationWorkspace(size_t n) - : buffers_(n) - , sizes_(n) { - } - /** - * Copy constructor is deleted to avoid copying large amounts of memory - */ - FragmentConsolidationWorkspace(const FragmentConsolidationWorkspace&) = - delete; + FragmentConsolidationWorkspace() = default; - /** - * Move constructor - */ - FragmentConsolidationWorkspace(FragmentConsolidationWorkspace&&) = default; + // Disable copy and move construction/assignment so we don't have + // to think about it. + DISABLE_COPY_AND_COPY_ASSIGN(FragmentConsolidationWorkspace); + DISABLE_MOVE_AND_MOVE_ASSIGN(FragmentConsolidationWorkspace); /** - * Copy assignment is deleted to avoid copying large amounts of memory - */ - const FragmentConsolidationWorkspace& operator=( - const FragmentConsolidationWorkspace&) = delete; - - /** - * Move assignment is deleted for convenience + * Resize the buffers that will be used upon reading the input fragments and + * writing into the new fragment. It also retrieves the number of buffers + * created. + * + * @param stats The stats. + * @param config The consolidation config. + * @param array_schema The array schema. + * @param avg_cell_sizes The average cell sizes. + * @return a consolidation workspace containing the buffers */ - const FragmentConsolidationWorkspace& operator=( - FragmentConsolidationWorkspace&&) = delete; + void resize_buffers( + stats::Stats* stats, + const FragmentConsolidationConfig& config, + const ArraySchema& array_schema, + std::unordered_map& avg_cell_sizes); /** * Accessor for buffers */ - std::vector& buffers() { + std::vector>& buffers() { return buffers_; } @@ -152,6 +145,11 @@ class FragmentConsolidationWorkspace { std::vector& sizes() { return sizes_; }; + + private: + std::vector backing_buffer_; + std::vector> buffers_; + std::vector sizes_; }; /** Handles fragment consolidation. */ @@ -274,6 +272,7 @@ class FragmentConsolidator : public Consolidator { * fragments are *not* all sparse. * @param new_fragment_uri The URI of the fragment created after * consolidating the `to_consolidate` fragments. + * @param cw A workspace containing buffers for the queries * @return Status */ Status consolidate_internal( @@ -281,7 +280,8 @@ class FragmentConsolidator : public Consolidator { shared_ptr array_for_writes, const std::vector& to_consolidate, const NDRange& union_non_empty_domains, - URI* new_fragment_uri); + URI* new_fragment_uri, + FragmentConsolidationWorkspace& cw); /** * Copies the array by reading from the fragments to be consolidated @@ -295,23 +295,6 @@ class FragmentConsolidator : public Consolidator { void copy_array( Query* query_r, Query* query_w, FragmentConsolidationWorkspace& cw); - /** - * Creates the buffers that will be used upon reading the input fragments and - * writing into the new fragment. It also retrieves the number of buffers - * created. - * - * @param stats The stats. - * @param config The consolidation config. - * @param array_schema The array schema. - * @param avg_cell_sizes The average cell sizes. - * @return a consolidation workspace containing the buffers - */ - static FragmentConsolidationWorkspace create_buffers( - stats::Stats* stats, - const FragmentConsolidationConfig& config, - const ArraySchema& array_schema, - std::unordered_map& avg_cell_sizes); - /** * Creates the queries needed for consolidation. It also retrieves * the number of fragments to be consolidated and the URI of the