Skip to content

Commit

Permalink
ENG-4717: #891: Not submitted write operation should block tablet pee…
Browse files Browse the repository at this point in the history
…r shutdown

Summary:
When a write operation completes, even with failure, it releases acquired resources.
So the tablet peer should exist while the operation is alive, otherwise the tablet server could crash when the operation releases locks.

Fixed by adding a counter of preparing operations.

Test Plan: ybd --java-test org.yb.cql.TestIndex#testDropDuringWrite

Reviewers: bogdan, mikhail

Reviewed By: mikhail

Subscribers: bharat, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D6221
  • Loading branch information
spolitov committed Feb 27, 2019
1 parent e90080b commit 3f4e225
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 9 deletions.
9 changes: 8 additions & 1 deletion java/yb-client/src/test/java/org/yb/util/CoreFileUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

public final class CoreFileUtil {

public static final boolean IS_MAC =
System.getProperty("os.name").toLowerCase().startsWith("mac os x");

private CoreFileUtil() {
}

Expand Down Expand Up @@ -63,7 +66,11 @@ public static void processCoreFile(int pid,
List<String> coreFileBasenames = new ArrayList<>();

if (coreFileDir == null) {
coreFileDir = new File(System.getProperty("user.dir"));
if (IS_MAC) {
coreFileDir = new File("/cores");
} else {
coreFileDir = new File(System.getProperty("user.dir"));
}
}

if (matchMode == CoreFileMatchMode.ANY_CORE_FILE) {
Expand Down
38 changes: 38 additions & 0 deletions java/yb-cql/src/test/java/org/yb/cql/TestIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -939,4 +939,42 @@ public void testPagingSelect() throws Exception {
"Row[2, 2, 4, d]",
"Row[3, 1, 5, e]")));
}

