From bbd5946bb083b864815fc92f22513d8a8e35bf29 Mon Sep 17 00:00:00 2001 From: rsami Date: Wed, 20 Mar 2024 12:59:52 -0400 Subject: [PATCH] [BACKPORT 2.20][#14165][#19257] DocDB: Refactor recently deadlocked transaction tracking to transaction coordinator Summary: Original commit: 916da9e6e35dcff2eb4dc1fcb53414b3328b93b2 / D31937 In preparation for the changes being introduced in D31827, this revision moves recently aborted transaction detection to the transaction coordinator with the following changes: 1. Deadlock detector now always sends an RPC to abort a deadlocked transaction, instead of making a local call. This makes the behavior to be introduced in a follow-up diff consistent between remote and local deadlock detectors aborting transactions. 2. If the coordinator receives an AbortTransaction request with a deadlock_reason populated, it updates the local TransactionState with that data and retains the TransactionState in managed_transactions for FLAGS_clear_deadlocked_txns_info_older_than_heartbeats txn heartbeat intervals. 3. UpdateTransaction and GetStatus requests now check the TransactionState for deadlock_reason instead of making a call to the deadlock detector **Upgrade/Rollback safety:** This revision makes an additive change to AbortTransactionRequestPB and does not pose any Upgrade/Downgrade safety issues. Jira: DB-3646, DB-8053 Test Plan: Jenkins Reviewers: bkolagani, sergei Reviewed By: bkolagani Subscribers: ybase, bogdan Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D33390 --- src/yb/common/transaction.cc | 5 +- src/yb/common/transaction.h | 4 +- src/yb/docdb/deadlock_detector.cc | 113 +++++++---------- src/yb/docdb/deadlock_detector.h | 11 +- src/yb/tablet/transaction_coordinator.cc | 148 ++++++++++++++--------- src/yb/tablet/transaction_coordinator.h | 3 +- src/yb/tserver/tablet_service.cc | 2 + src/yb/tserver/tserver_service.proto | 3 + 8 files changed, 150 insertions(+), 139 deletions(-) diff --git a/src/yb/common/transaction.cc b/src/yb/common/transaction.cc index fcf00741b689..aff7381e8829 100644 --- a/src/yb/common/transaction.cc +++ b/src/yb/common/transaction.cc @@ -33,8 +33,9 @@ const char* kGlobalTransactionsTableName = "transactions"; const std::string kMetricsSnapshotsTableName = "metrics"; const std::string kTransactionTablePrefix = "transactions_"; -TransactionStatusResult::TransactionStatusResult(TransactionStatus status_, HybridTime status_time_) - : TransactionStatusResult(status_, status_time_, SubtxnSet()) {} +TransactionStatusResult::TransactionStatusResult( + TransactionStatus status_, HybridTime status_time_, Status expected_deadlock_status_) + : TransactionStatusResult(status_, status_time_, SubtxnSet(), expected_deadlock_status_) {} TransactionStatusResult::TransactionStatusResult( TransactionStatus status_, HybridTime status_time_, SubtxnSet aborted_subtxn_set_, diff --git a/src/yb/common/transaction.h b/src/yb/common/transaction.h index edd48cef7c61..a1e03ce74b22 100644 --- a/src/yb/common/transaction.h +++ b/src/yb/common/transaction.h @@ -124,7 +124,9 @@ struct TransactionStatusResult { TransactionStatusResult() {} - TransactionStatusResult(TransactionStatus status_, HybridTime status_time_); + TransactionStatusResult( + TransactionStatus status_, HybridTime status_time_, + Status expected_deadlock_status_ = Status::OK()); TransactionStatusResult( TransactionStatus status_, HybridTime status_time_, diff --git a/src/yb/docdb/deadlock_detector.cc b/src/yb/docdb/deadlock_detector.cc index c3fff2ce7078..59e674dedf65 100644 --- a/src/yb/docdb/deadlock_detector.cc +++ b/src/yb/docdb/deadlock_detector.cc @@ -56,14 +56,6 @@ DEFINE_UNKNOWN_int32( TAG_FLAG(clear_active_probes_older_than_seconds, hidden); TAG_FLAG(clear_active_probes_older_than_seconds, advanced); -DEFINE_RUNTIME_int32( - clear_deadlocked_txns_info_older_than_heartbeats, 10, - "Minimum number of transaction heartbeat periods for which a deadlocked transaction's info is " - "retained, after it has been reported to be aborted. This ensures the memory used to track " - "info of deadlocked transactions does not grow unbounded."); -TAG_FLAG(clear_deadlocked_txns_info_older_than_heartbeats, hidden); -TAG_FLAG(clear_deadlocked_txns_info_older_than_heartbeats, advanced); - METRIC_DEFINE_event_stats( tablet, deadlock_size, "Deadlock size", yb::MetricUnit::kTransactions, "The number of transactions involved in detected deadlocks"); @@ -367,6 +359,43 @@ class LocalProbeProcessor : public std::enable_shared_from_this { + public: + RemoteDeadlockResolver(rpc::Rpcs* rpcs, client::YBClient* client): + rpcs_(rpcs), client_(client), handle_(rpcs_->InvalidHandle()) {} + + void AbortRemoteTransaction( + const TransactionId& id, const TabletId& status_tablet, + const std::string& err_msg) { + tserver::AbortTransactionRequestPB req; + req.set_tablet_id(status_tablet); + req.set_propagated_hybrid_time(client_->Clock()->Now().ToUint64()); + req.set_transaction_id(id.data(), id.size()); + StatusToPB( + STATUS_EC_FORMAT( + Expired, TransactionError(TransactionErrorCode::kDeadlock), err_msg), + req.mutable_deadlock_reason()); + rpcs_->RegisterAndStart( + AbortTransaction( + TransactionRpcDeadline(), + nullptr, + client_, + &req, + [shared_this = shared_from(this), txn_id = id] + (const auto& status, const auto& resp) { + LOG_WITH_FUNC(INFO) << "Abort deadlocked transaction request for " << txn_id + << " completed: " << resp.ShortDebugString(); + shared_this->rpcs_->Unregister(shared_this->handle_); + }), + &handle_); + } + + private: + rpc::Rpcs* rpcs_; + client::YBClient* client_; + rpc::Rpcs::Handle handle_; +}; + using LocalProbeProcessorPtr = std::shared_ptr; } // namespace @@ -391,10 +420,10 @@ class DeadlockDetector::Impl : public std::enable_shared_from_this& client_future, - TransactionAbortController* controller, const TabletId& status_tablet_id, + TransactionStatusController* controller, const TabletId& status_tablet_id, const MetricEntityPtr& metrics) : client_future_(client_future), controller_(controller), - detector_id_(DetectorId::GenerateRandom()), + detector_id_(DetectorId::GenerateRandom()), status_tablet_(status_tablet_id), log_prefix_(Format("T $0 D $1 ", status_tablet_id, detector_id_)), deadlock_size_(METRIC_deadlock_size.Instantiate(metrics)), probe_latency_(METRIC_deadlock_probe_latency.Instantiate(metrics)), @@ -591,20 +620,6 @@ class DeadlockDetector::Impl : public std::enable_shared_from_this Now() - N seconds probes_to_send = GetProbesToSend(waiters_); - - // Clear the info of old deadlocked transactions. - auto interval = FLAGS_clear_deadlocked_txns_info_older_than_heartbeats * - FLAGS_transaction_heartbeat_usec * 1us; - auto expired_cutoff_time = CoarseMonoClock::Now() - interval; - for (auto it = recently_deadlocked_txns_info_.begin(); - it != recently_deadlocked_txns_info_.end();) { - const auto& deadlock_time = it->second.second; - if (deadlock_time < expired_cutoff_time) { - it = recently_deadlocked_txns_info_.erase(it); - } else { - it++; - } - } } for (auto& processor : probes_to_send) { @@ -624,18 +639,6 @@ class DeadlockDetector::Impl : public std::enable_shared_from_this l(mutex_); - auto it = recently_deadlocked_txns_info_.find(txn_id); - if (it == recently_deadlocked_txns_info_.end()) { - return Status::OK(); - } - // Return Expired so that TabletInvoker does not retry. Also, query layer proactively sends - // clean up requests to transaction participant only for transactions with Expired status. - return STATUS_EC_FORMAT( - Expired, TransactionError(TransactionErrorCode::kDeadlock), it->second.first); - } - private: template std::vector GetProbesToSend(const T& waiters) { @@ -685,14 +688,10 @@ class DeadlockDetector::Impl : public std::enable_shared_from_thismutex_)> l(detector->mutex_); - auto deadlock_info = std::make_pair(ConstructDeadlockedMessage(waiter, resp), - CoarseMonoClock::Now()); - detector->recently_deadlocked_txns_info_.emplace(waiter, deadlock_info); - } - detector->controller_->Abort( - waiter, std::bind(&DeadlockDetector::Impl::TxnAbortCallback, detector, _1, waiter)); + auto deadlock_msg = ConstructDeadlockedMessage(waiter, resp); + auto resolver = std::make_shared( + &detector->rpcs_, &detector->client()); + resolver->AbortRemoteTransaction(waiter, detector->status_tablet_, deadlock_msg); } } }); @@ -828,20 +827,6 @@ class DeadlockDetector::Impl : public std::enable_shared_from_this res, const TransactionId txn_id) { - if (res.ok()) { - if (res->status == TransactionStatus::ABORTED) { - LOG_WITH_FUNC(INFO) << "Aborting deadlocked transaction " << txn_id << " succeeded."; - return; - } - LOG_WITH_FUNC(INFO) << "Aborting deadlocked transaction " << txn_id - << " failed -- status: " << res->status << ", time: " << res->status_time; - } else { - LOG_WITH_FUNC(INFO) << "Aborting deadlocked transaction " << txn_id - << " failed -- " << res.status(); - } - } - const std::string& LogPrefix() const { return log_prefix_; } @@ -849,8 +834,9 @@ class DeadlockDetector::Impl : public std::enable_shared_from_this& client_future_; - TransactionAbortController* const controller_; + TransactionStatusController* const controller_; const DetectorId detector_id_; + const TabletId status_tablet_; const std::string log_prefix_; scoped_refptr deadlock_size_; @@ -872,14 +858,11 @@ class DeadlockDetector::Impl : public std::enable_shared_from_this seq_no_ = 0; - - std::unordered_map, TransactionIdHash> - recently_deadlocked_txns_info_ GUARDED_BY(mutex_); }; DeadlockDetector::DeadlockDetector( const std::shared_future& client_future, - TransactionAbortController* controller, + TransactionStatusController* controller, const TabletId& status_tablet_id, const MetricEntityPtr& metrics): impl_(new Impl(client_future, controller, status_tablet_id, metrics)) {} @@ -904,10 +887,6 @@ void DeadlockDetector::TriggerProbes() { return impl_->TriggerProbes(); } -Status DeadlockDetector::GetTransactionDeadlockStatus(const TransactionId& txn_id) { - return impl_->GetTransactionDeadlockStatus(txn_id); -} - void DeadlockDetector::Shutdown() { return impl_->Shutdown(); } diff --git a/src/yb/docdb/deadlock_detector.h b/src/yb/docdb/deadlock_detector.h index 3350effa59a2..a1dbbddf6e5c 100644 --- a/src/yb/docdb/deadlock_detector.h +++ b/src/yb/docdb/deadlock_detector.h @@ -121,13 +121,12 @@ typedef boost::multi_index_container; @@ -169,7 +168,7 @@ class DeadlockDetector { public: DeadlockDetector( const std::shared_future& client_future, - TransactionAbortController* controller, + TransactionStatusController* controller, const TabletId& status_tablet_id, const MetricEntityPtr& metrics); @@ -189,10 +188,6 @@ class DeadlockDetector { void Shutdown(); - // Returns the deadlock status if the given transaction could have been involved in a deadlock. - // Returns Status::OK() in all other cases. - Status GetTransactionDeadlockStatus(const TransactionId& txn_id); - private: class Impl; std::shared_ptr impl_; diff --git a/src/yb/tablet/transaction_coordinator.cc b/src/yb/tablet/transaction_coordinator.cc index 7546e9948b38..f3125f652340 100644 --- a/src/yb/tablet/transaction_coordinator.cc +++ b/src/yb/tablet/transaction_coordinator.cc @@ -36,6 +36,7 @@ #include "yb/common/transaction_error.h" #include "yb/common/wire_protocol.h" +#include "yb/common/wire_protocol.pb.h" #include "yb/consensus/consensus_round.h" #include "yb/consensus/consensus_util.h" @@ -87,6 +88,14 @@ DEFINE_UNKNOWN_uint64(transaction_deadlock_detection_interval_usec, 60000000, "Deadlock detection interval in usec."); TAG_FLAG(transaction_deadlock_detection_interval_usec, advanced); +DEFINE_RUNTIME_int32( + clear_deadlocked_txns_info_older_than_heartbeats, 10, + "Minimum number of transaction heartbeat periods for which a deadlocked transaction's info is " + "retained, after it has been reported to be aborted. This ensures the memory used to track " + "info of deadlocked transactions does not grow unbounded."); +TAG_FLAG(clear_deadlocked_txns_info_older_than_heartbeats, hidden); +TAG_FLAG(clear_deadlocked_txns_info_older_than_heartbeats, advanced); + DEFINE_UNKNOWN_int64(avoid_abort_after_sealing_ms, 20, "If transaction was only sealed, we will try to abort it not earlier than this " "period in milliseconds."); @@ -268,6 +277,23 @@ class TransactionState { status_ == TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS; } + bool ShouldBeRetained() const { + if (!Completed()) { + return true; + } + if (!deadlock_time_) { + return false; + } + auto retain_until = deadlock_time_.AddMicroseconds( + GetAtomicFlag(&FLAGS_clear_deadlocked_txns_info_older_than_heartbeats) * + GetAtomicFlag(&FLAGS_transaction_heartbeat_usec)); + return retain_until > context_.coordinator_context().clock().Now(); + } + + HybridTime GetDeadlockTime() const { + return deadlock_time_; + } + // Applies new state to transaction. Status ProcessReplicated(const TransactionCoordinator::ReplicatedData& data) { VLOG_WITH_PREFIX(4) @@ -407,7 +433,8 @@ class TransactionState { FillExpectedTabletBatches(expected_tablet_batches); return TransactionStatusResult{TransactionStatus::SEALED, commit_time_}; case TransactionStatus::ABORTED: - return TransactionStatusResult{TransactionStatus::ABORTED, HybridTime::kMax}; + return TransactionStatusResult{ + TransactionStatus::ABORTED, HybridTime::kMax, deadlock_reason_}; case TransactionStatus::PENDING: { HybridTime status_ht; if (replicating_) { @@ -448,7 +475,8 @@ class TransactionState { NotifyAbortWaiters(TransactionStatusResult::Aborted()); } - TransactionStatusResult Abort(TransactionAbortCallback* callback) { + TransactionStatusResult Abort( + TransactionAbortCallback* callback, const Status& deadlock_reason = Status::OK()) { if (status_ == TransactionStatus::COMMITTED || status_ == TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS) { return TransactionStatusResult(TransactionStatus::COMMITTED, commit_time_); @@ -460,7 +488,7 @@ class TransactionState { VLOG_WITH_PREFIX(1) << "External abort request"; CHECK_EQ(TransactionStatus::PENDING, status_); abort_waiters_.emplace_back(std::move(*callback)); - Abort(); + Abort(deadlock_reason); return TransactionStatusResult(TransactionStatus::PENDING, HybridTime::kMax); } } @@ -481,7 +509,7 @@ class TransactionState { } // Aborts this transaction. - void Abort() { + void Abort(const Status& deadlock_reason = Status::OK()) { if (ShouldBeCommitted()) { LOG_WITH_PREFIX(DFATAL) << "Transaction abort in wrong state: " << status_; return; @@ -494,7 +522,7 @@ class TransactionState { << TransactionStatus_Name(status_); return; } - SubmitUpdateStatus(TransactionStatus::ABORTED); + SubmitUpdateStatus(TransactionStatus::ABORTED, deadlock_reason); } // Returns logs prefix for this transaction. @@ -660,13 +688,24 @@ class TransactionState { Status status; auto txn_status = state.status(); + if (!deadlock_reason_.ok()) { + // If this transaction was aborted due to a deadlock, we should disallow any updates to it's + // state which are not ABORTED or cleanup-related. + DCHECK(deadlock_time_); + if (txn_status != TransactionStatus::ABORTED && + txn_status != TransactionStatus::GRACEFUL_CLEANUP && + txn_status != TransactionStatus::IMMEDIATE_CLEANUP) { + context_.CompleteWithStatus(std::move(request), deadlock_reason_); + return; + } + } if (txn_status == TransactionStatus::COMMITTED) { status = HandleCommit(); } else if (txn_status == TransactionStatus::PENDING || txn_status == TransactionStatus::CREATED) { - // Handling txn_status of CREATED when the current status (status_) is PENDING is only - // allowed for backward compatibility with versions prior to D11210, which could send - // transaction creation retries with the same id. + // Handling txn_status of CREATED when the current status (status_) is PENDING is only + // allowed for backward compatibility with versions prior to D11210, which could send + // transaction creation retries with the same id. if (status_ != TransactionStatus::PENDING) { status = STATUS_FORMAT(IllegalState, "Transaction in wrong state during heartbeat: $0", @@ -702,7 +741,7 @@ class TransactionState { Status HandleCommit() { auto hybrid_time = context_.coordinator_context().clock().Now(); if (ExpiredAt(hybrid_time)) { - auto status = STATUS(Expired, "Commit of expired transaction"); + auto status = STATUS(Expired, "Attempted to commit expired transaction"); VLOG_WITH_PREFIX(4) << status; Abort(); return status; @@ -716,16 +755,13 @@ class TransactionState { return Status::OK(); } - void SubmitUpdateStatus(TransactionStatus status) { + void SubmitUpdateStatus(TransactionStatus status, const Status& deadlock_reason = Status::OK()) { VLOG_WITH_PREFIX(4) << "SubmitUpdateStatus(" << TransactionStatus_Name(status) << ")"; - // TODO(wait-queues): If the transaction is being aborted due to a deadlock, replicate the - // deadlock specific info on status tablet followers. That would help return a consistent - // error message on deadlock, across status tablet leadership changes. - // - // Refer https://github.com/yugabyte/yugabyte-db/issues/19257 for more details. auto state = rpc::MakeSharedMessage(); state->dup_transaction_id(id_.AsSlice()); state->set_status(status); + DCHECK(deadlock_reason.ok() || status == TransactionStatus::ABORTED); + StatusToPB(deadlock_reason, state->mutable_deadlock_reason()); auto request = context_.coordinator_context().CreateUpdateTransaction(std::move(state)); if (replicating_) { @@ -766,6 +802,13 @@ class TransactionState { status_ = TransactionStatus::ABORTED; first_entry_raft_index_ = data.op_id.index; + auto deadlock_reason = StatusFromPB(data.state.deadlock_reason()); + if (!deadlock_reason.ok() && !deadlock_time_) { + LOG_IF(DFATAL, !deadlock_reason_.ok()) + << "Deadlock reason already set: " << deadlock_reason_; + deadlock_time_ = data.hybrid_time; + deadlock_reason_ = deadlock_reason; + } NotifyAbortWaiters(TransactionStatusResult::Aborted()); return Status::OK(); } @@ -935,6 +978,8 @@ class TransactionState { const TransactionId id_; const std::string log_prefix_; TransactionStatus status_ = TransactionStatus::PENDING; + Status deadlock_reason_ = Status::OK(); + HybridTime deadlock_time_ = HybridTime::kInvalid; MicrosTime first_touch_; HybridTime last_touch_; // It should match last_touch_, but it is possible that because of some code errors it @@ -1028,7 +1073,7 @@ std::string TransactionCoordinator::AbortedData::ToString() const { // Real implementation of transaction coordinator, as in PImpl idiom. class TransactionCoordinator::Impl : public TransactionStateContext, - public TransactionAbortController { + public TransactionStatusController { public: Impl(const std::string& permanent_uuid, TransactionCoordinatorContext* context, @@ -1046,10 +1091,6 @@ class TransactionCoordinator::Impl : public TransactionStateContext, Shutdown(); } - void Abort(const TransactionId& transaction_id, TransactionStatusCallback callback) override { - Abort(transaction_id, context_.LeaderTerm(), callback); - } - void RemoveInactiveTransactions(Waiters* waiters) override { std::lock_guard lock(managed_mutex_); auto& sorted_txn_map = waiters->get(); @@ -1162,18 +1203,14 @@ class TransactionCoordinator::Impl : public TransactionStateContext, tserver::GetTransactionStatusResponsePB* response) { AtomicFlagSleepMs(&FLAGS_TEST_inject_txn_get_status_delay_ms); auto leader_term = context_.LeaderTerm(); - std::vector decoded_txn_ids; - decoded_txn_ids.reserve(transaction_ids.size()); - for (auto i = 0 ; i < transaction_ids.size() ; i++) { - decoded_txn_ids.emplace_back(VERIFY_RESULT(FullyDecodeTransactionId(transaction_ids[i]))); - } PostponedLeaderActions postponed_leader_actions; { std::unique_lock lock(managed_mutex_); HybridTime leader_safe_time; postponed_leader_actions_.leader_term = leader_term; - for (const auto& id : decoded_txn_ids) { + for (const auto& transaction_id : transaction_ids) { + auto id = VERIFY_RESULT(FullyDecodeTransactionId(transaction_id)); auto it = managed_transactions_.find(id); std::vector expected_tablet_batches; bool known_txn = it != managed_transactions_.end(); @@ -1202,6 +1239,7 @@ class TransactionCoordinator::Impl : public TransactionStateContext, } response->add_status(txn_status_with_ht.status); response->add_status_hybrid_time(txn_status_with_ht.status_time.ToUint64()); + StatusToPB(txn_status_with_ht.expected_deadlock_status, response->add_deadlock_reason()); auto mutable_aborted_set_pb = response->add_aborted_subtxn_set(); if (it != managed_transactions_.end() && @@ -1213,24 +1251,6 @@ class TransactionCoordinator::Impl : public TransactionStateContext, postponed_leader_actions.Swap(&postponed_leader_actions_); } - RSTATUS_DCHECK_EQ( - response->status().size(), decoded_txn_ids.size(), IllegalState, - Format("Expected to see $0 (vs $1) statuses in GetTransactionStatusResponsePB", - decoded_txn_ids.size(), response->status().size())); - // GetTransactionDeadlockStatus should be called outside the scope of managed_mutex_. - // Else we risk a deadlock since the detector acquires the locks in the order: - // detector's mutex -> coordinator's managed_mutex_, during execution of TriggerProbes(). - for (auto i = 0 ; i < response->status().size() ; i++) { - // Deadlock detector stores info of transactions that might have been aborted due to a - // deadlock even before the coordinator cancels them. So it could happen that the detector - // reports a deadlock specific error for a transaction that the coordinator is/was unable - // to abort. Hence we query the detector for statuses of txns in ABORTED state alone. - Status s = response->status(i) == TransactionStatus::ABORTED - ? deadlock_detector_.GetTransactionDeadlockStatus(decoded_txn_ids[i]) - : Status::OK(); - StatusToPB(s, response->add_deadlock_reason()); - } - ExecutePostponedLeaderActions(&postponed_leader_actions); if (GetAtomicFlag(&FLAGS_TEST_inject_random_delay_on_txn_status_response_ms)) { if (response->status().size() > 0 && response->status(0) == TransactionStatus::PENDING) { @@ -1343,7 +1363,9 @@ class TransactionCoordinator::Impl : public TransactionStateContext, return TransactionStatusResult{TransactionStatus::PENDING, result.status_time.Decremented()}; } - void Abort(const TransactionId& transaction_id, int64_t term, TransactionAbortCallback callback) { + void Abort( + const TransactionId& transaction_id, int64_t term, const Status& deadlock_reason, + TransactionAbortCallback callback) { AtomicFlagSleepMs(&FLAGS_TEST_inject_txn_get_status_delay_ms); std::unique_lock lock(managed_mutex_); @@ -1355,7 +1377,7 @@ class TransactionCoordinator::Impl : public TransactionStateContext, return; } - DoAbort(it, term, std::move(callback), std::move(lock)); + DoAbort(it, term, std::move(callback), std::move(lock), deadlock_reason); } bool CancelTransactionIfFound( @@ -1459,6 +1481,7 @@ class TransactionCoordinator::Impl : public TransactionStateContext, void Handle(std::unique_ptr request, int64_t term) { auto& state = *request->request(); + DCHECK_NE(state.status(), TransactionStatus::ABORTED); auto id = FullyDecodeTransactionId(state.transaction_id()); if (!id.ok()) { LOG(WARNING) << "Failed to decode id from " << state.ShortDebugString() << ": " << id; @@ -1479,14 +1502,6 @@ class TransactionCoordinator::Impl : public TransactionStateContext, } else { lock.unlock(); status = status.CloneAndAddErrorCode(TransactionError(TransactionErrorCode::kAborted)); - // If the transaction was involved in a deadlock, the deadlock error takes precedence - // over a generic status of type Expired. - if (status.IsExpired()) { - auto s = deadlock_detector_.GetTransactionDeadlockStatus(*id); - if (!s.ok()) { - status = std::move(s); - } - } request->CompleteWithStatus(status); return; } @@ -1803,6 +1818,14 @@ class TransactionCoordinator::Impl : public TransactionStateContext, for (auto it = index.begin(); it != index.end() && it->ExpiredAt(now);) { if (it->status() == TransactionStatus::ABORTED) { + if (it->ShouldBeRetained()) { + VLOG_WITH_PREFIX_AND_FUNC(4) << "Retaining recently deadlocked transaction -- " + << "now: " << now << ", first: " << it->id().ToString() + << ", expired: " << it->ExpiredAt(now) << ", deadlocked at: " + << it->GetDeadlockTime().ToDebugString(); + ++it; + continue; + } it = index.erase(it); } else { if (leader) { @@ -1835,13 +1858,16 @@ class TransactionCoordinator::Impl : public TransactionStateContext, managed_transactions_.modify(it, [&status](TransactionState& state) { state.ClearRequests(status); }); - managed_transactions_.erase(it); + if (!it->ShouldBeRetained()) { + managed_transactions_.erase(it); + } } } void DoAbort( - ManagedTransactions::iterator it, int64_t term, TransactionAbortCallback callback, - std::unique_lock lock) { + ManagedTransactions::iterator it, int64_t term, + TransactionAbortCallback callback, std::unique_lock lock, + const Status& deadlock_reason = Status::OK()) { CHECK(it != managed_transactions_.end()); VLOG_WITH_PREFIX_AND_FUNC(4) << "transaction_id: " << it->id() << " found, aborting now."; @@ -1855,8 +1881,9 @@ class TransactionCoordinator::Impl : public TransactionStateContext, PostponedLeaderActions actions; postponed_leader_actions_.leader_term = term; TransactionStatusResult status; - managed_transactions_.modify(it, [&status, &callback](TransactionState& state) { - status = state.Abort(&callback); + managed_transactions_.modify( + it, [&status, &callback, &deadlock_reason](TransactionState& state) { + status = state.Abort(&callback, deadlock_reason); }); if (callback) { lock.unlock(); @@ -1947,8 +1974,9 @@ Status TransactionCoordinator::GetOldTransactions( void TransactionCoordinator::Abort(const TransactionId& transaction_id, int64_t term, + const Status& deadlock_reason, TransactionAbortCallback callback) { - impl_->Abort(transaction_id, term, std::move(callback)); + impl_->Abort(transaction_id, term, deadlock_reason, std::move(callback)); } bool TransactionCoordinator::CancelTransactionIfFound( diff --git a/src/yb/tablet/transaction_coordinator.h b/src/yb/tablet/transaction_coordinator.h index fc8240a54664..cedb935524ba 100644 --- a/src/yb/tablet/transaction_coordinator.h +++ b/src/yb/tablet/transaction_coordinator.h @@ -138,7 +138,8 @@ class TransactionCoordinator { CoarseTimePoint deadline, tserver::GetTransactionStatusResponsePB* response); - void Abort(const TransactionId& transaction_id, int64_t term, TransactionAbortCallback callback); + void Abort(const TransactionId& transaction_id, int64_t term, + const Status& deadlock_reason, TransactionAbortCallback callback); // CancelTransactionIfFound returns true if the transaction is found in the list of managed txns // at the coordinator, and invokes the callback with the cancelation status. If the txn isn't diff --git a/src/yb/tserver/tablet_service.cc b/src/yb/tserver/tablet_service.cc index 2131ea27509f..8c080de0635d 100644 --- a/src/yb/tserver/tablet_service.cc +++ b/src/yb/tserver/tablet_service.cc @@ -45,6 +45,7 @@ #include "yb/common/schema_pbutil.h" #include "yb/common/row_mark.h" #include "yb/common/schema.h" +#include "yb/common/wire_protocol.h" #include "yb/consensus/leader_lease.h" #include "yb/consensus/consensus.pb.h" #include "yb/consensus/consensus_util.h" @@ -1323,6 +1324,7 @@ void TabletServiceImpl::AbortTransaction(const AbortTransactionRequestPB* req, tablet.tablet->transaction_coordinator()->Abort( txn_id, tablet.leader_term, + req->has_deadlock_reason() ? StatusFromPB(req->deadlock_reason()) : Status::OK(), [resp, context_ptr, clock, peer = tablet.peer](Result result) { resp->set_propagated_hybrid_time(clock->Now().ToUint64()); Status status; diff --git a/src/yb/tserver/tserver_service.proto b/src/yb/tserver/tserver_service.proto index 224596f768fb..8d03a16505c5 100644 --- a/src/yb/tserver/tserver_service.proto +++ b/src/yb/tserver/tserver_service.proto @@ -398,6 +398,9 @@ message AbortTransactionRequestPB { optional bytes tablet_id = 1; optional bytes transaction_id = 2; optional fixed64 propagated_hybrid_time = 3; + + // Status containing deadlock info if the transaction is being aborted due to deadlock. + optional AppStatusPB deadlock_reason = 4; } message AbortTransactionResponsePB {