Skip to content

Commit

Permalink
[BACKPORT 2.20][#14165][#19257] DocDB: Refactor recently deadlocked t…
Browse files Browse the repository at this point in the history
…ransaction tracking to transaction coordinator

Summary:
Original commit: 916da9e / D31937
In preparation for the changes being introduced in D31827, this revision moves recently aborted
transaction detection to the transaction coordinator with the following changes:
1. Deadlock detector now always sends an RPC to abort a deadlocked transaction, instead of making
   a local call. This makes the behavior to be introduced in a follow-up diff consistent between
   remote and local deadlock detectors aborting transactions.
2. If the coordinator receives an AbortTransaction request with a deadlock_reason populated, it
   updates the local TransactionState with that data and retains the TransactionState in
   managed_transactions for FLAGS_clear_deadlocked_txns_info_older_than_heartbeats txn heartbeat
   intervals.
3. UpdateTransaction and GetStatus requests now check the TransactionState for deadlock_reason
   instead of making a call to the deadlock detector

**Upgrade/Rollback safety:**
This revision makes an additive change to AbortTransactionRequestPB and does not pose any Upgrade/Downgrade safety issues.
Jira: DB-3646, DB-8053

Test Plan: Jenkins

Reviewers: bkolagani, sergei

Reviewed By: bkolagani

Subscribers: ybase, bogdan

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D33390
  • Loading branch information
robertsami committed Mar 23, 2024
1 parent 122aeac commit bbd5946
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 139 deletions.
5 changes: 3 additions & 2 deletions src/yb/common/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ const char* kGlobalTransactionsTableName = "transactions";
const std::string kMetricsSnapshotsTableName = "metrics";
const std::string kTransactionTablePrefix = "transactions_";

TransactionStatusResult::TransactionStatusResult(TransactionStatus status_, HybridTime status_time_)
: TransactionStatusResult(status_, status_time_, SubtxnSet()) {}
TransactionStatusResult::TransactionStatusResult(
TransactionStatus status_, HybridTime status_time_, Status expected_deadlock_status_)
: TransactionStatusResult(status_, status_time_, SubtxnSet(), expected_deadlock_status_) {}

TransactionStatusResult::TransactionStatusResult(
TransactionStatus status_, HybridTime status_time_, SubtxnSet aborted_subtxn_set_,
Expand Down
4 changes: 3 additions & 1 deletion src/yb/common/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ struct TransactionStatusResult {

TransactionStatusResult() {}

TransactionStatusResult(TransactionStatus status_, HybridTime status_time_);
TransactionStatusResult(
TransactionStatus status_, HybridTime status_time_,
Status expected_deadlock_status_ = Status::OK());

TransactionStatusResult(
TransactionStatus status_, HybridTime status_time_,
Expand Down
113 changes: 46 additions & 67 deletions src/yb/docdb/deadlock_detector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,6 @@ DEFINE_UNKNOWN_int32(
TAG_FLAG(clear_active_probes_older_than_seconds, hidden);
TAG_FLAG(clear_active_probes_older_than_seconds, advanced);

DEFINE_RUNTIME_int32(
clear_deadlocked_txns_info_older_than_heartbeats, 10,
"Minimum number of transaction heartbeat periods for which a deadlocked transaction's info is "
"retained, after it has been reported to be aborted. This ensures the memory used to track "
"info of deadlocked transactions does not grow unbounded.");
TAG_FLAG(clear_deadlocked_txns_info_older_than_heartbeats, hidden);
TAG_FLAG(clear_deadlocked_txns_info_older_than_heartbeats, advanced);

METRIC_DEFINE_event_stats(
tablet, deadlock_size, "Deadlock size", yb::MetricUnit::kTransactions,
"The number of transactions involved in detected deadlocks");
Expand Down Expand Up @@ -367,6 +359,43 @@ class LocalProbeProcessor : public std::enable_shared_from_this<LocalProbeProces
tserver::ProbeTransactionDeadlockResponsePB resp_ GUARDED_BY(mutex_);
};

class RemoteDeadlockResolver : public std::enable_shared_from_this<RemoteDeadlockResolver> {
public:
RemoteDeadlockResolver(rpc::Rpcs* rpcs, client::YBClient* client):
rpcs_(rpcs), client_(client), handle_(rpcs_->InvalidHandle()) {}

void AbortRemoteTransaction(
const TransactionId& id, const TabletId& status_tablet,
const std::string& err_msg) {
tserver::AbortTransactionRequestPB req;
req.set_tablet_id(status_tablet);
req.set_propagated_hybrid_time(client_->Clock()->Now().ToUint64());
req.set_transaction_id(id.data(), id.size());
StatusToPB(
STATUS_EC_FORMAT(
Expired, TransactionError(TransactionErrorCode::kDeadlock), err_msg),
req.mutable_deadlock_reason());
rpcs_->RegisterAndStart(
AbortTransaction(
TransactionRpcDeadline(),
nullptr,
client_,
&req,
[shared_this = shared_from(this), txn_id = id]
(const auto& status, const auto& resp) {
LOG_WITH_FUNC(INFO) << "Abort deadlocked transaction request for " << txn_id
<< " completed: " << resp.ShortDebugString();
shared_this->rpcs_->Unregister(shared_this->handle_);
}),
&handle_);
}

private:
rpc::Rpcs* rpcs_;
client::YBClient* client_;
rpc::Rpcs::Handle handle_;
};

using LocalProbeProcessorPtr = std::shared_ptr<LocalProbeProcessor>;

} // namespace
Expand All @@ -391,10 +420,10 @@ class DeadlockDetector::Impl : public std::enable_shared_from_this<DeadlockDetec
public:
explicit Impl(
const std::shared_future<client::YBClient*>& client_future,
TransactionAbortController* controller, const TabletId& status_tablet_id,
TransactionStatusController* controller, const TabletId& status_tablet_id,
const MetricEntityPtr& metrics)
: client_future_(client_future), controller_(controller),
detector_id_(DetectorId::GenerateRandom()),
detector_id_(DetectorId::GenerateRandom()), status_tablet_(status_tablet_id),
log_prefix_(Format("T $0 D $1 ", status_tablet_id, detector_id_)),
deadlock_size_(METRIC_deadlock_size.Instantiate(metrics)),
probe_latency_(METRIC_deadlock_probe_latency.Instantiate(metrics)),
Expand Down Expand Up @@ -591,20 +620,6 @@ class DeadlockDetector::Impl : public std::enable_shared_from_this<DeadlockDetec
// TODO(wait-queues): Trigger probes only for waiters which which have
// wait_start_time > Now() - N seconds
probes_to_send = GetProbesToSend(waiters_);

// Clear the info of old deadlocked transactions.
auto interval = FLAGS_clear_deadlocked_txns_info_older_than_heartbeats *
FLAGS_transaction_heartbeat_usec * 1us;
auto expired_cutoff_time = CoarseMonoClock::Now() - interval;
for (auto it = recently_deadlocked_txns_info_.begin();
it != recently_deadlocked_txns_info_.end();) {
const auto& deadlock_time = it->second.second;
if (deadlock_time < expired_cutoff_time) {
it = recently_deadlocked_txns_info_.erase(it);
} else {
it++;
}
}
}

for (auto& processor : probes_to_send) {
Expand All @@ -624,18 +639,6 @@ class DeadlockDetector::Impl : public std::enable_shared_from_this<DeadlockDetec
}
}

Status GetTransactionDeadlockStatus(const TransactionId& txn_id) {
SharedLock<decltype(mutex_)> l(mutex_);
auto it = recently_deadlocked_txns_info_.find(txn_id);
if (it == recently_deadlocked_txns_info_.end()) {
return Status::OK();
}
// Return Expired so that TabletInvoker does not retry. Also, query layer proactively sends
// clean up requests to transaction participant only for transactions with Expired status.
return STATUS_EC_FORMAT(
Expired, TransactionError(TransactionErrorCode::kDeadlock), it->second.first);
}

private:
template <class T>
std::vector<LocalProbeProcessorPtr> GetProbesToSend(const T& waiters) {
Expand Down Expand Up @@ -685,14 +688,10 @@ class DeadlockDetector::Impl : public std::enable_shared_from_this<DeadlockDetec
LOG(ERROR) << "Failed to decode transaction id in detected deadlock!";
} else {
const auto& waiter = *waiter_or_status;
{
UniqueLock<decltype(detector->mutex_)> l(detector->mutex_);
auto deadlock_info = std::make_pair(ConstructDeadlockedMessage(waiter, resp),
CoarseMonoClock::Now());
detector->recently_deadlocked_txns_info_.emplace(waiter, deadlock_info);
}
detector->controller_->Abort(
waiter, std::bind(&DeadlockDetector::Impl::TxnAbortCallback, detector, _1, waiter));
auto deadlock_msg = ConstructDeadlockedMessage(waiter, resp);
auto resolver = std::make_shared<RemoteDeadlockResolver>(
&detector->rpcs_, &detector->client());
resolver->AbortRemoteTransaction(waiter, detector->status_tablet_, deadlock_msg);
}
}
});
Expand Down Expand Up @@ -828,29 +827,16 @@ class DeadlockDetector::Impl : public std::enable_shared_from_this<DeadlockDetec
return local_processor;
}