@Test
public void testDropDuringWrite() throws Exception {
for (int i = 0; i != 5; ++i) {
String table_name = "index_test_" + i;
String index_name = "index_" + i;
session.execute(String.format(
"create table %s (h int, c int, primary key ((h))) " +
"with transactions = { 'enabled' : true };", table_name));
session.execute(String.format("create index %s on %s (c);", index_name, table_name));
final PreparedStatement statement = session.prepare(String.format(
"insert into %s (h, c) values (?, ?);", table_name));

List<Thread> threads = new ArrayList<Thread>();
while (threads.size() != 10) {
Thread thread = new Thread(() -> {
int key = 0;
while (!Thread.interrupted()) {
session.execute(statement.bind(Integer.valueOf(key), Integer.valueOf(-key)));
++key;
}
});
thread.start();
threads.add(thread);
}
try {
Thread.sleep(5000);
session.execute(String.format("drop table %s;", table_name));
} finally {
for (Thread thread : threads) {
thread.interrupt();
}
for (Thread thread : threads) {
thread.join();
}
}
}
}
}
4 changes: 4 additions & 0 deletions src/yb/client/client-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,10 @@ Status YBClient::Data::IsDeleteTableInProgress(YBClient* client,
// compiler complains.
RETURN_NOT_OK(s);
if (resp.has_error()) {
if (resp.error().code() == MasterErrorPB::TABLE_NOT_FOUND) {
*delete_in_progress = false;
return Status::OK();
}
return StatusFromPB(resp.error().status());
}

Expand Down
7 changes: 7 additions & 0 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3092,6 +3092,13 @@ Status CatalogManager::IsDeleteTableDone(const IsDeleteTableDoneRequestPB* req,
} else {
LOG(INFO) << "Servicing IsDeleteTableDone request for table id "
<< req->table_id() << ": deleting tablets";
std::vector<std::shared_ptr<TSDescriptor>> descs;
master_->ts_manager()->GetAllDescriptors(&descs);
for (auto& ts_desc : descs) {
LOG(INFO) << "Deleting on " << ts_desc->permanent_uuid() << ": "
<< ts_desc->PendingTabletDeleteToString();
}

resp->set_done(false);
}

Expand Down
5 changes: 5 additions & 0 deletions src/yb/master/ts_descriptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ bool TSDescriptor::IsTabletDeletePending(const std::string& tablet_id) const {
return tablets_pending_delete_.count(tablet_id);
}

std::string TSDescriptor::PendingTabletDeleteToString() const {
std::lock_guard<simple_spinlock> l(lock_);
return yb::ToString(tablets_pending_delete_);
}

void TSDescriptor::AddPendingTabletDelete(const std::string& tablet_id) {
std::lock_guard<simple_spinlock> l(lock_);
tablets_pending_delete_.insert(tablet_id);
Expand Down
1 change: 1 addition & 0 deletions src/yb/master/ts_descriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ class TSDescriptor {
bool IsTabletDeletePending(const std::string& tablet_id) const;
void AddPendingTabletDelete(const std::string& tablet_id);
void ClearPendingTabletDelete(const std::string& tablet_id);
std::string PendingTabletDeleteToString() const;

std::string ToString() const;

Expand Down
1 change: 1 addition & 0 deletions src/yb/tablet/local_tablet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class LocalTabletWriter : public WriteOperationContext {

private:
void Submit(std::unique_ptr<Operation> operation, int64_t term) override;
void Aborted(Operation* operation) override {}
HybridTime ReportReadRestart() override;

Tablet* const tablet_;
Expand Down
14 changes: 13 additions & 1 deletion src/yb/tablet/operations/write_operation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ WriteOperation::WriteOperation(
std::unique_ptr<WriteOperationState> state, int64_t term, CoarseTimePoint deadline,
WriteOperationContext* context)
: Operation(std::move(state), OperationType::kWrite),
context_(*context), term_(term), deadline_(deadline), start_time_(MonoTime::Now()) {
context_(*context), term_(term), deadline_(deadline), start_time_(MonoTime::Now()),
// If term is unknown, it means that we are creating operation at replica.
// So it was already submitted at leader.
submitted_(term == yb::OpId::kUnknownTerm) {
}

consensus::ReplicateMsgPtr WriteOperation::NewReplicateMsg() {
Expand Down Expand Up @@ -152,6 +155,14 @@ string WriteOperation::ToString() const {
abs_time_formatted, state()->ToString());
}

WriteOperation::~WriteOperation() {
// If operation was submitted, then it's lifetime is controlled by operation tracker and we
// don't have to do it here.
if (!submitted_) {
context_.Aborted(this);
}
}

void WriteOperation::DoStartSynchronization(const Status& status) {
std::unique_ptr<WriteOperation> self(this);
// If a restart read is required, then we return this fact to caller and don't perform the write
Expand All @@ -171,6 +182,7 @@ void WriteOperation::DoStartSynchronization(const Status& status) {
return;
}

self->submitted_ = true;
context_.Submit(std::move(self), term_);
}

Expand Down
6 changes: 6 additions & 0 deletions src/yb/tablet/operations/write_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ class WriteOperationContext {
public:
// When operation completes, its callback is executed.
virtual void Submit(std::unique_ptr<Operation> operation, int64_t term) = 0;
virtual void Aborted(Operation* operation) = 0;
virtual HybridTime ReportReadRestart() = 0;

virtual ~WriteOperationContext() {}
Expand All @@ -203,6 +204,8 @@ class WriteOperation : public Operation {
WriteOperation(std::unique_ptr<WriteOperationState> operation_state, int64_t term,
CoarseTimePoint deadline, WriteOperationContext* context);

~WriteOperation();

WriteOperationState* state() override {
return down_cast<WriteOperationState*>(Operation::state());
}
Expand Down Expand Up @@ -295,6 +298,9 @@ class WriteOperation : public Operation {

docdb::DocOperations doc_ops_;

// True if operation was submitted, i.e. context_.Submit(this) was invoked.
bool submitted_;

Tablet* tablet() { return state()->tablet(); }

DISALLOW_COPY_AND_ASSIGN(WriteOperation);
Expand Down
4 changes: 2 additions & 2 deletions src/yb/tablet/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,7 @@ void Tablet::UpdateQLIndexes(std::unique_ptr<WriteOperation> operation) {

if (index_response->status() != QLResponsePB::YQL_STATUS_OK) {
response->set_status(index_response->status());
response->set_error_message(std::move(index_response->error_message()));
response->set_error_message(std::move(*index_response->mutable_error_message()));
}
if (txn) {
*response->mutable_child_transaction_result() = child_result;
Expand Down Expand Up @@ -1247,7 +1247,7 @@ void Tablet::AcquireLocksAndPerformDocOperations(std::unique_ptr<WriteOperation>
return;
}

WriteRequestPB* key_value_write_request = operation->state()->mutable_request();
const WriteRequestPB* key_value_write_request = operation->state()->request();

if (!key_value_write_request->redis_write_batch().empty()) {
auto status = KeyValueBatchFromRedisWriteBatch(operation.get());
Expand Down
31 changes: 27 additions & 4 deletions src/yb/tablet/tablet_peer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
#include "yb/util/threadpool.h"
#include "yb/util/trace.h"

using namespace std::literals;
using namespace std::placeholders;
using std::shared_ptr;
using std::string;
Expand Down Expand Up @@ -356,6 +357,18 @@ bool TabletPeer::StartShutdown() {
}

void TabletPeer::CompleteShutdown() {
auto wait_start = CoarseMonoClock::now();
auto last_report = wait_start;
while (preparing_operations_.load(std::memory_order_acquire) != 0) {
auto now = CoarseMonoClock::now();
if (now > last_report + 10s) {
LOG_WITH_PREFIX(WARNING)
<< "Long wait for finish of preparing operations: " << yb::ToString(now - wait_start);
last_report = now;
}
std::this_thread::sleep_for(100ms);
}

// TODO: KUDU-183: Keep track of the pending tasks and send an "abort" message.
LOG_SLOW_EXECUTION(WARNING, 1000,
Substitute("TabletPeer: tablet $0: Waiting for Operations to complete", tablet_id())) {
Expand All @@ -367,12 +380,10 @@ void TabletPeer::CompleteShutdown() {
}

if (log_) {
WARN_NOT_OK(log_->Close(), "Error closing the Log.");
WARN_NOT_OK(log_->Close(), LogPrefix() + "Error closing the Log");
}

if (VLOG_IS_ON(1)) {
VLOG_WITH_PREFIX(1) << "Shut down!";
}
VLOG_WITH_PREFIX(1) << "Shut down!";

if (tablet_) {
tablet_->Shutdown();
Expand Down Expand Up @@ -461,11 +472,14 @@ void TabletPeer::WriteAsync(
return;
}

preparing_operations_.fetch_add(1, std::memory_order_acq_rel);
auto status = CheckRunning();
if (!status.ok()) {
preparing_operations_.fetch_sub(1, std::memory_order_acq_rel);
state->CompleteWithStatus(status);
return;
}

auto operation = std::make_unique<WriteOperation>(std::move(state), term, deadline, this);
tablet_->AcquireLocksAndPerformDocOperations(std::move(operation));
}
Expand All @@ -475,9 +489,14 @@ HybridTime TabletPeer::ReportReadRestart() {
return tablet_->SafeTime(RequireLease::kTrue);
}

void TabletPeer::Aborted(Operation* operation) {
preparing_operations_.fetch_sub(1, std::memory_order_acq_rel);
}

void TabletPeer::Submit(std::unique_ptr<Operation> operation, int64_t term) {
auto status = CheckRunning();

auto operation_type = operation->operation_type();
if (status.ok()) {
auto driver = NewLeaderOperationDriver(&operation, term);
if (driver.ok()) {
Expand All @@ -490,6 +509,10 @@ void TabletPeer::Submit(std::unique_ptr<Operation> operation, int64_t term) {
operation->Finish(Operation::ABORTED);
operation->state()->CompleteWithStatus(status);
}

if (operation_type == OperationType::kWrite) {
preparing_operations_.fetch_sub(1, std::memory_order_acq_rel);
}
}

void TabletPeer::SubmitUpdateTransaction(
Expand Down
4 changes: 4 additions & 0 deletions src/yb/tablet/tablet_peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ class TabletPeer : public consensus::ReplicaOperationFactory,

void Submit(std::unique_ptr<Operation> operation, int64_t term) override;

void Aborted(Operation* operation) override;

HybridTime Now() override;

void UpdateClock(HybridTime hybrid_time) override;
Expand Down Expand Up @@ -406,6 +408,8 @@ class TabletPeer : public consensus::ReplicaOperationFactory,

std::atomic<rpc::ThreadPool*> service_thread_pool_{nullptr};

std::atomic<size_t> preparing_operations_{0};

private:
HybridTime ReportReadRestart() override;

Expand Down
9 changes: 8 additions & 1 deletion src/yb/yql/cql/ql/exec/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,7 @@ void Executor::ProcessAsyncResults(const bool rescheduled) {

// Go through each ExecContext and process async results.
bool has_buffered_ops = false;
bool has_restart = false;
const MonoTime now = (ql_metrics_ != nullptr) ? MonoTime::Now() : MonoTime();
for (auto exec_itr = exec_contexts_.begin(); exec_itr != exec_contexts_.end(); ) {

Expand All @@ -1397,6 +1398,7 @@ void Executor::ProcessAsyncResults(const bool rescheduled) {

// Restart a statement if necessary
if (exec_context_->restart()) {
has_restart = true;
const TreeNode *root = exec_context_->parse_tree().root().get();
// Clear partial rows accumulated from the SELECT statement.
if (root->opcode() == TreeNodeOpcode::kPTSelectStmt) {
Expand Down Expand Up @@ -1494,7 +1496,12 @@ void Executor::ProcessAsyncResults(const bool rescheduled) {
// recurse too deeply hitting the stack limitiation. Rescheduling can also avoid occupying the
// RPC worker thread for too long starving other CQL calls waiting in the queue. If there is no
// buffered ops to flush, just call FlushAsync() to commit the transactions if any.
if (has_buffered_ops && !rescheduled) {
//
// When restart is required we will reexecute the whole operation with new transaction.
// Since restart could happen multiple times, it is possible that we will do it recursively,
// when local call is enabled.
// So to avoid stack overflow we use reschedule in this case.
if ((has_buffered_ops || has_restart) && !rescheduled) {
rescheduler_->Reschedule(&flush_async_task_.Bind(this));
} else {
FlushAsync();
Expand Down

0 comments on commit 3f4e225

Please sign in to comment.