Skip to content

Commit

Permalink
Keep pending operation counter disabled on tablet shutdown
Browse files Browse the repository at this point in the history
Summary: Keep pending operation counter disabled on tablet shutdown

Test Plan: Jenkins

Reviewers: robert, sergei

Reviewed By: sergei

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D4644
  • Loading branch information
mbautin committed Apr 19, 2018
1 parent bd39517 commit 92ac433
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 27 deletions.
22 changes: 19 additions & 3 deletions src/yb/tablet/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ TAG_FLAG(tablet_bloom_target_fp_rate, advanced);

METRIC_DEFINE_entity(tablet);

// TODO: use a lower default for truncate / snapshot restore Raft operations. The one-minute timeout
// is probably OK for shutdown.
DEFINE_int32(tablet_rocksdb_ops_quiet_down_timeout_ms, 60000,
"Max amount of time we can wait for read/write operations on RocksDB to finish "
"so that we can perform exclusive-ownership operations on RocksDB, such as removing "
"all data in the tablet by replacing the RocksDB instance with an empty one.");

using namespace std::placeholders;

using std::shared_ptr;
Expand Down Expand Up @@ -464,6 +471,12 @@ void Tablet::Shutdown() {
// Shutdown the RocksDB instance for this table, if present.
rocksdb_.reset();
state_ = kShutdown;

// Release the mutex that prevents snapshot restore / truncate operations from running. Such
// operations are no longer possible because the tablet has shut down. When we start the
// "read/write operation pause", we incremented the "exclusive operation" counter. This will
// prevent us from decrementing that counter back, disabling read/write operations permanently.
op_pause.ReleaseMutexButKeepDisabled();
}

