Skip to content

Commit

Permalink
[#5669] Periodically check statuses of running transactions to clean …
Browse files Browse the repository at this point in the history
…up aborted ones

Summary:
This diff adds a periodic status check of each running transaction to transaction participant. This is needed to detect transactions that have been aborted and abandoned more proactively. Such cases might happen when the transaction client has crashed, so that there is no one to send a cleanup RPC to the transaction participant. Previously, we would have to wait for a compaction for those transactions' intents to be cleaned up.

The cleanup mechanism works as follows. Every running transaction now has an associated scheduled abort check hybrid time, abort_check_ht, which we set to start time + FLAGS_transaction_abort_check_interval_ms when the transaction starts. We keep resetting it to current time + the same interval FLAGS_transaction_abort_check_interval_ms when we receive a response saying the transaction is still pending. As a result of this, in the normal situation with no network disconnections or slowness, we check the status of each pending transaction once per FLAGS_transaction_abort_check_interval_ms milliseconds on average. In case of slow status request processing, we wait for the previous status request to time out (as per FLAGS_transaction_abort_check_timeout_ms flag) before scheduling a new status check for the same transaction.

To efficiently implement the above polling mechanism, we use rpc::Poller and rpc::Scheduler to invoke a Poll function every FLAGS_transactions_status_poll_interval_ms milliseconds. This polling interval is much smaller the per-transaction status check interval FLAGS_transaction_abort_check_timeout_ms. This function uses the new sequential index on abort_check_ht that is being added to the transactions_ multi-index container in TransactionParticipant to obtain the set of transactions that are due for status check at this iteration.

Also, in this diff we extract the code in TransactionParticipant that loads transaction metadata from intents RocksDB and large-transaction "apply metadata" from regular RocksDB into memory to a new class TransactionLoader.

Test Plan: ybd --gtest_filter CqlIndexTest.TxnPollCleanup

Reviewers: mikhail

Reviewed By: mikhail

Subscribers: bogdan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D9427
  • Loading branch information
spolitov committed Oct 7, 2020
1 parent bbcf51d commit ff5d528
Show file tree
Hide file tree
Showing 26 changed files with 819 additions and 375 deletions.
6 changes: 3 additions & 3 deletions src/yb/client/ql-transaction-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ DECLARE_bool(TEST_transaction_allow_rerequest_status);
DECLARE_uint64(TEST_transaction_delay_status_reply_usec_in_tests);
DECLARE_bool(enable_load_balancing);
DECLARE_bool(flush_rocksdb_on_shutdown);
DECLARE_bool(transaction_disable_proactive_cleanup_in_tests);
DECLARE_bool(TEST_disable_proactive_txn_cleanup_on_abort);
DECLARE_uint64(aborted_intent_cleanup_ms);
DECLARE_int32(remote_bootstrap_max_chunk_size);
DECLARE_bool(TEST_master_fail_transactional_tablet_lookups);
Expand Down Expand Up @@ -821,7 +821,7 @@ TEST_F(QLTransactionTest, ResolveIntentsWriteReadWithinTransactionAndRollback) {

TEST_F(QLTransactionTest, CheckCompactionAbortCleanup) {
SetAtomicFlag(0ULL, &FLAGS_max_clock_skew_usec); // To avoid read restart in this test.
FLAGS_transaction_disable_proactive_cleanup_in_tests = true;
FLAGS_TEST_disable_proactive_txn_cleanup_on_abort = true;
FLAGS_aborted_intent_cleanup_ms = 1000; // 1 sec

// Write { 1 -> 1, 2 -> 2 }.
Expand Down Expand Up @@ -874,7 +874,7 @@ class QLTransactionTestWithDisabledCompactions : public QLTransactionTest {

TEST_F_EX(QLTransactionTest, IntentsCleanupAfterRestart, QLTransactionTestWithDisabledCompactions) {
SetAtomicFlag(0ULL, &FLAGS_max_clock_skew_usec); // To avoid read restart in this test.
FLAGS_transaction_disable_proactive_cleanup_in_tests = true;
FLAGS_TEST_disable_proactive_txn_cleanup_on_abort = true;
FLAGS_aborted_intent_cleanup_ms = 1000; // 1 sec
FLAGS_delete_intents_sst_files = false;

Expand Down
6 changes: 6 additions & 0 deletions src/yb/client/tablet_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,12 @@ bool TabletInvoker::Done(Status* status) {
assign_new_leader_ = false;

if (status->IsAborted() || retrier_->finished()) {
if (status->ok()) {
*status = retrier_->controller().status();
if (status->ok()) {
*status = STATUS(Aborted, "Retrier finished");
}
}
return true;
}

Expand Down
7 changes: 4 additions & 3 deletions src/yb/client/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ using namespace std::placeholders;
DEFINE_uint64(transaction_heartbeat_usec, 500000 * yb::kTimeMultiplier,
"Interval of transaction heartbeat in usec.");
DEFINE_bool(transaction_disable_heartbeat_in_tests, false, "Disable heartbeat during test.");
DEFINE_bool(transaction_disable_proactive_cleanup_in_tests, false,
"Disable cleanup of intents in abort path.");
DECLARE_uint64(max_clock_skew_usec);

DEFINE_test_flag(int32, transaction_inject_flushed_delay_ms, 0,
"Inject delay before processing flushed operations by transaction.");

DEFINE_test_flag(bool, disable_proactive_txn_cleanup_on_abort, false,
"Disable cleanup of intents in abort path.");

namespace yb {
namespace client {

Expand Down Expand Up @@ -671,7 +672,7 @@ class YBTransaction::Impl final {
}

void DoAbortCleanup(const YBTransactionPtr& transaction) {
if (FLAGS_transaction_disable_proactive_cleanup_in_tests) {
if (FLAGS_TEST_disable_proactive_txn_cleanup_on_abort) {
return;
}

Expand Down
2 changes: 1 addition & 1 deletion src/yb/common/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ std::ostream& operator<<(std::ostream& out, const TransactionMetadata& metadata)
}

MonoDelta TransactionRpcTimeout() {
return FLAGS_transaction_rpc_timeout_ms * 1ms;
return FLAGS_transaction_rpc_timeout_ms * 1ms * kTimeMultiplier;
}

// TODO(dtxn) correct deadline should be calculated and propagated.
Expand Down
32 changes: 28 additions & 4 deletions src/yb/integration-tests/cql-index-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,21 @@
using namespace std::literals;

DECLARE_bool(disable_index_backfill);
DECLARE_bool(transactions_poll_check_aborted);
DECLARE_bool(TEST_disable_proactive_txn_cleanup_on_abort);
DECLARE_int32(client_read_write_timeout_ms);
DECLARE_int32(rpc_workers_limit);
DECLARE_uint64(transaction_manager_workers_limit);
DECLARE_uint64(TEST_inject_txn_get_status_delay_ms);
DECLARE_int64(transaction_abort_check_interval_ms);

namespace yb {

class CqlIndexTest : public CqlTestBase {
public:
virtual ~CqlIndexTest() = default;

void TestTxnCleanup(size_t max_remaining_txns_per_tablet);
};

YB_STRONGLY_TYPED_BOOL(UniqueIndex);
Expand Down Expand Up @@ -107,6 +112,7 @@ TEST_F_EX(CqlIndexTest, ConcurrentIndexUpdate, CqlIndexSmallWorkersTest) {

FLAGS_client_read_write_timeout_ms = 10000;
SetAtomicFlag(1000, &FLAGS_TEST_inject_txn_get_status_delay_ms);
FLAGS_transaction_abort_check_interval_ms = 100000;

auto session = ASSERT_RESULT(EstablishSession(driver_.get()));

Expand Down Expand Up @@ -146,9 +152,12 @@ TEST_F_EX(CqlIndexTest, ConcurrentIndexUpdate, CqlIndexSmallWorkersTest) {
}

while (!thread_holder.stop_flag().load(std::memory_order_acquire)) {
if (inserts.load(std::memory_order_acquire) >= kNumInserts) {
auto num_inserts = inserts.load(std::memory_order_acquire);
if (num_inserts >= kNumInserts) {
break;
}
YB_LOG_EVERY_N_SECS(INFO, 5) << "Num inserts " << num_inserts << " of " << kNumInserts;
std::this_thread::sleep_for(100ms);
}

thread_holder.Stop();
Expand All @@ -171,15 +180,15 @@ void CleanFutures(std::deque<CassandraFuture>* futures, CheckReady check_ready)
}
}

TEST_F(CqlIndexTest, TxnCleanup) {
void CqlIndexTest::TestTxnCleanup(size_t max_remaining_txns_per_tablet) {
auto session = ASSERT_RESULT(EstablishSession(driver_.get()));

ASSERT_OK(CreateIndexedTable(&session, UniqueIndex::kTrue));
std::deque<CassandraFuture> futures;

auto prepared = ASSERT_RESULT(session.Prepare("INSERT INTO t (key, value) VALUES (?, ?)"));

for (int i = 0; i != 100; ++i) {
for (int i = 0; i != RegularBuildVsSanitizers(100, 30); ++i) {
CleanFutures(&futures, CheckReady::kTrue);

auto stmt = prepared.Bind();
Expand All @@ -190,7 +199,22 @@ TEST_F(CqlIndexTest, TxnCleanup) {

CleanFutures(&futures, CheckReady::kFalse);

AssertRunningTransactionsCountLessOrEqualTo(cluster_.get(), 5);
AssertRunningTransactionsCountLessOrEqualTo(cluster_.get(), max_remaining_txns_per_tablet);
}

// Test proactive aborted transactions cleanup.
TEST_F(CqlIndexTest, TxnCleanup) {
FLAGS_transactions_poll_check_aborted = false;

TestTxnCleanup(/* max_remaining_txns_per_tablet= */ 5);
}

// Test poll based aborted transactions cleanup.
TEST_F(CqlIndexTest, TxnPollCleanup) {
FLAGS_TEST_disable_proactive_txn_cleanup_on_abort = true;
FLAGS_transaction_abort_check_interval_ms = 1000;

TestTxnCleanup(/* max_remaining_txns_per_tablet= */ 0);
}

} // namespace yb
8 changes: 4 additions & 4 deletions src/yb/integration-tests/external_mini_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1766,13 +1766,13 @@ Status ExternalDaemon::StartProcess(const vector<string>& user_flags) {
RETURN_NOT_OK_PREPEND(p->Start(),
Substitute("Failed to start subprocess $0", exe_));

stdout_tailer_thread_ = unique_ptr<LogTailerThread>(new LogTailerThread(
Substitute("[$0 stdout]", daemon_id_), p->ReleaseChildStdoutFd(), &std::cout));
stdout_tailer_thread_ = std::make_unique<LogTailerThread>(
Substitute("[$0 stdout]", daemon_id_), p->ReleaseChildStdoutFd(), &std::cout);

// We will mostly see stderr output from the child process (because of --logtostderr), so we'll
// assume that by default in the output prefix.
stderr_tailer_thread_ = unique_ptr<LogTailerThread>(new LogTailerThread(
default_output_prefix, p->ReleaseChildStderrFd(), &std::cerr));
stderr_tailer_thread_ = std::make_unique<LogTailerThread>(
default_output_prefix, p->ReleaseChildStderrFd(), &std::cerr);

// The process is now starting -- wait for the bound port info to show up.
Stopwatch sw;
Expand Down
9 changes: 5 additions & 4 deletions src/yb/integration-tests/mini_cluster_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ size_t CountRunningTransactions(MiniCluster* cluster) {
return result;
}

void AssertRunningTransactionsCountLessOrEqualTo(MiniCluster* cluster, size_t limit_per_tablet) {
MonoTime deadline = MonoTime::Now() + 7s * kTimeMultiplier;
void AssertRunningTransactionsCountLessOrEqualTo(MiniCluster* cluster,
size_t max_remaining_txns_per_tablet) {
MonoTime deadline = MonoTime::Now() + 15s * kTimeMultiplier;
bool has_bad = false;
for (int i = 0; i != cluster->num_tablet_servers(); ++i) {
auto server = cluster->mini_tablet_server(i)->server();
Expand Down Expand Up @@ -67,8 +68,8 @@ void AssertRunningTransactionsCountLessOrEqualTo(MiniCluster* cluster, size_t li
for (const auto& peer : tablets) {
auto participant = peer->tablet()->transaction_participant();
if (participant) {
auto status = Wait([participant, limit_per_tablet] {
return participant->TEST_GetNumRunningTransactions() <= limit_per_tablet;
auto status = Wait([participant, max_remaining_txns_per_tablet] {
return participant->TEST_GetNumRunningTransactions() <= max_remaining_txns_per_tablet;
},
deadline,
"Wait until no transactions are running");
Expand Down
3 changes: 2 additions & 1 deletion src/yb/integration-tests/mini_cluster_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class MiniCluster;

size_t CountRunningTransactions(MiniCluster* cluster);
void AssertNoRunningTransactions(MiniCluster* cluster);
void AssertRunningTransactionsCountLessOrEqualTo(MiniCluster* cluster, size_t limit_per_tablet);
void AssertRunningTransactionsCountLessOrEqualTo(
MiniCluster* cluster, size_t max_remaining_txns_per_tablet);

} // namespace yb

Expand Down
2 changes: 2 additions & 0 deletions src/yb/integration-tests/yb_mini_cluster_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ namespace yb {
template <class T>
void YBMiniClusterTestBase<T>::SetUp() {
YBTest::SetUp();
HybridTime::TEST_SetPrettyToString(true);

FLAGS_use_priority_thread_pool_for_flushes = true;
FLAGS_allow_preempting_compactions = true;

Expand Down
7 changes: 7 additions & 0 deletions src/yb/rpc/rpc_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,12 @@ MonoDelta RpcController::timeout() const {
return timeout_;
}

int32_t RpcController::call_id() const {
if (call_) {
return call_->call_id();
}
return -1;
}

} // namespace rpc
} // namespace yb
2 changes: 2 additions & 0 deletions src/yb/rpc/rpc_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ class RpcController {
// May fail if index is invalid.
Result<Slice> GetSidecar(int idx) const;

int32_t call_id() const;

private:
friend class OutboundCall;
friend class Proxy;
Expand Down
1 change: 1 addition & 0 deletions src/yb/tablet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ set(TABLET_SRCS
tablet_peer_mm_ops.cc
tablet_peer.cc
transaction_coordinator.cc
transaction_loader.cc
transaction_participant.cc
transaction_status_resolver.cc
operation_order_verifier.cc
Expand Down
Loading

0 comments on commit ff5d528

Please sign in to comment.