Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport release-2.26] Dense reader fails early when tile offsets are too large. (#5310) #5311

Merged
merged 3 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions test/src/unit-dense-reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,42 @@ TEST_CASE_METHOD(
"DenseReader: Memory budget is too small to open array");
}

TEST_CASE_METHOD(
CDenseFx,
"Dense reader: budget too small for tile offsets",
"[dense-reader][budget-too-small-tile-offsets]") {
// Create default array.
reset_config();
create_default_array_1d();

uint64_t num_frags = GENERATE(1, 2);

// Write some fragments.
int subarray[] = {1, NUM_CELLS};
std::vector<int> data(NUM_CELLS);
std::iota(data.begin(), data.end(), 1);
uint64_t data_size = data.size() * sizeof(int);
for (uint64_t f = 0; f < num_frags; f++) {
write_1d_fragment(subarray, data.data(), &data_size);
}

// Footer for a fragment is 390 bytes.
// Tile offsets for a fragment are 400 bytes.
// Tile upper memory limit is more than enough to load 40 bytes tiles.
total_budget_ = std::to_string(390 * num_frags + 200);
update_config();

// Try to read.
int data_r[NUM_CELLS] = {0};
uint64_t data_r_size = sizeof(data_r);
read(
subarray,
data_r,
&data_r_size,
0,
"DenseReader: Cannot load tile offsets, increase memory budget");
}

