diff --git a/src/yb/tablet/tablet.cc b/src/yb/tablet/tablet.cc index 7daa0037e519..3a71d68b3321 100644 --- a/src/yb/tablet/tablet.cc +++ b/src/yb/tablet/tablet.cc @@ -124,6 +124,13 @@ TAG_FLAG(tablet_bloom_target_fp_rate, advanced); METRIC_DEFINE_entity(tablet); +// TODO: use a lower default for truncate / snapshot restore Raft operations. The one-minute timeout +// is probably OK for shutdown. +DEFINE_int32(tablet_rocksdb_ops_quiet_down_timeout_ms, 60000, + "Max amount of time we can wait for read/write operations on RocksDB to finish " + "so that we can perform exclusive-ownership operations on RocksDB, such as removing " + "all data in the tablet by replacing the RocksDB instance with an empty one."); + using namespace std::placeholders; using std::shared_ptr; @@ -464,6 +471,12 @@ void Tablet::Shutdown() { // Shutdown the RocksDB instance for this table, if present. rocksdb_.reset(); state_ = kShutdown; + + // Release the mutex that prevents snapshot restore / truncate operations from running. Such + // operations are no longer possible because the tablet has shut down. When we start the + // "read/write operation pause", we incremented the "exclusive operation" counter. This will + // prevent us from decrementing that counter back, disabling read/write operations permanently. + op_pause.ReleaseMutexButKeepDisabled(); } Result> Tablet::NewRowIterator( @@ -665,6 +678,7 @@ Status Tablet::KeyValueBatchFromRedisWriteBatch(const WriteOperationData& data) Status Tablet::HandleRedisReadRequest(const ReadHybridTime& read_time, const RedisReadRequestPB& redis_read_request, RedisResponsePB* response) { + // TODO: move this locking to the top-level read request handler in TabletService. ScopedPendingOperation scoped_read_operation(&pending_op_counter_); RETURN_NOT_OK(scoped_read_operation); @@ -1080,12 +1094,14 @@ Status Tablet::AlterSchema(AlterSchemaOperationState *operation_state) { return Status::OK(); } -Result Tablet::PauseReadWriteOperations() { +ScopedPendingOperationPause Tablet::PauseReadWriteOperations() { LOG_SLOW_EXECUTION(WARNING, 1000, Substitute("Tablet $0: Waiting for pending ops to complete", tablet_id())) { - return ScopedPendingOperationPause(&pending_op_counter_, 60s); + return ScopedPendingOperationPause( + &pending_op_counter_, + MonoDelta::FromMilliseconds(FLAGS_tablet_rocksdb_ops_quiet_down_timeout_ms)); } - return STATUS(InternalError, "unexpected return"); // should not happen + FATAL_ERROR("Unreachable code -- the previous block must always return"); } Status Tablet::SetFlushedFrontier(const docdb::ConsensusFrontier& frontier) { diff --git a/src/yb/tablet/tablet.h b/src/yb/tablet/tablet.h index 3914ed6ffcbf..2256bc5e8b0a 100644 --- a/src/yb/tablet/tablet.h +++ b/src/yb/tablet/tablet.h @@ -470,7 +470,7 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier { const boost::optional& transaction_id) const; // Pause any new read/write operations and wait for all pending read/write operations to finish. - Result PauseReadWriteOperations(); + util::ScopedPendingOperationPause PauseReadWriteOperations(); // Initialize RocksDB's max persistent op id and hybrid time to that of the operation state. // Necessary for cases like truncate or restore snapshot when RocksDB is reset. diff --git a/src/yb/tserver/ts_tablet_manager.cc b/src/yb/tserver/ts_tablet_manager.cc index 92be76c401d4..e04254ea2f11 100644 --- a/src/yb/tserver/ts_tablet_manager.cc +++ b/src/yb/tserver/ts_tablet_manager.cc @@ -842,7 +842,7 @@ Status TSTabletManager::DeleteTablet( meta->table_type(), meta->data_root_dir(), meta->wal_root_dir()); -} + } return Status::OK(); } diff --git a/src/yb/util/pending_op_counter.cc b/src/yb/util/pending_op_counter.cc index a50e80f06a6e..6f77ca60b93a 100644 --- a/src/yb/util/pending_op_counter.cc +++ b/src/yb/util/pending_op_counter.cc @@ -26,7 +26,7 @@ uint64_t PendingOperationCounter::Update(uint64_t delta) { const uint64_t result = counters_.fetch_add(delta, std::memory_order::memory_order_release); VLOG(2) << "[" << this << "] Update(" << static_cast(delta) << "), result = " << result; // Ensure that there is no underflow in either counter. - DCHECK_EQ((result & (1ull << 63)), 0); // Counter of Disable() calls. + DCHECK_EQ((result & (1ull << 63)), 0); // Counter of DisableAndWaitForOps() calls. DCHECK_EQ((result & (kDisabledDelta >> 1)), 0); // Counter of pending operations. return result; } diff --git a/src/yb/util/pending_op_counter.h b/src/yb/util/pending_op_counter.h index 257d6c958d48..63a59fbcf3a4 100644 --- a/src/yb/util/pending_op_counter.h +++ b/src/yb/util/pending_op_counter.h @@ -23,32 +23,36 @@ namespace yb { namespace util { -// This is used to track the number of pending operations using a certain resource (e.g. -// a RocksDB database) so we can safely wait for all operations to complete and destroy the -// resource. +// This is used to track the number of pending operations using a certain resource (as of Apr 2018 +// just the RocksDB database within a tablet) so we can safely wait for all operations to complete +// and destroy or replace the resource. This is similar to a shared mutex, but allows fine-grained +// control, such as preventing new operations from being started. class PendingOperationCounter { public: // Using upper bits of counter as special flags. - static constexpr uint64_t kDisabledDelta = 0x0001000000000000; + static constexpr uint64_t kDisabledDelta = 1ull << 48; static constexpr uint64_t kOpCounterMask = kDisabledDelta - 1; static constexpr uint64_t kDisabledCounterMask = ~kOpCounterMask; - PendingOperationCounter() : counters_(0) {} - CHECKED_STATUS DisableAndWaitForOps(const MonoDelta& timeout) { Update(kDisabledDelta); return WaitForOpsToFinish(timeout); } - // Due to the thread restriction of "timed_mutex::.unlock()", this Unlock method must be called - // in the same thread that invoked DisableAndWaitForOps(). + // Due to the thread restriction of "timed_mutex::unlock()", this Unlock method must be called + // in the same thread that invoked DisableAndWaitForOps(). This is fine for truncate, snapshot + // restore, and tablet shutdown operations. void Enable(const bool unlock) { Update(-kDisabledDelta); if (unlock) { - disable_.unlock(); + UnlockExclusiveOpMutex(); } } + void UnlockExclusiveOpMutex() { + disable_.unlock(); + } + uint64_t Increment() { return Update(1); } void Decrement() { Update(-1); } uint64_t Get() const { @@ -69,14 +73,22 @@ class PendingOperationCounter { uint64_t Update(uint64_t delta); - // Upper bits are used for storing number of Disable() calls. - std::atomic counters_; + // The upper 16 bits are used for storing the number of separate operations that have disabled the + // resource. E.g. tablet shutdown running at the same time with Truncate/RestoreSnapshot. + // The lower 48 bits are used to keep track of the number of concurrent read/write operations. + std::atomic counters_{0}; - // Mutex to disable the resource exclusively. + // Mutex to disable the resource exclusively. This mutex is locked by DisableAndWaitForOps after + // waiting for all shared-ownership operations to complete. We need this to avoid a race condition + // between Raft operations that replace RocksDB (apply snapshot / truncate) and tablet shutdown. std::timed_mutex disable_; }; -// A convenience class to automatically increment/decrement a PendingOperationCounter. +// A convenience class to automatically increment/decrement a PendingOperationCounter. This is used +// for regular RocksDB read/write operations that are allowed to proceed in parallel. Constructing +// a ScopedPendingOperation might fail because the counter is in the disabled state. An instance +// of this class resembles a Result or a Status, because it can be used with the RETURN_NOT_OK +// macro. class ScopedPendingOperation { public: // Object is not copyable, but movable. @@ -84,20 +96,22 @@ class ScopedPendingOperation { ScopedPendingOperation(const ScopedPendingOperation&) = delete; explicit ScopedPendingOperation(PendingOperationCounter* counter) - : counter_(counter), - orig_counter_value_(0) { + : counter_(counter), ok_(false) { if (counter != nullptr) { if (counter_->IsReady()) { - orig_counter_value_ = counter->Increment(); + // The race condition between IsReady() and Increment() is OK, because we are checking if + // anyone has started an exclusive operation since we did the increment, and don't proceed + // with this shared-ownership operation in that case. + ok_ = (counter->Increment() & PendingOperationCounter::kDisabledCounterMask) == 0; } else { - orig_counter_value_ = PendingOperationCounter::kDisabledDelta; + ok_ = false; counter_ = nullptr; // Avoid decrementing the counter. } } } ScopedPendingOperation(ScopedPendingOperation&& op) - : counter_(op.counter_), orig_counter_value_(op.orig_counter_value_) { + : counter_(op.counter_), ok_(op.ok_) { op.counter_ = nullptr; // Moved ownership. } @@ -108,16 +122,18 @@ class ScopedPendingOperation { } bool ok() const { - return (orig_counter_value_ & PendingOperationCounter::kDisabledCounterMask) == 0; + return ok_; } private: PendingOperationCounter* counter_; - // Store in constructor original counter value to be able checking it later in ok(). - uint64_t orig_counter_value_; + + bool ok_; }; // RETURN_NOT_OK macro support. +// The error message currently mentions RocksDB because that is the only type of resource that +// this framework is used to protect as of Apr 2018. inline Status MoveStatus(const ScopedPendingOperation& scoped) { return scoped.ok() ? Status::OK() : STATUS(Busy, "RocksDB store is busy"); } @@ -141,6 +157,18 @@ class ScopedPendingOperationPause { p.counter_ = nullptr; // Moved ownership. } + // This is called during tablet shutdown to release the mutex that we took to prevent concurrent + // exclusive-ownership operations on the RocksDB instance, such as truncation and snapshot + // restoration. It is fine to release the mutex because these exclusive operations are not allowed + // to happen after tablet shutdown anyway. + void ReleaseMutexButKeepDisabled() { + CHECK_OK(status_); + CHECK_NOTNULL(counter_); + counter_->UnlockExclusiveOpMutex(); + // Make sure the destructor has no effect when it runs. + counter_ = nullptr; + } + // See PendingOperationCounter::Enable() for the thread restriction. ~ScopedPendingOperationPause() { if (counter_ != nullptr) {