From abe90b142d79dd129a21ac651f88391cc690ed78 Mon Sep 17 00:00:00 2001 From: Venkatesh Radhakrishnan Date: Mon, 20 Nov 2017 15:23:30 -0800 Subject: [PATCH] Make the appenders use a thread pool Summary: This diff makes the appender use a task stream which uses threads from the append thread pool. This ensures that we use a thread only when a tablet is processing consensus batches. We will now typically use fewer threads than the number of tablets for appending to the log. Test Plan: Jenkins Reviewers: bogdan, sergei, mikhail Reviewed By: mikhail Subscribers: ybase, bogdan, bharat Differential Revision: https://phabricator.dev.yugabyte.com/D4232 --- src/yb/consensus/README | 4 +- src/yb/consensus/consensus_peers-test.cc | 13 +- src/yb/consensus/consensus_queue-test.cc | 5 +- src/yb/consensus/log-test-base.h | 10 +- src/yb/consensus/log.cc | 208 ++++++++++-------- src/yb/consensus/log.h | 14 +- src/yb/consensus/log_cache-test.cc | 7 +- src/yb/consensus/raft_consensus-test.cc | 17 +- .../consensus/raft_consensus_quorum-test.cc | 49 +++-- src/yb/master/sys_catalog.cc | 5 +- src/yb/master/sys_catalog.h | 4 + src/yb/tablet/tablet_bootstrap-test.cc | 4 +- src/yb/tablet/tablet_bootstrap.cc | 5 +- src/yb/tablet/tablet_bootstrap.h | 4 + src/yb/tablet/tablet_bootstrap_if.h | 3 + src/yb/tablet/tablet_peer-test.cc | 9 +- .../tserver/remote_bootstrap_session-test.h | 32 +-- src/yb/tserver/ts_tablet_manager.cc | 15 +- src/yb/tserver/ts_tablet_manager.h | 4 + src/yb/util/CMakeLists.txt | 1 - src/yb/util/taskstream.h | 3 + .../util/{taskstream.cc => taskstream_impl.h} | 10 +- src/yb/util/threadpool.cc | 5 + src/yb/util/threadpool.h | 1 + 24 files changed, 258 insertions(+), 174 deletions(-) rename src/yb/util/{taskstream.cc => taskstream_impl.h} (95%) diff --git a/src/yb/consensus/README b/src/yb/consensus/README index 70986fed4c2e..85e8add8d542 100644 --- a/src/yb/consensus/README +++ b/src/yb/consensus/README @@ -63,8 +63,8 @@ Group commit implementation details ============================================================= Currently, the group implementation uses a blocking queue (see -Log::entry_queue_ in log.h) and a separate long-running thread (see -Log::AppendThread in log.cc). Since access to the queue is +Log::entry_queue_ in log.h) and a separate TaskStream (see +Log::Appender in log.cc). Since access to the queue is synchronized via a lock and only a single thread removes the queue, the order in which the elements are added to the queue will be the same as the order in which the elements are removed from the queue. diff --git a/src/yb/consensus/consensus_peers-test.cc b/src/yb/consensus/consensus_peers-test.cc index e655a7c818c3..19c3e7b644fe 100644 --- a/src/yb/consensus/consensus_peers-test.cc +++ b/src/yb/consensus/consensus_peers-test.cc @@ -74,22 +74,24 @@ class ConsensusPeersTest : public YBTest { ConsensusPeersTest() : metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "peer-test")), schema_(GetSimpleTestSchema()) { - CHECK_OK(ThreadPoolBuilder("test-raft-pool").Build(&raft_pool_)); - raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT); } void SetUp() override { YBTest::SetUp(); + ASSERT_OK(ThreadPoolBuilder("test-raft-pool").Build(&raft_pool_)); + raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT); + ASSERT_OK(ThreadPoolBuilder("append").Build(&append_pool_)); fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root"), "tserver_test")); - CHECK_OK(fs_manager_->CreateInitialFileSystemLayout()); - CHECK_OK(Log::Open(options_, + ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout()); + ASSERT_OK(Log::Open(options_, fs_manager_.get(), kTabletId, fs_manager_->GetFirstTabletWalDirOrDie(kTableId, kTabletId), schema_, 0, // schema_version NULL, + append_pool_.get(), &log_)); clock_.reset(new server::HybridClock()); ASSERT_OK(clock_->Init()); @@ -111,7 +113,7 @@ class ConsensusPeersTest : public YBTest { } void TearDown() override { - CHECK_OK(log_->WaitUntilAllFlushed()); + ASSERT_OK(log_->WaitUntilAllFlushed()); } DelayablePeerProxy* NewRemotePeer( @@ -159,6 +161,7 @@ class ConsensusPeersTest : public YBTest { MetricRegistry metric_registry_; scoped_refptr metric_entity_; gscoped_ptr fs_manager_; + unique_ptr append_pool_; scoped_refptr log_; gscoped_ptr message_queue_; const Schema schema_; diff --git a/src/yb/consensus/consensus_queue-test.cc b/src/yb/consensus/consensus_queue-test.cc index 20bf34c86a71..722c7ec5c1df 100644 --- a/src/yb/consensus/consensus_queue-test.cc +++ b/src/yb/consensus/consensus_queue-test.cc @@ -79,13 +79,15 @@ class ConsensusQueueTest : public YBTest { fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root"), "tserver_test")); ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout()); ASSERT_OK(fs_manager_->Open()); - CHECK_OK(log::Log::Open(log::LogOptions(), + ASSERT_OK(ThreadPoolBuilder("append").Build(&append_pool_)); + ASSERT_OK(log::Log::Open(log::LogOptions(), fs_manager_.get(), kTestTablet, fs_manager_->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet), schema_, 0, // schema_version NULL, + append_pool_.get(), &log_)); clock_.reset(new server::HybridClock()); ASSERT_OK(clock_->Init()); @@ -213,6 +215,7 @@ class ConsensusQueueTest : public YBTest { gscoped_ptr fs_manager_; MetricRegistry metric_registry_; scoped_refptr metric_entity_; + std::unique_ptr append_pool_; scoped_refptr log_; std::unique_ptr raft_pool_; gscoped_ptr queue_; diff --git a/src/yb/consensus/log-test-base.h b/src/yb/consensus/log-test-base.h index b940b5ed0583..027ce9162532 100644 --- a/src/yb/consensus/log-test-base.h +++ b/src/yb/consensus/log-test-base.h @@ -166,19 +166,22 @@ class LogTestBase : public YBTest { tablet_wal_path_ = fs_manager_->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet); clock_.reset(new server::HybridClock()); ASSERT_OK(clock_->Init()); - FLAGS_log_min_seconds_to_retain = 0; + ASSERT_OK(ThreadPoolBuilder("append") + .unlimited_threads() + .Build(&append_pool_)); } void BuildLog() { Schema schema_with_ids = SchemaBuilder(schema_).Build(); - CHECK_OK(Log::Open(options_, + ASSERT_OK(Log::Open(options_, fs_manager_.get(), kTestTablet, tablet_wal_path_, schema_with_ids, 0, // schema_version metric_entity_.get(), + append_pool_.get(), &log_)); } @@ -206,7 +209,7 @@ class LogTestBase : public YBTest { } static void CheckReplicateResult(const consensus::ReplicateMsgPtr& msg, const Status& s) { - CHECK_OK(s); + ASSERT_OK(s); } // Appends a batch with size 2, or the given set of writes. @@ -301,6 +304,7 @@ class LogTestBase : public YBTest { gscoped_ptr fs_manager_; gscoped_ptr metric_registry_; scoped_refptr metric_entity_; + std::unique_ptr append_pool_; scoped_refptr log_; int32_t current_index_; LogOptions options_; diff --git a/src/yb/consensus/log.cc b/src/yb/consensus/log.cc index 80e108cf7243..41aeac727012 100644 --- a/src/yb/consensus/log.cc +++ b/src/yb/consensus/log.cc @@ -37,6 +37,7 @@ #include #include +#include #include "yb/common/wire_protocol.h" #include "yb/consensus/log_index.h" #include "yb/consensus/log_metrics.h" @@ -63,12 +64,14 @@ #include "yb/util/random.h" #include "yb/util/size_literals.h" #include "yb/util/stopwatch.h" +#include "yb/util/taskstream.h" #include "yb/util/thread.h" #include "yb/util/threadpool.h" #include "yb/util/trace.h" using namespace yb::size_literals; using namespace std::literals; +using namespace std::placeholders; // Log retention configuration. // ----------------------------- @@ -135,136 +138,146 @@ namespace log { using consensus::OpId; using env_util::OpenFileForRandom; using std::shared_ptr; +using std::unique_ptr; using strings::Substitute; -// This class is responsible for managing the thread that appends to the log file. -class Log::AppendThread { +// This class is responsible for managing the task in a threadpool that appends to the log file. +class Log::Appender { public: - explicit AppendThread(Log* log); + explicit Appender(Log* log, ThreadPool* append_thread_pool); - // Initializes the objects and starts the thread. + // Initializes the objects and starts the task. Status Init(); - // Waits until the last enqueued elements are processed, sets the Appender thread to closing + CHECKED_STATUS Submit(LogEntryBatch* item) { + return task_stream_->Submit(item); + } + + // Waits until the last enqueued elements are processed, sets the appender_ to closing // state. If any entries are added to the queue during the process, invoke their callbacks' // 'OnFailure()' method. void Shutdown(); private: - void RunThread(); + // Process the given log entry batch or does a sync if a null is passed. + void ProcessBatch(LogEntryBatch* entry_batch); + void SyncWork(); Log* const log_; // Lock to protect access to thread_ during shutdown. mutable std::mutex lock_; - scoped_refptr thread_; + unique_ptr> task_stream_; + std::vector> sync_batch_; + MonoTime time_started_; }; -Log::AppendThread::AppendThread(Log *log) - : log_(log) { +Log::Appender::Appender(Log *log, ThreadPool* append_thread_pool) + : log_(log), task_stream_(new TaskStream( + std::bind(&Log::Appender::ProcessBatch, this, _1), append_thread_pool)) { DCHECK(dummy); } -Status Log::AppendThread::Init() { - DCHECK(!thread_) << "Already initialized"; - VLOG(1) << "Starting log append thread for tablet " << log_->tablet_id(); - RETURN_NOT_OK(yb::Thread::Create("log", "appender", - &AppendThread::RunThread, this, &thread_)); +Status Log::Appender::Init() { + VLOG(1) << "Starting log task stream for tablet " << log_->tablet_id(); return Status::OK(); } -void Log::AppendThread::RunThread() { - bool shutting_down = false; - - while (PREDICT_TRUE(!shutting_down)) { - std::vector entry_batches; - ElementDeleter d(&entry_batches); - - MonoTime wait_timeout_deadline = MonoTime::kMax; - if ((log_->interval_durable_wal_write_) - && log_->periodic_sync_needed_.load()) { - wait_timeout_deadline = log_->periodic_sync_earliest_unsync_entry_time_ - + log_->interval_durable_wal_write_; - } - - // We shut down the entry_queue when it's time to shut down the append thread, which causes this - // call to return false, while still populating the entry_batches vector with the final set of - // log entry batches that were enqueued. We finish processing this last bunch of log entry - // batches before exiting the main RunThread() loop. - if (PREDICT_FALSE(!log_->entry_queue()->BlockingDrainTo(&entry_batches, - wait_timeout_deadline))) { - shutting_down = true; - } +void Log::Appender::ProcessBatch(LogEntryBatch* entry_batch) { + // A callback function to TaskStream is expected to process the accumulated batch of entries. + // Here, we do a sync. + if (entry_batch == nullptr) { + SyncWork(); + return; + } + if (sync_batch_.empty()) { // Start of batch. + // Used in tests to delay writing log entries. auto sleep_duration = log_->sleep_duration_.load(std::memory_order_acquire); if (sleep_duration.count() > 0) { std::this_thread::sleep_for(sleep_duration); } - - if (log_->metrics_) { - log_->metrics_->entry_batches_per_group->Increment(entry_batches.size()); + time_started_ = MonoTime::Now(); + } + TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch); + Status s = log_->DoAppend(entry_batch); + + if (PREDICT_FALSE(!s.ok())) { + LOG(ERROR) << "Error appending to the log: " << s.ToString(); + DLOG(FATAL) << "Aborting: " << s.ToString(); + entry_batch->set_failed_to_append(); + // TODO If a single operation fails to append, should we abort all subsequent operations + // in this batch or allow them to be appended? What about operations in future batches? + if (!entry_batch->callback().is_null()) { + entry_batch->callback().Run(s); } - TRACE_EVENT1("log", "batch", "batch_size", entry_batches.size()); - - SCOPED_LATENCY_METRIC(log_->metrics_, group_commit_latency); - - for (LogEntryBatch* entry_batch : entry_batches) { - TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch); - Status s = log_->DoAppend(entry_batch); - - if (PREDICT_FALSE(!s.ok())) { - LOG(ERROR) << "Error appending to the log: " << s.ToString(); - DLOG(FATAL) << "Aborting: " << s.ToString(); - entry_batch->set_failed_to_append(); - // TODO If a single operation fails to append, should we abort all subsequent operations - // in this batch or allow them to be appended? What about operations in future batches? - if (!entry_batch->callback().is_null()) { - entry_batch->callback().Run(s); - } - } else if (!log_->sync_disabled_) { - if (!log_->periodic_sync_needed_.load()) { - log_->periodic_sync_needed_.store(true); - log_->periodic_sync_earliest_unsync_entry_time_ = MonoTime::Now(); - } - log_->periodic_sync_unsynced_bytes_ += entry_batch->total_size_bytes(); - } + return; + } + if (!log_->sync_disabled_) { + if (!log_->periodic_sync_needed_.load()) { + log_->periodic_sync_needed_.store(true); + log_->periodic_sync_earliest_unsync_entry_time_ = MonoTime::Now(); } + log_->periodic_sync_unsynced_bytes_ += entry_batch->total_size_bytes(); + sync_batch_.emplace_back(entry_batch); + } +} - Status s = log_->Sync(); - if (PREDICT_FALSE(!s.ok())) { - LOG(ERROR) << "Error syncing log" << s.ToString(); - DLOG(FATAL) << "Aborting: " << s.ToString(); - for (LogEntryBatch* entry_batch : entry_batches) { - if (!entry_batch->callback().is_null()) { - entry_batch->callback().Run(s); - } +void Log::Appender::SyncWork() { + MonoTime wait_timeout_deadline = MonoTime::kMax; + if (log_->interval_durable_wal_write_ && log_->periodic_sync_needed_.load()) { + wait_timeout_deadline = log_->periodic_sync_earliest_unsync_entry_time_ + + log_->interval_durable_wal_write_; + } + + if (log_->metrics_) { + log_->metrics_->entry_batches_per_group->Increment(sync_batch_.size()); + } + TRACE_EVENT1("log", "batch", "batch_size", sync_batch_.size()); + + BOOST_SCOPE_EXIT(this_) { + MonoTime time_now = MonoTime::Now(); + if (this_->log_->metrics_) { + this_->log_->metrics_->group_commit_latency->Increment( + time_now.GetDeltaSince(this_->time_started_).ToMicroseconds()); + } + this_->sync_batch_.clear(); + } BOOST_SCOPE_EXIT_END; + + Status s = log_->Sync(); + if (PREDICT_FALSE(!s.ok())) { + LOG(ERROR) << "Error syncing log" << s.ToString(); + DLOG(FATAL) << "Aborting: " << s.ToString(); + for (std::unique_ptr& entry_batch : sync_batch_) { + if (!entry_batch->callback().is_null()) { + entry_batch->callback().Run(s); } - } else { - TRACE_EVENT0("log", "Callbacks"); - VLOG(2) << "Synchronized " << entry_batches.size() << " entry batches"; - SCOPED_WATCH_STACK(100); - for (LogEntryBatch* entry_batch : entry_batches) { - if (PREDICT_TRUE(!entry_batch->failed_to_append() && !entry_batch->callback().is_null())) { - entry_batch->callback().Run(Status::OK()); - } - // It's important to delete each batch as we see it, because deleting it may free up memory - // from memory trackers, and the callback of a later batch may want to use that memory. - delete entry_batch; + } + } else { + TRACE_EVENT0("log", "Callbacks"); + VLOG(2) << "Synchronized " << sync_batch_.size() << " entry batches"; + SCOPED_WATCH_STACK(100); + for (std::unique_ptr& entry_batch : sync_batch_) { + if (PREDICT_TRUE(!entry_batch->failed_to_append() && !entry_batch->callback().is_null())) { + entry_batch->callback().Run(Status::OK()); } - entry_batches.clear(); + // It's important to delete each batch as we see it, because deleting it may free up memory + // from memory trackers, and the callback of a later batch may want to use that memory. + entry_batch.reset(); } + sync_batch_.clear(); } - VLOG(1) << "Exiting AppendThread for tablet " << log_->tablet_id(); + VLOG(1) << "Exiting AppendTask for tablet " << log_->tablet_id(); } -void Log::AppendThread::Shutdown() { +void Log::Appender::Shutdown() { log_->entry_queue()->Shutdown(); std::lock_guard lock_guard(lock_); - if (thread_) { - VLOG(1) << "Shutting down log append thread for tablet " << log_->tablet_id(); - CHECK_OK(ThreadJoiner(thread_.get()).Join()); - VLOG(1) << "Log append thread for tablet " << log_->tablet_id() << " is shut down"; - thread_.reset(); + if (task_stream_) { + VLOG(1) << "Shutting down log task stream for tablet " << log_->tablet_id(); + task_stream_->Stop(); + VLOG(1) << "Log append task stream for tablet " << log_->tablet_id() << " is shut down"; + task_stream_.reset(); } } @@ -284,6 +297,7 @@ Status Log::Open(const LogOptions &options, const Schema& schema, uint32_t schema_version, const scoped_refptr& metric_entity, + ThreadPool* append_thread_pool, scoped_refptr* log) { RETURN_NOT_OK_PREPEND(fs_manager->CreateDirIfMissing(DirName(tablet_wal_path)), @@ -299,7 +313,8 @@ Status Log::Open(const LogOptions &options, tablet_wal_path, schema, schema_version, - metric_entity)); + metric_entity, + append_thread_pool)); RETURN_NOT_OK(new_log->Init()); log->swap(new_log); return Status::OK(); @@ -307,7 +322,8 @@ Status Log::Open(const LogOptions &options, Log::Log(LogOptions options, FsManager* fs_manager, string log_path, string tablet_id, string tablet_wal_path, const Schema& schema, uint32_t schema_version, - const scoped_refptr& metric_entity) + const scoped_refptr& metric_entity, + ThreadPool* append_thread_pool) : options_(std::move(options)), fs_manager_(fs_manager), log_dir_(std::move(log_path)), @@ -319,7 +335,7 @@ Log::Log(LogOptions options, FsManager* fs_manager, string log_path, log_state_(kLogInitialized), max_segment_size_(options_.segment_size_bytes), entry_batch_queue_(FLAGS_group_commit_queue_size_bytes), - append_thread_(new AppendThread(this)), + appender_(new Appender(this, append_thread_pool)), durable_wal_write_(options_.durable_wal_write), interval_durable_wal_write_(options_.interval_durable_wal_write), bytes_durable_wal_write_mb_(options_.bytes_durable_wal_write_mb), @@ -374,7 +390,7 @@ Status Log::Init() { RETURN_NOT_OK(allocation_status_.Get()); RETURN_NOT_OK(SwitchToAllocatedSegment()); - RETURN_NOT_OK(append_thread_->Init()); + RETURN_NOT_OK(appender_->Init()); log_state_ = kLogWriting; return Status::OK(); } @@ -456,7 +472,7 @@ Status Log::AsyncAppend(LogEntryBatch* entry_batch, const StatusCallback& callba entry_batch->set_callback(callback); entry_batch->MarkReady(); - if (PREDICT_FALSE(!entry_batch_queue_.BlockingPut(entry_batch))) { + if (PREDICT_FALSE(!appender_->Submit(entry_batch).ok())) { delete entry_batch; return kLogShutdownStatus; } @@ -864,7 +880,7 @@ void Log::SetSchemaForNextLogSegment(const Schema& schema, Status Log::Close() { allocation_pool_->Shutdown(); - append_thread_->Shutdown(); + appender_->Shutdown(); std::lock_guard l(state_lock_); switch (log_state_) { diff --git a/src/yb/consensus/log.h b/src/yb/consensus/log.h index a3d69a4f2e7b..e25c881f4bc1 100644 --- a/src/yb/consensus/log.h +++ b/src/yb/consensus/log.h @@ -53,6 +53,7 @@ #include "yb/util/opid.h" #include "yb/util/promise.h" #include "yb/util/status.h" +#include "yb/util/threadpool.h" namespace yb { @@ -107,6 +108,7 @@ class Log : public RefCountedThreadSafe { const Schema& schema, uint32_t schema_version, const scoped_refptr& metric_entity, + ThreadPool *append_thread_pool, scoped_refptr *log); ~Log(); @@ -255,7 +257,7 @@ class Log : public RefCountedThreadSafe { FRIEND_TEST(LogTest, TestReadLogWithReplacedReplicates); FRIEND_TEST(LogTest, TestWriteAndReadToAndFromInProgressSegment); - class AppendThread; + class Appender; // Log state. enum LogState { @@ -273,7 +275,8 @@ class Log : public RefCountedThreadSafe { Log(LogOptions options, FsManager* fs_manager, std::string log_path, std::string tablet_id, std::string tablet_wal_path, const Schema& schema, - uint32_t schema_version, const scoped_refptr& metric_entity); + uint32_t schema_version, const scoped_refptr& metric_entity, + ThreadPool* append_thread_pool); // Initializes a new one or continues an existing log. CHECKED_STATUS Init(); @@ -396,11 +399,12 @@ class Log : public RefCountedThreadSafe { // Note: The first WAL segment will start off as twice of this value. uint64_t cur_max_segment_size_ = 512 * 1024; - // The queue used to communicate between the thread calling Reserve() and the Log Appender thread. + // The queue used to communicate between the thread calling Reserve() and the thread appending to + // the Log. LogEntryBatchQueue entry_batch_queue_; - // Thread writing to the log. - gscoped_ptr append_thread_; + // Appender manages a TaskStream writing to the log. We will use one taskstream per tablet. + std::unique_ptr appender_; // A thread pool for asynchronously pre-allocating new log segments. gscoped_ptr allocation_pool_; diff --git a/src/yb/consensus/log_cache-test.cc b/src/yb/consensus/log_cache-test.cc index 892a7f072445..a29376ff8fcd 100644 --- a/src/yb/consensus/log_cache-test.cc +++ b/src/yb/consensus/log_cache-test.cc @@ -73,13 +73,15 @@ class LogCacheTest : public YBTest { fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root"), "tserver_test")); ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout()); ASSERT_OK(fs_manager_->Open()); - CHECK_OK(log::Log::Open(log::LogOptions(), + ASSERT_OK(ThreadPoolBuilder("append").Build(&append_pool_)); + ASSERT_OK(log::Log::Open(log::LogOptions(), fs_manager_.get(), kTestTablet, fs_manager_->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet), schema_, 0, // schema_version NULL, + append_pool_.get(), &log_)); CloseAndReopenCache(MinimumOpId()); @@ -104,7 +106,7 @@ class LogCacheTest : public YBTest { protected: static void FatalOnError(const Status& s) { - CHECK_OK(s); + ASSERT_OK(s); } Status AppendReplicateMessagesToCache( @@ -125,6 +127,7 @@ class LogCacheTest : public YBTest { MetricRegistry metric_registry_; scoped_refptr metric_entity_; gscoped_ptr fs_manager_; + std::unique_ptr append_pool_; gscoped_ptr cache_; scoped_refptr log_; scoped_refptr clock_; diff --git a/src/yb/consensus/raft_consensus-test.cc b/src/yb/consensus/raft_consensus-test.cc index 833700a15054..f774b919eb75 100644 --- a/src/yb/consensus/raft_consensus-test.cc +++ b/src/yb/consensus/raft_consensus-test.cc @@ -212,20 +212,22 @@ class RaftConsensusTest : public YBTest { // TODO mock the Log too, since we're gonna mock the queue // monitors and pretty much everything else. fs_manager_.reset(new FsManager(env_.get(), test_path, "tserver_test")); - CHECK_OK(fs_manager_->CreateInitialFileSystemLayout()); - CHECK_OK(fs_manager_->Open()); - CHECK_OK(Log::Open(LogOptions(), + ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout()); + ASSERT_OK(fs_manager_->Open()); + ASSERT_OK(ThreadPoolBuilder("append").Build(&append_pool_)); + ASSERT_OK(Log::Open(LogOptions(), fs_manager_.get(), kTestTablet, fs_manager_->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet), schema_, 0, // schema_version - NULL, + nullptr, // metric_entity + append_pool_.get(), &log_)); log_->TEST_SetAllOpIdsSafe(true); - CHECK_OK(ThreadPoolBuilder("raft-pool").Build(&raft_pool_)); + ASSERT_OK(ThreadPoolBuilder("raft-pool").Build(&raft_pool_)); std::unique_ptr raft_pool_token = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT); queue_ = new MockQueue(metric_entity_, log_.get(), clock_, std::move(raft_pool_token)); @@ -245,7 +247,7 @@ class RaftConsensusTest : public YBTest { string peer_uuid = config_.peers(num_peers - 1).permanent_uuid(); std::unique_ptr cmeta; - CHECK_OK(ConsensusMetadata::Create(fs_manager_.get(), kTestTablet, peer_uuid, + ASSERT_OK(ConsensusMetadata::Create(fs_manager_.get(), kTestTablet, peer_uuid, config_, initial_term, &cmeta)); std::unique_ptr raft_pool_token = @@ -279,7 +281,7 @@ class RaftConsensusTest : public YBTest { static void LogAppendCallback(const StatusCallback& callback, const Status& s) { - CHECK_OK(s); + ASSERT_OK(s); callback.Run(s); } @@ -347,6 +349,7 @@ class RaftConsensusTest : public YBTest { RaftConfigPB config_; OpId initial_id_; gscoped_ptr fs_manager_; + std::unique_ptr append_pool_; scoped_refptr log_; gscoped_ptr proxy_factory_; scoped_refptr clock_; diff --git a/src/yb/consensus/raft_consensus_quorum-test.cc b/src/yb/consensus/raft_consensus_quorum-test.cc index 117f93f865e3..d4f53175af8c 100644 --- a/src/yb/consensus/raft_consensus_quorum-test.cc +++ b/src/yb/consensus/raft_consensus_quorum-test.cc @@ -99,7 +99,6 @@ class RaftConsensusQuorumTest : public YBTest { schema_(GetSimpleTestSchema()) { options_.tablet_id = kTestTablet; FLAGS_enable_leader_failure_detection = false; - CHECK_OK(ThreadPoolBuilder("raft").Build(&raft_pool_)); } // Builds an initial configuration of 'num' elements. @@ -133,7 +132,8 @@ class RaftConsensusQuorumTest : public YBTest { fs_manager->GetFirstTabletWalDirOrDie(kTestTable, kTestTablet), schema_, 0, // schema_version - NULL, + nullptr, // metric_entity + append_pool_.get(), &log)); logs_.push_back(log.get()); fs_managers_.push_back(fs_manager.release()); @@ -152,11 +152,11 @@ class RaftConsensusQuorumTest : public YBTest { string peer_uuid = Substitute("peer-$0", i); std::unique_ptr cmeta; - CHECK_OK(ConsensusMetadata::Create(fs_managers_[i], kTestTablet, peer_uuid, config_, + ASSERT_OK(ConsensusMetadata::Create(fs_managers_[i], kTestTablet, peer_uuid, config_, kMinimumTerm, &cmeta)); RaftPeerPB local_peer_pb; - CHECK_OK(GetRaftConfigMember(config_, peer_uuid, &local_peer_pb)); + ASSERT_OK(GetRaftConfigMember(config_, peer_uuid, &local_peer_pb)); gscoped_ptr queue( new PeerMessageQueue(metric_entity_, logs_[i], @@ -210,6 +210,8 @@ class RaftConsensusQuorumTest : public YBTest { } Status BuildConfig(int num) { + RETURN_NOT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_)); + RETURN_NOT_OK(ThreadPoolBuilder("append").Build(&append_pool_)); BuildInitialRaftConfigPB(num); RETURN_NOT_OK(BuildFsManagersAndLogs()); BuildPeers(); @@ -273,12 +275,12 @@ class RaftConsensusQuorumTest : public YBTest { void WaitForReplicateIfNotAlreadyPresent(const OpId& to_wait_for, int peer_idx) { scoped_refptr peer; - CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer)); + ASSERT_OK(peers_->GetPeerByIdx(peer_idx, &peer)); ReplicaState* state = peer->GetReplicaStateForTests(); while (true) { { ReplicaState::UniqueLock lock; - CHECK_OK(state->LockForRead(&lock)); + ASSERT_OK(state->LockForRead(&lock)); if (OpIdCompare(state->GetLastReceivedOpIdUnlocked(), to_wait_for) >= 0) { return; } @@ -296,7 +298,7 @@ class RaftConsensusQuorumTest : public YBTest { MonoTime start(MonoTime::Now()); scoped_refptr peer; - CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer)); + ASSERT_OK(peers_->GetPeerByIdx(peer_idx, &peer)); ReplicaState* state = peer->GetReplicaStateForTests(); int backoff_exp = 0; @@ -305,7 +307,7 @@ class RaftConsensusQuorumTest : public YBTest { while (true) { { ReplicaState::UniqueLock lock; - CHECK_OK(state->LockForRead(&lock)); + ASSERT_OK(state->LockForRead(&lock)); committed_op_id = state->GetCommittedOpIdUnlocked(); if (OpIdCompare(committed_op_id, to_wait_for) >= 0) { return; @@ -324,7 +326,7 @@ class RaftConsensusQuorumTest : public YBTest { << committed_op_id << ". Dumping state and quitting."; vector lines; scoped_refptr leader; - CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader)); + ASSERT_OK(peers_->GetPeerByIdx(leader_idx, &leader)); for (const string& line : lines) { LOG(ERROR) << line; } @@ -372,7 +374,7 @@ class RaftConsensusQuorumTest : public YBTest { if (wait_mode == WAIT_FOR_ALL_REPLICAS) { scoped_refptr leader; - CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader)); + ASSERT_OK(peers_->GetPeerByIdx(leader_idx, &leader)); TestPeerMap all_peers = peers_->GetPeerMapCopy(); int i = 0; @@ -425,13 +427,13 @@ class RaftConsensusQuorumTest : public YBTest { log::LogEntries leader_entries = GatherLogEntries(leader_idx, logs_[leader_idx]); scoped_refptr leader; - CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader)); + ASSERT_OK(peers_->GetPeerByIdx(leader_idx, &leader)); for (int replica_idx = first_replica_idx; replica_idx < last_replica_idx; replica_idx++) { log::LogEntries replica_entries = GatherLogEntries(replica_idx, logs_[replica_idx]); scoped_refptr replica; - CHECK_OK(peers_->GetPeerByIdx(replica_idx, &replica)); + ASSERT_OK(peers_->GetPeerByIdx(replica_idx, &replica)); VerifyReplica(leader_entries, replica_entries, leader->peer_uuid(), @@ -541,6 +543,7 @@ class RaftConsensusQuorumTest : public YBTest { vector fs_managers_; vector > logs_; unique_ptr raft_pool_; + unique_ptr append_pool_; gscoped_ptr peers_; std::vector> operation_factories_; scoped_refptr clock_; @@ -565,7 +568,7 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind) { // lock one of the replicas down by obtaining the state lock // and never letting it go. scoped_refptr follower0; - CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0)); + ASSERT_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0)); ReplicaState* follower0_rs = follower0->GetReplicaStateForTests(); ReplicaState::UniqueLock lock; @@ -610,13 +613,13 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind) { // lock two of the replicas down by obtaining the state locks // and never letting them go. scoped_refptr follower0; - CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0)); + ASSERT_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0)); ReplicaState* follower0_rs = follower0->GetReplicaStateForTests(); ReplicaState::UniqueLock lock0; ASSERT_OK(follower0_rs->LockForRead(&lock0)); scoped_refptr follower1; - CHECK_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1)); + ASSERT_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1)); ReplicaState* follower1_rs = follower1->GetReplicaStateForTests(); ReplicaState::UniqueLock lock1; ASSERT_OK(follower1_rs->LockForRead(&lock1)); @@ -716,9 +719,9 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) { ASSERT_OK(BuildConfig(3)); scoped_refptr follower0; - CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0)); + ASSERT_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0)); scoped_refptr follower1; - CHECK_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1)); + ASSERT_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1)); shared_ptr counter_hook_rpl0( new CounterHooks(follower0->GetFaultHooks())); @@ -733,7 +736,7 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) { ASSERT_OK(StartPeers()); scoped_refptr leader; - CHECK_OK(peers_->GetPeerByIdx(kLeaderIdx, &leader)); + ASSERT_OK(peers_->GetPeerByIdx(kLeaderIdx, &leader)); ASSERT_OK(leader->EmulateElection()); // Wait for the config round to get committed and count the number @@ -793,13 +796,13 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) { // Now shutdown the current leader. LOG(INFO) << "Shutting down current leader with index " << (current_config_size - 1); scoped_refptr current_leader; - CHECK_OK(peers_->GetPeerByIdx(current_config_size - 1, ¤t_leader)); + ASSERT_OK(peers_->GetPeerByIdx(current_config_size - 1, ¤t_leader)); current_leader->Shutdown(); peers_->RemovePeer(current_leader->peer_uuid()); // ... and make the peer before it become leader. scoped_refptr new_leader; - CHECK_OK(peers_->GetPeerByIdx(current_config_size - 2, &new_leader)); + ASSERT_OK(peers_->GetPeerByIdx(current_config_size - 2, &new_leader)); // This will force an election in which we expect to make the last // non-shutdown peer in the list become leader. @@ -848,10 +851,10 @@ TEST_F(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty) { ConsensusResponsePB resp; scoped_refptr leader; - CHECK_OK(peers_->GetPeerByIdx(2, &leader)); + ASSERT_OK(peers_->GetPeerByIdx(2, &leader)); scoped_refptr follower; - CHECK_OK(peers_->GetPeerByIdx(0, &follower)); + ASSERT_OK(peers_->GetPeerByIdx(0, &follower)); req.set_caller_uuid(leader->peer_uuid()); req.set_caller_term(last_op_id.term()); @@ -906,7 +909,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) { const int kPeerIndex = 1; scoped_refptr peer; - CHECK_OK(peers_->GetPeerByIdx(kPeerIndex, &peer)); + ASSERT_OK(peers_->GetPeerByIdx(kPeerIndex, &peer)); VoteRequestPB request; request.set_tablet_id(kTestTablet); diff --git a/src/yb/master/sys_catalog.cc b/src/yb/master/sys_catalog.cc index f461ae762023..be18cecc2c09 100644 --- a/src/yb/master/sys_catalog.cc +++ b/src/yb/master/sys_catalog.cc @@ -119,6 +119,7 @@ SysCatalogTable::SysCatalogTable(Master* master, MetricRegistry* metrics, CHECK_OK(ThreadPoolBuilder("apply").Build(&apply_pool_)); CHECK_OK(ThreadPoolBuilder("raft").Build(&raft_pool_)); CHECK_OK(ThreadPoolBuilder("prepare").set_min_threads(1).Build(&tablet_prepare_pool_)); + CHECK_OK(ThreadPoolBuilder("append").set_min_threads(1).Build(&append_pool_)); } SysCatalogTable::~SysCatalogTable() { @@ -473,7 +474,9 @@ Status SysCatalogTable::OpenTablet(const scoped_refptr& tablet_peer_->status_listener(), tablet_peer_->log_anchor_registry(), tablet_options, - nullptr /* transaction_coordinator_context */ }; + nullptr, // transaction_participant_context + nullptr, // transaction_coordinator_context + append_pool()}; RETURN_NOT_OK(BootstrapTablet(data, &tablet, &log, &consensus_info)); // TODO: Do we have a setSplittable(false) or something from the outside is diff --git a/src/yb/master/sys_catalog.h b/src/yb/master/sys_catalog.h index 0b93ef2605bc..2c885ae65263 100644 --- a/src/yb/master/sys_catalog.h +++ b/src/yb/master/sys_catalog.h @@ -135,6 +135,7 @@ class SysCatalogTable { ThreadPool* raft_pool() const { return raft_pool_.get(); } ThreadPool* tablet_prepare_pool() const { return tablet_prepare_pool_.get(); } + ThreadPool* append_pool() const { return append_pool_.get(); } const scoped_refptr& tablet_peer() const { return tablet_peer_; @@ -222,6 +223,9 @@ class SysCatalogTable { // Thread pool for preparing transactions, shared between all tablets. gscoped_ptr tablet_prepare_pool_; + // Thread pool for appender tasks + gscoped_ptr append_pool_; + scoped_refptr tablet_peer_; Master* master_; diff --git a/src/yb/tablet/tablet_bootstrap-test.cc b/src/yb/tablet/tablet_bootstrap-test.cc index de78c4c6197c..55b4af3107e3 100644 --- a/src/yb/tablet/tablet_bootstrap-test.cc +++ b/src/yb/tablet/tablet_bootstrap-test.cc @@ -126,7 +126,9 @@ class BootstrapTest : public LogTestBase { listener.get(), log_anchor_registry, tablet_options, - nullptr /* transaction_coordinator_context */}; + nullptr, // transaction_participant_context + nullptr, // transaction_coordinator_context + append_pool_.get()}; RETURN_NOT_OK(BootstrapTablet(data, tablet, &log_, boot_info)); return Status::OK(); } diff --git a/src/yb/tablet/tablet_bootstrap.cc b/src/yb/tablet/tablet_bootstrap.cc index ced5b9ddde18..e94802b20e82 100644 --- a/src/yb/tablet/tablet_bootstrap.cc +++ b/src/yb/tablet/tablet_bootstrap.cc @@ -255,7 +255,9 @@ TabletBootstrap::TabletBootstrap(const BootstrapTabletData& data) metric_registry_(data.metric_registry), listener_(data.listener), log_anchor_registry_(data.log_anchor_registry), - tablet_options_(data.tablet_options) {} + tablet_options_(data.tablet_options), + append_pool_(data.append_pool) { +} TabletBootstrap::~TabletBootstrap() {} @@ -479,6 +481,7 @@ Status TabletBootstrap::OpenNewLog() { *tablet_->schema(), tablet_->metadata()->schema_version(), tablet_->GetMetricEntity(), + append_pool_, &log_)); // Disable sync temporarily in order to speed up appends during the bootstrap process. log_->DisableSync(); diff --git a/src/yb/tablet/tablet_bootstrap.h b/src/yb/tablet/tablet_bootstrap.h index b0f2b2a67db8..cbda71d181fc 100644 --- a/src/yb/tablet/tablet_bootstrap.h +++ b/src/yb/tablet/tablet_bootstrap.h @@ -36,6 +36,7 @@ #include "yb/consensus/consensus_meta.h" #include "yb/consensus/opid_util.h" #include "yb/consensus/log_reader.h" +#include "yb/util/threadpool.h" namespace yb { namespace tablet { @@ -153,6 +154,9 @@ class TabletBootstrap { std::unique_ptr cmeta_; TabletOptions tablet_options_; + // Thread pool for append task for bootstrap. + ThreadPool* append_pool_; + // Statistics on the replay of entries in the log. struct Stats { Stats() diff --git a/src/yb/tablet/tablet_bootstrap_if.h b/src/yb/tablet/tablet_bootstrap_if.h index e409e254dd5b..a5dfba8205c5 100644 --- a/src/yb/tablet/tablet_bootstrap_if.h +++ b/src/yb/tablet/tablet_bootstrap_if.h @@ -49,6 +49,8 @@ #include "yb/util/status.h" #include "yb/tablet/tablet_options.h" #include "yb/tablet/tablet_fwd.h" +#include "yb/util/threadpool.h" + namespace yb { @@ -118,6 +120,7 @@ struct BootstrapTabletData { TabletOptions tablet_options; TransactionParticipantContext* transaction_participant_context; TransactionCoordinatorContext* transaction_coordinator_context; + ThreadPool* append_pool; }; // Bootstraps a tablet, initializing it with the provided metadata. If the tablet diff --git a/src/yb/tablet/tablet_peer-test.cc b/src/yb/tablet/tablet_peer-test.cc index 1e459d01ab71..c41e997b139e 100644 --- a/src/yb/tablet/tablet_peer-test.cc +++ b/src/yb/tablet/tablet_peer-test.cc @@ -151,10 +151,14 @@ class TabletPeerTest : public YBTabletTest, consensus::kMinimumTerm, &cmeta)); + ASSERT_OK(ThreadPoolBuilder("append") + .unlimited_threads() + .Build(&append_pool_)); scoped_refptr log; ASSERT_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(), tablet()->metadata()->wal_dir(), *tablet()->schema(), - tablet()->metadata()->schema_version(), metric_entity_.get(), &log)); + tablet()->metadata()->schema_version(), metric_entity_.get(), + append_pool_.get(), &log)); tablet_peer_->SetBootstrapping(); ASSERT_OK(tablet_peer_->InitTabletPeer(tablet(), @@ -252,7 +256,7 @@ class TabletPeerTest : public YBTabletTest, // Assert that the Log GC() anchor is earlier than the latest OpId in the Log. void AssertLogAnchorEarlierThanLogLatest() { int64_t earliest_index = -1; - CHECK_OK(tablet_peer_->GetEarliestNeededLogIndex(&earliest_index)); + ASSERT_OK(tablet_peer_->GetEarliestNeededLogIndex(&earliest_index)); auto last_log_opid = tablet_peer_->log_->GetLatestEntryOpId(); CHECK_LT(earliest_index, last_log_opid.index) << "Expected valid log anchor, got earliest opid: " << earliest_index @@ -276,6 +280,7 @@ class TabletPeerTest : public YBTabletTest, gscoped_ptr apply_pool_; std::unique_ptr raft_pool_; std::unique_ptr tablet_prepare_pool_; + std::unique_ptr append_pool_; scoped_refptr tablet_peer_; TableType table_type_; }; diff --git a/src/yb/tserver/remote_bootstrap_session-test.h b/src/yb/tserver/remote_bootstrap_session-test.h index 7c2d2d86a07d..80ab584bf671 100644 --- a/src/yb/tserver/remote_bootstrap_session-test.h +++ b/src/yb/tserver/remote_bootstrap_session-test.h @@ -90,12 +90,13 @@ class RemoteBootstrapTest : public YBTabletTest { public: explicit RemoteBootstrapTest(TableType table_type) : YBTabletTest(GetSimpleTestSchema(), table_type) { - CHECK_OK(ThreadPoolBuilder("test-exec").Build(&apply_pool_)); - CHECK_OK(ThreadPoolBuilder("raft").Build(&raft_pool_)); - CHECK_OK(ThreadPoolBuilder("prepare").Build(&tablet_prepare_pool_)); } virtual void SetUp() override { + ASSERT_OK(ThreadPoolBuilder("test-exec").Build(&apply_pool_)); + ASSERT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_)); + ASSERT_OK(ThreadPoolBuilder("prepare").Build(&tablet_prepare_pool_)); + ASSERT_OK(ThreadPoolBuilder("append").Build(&append_pool_)); YBTabletTest::SetUp(); SetUpTabletPeer(); ASSERT_NO_FATALS(PopulateTablet()); @@ -111,12 +112,14 @@ class RemoteBootstrapTest : public YBTabletTest { protected: void SetUpTabletPeer() { scoped_refptr log; - CHECK_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(), + ASSERT_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(), fs_manager()->GetFirstTabletWalDirOrDie(tablet()->metadata()->table_id(), tablet()->tablet_id()), *tablet()->schema(), 0, // schema_version - NULL, &log)); + nullptr, // metric_entity + append_pool_.get(), + &log)); scoped_refptr metric_entity = METRIC_ENTITY_tablet.Instantiate(&metric_registry_, CURRENT_TEST_NAME()); @@ -141,7 +144,7 @@ class RemoteBootstrapTest : public YBTabletTest { config.set_opid_index(consensus::kInvalidOpIdIndex); std::unique_ptr cmeta; - CHECK_OK(ConsensusMetadata::Create(tablet()->metadata()->fs_manager(), + ASSERT_OK(ConsensusMetadata::Create(tablet()->metadata()->fs_manager(), tablet()->tablet_id(), fs_manager()->uuid(), config, consensus::kMinimumTerm, &cmeta)); @@ -151,7 +154,7 @@ class RemoteBootstrapTest : public YBTabletTest { log_anchor_registry_.reset(new LogAnchorRegistry()); tablet_peer_->SetBootstrapping(); - CHECK_OK(tablet_peer_->InitTabletPeer(tablet(), + ASSERT_OK(tablet_peer_->InitTabletPeer(tablet(), std::shared_future(), clock(), *messenger, @@ -160,7 +163,7 @@ class RemoteBootstrapTest : public YBTabletTest { raft_pool_.get(), tablet_prepare_pool_.get())); consensus::ConsensusBootstrapInfo boot_info; - CHECK_OK(tablet_peer_->Start(boot_info)); + ASSERT_OK(tablet_peer_->Start(boot_info)); ASSERT_OK(tablet_peer_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(2))); @@ -197,7 +200,7 @@ class RemoteBootstrapTest : public YBTabletTest { virtual void InitSession() { session_.reset(new YB_EDITION_NS_PREFIX RemoteBootstrapSession( tablet_peer_.get(), "TestSession", "FakeUUID", fs_manager())); - CHECK_OK(session_->Init()); + ASSERT_OK(session_->Init()); } // Read the specified BlockId, via the RemoteBootstrapSession, into a file. @@ -209,7 +212,7 @@ class RemoteBootstrapTest : public YBTabletTest { string data; int64_t block_file_size = 0; RemoteBootstrapErrorPB::Code error_code; - CHECK_OK(session_->GetBlockPiece(block_id, 0, 0, &data, &block_file_size, &error_code)); + ASSERT_OK(session_->GetBlockPiece(block_id, 0, 0, &data, &block_file_size, &error_code)); if (block_file_size > 0) { CHECK_GT(data.size(), 0); } @@ -218,11 +221,11 @@ class RemoteBootstrapTest : public YBTabletTest { WritableFileOptions opts; string path_template = GetTestPath(Substitute("test_block_$0.tmp.XXXXXX", block_id.ToString())); gscoped_ptr writable_file; - CHECK_OK(Env::Default()->NewTempWritableFile(opts, path_template, path, &writable_file)); - CHECK_OK(writable_file->Append(Slice(data.data(), data.size()))); - CHECK_OK(writable_file->Close()); + ASSERT_OK(Env::Default()->NewTempWritableFile(opts, path_template, path, &writable_file)); + ASSERT_OK(writable_file->Append(Slice(data.data(), data.size()))); + ASSERT_OK(writable_file->Close()); - CHECK_OK(Env::Default()->NewSequentialFile(*path, file)); + ASSERT_OK(Env::Default()->NewSequentialFile(*path, file)); } MetricRegistry metric_registry_; @@ -230,6 +233,7 @@ class RemoteBootstrapTest : public YBTabletTest { gscoped_ptr apply_pool_; unique_ptr raft_pool_; unique_ptr tablet_prepare_pool_; + unique_ptr append_pool_; scoped_refptr tablet_peer_; scoped_refptr session_; }; diff --git a/src/yb/tserver/ts_tablet_manager.cc b/src/yb/tserver/ts_tablet_manager.cc index ef0b3ba86cc7..9cada472f673 100644 --- a/src/yb/tserver/ts_tablet_manager.cc +++ b/src/yb/tserver/ts_tablet_manager.cc @@ -313,11 +313,14 @@ TSTabletManager::TSTabletManager(FsManager* fs_manager, // However, the effective upper bound is the number of replicas as each will // submit its own tasks via a dedicated token. CHECK_OK(ThreadPoolBuilder("raft") - .set_max_threads(std::numeric_limits::max()) + .unlimited_threads() .Build(&raft_pool_)); CHECK_OK(ThreadPoolBuilder("prepare") - .set_max_threads(std::numeric_limits::max()) + .unlimited_threads() .Build(&tablet_prepare_pool_)); + CHECK_OK(ThreadPoolBuilder("append") + .unlimited_threads() + .Build(&append_pool_)); ThreadPoolMetrics read_metrics = { METRIC_op_read_queue_length.Instantiate(server_->metric_entity()), METRIC_op_read_queue_time.Instantiate(server_->metric_entity()), @@ -910,7 +913,8 @@ void TSTabletManager::OpenTablet(const scoped_refptr& meta, tablet_peer->log_anchor_registry(), tablet_options_, tablet_peer.get(), - tablet_peer.get()}; + tablet_peer.get(), + append_pool()}; s = BootstrapTablet(data, &tablet, &log, &bootstrap_info); if (!s.ok()) { LOG(ERROR) << kLogPrefix << "Tablet failed to bootstrap: " @@ -930,7 +934,7 @@ void TSTabletManager::OpenTablet(const scoped_refptr& meta, log, tablet->GetMetricEntity(), raft_pool(), - tablet_prepare_pool()); + append_pool()); if (!s.ok()) { LOG(ERROR) << kLogPrefix << "Tablet failed to init: " @@ -1013,6 +1017,9 @@ void TSTabletManager::Shutdown() { if (tablet_prepare_pool_) { tablet_prepare_pool_->Shutdown(); } + if (append_pool_) { + append_pool_->Shutdown(); + } { std::lock_guard l(lock_); diff --git a/src/yb/tserver/ts_tablet_manager.h b/src/yb/tserver/ts_tablet_manager.h index 474f53be4f09..2ad3cb405a63 100644 --- a/src/yb/tserver/ts_tablet_manager.h +++ b/src/yb/tserver/ts_tablet_manager.h @@ -147,6 +147,7 @@ class TSTabletManager : public tserver::TabletPeerLookupIf { ThreadPool* tablet_prepare_pool() const { return tablet_prepare_pool_.get(); } ThreadPool* raft_pool() const { return raft_pool_.get(); } ThreadPool* read_pool() const { return read_pool_.get(); } + ThreadPool* append_pool() const { return append_pool_.get(); } // Create a new tablet and register it with the tablet manager. The new tablet // is persisted on disk and opened before this method returns. @@ -434,6 +435,9 @@ class TSTabletManager : public tserver::TabletPeerLookupIf { // Thread pool for Raft-related operations, shared between all tablets. std::unique_ptr raft_pool_; + // Thread pool for appender threads, shared between all tablets. + std::unique_ptr append_pool_; + // Thread pool for read ops, that are run in parallel, shared between all tablets. std::unique_ptr read_pool_; diff --git a/src/yb/util/CMakeLists.txt b/src/yb/util/CMakeLists.txt index 161ce64018fa..121ffe61f346 100644 --- a/src/yb/util/CMakeLists.txt +++ b/src/yb/util/CMakeLists.txt @@ -200,7 +200,6 @@ set(UTIL_SRCS subprocess.cc sync_point.cc test_graph.cc - taskstream.cc thread.cc thread_restrictions.cc threadlocal.cc diff --git a/src/yb/util/taskstream.h b/src/yb/util/taskstream.h index f3a7a6a8d6bb..86af5946a882 100644 --- a/src/yb/util/taskstream.h +++ b/src/yb/util/taskstream.h @@ -49,4 +49,7 @@ class TaskStream { }; } // namespace yb + +#include "yb/util/taskstream_impl.h" + #endif // YB_UTIL_TASKSTREAM_H diff --git a/src/yb/util/taskstream.cc b/src/yb/util/taskstream_impl.h similarity index 95% rename from src/yb/util/taskstream.cc rename to src/yb/util/taskstream_impl.h index 4c4ef550bae1..5d7042bc08f1 100644 --- a/src/yb/util/taskstream.cc +++ b/src/yb/util/taskstream_impl.h @@ -11,6 +11,9 @@ // under the License. // +#ifndef YB_UTIL_TASKSTREAM_IMPL_H +#define YB_UTIL_TASKSTREAM_IMPL_H + #include #include #include @@ -193,11 +196,6 @@ template Status TaskStream::Submit(T* item) { return impl_->Submit(item); } -template TaskStream::TaskStream(std::function process_item, - ThreadPool* thread_pool); -template TaskStream::~TaskStream(); -template Status TaskStream::Start(); -template void TaskStream::Stop(); -template Status TaskStream::Submit(int* item); } // namespace yb +#endif // YB_UTIL_TASKSTREAM_IMPL_H diff --git a/src/yb/util/threadpool.cc b/src/yb/util/threadpool.cc index 4994350bad83..1896ccb53324 100644 --- a/src/yb/util/threadpool.cc +++ b/src/yb/util/threadpool.cc @@ -95,6 +95,11 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) { return *this; } +ThreadPoolBuilder& ThreadPoolBuilder::unlimited_threads() { + max_threads_ = std::numeric_limits::max(); + return *this; +} + ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) { CHECK_GT(max_queue_size, 0); max_queue_size_ = max_queue_size; diff --git a/src/yb/util/threadpool.h b/src/yb/util/threadpool.h index af586ddc93e4..accb5af9d555 100644 --- a/src/yb/util/threadpool.h +++ b/src/yb/util/threadpool.h @@ -111,6 +111,7 @@ class ThreadPoolBuilder { // in order to provide traditional Builder pattern conveniences. ThreadPoolBuilder& set_min_threads(int min_threads); ThreadPoolBuilder& set_max_threads(int max_threads); + ThreadPoolBuilder& unlimited_threads(); ThreadPoolBuilder& set_max_queue_size(int max_queue_size); ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout); ThreadPoolBuilder& set_metrics(ThreadPoolMetrics metrics);