Skip to content

Commit

Permalink
ENG-5188: #1240: Preparation diff. Use single writer skiplist in DocDB.
Browse files Browse the repository at this point in the history
Summary:
RocksDB contains 2 implementations of skip list.
`InlineSkipList`, which we've been using prior to this revision, allows concurrent writes and store keys of variable length in Node.
`SkipList` allows concurrent reads, but only one writer at a time, so it should really
be called `SingleWriterSkipList`.

Since we write to RocksDB only from single thread, we could benefit from using `SkipList`.
Also it is easier to implement erase in such list.

This diff adapts interface of `SkipList` (the single-writer skip list) so it could be used by SkipListRep.
Also it adds ability to store keys of variable length in Node.

Test Plan: Jenkins

Reviewers: timur, mikhail

Reviewed By: mikhail

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D6595
  • Loading branch information
spolitov committed Jun 13, 2019
1 parent 5e2bc11 commit 10190af
Show file tree
Hide file tree
Showing 6 changed files with 379 additions and 145 deletions.
4 changes: 4 additions & 0 deletions src/yb/docdb/docdb_rocksdb_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "yb/common/transaction.h"

#include "yb/rocksdb/memtablerep.h"
#include "yb/rocksdb/rate_limiter.h"
#include "yb/rocksdb/table.h"
#include "yb/rocksdb/util/compression.h"
Expand Down Expand Up @@ -531,6 +532,9 @@ void InitRocksDBOptions(
if (max_file_size_for_compaction != 0) {
options->max_file_size_for_compaction = max_file_size_for_compaction;
}

options->memtable_factory = std::make_shared<rocksdb::SkipListFactory>(
0 /* lookahead */, rocksdb::ConcurrentWrites::kFalse);
}

} // namespace docdb
Expand Down
65 changes: 64 additions & 1 deletion src/yb/rocksdb/db/inlineskiplist_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,21 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "yb/rocksdb/db/inlineskiplist.h"
#include <set>

#include "yb/rocksdb/db/inlineskiplist.h"
#include "yb/rocksdb/db/skiplist.h"

#include "yb/rocksdb/env.h"
#include "yb/rocksdb/util/concurrent_arena.h"
#include "yb/rocksdb/util/hash.h"
#include "yb/rocksdb/util/random.h"
#include "yb/rocksdb/util/testharness.h"

#include "yb/util/countdown_latch.h"
#include "yb/util/random_util.h"
#include "yb/util/tsan_util.h"

namespace rocksdb {

// Our test skip list stores 8-byte unsigned integers
Expand Down Expand Up @@ -481,6 +488,62 @@ TEST_F(InlineSkipTest, ConcurrentInsert1) { RunConcurrentInsert(1); }
TEST_F(InlineSkipTest, ConcurrentInsert2) { RunConcurrentInsert(2); }
TEST_F(InlineSkipTest, ConcurrentInsert3) { RunConcurrentInsert(3); }

template <class List>
void Benchmark() {
Arena arena;
TestComparator cmp;
List list(cmp, &arena);
constexpr int kWrites = yb::RegularBuildVsSanitizers(5000000, 50000);
constexpr int kReaders = 10;
yb::CountDownLatch latch(kReaders + 1);
std::atomic<bool> stop(false);
std::atomic<uint64_t> reads(0);

std::vector<std::thread> threads;
while (threads.size() != kReaders) {
threads.emplace_back([&list, &latch, &stop, &reads] {
latch.CountDown();
latch.Wait();
while (!stop.load(std::memory_order_acquire)) {
auto key = yb::RandomUniformInt<uint64_t>();
typename List::Iterator iter(&list);
iter.Seek(Encode(&key));
reads.fetch_add(1, std::memory_order_acq_rel);
}
});
}

latch.CountDown();
latch.Wait();

auto start = yb::MonoTime::Now();
for (int i = 0; i != kWrites; ++i) {
auto key = yb::RandomUniformInt<uint64_t>();
char* buf = list.AllocateKey(sizeof(Key));
memcpy(buf, &key, sizeof(Key));
list.Insert(buf);
}
auto finish = yb::MonoTime::Now();

stop.store(true, std::memory_order_release);
for (auto& thread : threads) {
thread.join();
}

auto passed = finish - start;
auto reads_value = reads.load(std::memory_order_acquire) / 1000;
LOG(INFO) << "Passed: " << passed << ", reads: "
<< reads_value / passed.ToSeconds() << "kops/sec";
}

TEST_F(InlineSkipTest, Benchmark) {
Benchmark<InlineSkipList<TestComparator>>();
}

TEST_F(InlineSkipTest, SWBenchmark) {
Benchmark<SingleWriterInlineSkipList<TestComparator>>();
}

} // namespace rocksdb

int main(int argc, char** argv) {
Expand Down
Loading

0 comments on commit 10190af

Please sign in to comment.