Skip to content

Commit

Permalink
Make the appenders use a thread pool
Browse files Browse the repository at this point in the history
Summary:
This diff makes the appender use a task stream which uses
threads from the append thread pool. This ensures that we use a thread
only when a tablet is processing consensus batches. We will now
typically use fewer threads than the number of tablets for appending to
the log.

Test Plan: Jenkins

Reviewers: hector, robert, kannan, mikhail

Reviewed By: mikhail

Subscribers: ybase, bogdan, bharat

Differential Revision: https://phabricator.dev.yugabyte.com/D4358
  • Loading branch information
rven1 committed Apr 4, 2018
1 parent c2555ed commit 17c3081
Show file tree
Hide file tree
Showing 23 changed files with 443 additions and 393 deletions.
13 changes: 8 additions & 5 deletions src/yb/consensus/consensus_peers-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,24 @@ class ConsensusPeersTest : public YBTest {
ConsensusPeersTest()
: metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "peer-test")),
schema_(GetSimpleTestSchema()) {
CHECK_OK(ThreadPoolBuilder("test-raft-pool").Build(&raft_pool_));
raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
}

void SetUp() override {
YBTest::SetUp();
ASSERT_OK(ThreadPoolBuilder("test-raft-pool").Build(&raft_pool_));
raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
ASSERT_OK(ThreadPoolBuilder("append").Build(&append_pool_));
fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root"), "tserver_test"));

CHECK_OK(fs_manager_->CreateInitialFileSystemLayout());
CHECK_OK(Log::Open(options_,
ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
ASSERT_OK(Log::Open(options_,
fs_manager_.get(),
kTabletId,
fs_manager_->GetFirstTabletWalDirOrDie(kTableId, kTabletId),
schema_,
0, // schema_version
NULL,
append_pool_.get(),
&log_));
clock_.reset(new server::HybridClock());
ASSERT_OK(clock_->Init());
Expand All @@ -111,7 +113,7 @@ class ConsensusPeersTest : public YBTest {
}

void TearDown() override {
CHECK_OK(log_->WaitUntilAllFlushed());
ASSERT_OK(log_->WaitUntilAllFlushed());
}

DelayablePeerProxy<NoOpTestPeerProxy>* NewRemotePeer(
Expand Down Expand Up @@ -159,6 +161,7 @@ class ConsensusPeersTest : public YBTest {
MetricRegistry metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
gscoped_ptr<FsManager> fs_manager_;
unique_ptr<ThreadPool> append_pool_;
scoped_refptr<Log> log_;
gscoped_ptr<PeerMessageQueue> message_queue_;
const Schema schema_;
Expand Down
5 changes: 4 additions & 1 deletion src/yb/consensus/consensus_queue-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,15 @@ class ConsensusQueueTest : public YBTest {
fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root"), "tserver_test"));
ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
ASSERT_OK(fs_manager_->Open());
CHECK_OK(log::Log::Open(log::LogOptions(),
ASSERT_OK(ThreadPoolBuilder("append").Build(&append_pool_));
ASSERT_OK(log::Log::Open(log::LogOptions(),
fs_manager_.get(),
kTestTablet,
fs_manager_->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet),
schema_,
0, // schema_version
NULL,
append_pool_.get(),
&log_));
clock_.reset(new server::HybridClock());
ASSERT_OK(clock_->Init());
Expand Down Expand Up @@ -213,6 +215,7 @@ class ConsensusQueueTest : public YBTest {
gscoped_ptr<FsManager> fs_manager_;
MetricRegistry metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
std::unique_ptr<ThreadPool> append_pool_;
scoped_refptr<log::Log> log_;
std::unique_ptr<ThreadPool> raft_pool_;
gscoped_ptr<PeerMessageQueue> queue_;
Expand Down
10 changes: 7 additions & 3 deletions src/yb/consensus/log-test-base.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,22 @@ class LogTestBase : public YBTest {
tablet_wal_path_ = fs_manager_->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet);
clock_.reset(new server::HybridClock());
ASSERT_OK(clock_->Init());

FLAGS_log_min_seconds_to_retain = 0;
ASSERT_OK(ThreadPoolBuilder("append")
.unlimited_threads()
.Build(&append_pool_));
}

void BuildLog() {
Schema schema_with_ids = SchemaBuilder(schema_).Build();
CHECK_OK(Log::Open(options_,
ASSERT_OK(Log::Open(options_,
fs_manager_.get(),
kTestTablet,
tablet_wal_path_,
schema_with_ids,
0, // schema_version
metric_entity_.get(),
append_pool_.get(),
&log_));
}

Expand Down Expand Up @@ -206,7 +209,7 @@ class LogTestBase : public YBTest {
}

static void CheckReplicateResult(const consensus::ReplicateMsgPtr& msg, const Status& s) {
CHECK_OK(s);
ASSERT_OK(s);
}

// Appends a batch with size 2, or the given set of writes.
Expand Down Expand Up @@ -301,6 +304,7 @@ class LogTestBase : public YBTest {
gscoped_ptr<FsManager> fs_manager_;
gscoped_ptr<MetricRegistry> metric_registry_;
scoped_refptr<MetricEntity> metric_entity_;
std::unique_ptr<ThreadPool> append_pool_;
scoped_refptr<Log> log_;
int32_t current_index_;
LogOptions options_;
Expand Down
Loading

0 comments on commit 17c3081

Please sign in to comment.