Skip to content

Commit

Permalink
[#3220] Implement transaction sealing w/o abort
Browse files Browse the repository at this point in the history
Summary:
This diff implements transaction sealing w/o the ability to abort sealed transactions and intents apply.
(It will be addressed by upcoming diffs).

Added ability to request transaction status at the participant, i.e. number of replicated batches, etc.

Test Plan: ybd debug --cxx-test seal-txn-test

Reviewers: mikhail

Reviewed By: mikhail

Subscribers: ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D7848
  • Loading branch information
spolitov committed Feb 19, 2020
1 parent 6dda063 commit d0c2f48
Show file tree
Hide file tree
Showing 32 changed files with 912 additions and 357 deletions.
7 changes: 7 additions & 0 deletions src/yb/client/batcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,19 @@ CoarseTimePoint Batcher::ComputeDeadlineUnlocked() const {
}

void Batcher::FlushAsync(StatusFunctor callback) {
size_t operations_count;
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
CHECK_EQ(state_, BatcherState::kGatheringOps);
state_ = BatcherState::kResolvingTablets;
flush_callback_ = std::move(callback);
deadline_ = ComputeDeadlineUnlocked();
operations_count = ops_.size();
}

auto transaction = this->transaction();
if (transaction) {
transaction->ExpectOperations(operations_count);
}

// In the case that we have nothing buffered, just call the callback
Expand Down
1 change: 1 addition & 0 deletions src/yb/client/ql-transaction-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ DECLARE_int32(log_min_seconds_to_retain);
DECLARE_uint64(max_clock_skew_usec);
DECLARE_bool(transaction_allow_rerequest_status_in_tests);
DECLARE_uint64(transaction_delay_status_reply_usec_in_tests);
DECLARE_bool(enable_load_balancing);
DECLARE_bool(flush_rocksdb_on_shutdown);
DECLARE_bool(transaction_disable_proactive_cleanup_in_tests);
DECLARE_uint64(aborted_intent_cleanup_ms);
Expand Down
39 changes: 39 additions & 0 deletions src/yb/client/seal-txn-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@

#include "yb/tablet/tablet_peer.h"

#include "yb/tserver/mini_tablet_server.h"
#include "yb/tserver/tablet_server.h"

using namespace std::literals;

DECLARE_int32(TEST_write_rejection_percentage);
DECLARE_bool(TEST_fail_on_replicated_batch_idx_set_in_txn_record);
DECLARE_bool(enable_load_balancing);
DECLARE_bool(enable_transaction_sealing);

namespace yb {
Expand Down Expand Up @@ -101,5 +107,38 @@ TEST_F(SealTxnTest, NumBatchesDisable) {
ASSERT_OK(txn->CommitFuture().get());
}

TEST_F(SealTxnTest, Simple) {
auto txn = CreateTransaction();
auto session = CreateSession(txn);
WriteRows(session, /* transaction = */ 0, WriteOpType::INSERT, Flush::kFalse);
auto flush_future = session->FlushFuture();
auto commit_future = txn->CommitFuture(CoarseTimePoint(), SealOnly::kTrue);
ASSERT_OK(flush_future.get());
LOG(INFO) << "Flushed: " << txn->id();
ASSERT_OK(commit_future.get());
LOG(INFO) << "Committed: " << txn->id();
ASSERT_NO_FATALS(VerifyData());
ASSERT_OK(cluster_->RestartSync());
CheckNoRunningTransactions();
}

TEST_F(SealTxnTest, Update) {
auto txn = CreateTransaction();
auto session = CreateSession(txn);
LOG(INFO) << "Inserting rows";
WriteRows(session, /* transaction = */ 0);
LOG(INFO) << "Updating rows";
WriteRows(session, /* transaction = */ 0, WriteOpType::UPDATE, Flush::kFalse);
auto flush_future = session->FlushFuture();
auto commit_future = txn->CommitFuture(CoarseTimePoint(), SealOnly::kTrue);
ASSERT_OK(flush_future.get());
LOG(INFO) << "Flushed: " << txn->id();
ASSERT_OK(commit_future.get());
LOG(INFO) << "Committed: " << txn->id();
ASSERT_NO_FATALS(VerifyData(1, WriteOpType::UPDATE));
ASSERT_OK(cluster_->RestartSync());
CheckNoRunningTransactions();
}

} // namespace client
} // namespace yb
123 changes: 88 additions & 35 deletions src/yb/client/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ namespace client {
namespace {

YB_STRONGLY_TYPED_BOOL(Child);
YB_DEFINE_ENUM(TransactionState, (kRunning)(kAborted)(kCommitted)(kReleased));
YB_DEFINE_ENUM(TransactionState, (kRunning)(kAborted)(kCommitted)(kReleased)(kSealed));

} // namespace

Expand Down Expand Up @@ -228,10 +228,6 @@ class YBTransaction::Impl final {
}
}