void TxnAbortCallback(Result<TransactionStatusResult> res, const TransactionId txn_id) {
if (res.ok()) {
if (res->status == TransactionStatus::ABORTED) {
LOG_WITH_FUNC(INFO) << "Aborting deadlocked transaction " << txn_id << " succeeded.";
return;
}
LOG_WITH_FUNC(INFO) << "Aborting deadlocked transaction " << txn_id
<< " failed -- status: " << res->status << ", time: " << res->status_time;
} else {
LOG_WITH_FUNC(INFO) << "Aborting deadlocked transaction " << txn_id
<< " failed -- " << res.status();
}
}

const std::string& LogPrefix() const {
return log_prefix_;
}

client::YBClient& client() { return *client_future_.get(); }

const std::shared_future<client::YBClient*>& client_future_;
TransactionAbortController* const controller_;
TransactionStatusController* const controller_;
const DetectorId detector_id_;
const TabletId status_tablet_;
const std::string log_prefix_;

scoped_refptr<EventStats> deadlock_size_;
Expand All @@ -872,14 +858,11 @@ class DeadlockDetector::Impl : public std::enable_shared_from_this<DeadlockDetec
Waiters waiters_ GUARDED_BY(mutex_);

std::atomic<uint32_t> seq_no_ = 0;