TEST_CASE_METHOD(
CDenseFx,
"Dense reader: tile budget exceeded, fixed attribute",
Expand Down Expand Up @@ -942,7 +978,7 @@ TEST_CASE_METHOD(
uint64_t data_size = data.size() * sizeof(int);
write_1d_fragment(subarray, data.data(), &data_size);

total_budget_ = "420";
total_budget_ = "804";
tile_upper_memory_limit_ = "50";
update_config();

Expand Down Expand Up @@ -1025,7 +1061,7 @@ TEST_CASE_METHOD(

// Each tiles are 91 and 100 bytes respectively, this will only allow to
// load one as the budget is split across two potential reads.
total_budget_ = "460";
total_budget_ = "840";
tile_upper_memory_limit_ = "210";
update_config();

Expand Down
22 changes: 22 additions & 0 deletions tiledb/sm/query/readers/dense_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ DenseReader::DenseReader(

// Check the validity buffer sizes.
check_validity_buffer_sizes();

// No dense dimensions can be var sized.
is_dim_var_size_.resize(array_schema_.dim_num(), false);
}

/* ****************************** */
Expand Down Expand Up @@ -310,6 +313,25 @@ Status DenseReader::dense_read() {
if (condition_.has_value()) {
qc_loaded_attr_names_set_ = condition_->field_names();
}
qc_loaded_attr_names_.clear();
qc_loaded_attr_names_.reserve(qc_loaded_attr_names_set_.size());
for (auto& name : qc_loaded_attr_names_set_) {
qc_loaded_attr_names_.emplace_back(name);
}

// Load per fragment tile offsets memory usage.
per_frag_tile_offsets_usage_ = tile_offset_sizes();

// Compute total tile offsets sizes.
auto total_tile_offsets_sizes = std::accumulate(
per_frag_tile_offsets_usage_.begin(),
per_frag_tile_offsets_usage_.end(),
static_cast<uint64_t>(0));
if (total_tile_offsets_sizes >
memory_budget_ - array_memory_tracker_->get_memory_usage()) {
throw DenseReaderException(
"Cannot load tile offsets, increase memory budget");
}

auto&& [names, var_names] = field_names_to_process(qc_loaded_attr_names_set_);

Expand Down
130 changes: 130 additions & 0 deletions tiledb/sm/query/readers/reader_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ ReaderBase::ReaderBase(
, memory_tracker_(params.query_memory_tracker())
, condition_(params.condition())
, user_requested_timestamps_(false)
, deletes_consolidation_no_purge_(
buffers_.count(constants::delete_timestamps) != 0)
, use_timestamps_(false)
, initial_data_loaded_(false)
, max_batch_size_(config_.get<uint64_t>("vfs.max_batch_size").value())
Expand Down Expand Up @@ -145,6 +147,134 @@ bool ReaderBase::skip_field(
/* PROTECTED METHODS */
/* ****************************** */

std::vector<uint64_t> ReaderBase::tile_offset_sizes() {
auto timer_se = stats_->start_timer("tile_offset_sizes");

// For easy reference.
std::vector<uint64_t> ret(fragment_metadata_.size());
const auto dim_num = array_schema_.dim_num();

// Compute the size of tile offsets per fragments.
const auto relevant_fragments = subarray_.relevant_fragments();
throw_if_not_ok(parallel_for(
&resources_.compute_tp(), 0, relevant_fragments.size(), [&](uint64_t i) {
// For easy reference.
auto frag_idx = relevant_fragments[i];
auto& fragment = fragment_metadata_[frag_idx];
const auto& schema = fragment->array_schema();
const auto tile_num = fragment->tile_num();
const auto dense = schema->dense();

// Compute the number of dimensions/attributes requiring offsets.
uint64_t num = 0;

// For fragments with version smaller than 5 we have zipped coords.
// Otherwise we load each dimensions independently.
if (!dense) {
if (fragment->version() < 5) {
num = 1;
} else {
for (unsigned d = 0; d < dim_num; ++d) {
// Fixed tile (offsets or fixed data).
num++;

// If var size, we load var offsets and var tile sizes.
if (is_dim_var_size_[d]) {
num += 2;
}
}
}
}

// Process everything loaded for query condition.
for (auto& name : qc_loaded_attr_names_) {
// Not a member of array schema, this field was added in array
// schema evolution, ignore for this fragment's tile offsets.
// Also skip dimensions.
if (!schema->is_field(name) || schema->is_dim(name)) {
continue;
}

// Fixed tile (offsets or fixed data).
num++;

// If var size, we load var offsets and var tile sizes.
const auto attr = schema->attribute(name);
num += 2 * attr->var_size();

// If nullable, we load nullable offsets.
num += attr->nullable();
}

// Process everything loaded for user requested data.
for (auto& it : buffers_) {
const auto& name = it.first;

// Skip dimensions and attributes loaded by query condition as they
// are processed above. Special attributes (timestamps, delete
// timestamps, etc.) are processed below.
if (array_schema_.is_dim(name) || !schema->is_field(name) ||
qc_loaded_attr_names_set_.count(name) != 0 ||
schema->is_special_attribute(name)) {
continue;
}

// Fixed tile (offsets or fixed data).
num++;

// If var size, we load var offsets and var tile sizes.
const auto attr = schema->attribute(name);
num += 2 * attr->var_size();

// If nullable, we load nullable offsets.
num += attr->nullable();
}

if (!dense) {
// Add timestamps if required.
if (!timestamps_not_present(constants::timestamps, frag_idx)) {
num++;
}

// Add delete metadata if required.
if (!delete_meta_not_present(
constants::delete_timestamps, frag_idx)) {
num++;
num += deletes_consolidation_no_purge_;
}
}

// Finally set the size of the loaded data.

// The expected size of the tile offsets
unsigned offsets_size = num * tile_num * sizeof(uint64_t);

// Other than the offsets themselves, there is also memory used for the
// initialization of the vectors that hold them. This initialization
// takes place in LoadedFragmentMetadata::resize_offsets()

// Calculate the number of fields
unsigned num_fields = schema->attribute_num() + 1 +
fragment->has_timestamps() +
fragment->has_delete_meta() * 2;

// If version < 5 we use zipped coordinates, otherwise separate
num_fields += (fragment->version() >= 5) ? schema->dim_num() : 0;

// The additional memory required for the vectors to
// store the tile offsets. The number of fields is calculated above.
// Each vector requires 32 bytes. Each field requires 4 vectors. These
// are: tile_offsets_, tile_var_offsets_, tile_var_sizes_,
// tile_validity_offsets_ and are located in loaded_fragment_metadata.h
unsigned offsets_init_size = num_fields * 4 * 32;

ret[frag_idx] = offsets_size + offsets_init_size;
return Status::Ok();
}));

return ret;
}

bool ReaderBase::process_partial_timestamps(FragmentMetadata& frag_meta) const {
return frag_meta.has_timestamps() &&
frag_meta.partial_time_overlap(
Expand Down
19 changes: 19 additions & 0 deletions tiledb/sm/query/readers/reader_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ class ReaderBase : public StrategyBase {
/** If the user requested timestamps attribute in the query */
bool user_requested_timestamps_;

/** Are we doing deletes consolidation (without purge option). */
bool deletes_consolidation_no_purge_;

/**
* If the special timestamps attribute should be loaded to memory for
* this query
Expand All @@ -281,6 +284,12 @@ class ReaderBase : public StrategyBase {
*/
std::vector<bool> timestamps_needed_for_deletes_and_updates_;

/** Are dimensions var sized. */
std::vector<bool> is_dim_var_size_;

/** Names of dim/attr loaded for query condition. */
std::vector<std::string> qc_loaded_attr_names_;

/** Names of dim/attr loaded for query condition. */
std::unordered_set<std::string> qc_loaded_attr_names_set_;

Expand All @@ -305,6 +314,9 @@ class ReaderBase : public StrategyBase {
* */
std::unordered_map<std::string, QueryBuffer>& aggregate_buffers_;

/** Per fragment tile offsets memory usage. */
std::vector<uint64_t> per_frag_tile_offsets_usage_;

/* ********************************* */
/* PROTECTED METHODS */
/* ********************************* */
Expand Down Expand Up @@ -338,6 +350,13 @@ class ReaderBase : public StrategyBase {
return true;
}

/**
* Computes the required size for loading tile offsets, per fragments.
*
* @return Required memory for loading tile offsets, per fragments.
*/
std::vector<uint64_t> tile_offset_sizes();

/**
* Returns if we need to process partial timestamp condition for this
* fragment.
Expand Down
Loading
Loading