if (initial) {
running_requests_ += ops.size();
}

if (defer) {
if (waiter) {
waiters_.push_back(std::move(waiter));
Expand Down Expand Up @@ -262,6 +258,11 @@ class YBTransaction::Impl final {
return true;
}

void ExpectOperations(size_t count) {
std::lock_guard<std::mutex> lock(mutex_);
running_requests_ += count;
}

void Flushed(
const internal::InFlightOps& ops, const ReadHybridTime& used_read_time,
const Status& status) {
Expand All @@ -272,6 +273,7 @@ class YBTransaction::Impl final {
std::this_thread::sleep_for(FLAGS_TEST_transaction_inject_flushed_delay_ms * 1ms);
}

boost::optional<Status> notify_commit_status;
bool abort = false;
{
std::lock_guard<std::mutex> lock(mutex_);
Expand Down Expand Up @@ -316,42 +318,54 @@ class YBTransaction::Impl final {
}
SetError(status, &lock);
}

if (running_requests_ == 0 && commit_replicated_) {
notify_commit_status = status_;
}
}

if (notify_commit_status) {
VLOG_WITH_PREFIX(4) << "Sealing done: " << *notify_commit_status;
commit_callback_(*notify_commit_status);
}

if (abort && !child_) {
DoAbort(TransactionRpcDeadline(), transaction_->shared_from_this());
}
}

void Commit(CoarseTimePoint deadline, CommitCallback callback) {
void Commit(CoarseTimePoint deadline, SealOnly seal_only, CommitCallback callback) {
auto transaction = transaction_->shared_from_this();
{
std::unique_lock<std::mutex> lock(mutex_);
auto status = CheckCouldCommit(&lock);
auto status = CheckCouldCommit(seal_only, &lock);
if (!status.ok()) {
callback(status);
return;
}
state_.store(TransactionState::kCommitted, std::memory_order_release);
state_.store(seal_only ? TransactionState::kSealed : TransactionState::kCommitted,
std::memory_order_release);
commit_callback_ = std::move(callback);
if (!ready_) {
// If we have not written any intents and do not even have a transaction status tablet,
// just report the transaction as committed.
//
// See https://github.com/yugabyte/yugabyte-db/issues/3105 for details -- we might be able
// to remove this special case if it turns out there is a bug elsewhere.
if (tablets_.empty()) {
if (tablets_.empty() && running_requests_ == 0) {
VLOG_WITH_PREFIX(4) << "Committed empty transaction";
commit_callback_(Status::OK());
return;
}

waiters_.emplace_back(std::bind(&Impl::DoCommit, this, deadline, _1, transaction));
waiters_.emplace_back(std::bind(
&Impl::DoCommit, this, deadline, seal_only, _1, transaction));
lock.unlock();
RequestStatusTablet(deadline);
return;
}
}
DoCommit(deadline, Status::OK(), transaction);
DoCommit(deadline, seal_only, Status::OK(), transaction);
}

void Abort(CoarseTimePoint deadline) {
Expand Down Expand Up @@ -455,6 +469,7 @@ class YBTransaction::Impl final {
for (const auto& tablet : tablets_) {
auto& out = *tablets.Add();
out.set_tablet_id(tablet.first);
out.set_num_batches(tablet.second.num_batches);
out.set_metadata_state(
tablet.second.has_metadata ? InvolvedTabletMetadataState::EXIST
: InvolvedTabletMetadataState::MISSING);
Expand Down Expand Up @@ -556,7 +571,7 @@ class YBTransaction::Impl final {

CHECKED_STATUS CheckRunning(std::unique_lock<std::mutex>* lock) {
if (state_.load(std::memory_order_acquire) != TransactionState::kRunning) {
auto status = error_;
auto status = status_;
lock->unlock();
if (status.ok()) {
status = STATUS(IllegalState, "Transaction already completed");
Expand All @@ -567,18 +582,22 @@ class YBTransaction::Impl final {
}

void DoCommit(
CoarseTimePoint deadline, const Status& status, const YBTransactionPtr& transaction) {
CoarseTimePoint deadline, SealOnly seal_only, const Status& status,
const YBTransactionPtr& transaction) {
VLOG_WITH_PREFIX(1)
<< Format("Commit, tablets: $0, status: $1", tablets_, status);
<< Format("Commit, seal_only: $0, tablets: $1, status: $2",
seal_only, tablets_, status);

if (!status.ok()) {
VLOG_WITH_PREFIX(4) << "Commit failed: " << status;
commit_callback_(status);
return;
}

// If we don't have any tablets that have intents written to them, just abort it.
// But notify caller that commit was successful, so it is transparent for him.
if (tablets_.empty()) {
VLOG_WITH_PREFIX(4) << "Committed empty";
DoAbort(deadline, transaction);
commit_callback_(Status::OK());
return;
Expand All @@ -589,10 +608,13 @@ class YBTransaction::Impl final {
req.set_propagated_hybrid_time(manager_->Now().ToUint64());
auto& state = *req.mutable_state();
state.set_transaction_id(metadata_.transaction_id.begin(), metadata_.transaction_id.size());
state.set_status(TransactionStatus::COMMITTED);
state.set_status(seal_only ? TransactionStatus::SEALED : TransactionStatus::COMMITTED);
state.mutable_tablets()->Reserve(tablets_.size());
for (const auto& tablet : tablets_) {
state.add_tablets(tablet.first);
if (seal_only) {
state.add_tablet_batches(tablet.second.num_batches);
}
}

manager_->rpcs().RegisterAndStart(
Expand Down Expand Up @@ -649,13 +671,24 @@ class YBTransaction::Impl final {
}

void CommitDone(const Status& status,
HybridTime propagated_hybrid_time,
const tserver::UpdateTransactionResponsePB& response,
const YBTransactionPtr& transaction) {
VLOG_WITH_PREFIX(1) << "Committed: " << status;

manager_->UpdateClock(propagated_hybrid_time);
UpdateClock(response, manager_);
manager_->rpcs().Unregister(&commit_handle_);
commit_callback_(status.IsAlreadyPresent() ? Status::OK() : status);

Status actual_status = status.IsAlreadyPresent() ? Status::OK() : status;
if (state_.load(std::memory_order_acquire) != TransactionState::kCommitted &&
actual_status.ok()) {
std::lock_guard<std::mutex> lock(mutex_);
commit_replicated_ = true;
if (running_requests_ != 0) {
return;
}
}
VLOG_WITH_PREFIX(4) << "Commit done: " << actual_status;
commit_callback_(actual_status);
}

void AbortDone(const Status& status,
Expand Down Expand Up @@ -768,17 +801,15 @@ class YBTransaction::Impl final {
}

auto current_state = state_.load(std::memory_order_acquire);
bool allow_heartbeat =
current_state == TransactionState::kRunning ||
(current_state == TransactionState::kReleased && status == TransactionStatus::CREATED);
if (!allow_heartbeat) {

if (!AllowHeartbeat(current_state, status)) {
VLOG_WITH_PREFIX(1) << " Send heartbeat cancelled: " << yb::ToString(transaction);
return;
}

if (status != TransactionStatus::CREATED &&
GetAtomicFlag(&FLAGS_transaction_disable_heartbeat_in_tests)) {
HeartbeatDone(Status::OK(), HybridTime::kInvalid, status, transaction);
HeartbeatDone(Status::OK(), tserver::UpdateTransactionResponsePB(), status, transaction);
return;
}

Expand All @@ -798,11 +829,25 @@ class YBTransaction::Impl final {
&heartbeat_handle_);
}

static bool AllowHeartbeat(TransactionState current_state, TransactionStatus status) {
switch (current_state) {
case TransactionState::kRunning:
return true;
case TransactionState::kReleased: FALLTHROUGH_INTENDED;
case TransactionState::kSealed:
return status == TransactionStatus::CREATED;
case TransactionState::kAborted: FALLTHROUGH_INTENDED;
case TransactionState::kCommitted:
return false;
}
FATAL_INVALID_ENUM_VALUE(TransactionState, current_state);
}

void HeartbeatDone(const Status& status,
HybridTime propagated_hybrid_time,
const tserver::UpdateTransactionResponsePB& response,
TransactionStatus transaction_status,
const YBTransactionPtr& transaction) {
manager_->UpdateClock(propagated_hybrid_time);
UpdateClock(response, manager_);
manager_->rpcs().Unregister(&heartbeat_handle_);

if (status.ok()) {
Expand Down Expand Up @@ -848,8 +893,8 @@ class YBTransaction::Impl final {
SetError(status, &new_lock);
return;
}
if (error_.ok()) {
error_ = status;
if (status_.ok()) {
status_ = status;
state_.store(TransactionState::kAborted, std::memory_order_release);
}
}
Expand All @@ -873,7 +918,7 @@ class YBTransaction::Impl final {
callback(data);
}

CHECKED_STATUS CheckCouldCommit(std::unique_lock<std::mutex>* lock) {
CHECKED_STATUS CheckCouldCommit(SealOnly seal_only, std::unique_lock<std::mutex>* lock) {
RETURN_NOT_OK(CheckRunning(lock));
if (child_) {
return STATUS(IllegalState, "Commit of child transaction is not allowed");
Expand All @@ -882,7 +927,7 @@ class YBTransaction::Impl final {
return STATUS(
IllegalState, "Commit of transaction that requires restart is not allowed");
}
if (running_requests_ > 0) {
if (!seal_only && running_requests_ > 0) {
return STATUS(IllegalState, "Commit of transaction with running requests");
}

Expand All @@ -907,7 +952,7 @@ class YBTransaction::Impl final {
bool child_had_read_time_ = false;
bool ready_ = false;
CommitCallback commit_callback_;
Status error_;
Status status_;
rpc::Rpcs::Handle heartbeat_handle_;
rpc::Rpcs::Handle commit_handle_;
rpc::Rpcs::Handle abort_handle_;
Expand All @@ -929,6 +974,8 @@ class YBTransaction::Impl final {
std::promise<TransactionMetadata> metadata_promise_;
std::shared_future<TransactionMetadata> metadata_future_;
size_t running_requests_ = 0;
// Set to true after commit record is replicated. Used only during transaction sealing.
bool commit_replicated_ = false;
};

CoarseTimePoint AdjustDeadline(CoarseTimePoint deadline) {
Expand Down Expand Up @@ -978,13 +1025,18 @@ bool YBTransaction::Prepare(const internal::InFlightOps& ops,
ops, force_consistent_read, deadline, initial, std::move(waiter), metadata);
}

void YBTransaction::ExpectOperations(size_t count) {
impl_->ExpectOperations(count);
}

void YBTransaction::Flushed(
const internal::InFlightOps& ops, const ReadHybridTime& used_read_time, const Status& status) {
impl_->Flushed(ops, used_read_time, status);
}

void YBTransaction::Commit(CoarseTimePoint deadline, CommitCallback callback) {
impl_->Commit(AdjustDeadline(deadline), std::move(callback));
void YBTransaction::Commit(
CoarseTimePoint deadline, SealOnly seal_only, CommitCallback callback) {
impl_->Commit(AdjustDeadline(deadline), seal_only, std::move(callback));
}

const TransactionId& YBTransaction::id() const {
Expand All @@ -1003,9 +1055,10 @@ ConsistentReadPoint& YBTransaction::read_point() {
return impl_->read_point();
}

std::future<Status> YBTransaction::CommitFuture(CoarseTimePoint deadline) {
return MakeFuture<Status>([this, deadline](auto callback) {
impl_->Commit(AdjustDeadline(deadline), std::move(callback));
std::future<Status> YBTransaction::CommitFuture(
CoarseTimePoint deadline, SealOnly seal_only) {
return MakeFuture<Status>([this, deadline, seal_only](auto callback) {
impl_->Commit(AdjustDeadline(deadline), seal_only, std::move(callback));
});
}

Expand Down
Loading

0 comments on commit d0c2f48

Please sign in to comment.