Result<std::unique_ptr<common::YQLRowwiseIteratorIf>> Tablet::NewRowIterator(
Expand Down Expand Up @@ -665,6 +678,7 @@ Status Tablet::KeyValueBatchFromRedisWriteBatch(const WriteOperationData& data)
Status Tablet::HandleRedisReadRequest(const ReadHybridTime& read_time,
const RedisReadRequestPB& redis_read_request,
RedisResponsePB* response) {
// TODO: move this locking to the top-level read request handler in TabletService.
ScopedPendingOperation scoped_read_operation(&pending_op_counter_);
RETURN_NOT_OK(scoped_read_operation);

Expand Down Expand Up @@ -1080,12 +1094,14 @@ Status Tablet::AlterSchema(AlterSchemaOperationState *operation_state) {
return Status::OK();
}

Result<ScopedPendingOperationPause> Tablet::PauseReadWriteOperations() {
ScopedPendingOperationPause Tablet::PauseReadWriteOperations() {
LOG_SLOW_EXECUTION(WARNING, 1000,
Substitute("Tablet $0: Waiting for pending ops to complete", tablet_id())) {
return ScopedPendingOperationPause(&pending_op_counter_, 60s);
return ScopedPendingOperationPause(
&pending_op_counter_,
MonoDelta::FromMilliseconds(FLAGS_tablet_rocksdb_ops_quiet_down_timeout_ms));
}
return STATUS(InternalError, "unexpected return"); // should not happen
FATAL_ERROR("Unreachable code -- the previous block must always return");
}

Status Tablet::SetFlushedFrontier(const docdb::ConsensusFrontier& frontier) {
Expand Down
2 changes: 1 addition & 1 deletion src/yb/tablet/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier {
const boost::optional<TransactionId>& transaction_id) const;

// Pause any new read/write operations and wait for all pending read/write operations to finish.
Result<util::ScopedPendingOperationPause> PauseReadWriteOperations();
util::ScopedPendingOperationPause PauseReadWriteOperations();

// Initialize RocksDB's max persistent op id and hybrid time to that of the operation state.
// Necessary for cases like truncate or restore snapshot when RocksDB is reset.
Expand Down
2 changes: 1 addition & 1 deletion src/yb/tserver/ts_tablet_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ Status TSTabletManager::DeleteTablet(
meta->table_type(),
meta->data_root_dir(),
meta->wal_root_dir());
}
}

return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion src/yb/util/pending_op_counter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ uint64_t PendingOperationCounter::Update(uint64_t delta) {
const uint64_t result = counters_.fetch_add(delta, std::memory_order::memory_order_release);
VLOG(2) << "[" << this << "] Update(" << static_cast<int64_t>(delta) << "), result = " << result;
// Ensure that there is no underflow in either counter.
DCHECK_EQ((result & (1ull << 63)), 0); // Counter of Disable() calls.
DCHECK_EQ((result & (1ull << 63)), 0); // Counter of DisableAndWaitForOps() calls.
DCHECK_EQ((result & (kDisabledDelta >> 1)), 0); // Counter of pending operations.
return result;
}
Expand Down
70 changes: 49 additions & 21 deletions src/yb/util/pending_op_counter.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,36 @@
namespace yb {
namespace util {

// This is used to track the number of pending operations using a certain resource (e.g.
// a RocksDB database) so we can safely wait for all operations to complete and destroy the
// resource.
// This is used to track the number of pending operations using a certain resource (as of Apr 2018
// just the RocksDB database within a tablet) so we can safely wait for all operations to complete
// and destroy or replace the resource. This is similar to a shared mutex, but allows fine-grained
// control, such as preventing new operations from being started.
class PendingOperationCounter {
public:
// Using upper bits of counter as special flags.
static constexpr uint64_t kDisabledDelta = 0x0001000000000000;
static constexpr uint64_t kDisabledDelta = 1ull << 48;
static constexpr uint64_t kOpCounterMask = kDisabledDelta - 1;
static constexpr uint64_t kDisabledCounterMask = ~kOpCounterMask;

PendingOperationCounter() : counters_(0) {}

CHECKED_STATUS DisableAndWaitForOps(const MonoDelta& timeout) {
Update(kDisabledDelta);
return WaitForOpsToFinish(timeout);
}

// Due to the thread restriction of "timed_mutex::.unlock()", this Unlock method must be called
// in the same thread that invoked DisableAndWaitForOps().
// Due to the thread restriction of "timed_mutex::unlock()", this Unlock method must be called
// in the same thread that invoked DisableAndWaitForOps(). This is fine for truncate, snapshot
// restore, and tablet shutdown operations.
void Enable(const bool unlock) {
Update(-kDisabledDelta);
if (unlock) {
disable_.unlock();
UnlockExclusiveOpMutex();
}
}

void UnlockExclusiveOpMutex() {
disable_.unlock();
}

uint64_t Increment() { return Update(1); }
void Decrement() { Update(-1); }
uint64_t Get() const {
Expand All @@ -69,35 +73,45 @@ class PendingOperationCounter {

uint64_t Update(uint64_t delta);

// Upper bits are used for storing number of Disable() calls.
std::atomic<uint64_t> counters_;
// The upper 16 bits are used for storing the number of separate operations that have disabled the
// resource. E.g. tablet shutdown running at the same time with Truncate/RestoreSnapshot.
// The lower 48 bits are used to keep track of the number of concurrent read/write operations.
std::atomic<uint64_t> counters_{0};

// Mutex to disable the resource exclusively.
// Mutex to disable the resource exclusively. This mutex is locked by DisableAndWaitForOps after
// waiting for all shared-ownership operations to complete. We need this to avoid a race condition
// between Raft operations that replace RocksDB (apply snapshot / truncate) and tablet shutdown.
std::timed_mutex disable_;
};

// A convenience class to automatically increment/decrement a PendingOperationCounter.
// A convenience class to automatically increment/decrement a PendingOperationCounter. This is used
// for regular RocksDB read/write operations that are allowed to proceed in parallel. Constructing
// a ScopedPendingOperation might fail because the counter is in the disabled state. An instance
// of this class resembles a Result or a Status, because it can be used with the RETURN_NOT_OK
// macro.
class ScopedPendingOperation {
public:
// Object is not copyable, but movable.
void operator=(const ScopedPendingOperation&) = delete;
ScopedPendingOperation(const ScopedPendingOperation&) = delete;

explicit ScopedPendingOperation(PendingOperationCounter* counter)
: counter_(counter),
orig_counter_value_(0) {
: counter_(counter), ok_(false) {
if (counter != nullptr) {
if (counter_->IsReady()) {
orig_counter_value_ = counter->Increment();
// The race condition between IsReady() and Increment() is OK, because we are checking if
// anyone has started an exclusive operation since we did the increment, and don't proceed
// with this shared-ownership operation in that case.
ok_ = (counter->Increment() & PendingOperationCounter::kDisabledCounterMask) == 0;
} else {
orig_counter_value_ = PendingOperationCounter::kDisabledDelta;
ok_ = false;
counter_ = nullptr; // Avoid decrementing the counter.
}
}
}

ScopedPendingOperation(ScopedPendingOperation&& op)
: counter_(op.counter_), orig_counter_value_(op.orig_counter_value_) {
: counter_(op.counter_), ok_(op.ok_) {
op.counter_ = nullptr; // Moved ownership.
}

Expand All @@ -108,16 +122,18 @@ class ScopedPendingOperation {
}

bool ok() const {
return (orig_counter_value_ & PendingOperationCounter::kDisabledCounterMask) == 0;
return ok_;
}

private:
PendingOperationCounter* counter_;
// Store in constructor original counter value to be able checking it later in ok().
uint64_t orig_counter_value_;

bool ok_;
};

// RETURN_NOT_OK macro support.
// The error message currently mentions RocksDB because that is the only type of resource that
// this framework is used to protect as of Apr 2018.
inline Status MoveStatus(const ScopedPendingOperation& scoped) {
return scoped.ok() ? Status::OK() : STATUS(Busy, "RocksDB store is busy");
}
Expand All @@ -141,6 +157,18 @@ class ScopedPendingOperationPause {
p.counter_ = nullptr; // Moved ownership.
}

// This is called during tablet shutdown to release the mutex that we took to prevent concurrent
// exclusive-ownership operations on the RocksDB instance, such as truncation and snapshot
// restoration. It is fine to release the mutex because these exclusive operations are not allowed
// to happen after tablet shutdown anyway.
void ReleaseMutexButKeepDisabled() {
CHECK_OK(status_);
CHECK_NOTNULL(counter_);
counter_->UnlockExclusiveOpMutex();
// Make sure the destructor has no effect when it runs.
counter_ = nullptr;
}

// See PendingOperationCounter::Enable() for the thread restriction.
~ScopedPendingOperationPause() {
if (counter_ != nullptr) {
Expand Down

0 comments on commit 92ac433

Please sign in to comment.