Skip to content

Commit

Permalink
Revert "Optimize Commit pipeline performance (facebook#286)"
Browse files Browse the repository at this point in the history
This reverts commit e4bfc11.
  • Loading branch information
BusyJay committed Aug 8, 2022
1 parent fd90ce2 commit 9eb84af
Show file tree
Hide file tree
Showing 23 changed files with 120 additions and 437 deletions.
2 changes: 1 addition & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@ Status ColumnFamilyData::ValidateOptions(
}
}

if (db_options.enable_multi_batch_write &&
if (db_options.enable_pipelined_commit &&
cf_options.max_successive_merges > 0) {
return Status::NotSupported(
"Multi thread write is only supported with no successive merges");
Expand Down
18 changes: 7 additions & 11 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,6 @@ class DBImpl : public DB {
WriteBatch* updates,
uint64_t* seq) override;

using DB::MultiBatchWrite;
virtual Status MultiBatchWrite(const WriteOptions& options,
std::vector<WriteBatch*>&& updates) override;

using DB::Get;
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
Expand Down Expand Up @@ -1029,12 +1025,12 @@ class DBImpl : public DB {
size_t batch_cnt = 0,
PreReleaseCallback* pre_release_callback = nullptr);

Status MultiBatchWriteImpl(const WriteOptions& write_options,
std::vector<WriteBatch*>&& my_batch,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
uint64_t* seq_used = nullptr);
void MultiBatchWriteCommit(CommitRequest* request);
Status PebbleWriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
uint64_t* seq_used = nullptr);
void PebbleWriteCommit(CommitRequest* request);

Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
Expand Down Expand Up @@ -1380,7 +1376,7 @@ class DBImpl : public DB {
}

if (!immutable_db_options_.unordered_write &&
!immutable_db_options_.enable_multi_batch_write) {
!immutable_db_options_.enable_pipelined_commit) {
// Then the writes are finished before the next write group starts
return;
}
Expand Down
6 changes: 3 additions & 3 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {

// multi thread write do not support two-write-que or write in 2PC
if (result.two_write_queues || result.allow_2pc) {
result.enable_multi_batch_write = false;
result.enable_pipelined_commit = false;
}

if (result.enable_multi_batch_write) {
if (result.enable_pipelined_commit) {
result.enable_pipelined_write = false;
result.allow_concurrent_memtable_write = true;
}
Expand Down Expand Up @@ -873,7 +873,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), &flush_scheduler_, true,
log_number, 0, this, false /* concurrent_memtable_writes */,
log_number, this, false /* concurrent_memtable_writes */,
next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_);
MaybeIgnoreError(&status);
if (!status.ok()) {
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ Status DBImplSecondary::RecoverLogFiles(
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(),
nullptr /* flush_scheduler */, true, log_number, 0, this,
nullptr /* flush_scheduler */, true, log_number, this,
false /* concurrent_memtable_writes */, next_sequence,
&has_valid_writes, seq_per_batch_, batch_per_txn_);
}
Expand Down
144 changes: 44 additions & 100 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
}
#endif // ROCKSDB_LITE

