Skip to content

Commit

Permalink
Make the appenders use a thread pool
Browse files Browse the repository at this point in the history
Summary:
This diff makes the appender use a task stream which uses
threads from the append thread pool. This ensures that we use a thread
only when a tablet is processing consensus batches. We will now
typically use fewer threads than the number of tablets for appending to
the log.

Test Plan: Jenkins

Reviewers: bogdan, sergei, mikhail

Reviewed By: mikhail

Subscribers: ybase, bogdan, bharat

Differential Revision: https://phabricator.dev.yugabyte.com/D4232
  • Loading branch information
rven1 committed Mar 7, 2018
1 parent 6b33e8d commit abe90b1
Show file tree
Hide file tree
Showing 24 changed files with 258 additions and 174 deletions.
4 changes: 2 additions & 2 deletions src/yb/consensus/README
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ Group commit implementation details
=============================================================

Currently, the group implementation uses a blocking queue (see
Log::entry_queue_ in log.h) and a separate long-running thread (see
Log::AppendThread in log.cc). Since access to the queue is
Log::entry_queue_ in log.h) and a separate TaskStream (see
Log::Appender in log.cc). Since access to the queue is
synchronized via a lock and only a single thread removes the queue,
the order in which the elements are added to the queue will be the
same as the order in which the elements are removed from the queue.
Expand Down
13 changes: 8 additions & 5 deletions src/yb/consensus/consensus_peers-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,24 @@ class ConsensusPeersTest : public YBTest {
ConsensusPeersTest()
: metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "peer-test")),
schema_(GetSimpleTestSchema()) {
CHECK_OK(ThreadPoolBuilder("test-raft-pool").Build(&raft_pool_));
raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
}

void SetUp() override {
YBTest::SetUp();
ASSERT_OK(ThreadPoolBuilder("test-raft-pool").Build(&raft_pool_));
raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
ASSERT_OK(ThreadPoolBuilder("append").Build(&append_pool_));
fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root"), "tserver_test"));

CHECK_OK(fs_manager_->CreateInitialFileSystemLayout());
CHECK_OK(Log::Open(options_,
ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
ASSERT_OK(Log::Open(options_,
fs_manager_.get(),
kTabletId,
fs_manager_->GetFirstTabletWalDirOrDie(kTableId, kTabletId),
schema_,
0, // schema_version
NULL,
append_pool_.get(),
&log_));
clock_.reset(new server::HybridClock());
ASSERT_OK(clock_->Init());
Expand All @@ -111,7 +113,7 @@ class ConsensusPeersTest : public YBTest {
}

void TearDown() override {
CHECK_OK(log_->WaitUntilAllFlushed());
ASSERT_OK(log_->WaitUntilAllFlushed());
}

DelayablePeerProxy<NoOpTestPeerProxy>* NewRemotePeer(
Expand Down Expand Up @@ -159,6 +161,7 @@ class ConsensusPeersTest : public YBTest {
MetricRegistry metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
gscoped_ptr<FsManager> fs_manager_;
unique_ptr<ThreadPool> append_pool_;
scoped_refptr<Log> log_;
gscoped_ptr<PeerMessageQueue> message_queue_;
const Schema schema_;
Expand Down
5 changes: 4 additions & 1 deletion src/yb/consensus/consensus_queue-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,15 @@ class ConsensusQueueTest : public YBTest {
fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root"), "tserver_test"));
ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
ASSERT_OK(fs_manager_->Open());
CHECK_OK(log::Log::Open(log::LogOptions(),
ASSERT_OK(ThreadPoolBuilder("append").Build(&append_pool_));
ASSERT_OK(log::Log::Open(log::LogOptions(),
fs_manager_.get(),
kTestTablet,
fs_manager_->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet),
schema_,
0, // schema_version
NULL,
append_pool_.get(),
&log_));
clock_.reset(new server::HybridClock());
ASSERT_OK(clock_->Init());
Expand Down Expand Up @@ -213,6 +215,7 @@ class ConsensusQueueTest : public YBTest {
gscoped_ptr<FsManager> fs_manager_;
MetricRegistry metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
std::unique_ptr<ThreadPool> append_pool_;
scoped_refptr<log::Log> log_;
std::unique_ptr<ThreadPool> raft_pool_;
gscoped_ptr<PeerMessageQueue> queue_;
Expand Down
10 changes: 7 additions & 3 deletions src/yb/consensus/log-test-base.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,22 @@ class LogTestBase : public YBTest {
tablet_wal_path_ = fs_manager_->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet);
clock_.reset(new server::HybridClock());
ASSERT_OK(clock_->Init());

FLAGS_log_min_seconds_to_retain = 0;
ASSERT_OK(ThreadPoolBuilder("append")
.unlimited_threads()
.Build(&append_pool_));
}

void BuildLog() {
Schema schema_with_ids = SchemaBuilder(schema_).Build();
CHECK_OK(Log::Open(options_,
ASSERT_OK(Log::Open(options_,
fs_manager_.get(),
kTestTablet,
tablet_wal_path_,
schema_with_ids,
0, // schema_version
metric_entity_.get(),
append_pool_.get(),
&log_));
}

Expand Down Expand Up @@ -206,7 +209,7 @@ class LogTestBase : public YBTest {
}

static void CheckReplicateResult(const consensus::ReplicateMsgPtr& msg, const Status& s) {
CHECK_OK(s);
ASSERT_OK(s);
}

// Appends a batch with size 2, or the given set of writes.
Expand Down Expand Up @@ -301,6 +304,7 @@ class LogTestBase : public YBTest {
gscoped_ptr<FsManager> fs_manager_;
gscoped_ptr<MetricRegistry> metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
std::unique_ptr<ThreadPool> append_pool_;
scoped_refptr<Log> log_;
int32_t current_index_;
LogOptions options_;
Expand Down
Loading

0 comments on commit abe90b1

Please sign in to comment.