std::unordered_map<TransactionId, std::pair<std::string, CoarseTimePoint>, TransactionIdHash>
recently_deadlocked_txns_info_ GUARDED_BY(mutex_);
};

DeadlockDetector::DeadlockDetector(
const std::shared_future<client::YBClient*>& client_future,
TransactionAbortController* controller,
TransactionStatusController* controller,
const TabletId& status_tablet_id,
const MetricEntityPtr& metrics):
impl_(new Impl(client_future, controller, status_tablet_id, metrics)) {}
Expand All @@ -904,10 +887,6 @@ void DeadlockDetector::TriggerProbes() {
return impl_->TriggerProbes();
}

Status DeadlockDetector::GetTransactionDeadlockStatus(const TransactionId& txn_id) {
return impl_->GetTransactionDeadlockStatus(txn_id);
}

void DeadlockDetector::Shutdown() {
return impl_->Shutdown();
}
Expand Down
11 changes: 3 additions & 8 deletions src/yb/docdb/deadlock_detector.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,12 @@ typedef boost::multi_index_container<WaiterInfoEntry,

// Specification used by the deadlock detector to interact with the transaction coordinator to abort
// transactions or determine which transactions are no longer running.
class TransactionAbortController {
class TransactionStatusController {
public:
virtual void Abort(const TransactionId& transaction_id, TransactionStatusCallback callback) = 0;
virtual void RemoveInactiveTransactions(Waiters* waiters) = 0;
virtual bool IsAnySubtxnActive(const TransactionId& transaction_id,
const SubtxnSet& subtxn_set) = 0;
virtual ~TransactionAbortController() = default;
virtual ~TransactionStatusController() = default;
};

using DeadlockDetectorRpcCallback = std::function<void(const Status&)>;
Expand Down Expand Up @@ -169,7 +168,7 @@ class DeadlockDetector {
public:
DeadlockDetector(
const std::shared_future<client::YBClient*>& client_future,
TransactionAbortController* controller,
TransactionStatusController* controller,
const TabletId& status_tablet_id,
const MetricEntityPtr& metrics);

Expand All @@ -189,10 +188,6 @@ class DeadlockDetector {

void Shutdown();

// Returns the deadlock status if the given transaction could have been involved in a deadlock.
// Returns Status::OK() in all other cases.
Status GetTransactionDeadlockStatus(const TransactionId& txn_id);

private:
class Impl;
std::shared_ptr<Impl> impl_;
Expand Down
Loading

0 comments on commit bbd5946

Please sign in to comment.