Skip to content

Commit

Permalink
Avoid opening table files and reading table properties under mutex (#…
Browse files Browse the repository at this point in the history
…12879)

Summary:
InitInputTableProperties() can open and do IOs and is called under mutex_. This PR removes it from FinalizeInputInfo(). It is now called in CompactionJob::Run() and BuildCompactionJobInfo() (called in NotifyOnCompactionBegin()) without holding mutex_.

Pull Request resolved: #12879

Test Plan: existing unit tests. Added assert in GetInputTableProperties() to ensure that input_table_properties_ is initialized whenever it's called.

Reviewed By: hx235

Differential Revision: D59933195

Pulled By: cbi42

fbshipit-source-id: c8089e13af8567fa3ab4b94d9ec384ae98ab2ec8
  • Loading branch information
cbi42 authored and facebook-github-bot committed Jul 20, 2024
1 parent 4384dd5 commit c064ac3
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 11 deletions.
4 changes: 3 additions & 1 deletion db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
return sum;
}

void Compaction::SetInputVersion(Version* _input_version) {
// TODO(hx235): consider making this function part of the construction so we
// don't forget to call it
void Compaction::FinalizeInputInfo(Version* _input_version) {
input_version_ = _input_version;
cfd_ = input_version_->cfd();

Expand Down
20 changes: 11 additions & 9 deletions db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,7 @@ class Compaction {
// is the sum of all input file sizes.
uint64_t OutputFilePreallocationSize() const;

// TODO(hx235): eventually we should consider `InitInputTableProperties()`'s
// status and fail the compaction if needed
// TODO(hx235): consider making this function part of the construction so we
// don't forget to call it
void FinalizeInputInfo(Version* input_version) {
SetInputVersion(input_version);
InitInputTableProperties().PermitUncheckedError();
}
void FinalizeInputInfo(Version* input_version);

struct InputLevelSummaryBuffer {
char buffer[128];
Expand Down Expand Up @@ -333,6 +326,16 @@ class Compaction {
int output_level, VersionStorageInfo* vstorage,
const std::vector<CompactionInputFiles>& inputs);

// TODO(hx235): eventually we should consider `InitInputTableProperties()`'s
// status and fail the compaction if needed
//
// May open and read table files for table property.
// Should not be called while holding mutex_.
const TablePropertiesCollection& GetOrInitInputTableProperties() {
InitInputTableProperties().PermitUncheckedError();
return input_table_properties_;
}

const TablePropertiesCollection& GetInputTableProperties() const {
return input_table_properties_;
}
Expand Down Expand Up @@ -433,7 +436,6 @@ class Compaction {
const int output_level);

private:
void SetInputVersion(Version* input_version);

Status InitInputTableProperties();

Expand Down
1 change: 1 addition & 0 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ Status CompactionJob::Run() {
const size_t num_threads = compact_->sub_compact_states.size();
assert(num_threads > 0);
const uint64_t start_micros = db_options_.clock->NowMicros();
compact_->compaction->GetOrInitInputTableProperties();

// Launch a thread for each of subcompactions 1...num_threads-1
std::vector<port::Thread> thread_pool;
Expand Down
2 changes: 2 additions & 0 deletions db/compaction/compaction_service_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ Status CompactionServiceCompactionJob::Run() {
log_buffer_->FlushBufferToLog();
LogCompaction();
const uint64_t start_micros = db_options_.clock->NowMicros();
c->GetOrInitInputTableProperties();

// Pick the only sub-compaction we should have
assert(compact_->sub_compact_states.size() == 1);
SubcompactionState* sub_compact = compact_->sub_compact_states.data();
Expand Down
3 changes: 3 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2348,6 +2348,9 @@ class DBImpl : public DB {
bool HaveManualCompaction(ColumnFamilyData* cfd);
bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1);
void UpdateDeletionCompactionStats(const std::unique_ptr<Compaction>& c);

// May open and read table files for table property.
// Should not be called while holding mutex_.
void BuildCompactionJobInfo(const ColumnFamilyData* cfd, Compaction* c,
const Status& st,
const CompactionJobStats& compaction_job_stats,
Expand Down
4 changes: 3 additions & 1 deletion db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1595,10 +1595,12 @@ Status DBImpl::CompactFilesImpl(

ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);

mutex_.Unlock();
if (compaction_job_info != nullptr) {
BuildCompactionJobInfo(cfd, c.get(), s, compaction_job_stats,
job_context->job_id, compaction_job_info);
}
mutex_.Lock();

if (status.ok()) {
// Done
Expand Down Expand Up @@ -4161,7 +4163,7 @@ void DBImpl::BuildCompactionJobInfo(
compaction_job_info->base_input_level = c->start_level();
compaction_job_info->output_level = c->output_level();
compaction_job_info->stats = compaction_job_stats;
const auto& input_table_properties = c->GetInputTableProperties();
const auto& input_table_properties = c->GetOrInitInputTableProperties();
const auto& output_table_properties = c->GetOutputTableProperties();
compaction_job_info->table_properties.insert(input_table_properties.begin(),
input_table_properties.end());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* Fix an issue where compactions were opening table files and reading table properties while holding db mutex_.

0 comments on commit c064ac3

Please sign in to comment.