Skip to content

Commit

Permalink
[#2571] Fix clock update during commit with committing transaction
Browse files Browse the repository at this point in the history
Summary:
During conflict resolution committing transaction has commit time HybridTime::kMax.
And we use this time to update clock after resolving operation conflicts.

This diff fixes issue by ignoring max hybrid time for clock update.

Test Plan: ybd --cxx-test ql-transaction-test --gtest_filter QLTransactionTest.MixedWriteConflicts

Reviewers: amitanand, mikhail, timur

Reviewed By: timur

Subscribers: ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D7779
  • Loading branch information
spolitov committed Jan 11, 2020
1 parent 4f3c8c0 commit c053ba3
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 23 deletions.
78 changes: 57 additions & 21 deletions src/yb/client/ql-transaction-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ DECLARE_bool(delete_intents_sst_files);
namespace yb {
namespace client {

struct WriteConflictsOptions {
bool do_restarts = false;
size_t active_transactions = 50;
size_t total_keys = 5;
bool non_txn_writes = false;
};

class QLTransactionTest : public TransactionTestBase {
protected:
void SetUp() override {
Expand All @@ -77,7 +84,7 @@ class QLTransactionTest : public TransactionTestBase {
// Otherwise second transaction would see pending intents from first one and should not restart.
void TestReadRestart(bool commit = true);

void TestWriteConflicts(bool do_restarts);
void TestWriteConflicts(const WriteConflictsOptions& options);

void TestReadOnlyTablets(IsolationLevel isolation_level,
bool perform_write,
Expand Down Expand Up @@ -596,24 +603,26 @@ TEST_F(QLTransactionTest, ReadOnlyTablets) {
true /* written_intents_expected */);
}

void QLTransactionTest::TestWriteConflicts(bool do_restarts) {
void QLTransactionTest::TestWriteConflicts(const WriteConflictsOptions& options) {
struct ActiveTransaction {
YBTransactionPtr transaction;
YBSessionPtr session;
std::future<Status> flush_future;
std::future<Status> commit_future;

std::string ToString() const {
return transaction ? transaction->ToString() : "no-txn";
}
};

constexpr size_t kActiveTransactions = 50;
constexpr auto kTestTime = 60s;
constexpr int kTotalKeys = 5;
std::vector<ActiveTransaction> active_transactions;

auto stop = std::chrono::steady_clock::now() + kTestTime;

std::thread restart_thread;

if (do_restarts) {
if (options.do_restarts) {
restart_thread = std::thread([this, stop] {
CDSAttacher attacher;
int it = 0;
Expand All @@ -626,7 +635,7 @@ void QLTransactionTest::TestWriteConflicts(bool do_restarts) {

int value = 0;
size_t tries = 0;
size_t committed = 0;
size_t written = 0;
size_t flushed = 0;
for (;;) {
auto expired = std::chrono::steady_clock::now() >= stop;
Expand All @@ -636,19 +645,23 @@ void QLTransactionTest::TestWriteConflicts(bool do_restarts) {
}
LOG(INFO) << "Time expired, remaining transactions: " << active_transactions.size();
for (const auto& txn : active_transactions) {
LOG(INFO) << "TXN: " << txn.transaction->ToString() << ", "
LOG(INFO) << "TXN: " << txn.ToString() << ", "
<< (!txn.commit_future.valid() ? "Flushing" : "Committing");
}
}
while (!expired && active_transactions.size() < kActiveTransactions) {
auto key = RandomUniformInt(1, kTotalKeys);
while (!expired && active_transactions.size() < options.active_transactions) {
auto key = RandomUniformInt<size_t>(1, options.total_keys);
ActiveTransaction active_txn;
active_txn.transaction = CreateTransaction();
if (!options.non_txn_writes || RandomUniformBool()) {
active_txn.transaction = CreateTransaction();
}
active_txn.session = CreateSession(active_txn.transaction);
const auto op = table_.NewInsertOp();
auto* const req = op->mutable_request();
QLAddInt32HashValue(req, key);
table_.AddInt32ColumnValue(req, kValueColumn, ++value);
const auto val = ++value;
table_.AddInt32ColumnValue(req, kValueColumn, val);
LOG(INFO) << "TXN: " << active_txn.ToString() << " write " << key << " = " << val;
ASSERT_OK(active_txn.session->Apply(op));
active_txn.flush_future = active_txn.session->FlushFuture();

Expand All @@ -658,23 +671,30 @@ void QLTransactionTest::TestWriteConflicts(bool do_restarts) {

auto w = active_transactions.begin();
for (auto i = active_transactions.begin(); i != active_transactions.end(); ++i) {
const auto txn_id = i->ToString();
if (!i->commit_future.valid()) {
if (i->flush_future.wait_for(0s) == std::future_status::ready) {
auto flush_status = i->flush_future.get();
if (!flush_status.ok()) {
LOG(INFO) << "Flush failed: " << flush_status;
LOG(INFO) << "TXN: " << txn_id << ", flush failed: " << flush_status;
continue;
}
++flushed;
LOG(INFO) << "TXN: " << txn_id << ", flushed";
if (!i->transaction) {
++written;
continue;
}
i->commit_future = i->transaction->CommitFuture();
}
} else if (i->commit_future.wait_for(0s) == std::future_status::ready) {
auto commit_status = i->commit_future.get();
if (!commit_status.ok()) {
LOG(INFO) << "Commit failed: " << commit_status;
LOG(INFO) << "TXN: " << txn_id << ", commit failed: " << commit_status;
continue;
}
++committed;
LOG(INFO) << "TXN: " << txn_id << ", committed";
++written;
continue;
}

Expand All @@ -688,24 +708,40 @@ void QLTransactionTest::TestWriteConflicts(bool do_restarts) {
std::this_thread::sleep_for(expired ? 1s : 100ms);
}

if (do_restarts) {
if (options.do_restarts) {
restart_thread.join();
}

LOG(INFO) << "Committed: " << committed << ", flushed: " << flushed << ", tries: " << tries;
LOG(INFO) << "Written: " << written << ", flushed: " << flushed << ", tries: " << tries;

ASSERT_GE(committed, kTotalKeys);
ASSERT_GT(flushed, committed);
ASSERT_GT(flushed, kActiveTransactions);
ASSERT_GE(written, options.total_keys);
ASSERT_GT(flushed, written);
ASSERT_GT(flushed, options.active_transactions);
ASSERT_GT(tries, flushed);
}

TEST_F_EX(QLTransactionTest, WriteConflicts, QLTransactionBigLogSegmentSizeTest) {
TestWriteConflicts(false /* do_restarts */);
WriteConflictsOptions options = {
.do_restarts = false,
};
TestWriteConflicts(options);
}

TEST_F_EX(QLTransactionTest, WriteConflictsWithRestarts, QLTransactionBigLogSegmentSizeTest) {
TestWriteConflicts(true /* do_restarts */);
WriteConflictsOptions options = {
.do_restarts = true,
};
TestWriteConflicts(options);
}

TEST_F_EX(QLTransactionTest, MixedWriteConflicts, QLTransactionBigLogSegmentSizeTest) {
WriteConflictsOptions options = {
.do_restarts = false,
.active_transactions = 3,
.total_keys = 1,
.non_txn_writes = true,
};
TestWriteConflicts(options);
}

TEST_F(QLTransactionTest, ResolveIntentsWriteReadUpdateRead) {
Expand Down
4 changes: 3 additions & 1 deletion src/yb/docdb/conflict_resolution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,9 @@ class OperationConflictResolverContext : public ConflictResolverContext {

CHECKED_STATUS CheckConflictWithCommitted(
const TransactionId& id, HybridTime commit_time) override {
resolution_ht_.MakeAtLeast(commit_time);
if (commit_time != HybridTime::kMax) {
resolution_ht_.MakeAtLeast(commit_time);
}
return Status::OK();
}

Expand Down
8 changes: 7 additions & 1 deletion src/yb/server/hybrid_clock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ void HybridClock::NowWithError(HybridTime *hybrid_time, uint64_t *max_error_usec
HybridClockComponents current_components = components_.load(boost::memory_order_acquire);
HybridClockComponents new_components = { now->time_point, 1 };

VLOG(4) << __func__ << ", new: " << new_components.ToString() << ", current: "
<< current_components.ToString();

// Loop over the check in case of concurrent updates making the CAS fail.
while (now->time_point > current_components.last_usec) {
if (components_.compare_exchange_weak(current_components, new_components)) {
Expand Down Expand Up @@ -203,6 +206,9 @@ void HybridClock::Update(const HybridTime& to_update) {
GetPhysicalValueMicros(to_update), GetLogicalValue(to_update) + 1
};

VLOG(4) << __func__ << ", new: " << new_components.ToString() << ", current: "
<< current_components.ToString();

new_components.HandleLogicalComponentOverflow();

// Keep trying to CAS until it works or until HT has advanced past this update.
Expand All @@ -226,7 +232,7 @@ uint64_t HybridClock::ErrorForMetrics() {

void HybridClock::HybridClockComponents::HandleLogicalComponentOverflow() {
if (logical > HybridTime::kLogicalBitMask) {
static constexpr uint64_t kMaxOverflowValue = 1 << HybridTime::kBitsForLogicalComponent;
static constexpr uint64_t kMaxOverflowValue = 1ULL << HybridTime::kBitsForLogicalComponent;
if (logical > kMaxOverflowValue) {
LOG(FATAL) << "Logical component is too high: last_usec=" << last_usec
<< "logical=" << logical << ", max allowed is " << kMaxOverflowValue;
Expand Down
4 changes: 4 additions & 0 deletions src/yb/server/hybrid_clock.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ class HybridClock : public Clock {
}

void HandleLogicalComponentOverflow();

std::string ToString() const {
return Format("{ last_usec: $0 logical: $1 }", last_usec, logical);
}
};

enum State {
Expand Down
4 changes: 4 additions & 0 deletions src/yb/util/physical_time.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ using MicrosTime = uint64_t;
struct PhysicalTime {
MicrosTime time_point;
MicrosTime max_error;

std::string ToString() const {
return Format("{ time_point: $0 max_error: $1 }", time_point, max_error);
}
};

class PhysicalClock {
Expand Down

0 comments on commit c053ba3

Please sign in to comment.