void DBImpl::MultiBatchWriteCommit(CommitRequest* request) {
void DBImpl::PebbleWriteCommit(CommitRequest* request) {
request->applied.store(true, std::memory_order_release);
write_thread_.ExitWaitSequenceCommit(request, &versions_->last_sequence_);
size_t pending_cnt = pending_memtable_writes_.fetch_sub(1) - 1;
if (pending_cnt == 0) {
Expand All @@ -79,55 +80,14 @@ void DBImpl::MultiBatchWriteCommit(CommitRequest* request) {
}
}

Status DBImpl::MultiBatchWrite(const WriteOptions& options,
std::vector<WriteBatch*>&& updates) {
if (immutable_db_options_.enable_multi_batch_write) {
return MultiBatchWriteImpl(options, std::move(updates), nullptr, nullptr);
} else {
return Status::NotSupported();
}
}

// In this way, RocksDB will apply WriteBatch to memtable out of order but commit
// them in order. (We borrow the idea from:
// https://github.com/cockroachdb/pebble/blob/master/docs/rocksdb.md#commit-pipeline.
// On this basis, we split the WriteBatch into smaller-grained WriteBatch vector,
// and when the WriteBatch sizes of multiple writers are not balanced, writers
// that finish first need to help the front writer finish writing the remaining
// WriteBatch to increase cpu usage and reduce overall latency)
//
// More details:
//
// Request Queue WriteBatchVec
// +--------------+ +---------------------+
// | Front Writer | -> | WB1 | WB2 | WB3|... |
// +--------------+ +-----+ +---------------------+
// | Writer 2 | -> | WB1 |
// +--------------+ +-----+ +-----------+
// | Writer 3 | -> | WB1 | WB2 |
// +--------------+ +---+ +-----------+
// | ... | -> |...|
// +--------------+ +---+
//
// 1. Mutli Writers enter the `Request queue` to determine the commit order.
// 2. Then all writers write to the memtable in parallel (Each thread iterates over
// its own write batch vector).
// 3.1. If the Front Writer finishes writing and enters the commit phase first, it will
// pop itself from the `Request queue`, then this function will return to its caller,
// and the Writer 2 becomes the new front.
// 3.2. If the Writer 2 or 3 finishes writing and enters the commit phase first, it will
// help the front writer complete its pending WBs one by one until all done and wake
// up the Front Writer, then the Front Writer will traverse and pop completed writers,
// the first unfinished writer encountered will become the new front.
//
Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
std::vector<WriteBatch*>&& my_batch,
WriteCallback* callback, uint64_t* log_used,
uint64_t log_ref, uint64_t* seq_used) {
Status DBImpl::PebbleWriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref,
uint64_t* seq_used) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
WriteThread::Writer writer(write_options, std::move(my_batch), callback, log_ref, false);
CommitRequest request(&writer);
CommitRequest request;
WriteThread::Writer writer(write_options, my_batch, callback, log_ref, false);
writer.request = &request;
write_thread_.JoinBatchGroup(&writer);

Expand Down Expand Up @@ -162,15 +122,8 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
if (w->CheckCallback(this)) {
if (w->ShouldWriteToMemtable()) {
w->sequence = next_sequence;
size_t count = WriteBatchInternal::Count(w->multi_batch.batches);
size_t count = WriteBatchInternal::Count(w->batch);
if (count > 0) {
auto sequence = w->sequence;
for (auto b: w->multi_batch.batches) {
WriteBatchInternal::SetSequence(b, sequence);
sequence += WriteBatchInternal::Count(b);
}
w->multi_batch.SetContext(versions_->GetColumnFamilySet(), &flush_scheduler_,
write_options.ignore_missing_column_families, this);
w->request->commit_lsn = next_sequence + count - 1;
write_thread_.EnterCommitQueue(w->request);
}
Expand All @@ -179,7 +132,7 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
memtable_write_cnt++;
}
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(w->multi_batch.batches));
total_byte_size, WriteBatchInternal::ByteSize(w->batch));
}
}
if (writer.disable_wal) {
Expand Down Expand Up @@ -219,10 +172,6 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
}
if (writer.status.ok()) {
pending_memtable_writes_ += memtable_write_cnt;
} else {
// The `pending_wb_cnt` must be reset to avoid other writers helping
// the front writer write its WBs after it failed to write the WAL.
writer.ResetPendingWBCnt();
}
write_thread_.ExitAsBatchGroupLeader(wal_write_group, writer.status);
}
Expand All @@ -240,20 +189,23 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);

while (writer.ConsumeOne());
MultiBatchWriteCommit(writer.request);
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
writer.status = WriteBatchInternal::InsertInto(
&writer, writer.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/);

WriteStatusCheck(writer.status);

if (!writer.FinalStatus().ok()) {
writer.status = writer.FinalStatus();
}
PebbleWriteCommit(writer.request);
} else if (writer.request->commit_lsn != 0) {
// When the leader fails to write WAL, all writers in the group need to cancel
// the write to memtable.
writer.ResetPendingWBCnt();
MultiBatchWriteCommit(writer.request);
PebbleWriteCommit(writer.request);
} else {
writer.ResetPendingWBCnt();
writer.request->applied.store(true, std::memory_order_release);
}
return writer.status;
}
Expand Down Expand Up @@ -284,7 +236,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return Status::NotSupported(
"pipelined_writes is not compatible with concurrent prepares");
}
if (two_write_queues_ && immutable_db_options_.enable_multi_batch_write) {
if (two_write_queues_ && immutable_db_options_.enable_pipelined_commit) {
return Status::NotSupported(
"pipelined_writes is not compatible with concurrent prepares");
}
Expand Down Expand Up @@ -348,11 +300,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return status;
}

if (immutable_db_options_.enable_multi_batch_write) {
std::vector<WriteBatch*> updates(1);
updates[0] = my_batch;
return MultiBatchWriteImpl(write_options, std::move(updates), callback, log_used, log_ref,
seq_used);
if (immutable_db_options_.enable_pipelined_commit) {
return PebbleWriteImpl(write_options, my_batch, callback, log_used, log_ref,
seq_used);
}

if (immutable_db_options_.enable_pipelined_write) {
Expand Down Expand Up @@ -480,12 +430,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (writer->CheckCallback(this)) {
valid_batches += writer->batch_cnt;
if (writer->ShouldWriteToMemtable()) {
total_count += WriteBatchInternal::Count(writer->multi_batch.batches[0]);
parallel = parallel && !writer->multi_batch.batches[0]->HasMerge();
total_count += WriteBatchInternal::Count(writer->batch);
parallel = parallel && !writer->batch->HasMerge();
}
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size,
WriteBatchInternal::ByteSize(writer->multi_batch.batches[0]));
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
if (writer->pre_release_callback) {
pre_release_callback_cnt++;
}
Expand Down Expand Up @@ -576,7 +525,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
assert(writer->batch_cnt);
next_sequence += writer->batch_cnt;
} else if (writer->ShouldWriteToMemtable()) {
next_sequence += WriteBatchInternal::Count(writer->multi_batch.batches[0]);
next_sequence += WriteBatchInternal::Count(writer->batch);
}
}
}
Expand Down Expand Up @@ -698,13 +647,12 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
if (writer->CheckCallback(this)) {
if (writer->ShouldWriteToMemtable()) {
writer->sequence = next_sequence;
size_t count = WriteBatchInternal::Count(writer->multi_batch.batches[0]);
size_t count = WriteBatchInternal::Count(writer->batch);
next_sequence += count;
total_count += count;
}
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size,
WriteBatchInternal::ByteSize(writer->multi_batch.batches[0]));
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
}
}
if (w.disable_wal) {
Expand Down Expand Up @@ -900,8 +848,7 @@ Status DBImpl::WriteImplWALOnly(
for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size,
WriteBatchInternal::ByteSize(writer->multi_batch.batches[0]));
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
if (writer->pre_release_callback) {
pre_release_callback_cnt++;
}
Expand Down Expand Up @@ -1130,12 +1077,11 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
auto* leader = write_group.leader;
assert(!leader->disable_wal); // Same holds for all in the batch group
if (write_group.size == 1 && !leader->CallbackFailed() &&
leader->multi_batch.batches.size() == 1 &&
leader->multi_batch.batches[0]->GetWalTerminationPoint().is_cleared()) {
leader->batch->GetWalTerminationPoint().is_cleared()) {
// we simply write the first WriteBatch to WAL if the group only
// contains one batch, that batch should be written to the WAL,
// and the batch is not wanting to be truncated
merged_batch = leader->multi_batch.batches[0];
merged_batch = leader->batch;
if (WriteBatchInternal::IsLatestPersistentState(merged_batch)) {
*to_be_cached_state = merged_batch;
}
Expand All @@ -1147,17 +1093,15 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
merged_batch = tmp_batch;
for (auto writer : write_group) {
if (!writer->CallbackFailed()) {
for (auto b : writer->multi_batch.batches) {
Status s = WriteBatchInternal::Append(merged_batch, b,
/*WAL_only*/ true);
// Always returns Status::OK.
assert(s.ok());
if (WriteBatchInternal::IsLatestPersistentState(b)) {
// We only need to cache the last of such write batch
*to_be_cached_state = b;
}
(*write_with_wal)++;
Status s = WriteBatchInternal::Append(merged_batch, writer->batch,
/*WAL_only*/ true);
// Always returns Status::OK.
assert(s.ok());
if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) {
// We only need to cache the last of such write batch
*to_be_cached_state = writer->batch;
}
(*write_with_wal)++;
}
}
}
Expand Down Expand Up @@ -1211,7 +1155,7 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
StopWatch write_sw(env_, stats_, DB_WRITE_WAL_TIME);
WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_,
&write_with_wal, &to_be_cached_state);
if (merged_batch == write_group.leader->multi_batch.batches[0]) {
if (merged_batch == write_group.leader->batch) {
write_group.leader->log_used = logfile_number_;
} else if (write_with_wal > 1) {
for (auto writer : write_group) {
Expand Down Expand Up @@ -1286,7 +1230,7 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
// We need to lock log_write_mutex_ since logs_ and alive_log_files might be
// pushed back concurrently
log_write_mutex_.Lock();
if (merged_batch == write_group.leader->multi_batch.batches[0]) {
if (merged_batch == write_group.leader->batch) {
write_group.leader->log_used = logfile_number_;
} else if (write_with_wal > 1) {
for (auto writer : write_group) {
Expand Down Expand Up @@ -1336,7 +1280,7 @@ Status DBImpl::WriteRecoverableState() {
WriteBatchInternal::SetSequence(&cached_recoverable_state_, seq + 1);
auto status = WriteBatchInternal::InsertInto(
&cached_recoverable_state_, column_family_memtables_.get(),
&flush_scheduler_, true, 0 /*recovery_log_number*/, 0 /*log_ref*/, this,
&flush_scheduler_, true, 0 /*recovery_log_number*/, this,
false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool,
seq_per_batch_);
auto last_seq = next_seq - 1;
Expand Down
2 changes: 1 addition & 1 deletion db/db_properties_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ TEST_F(DBPropertiesTest, Empty) {
options.write_buffer_size = 100000; // Small write buffer
options.allow_concurrent_memtable_write = false;
options = CurrentOptions(options);
if (options.enable_multi_batch_write) {
if (options.enable_pipelined_commit) {
continue;
}
CreateAndReopenWithCF({"pikachu"}, options);
Expand Down
4 changes: 2 additions & 2 deletions db/db_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,8 @@ Options DBTestBase::GetOptions(
options.enable_pipelined_write = true;
break;
}
case kMultiBatchWrite: {
options.enable_multi_batch_write = true;
case kCommitPipeline: {
options.enable_pipelined_commit = true;
options.enable_pipelined_write = false;
options.two_write_queues = false;
break;
Expand Down
2 changes: 1 addition & 1 deletion db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ class DBTestBase : public testing::Test {
kConcurrentSkipList = 28,
kPipelinedWrite = 29,
kConcurrentWALWrites = 30,
kMultiBatchWrite = 31,
kCommitPipeline = 31,
kDirectIO,
kLevelSubcompactions,
kBlockBasedTableWithIndexRestartInterval,
Expand Down
Loading

0 comments on commit 9eb84af

Please sign in to comment.