Skip to content

Commit

Permalink
[#1102]: Improve stability of QLTransactionTest.IntentsCleanupAfterRe…
Browse files Browse the repository at this point in the history
…start

Summary:
The following steps were done to improve stability of QLTransactionTest.IntentsCleanupAfterRestart:
1) Use public IP for web server in mini cluster. So it would not try to resolve FQDN, that takes a lot of time in case of failure.
2) Decrease required compacted bytes to 5Kb, since sometimes only 7-8Kb are compacted, even when test succeeds.

Also add/improve log prefix for several classes, so it have both tablet and peer UUIDs.

Test Plan: ybd debug --gtest_filter QLTransactionTest.IntentsCleanupAfterRestart -n 20

Reviewers: mikhail, alex, timur

Reviewed By: timur

Subscribers: bogdan, bharat, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D6419
  • Loading branch information
spolitov committed Apr 3, 2019
1 parent f810b78 commit 90680ca
Show file tree
Hide file tree
Showing 18 changed files with 111 additions and 74 deletions.
2 changes: 1 addition & 1 deletion src/yb/client/ql-stress-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ void QLStressTest::VerifyFlushedFrontiers() {

rocksdb::Options options;

InitRocksDBOptions(&options, "test_tablet", nullptr, TabletOptions());
InitRocksDBOptions(&options, "", nullptr, TabletOptions());
std::unique_ptr<rocksdb::DB> checkpoint_db;
rocksdb::DB* checkpoint_db_raw_ptr = nullptr;

Expand Down
15 changes: 9 additions & 6 deletions src/yb/client/ql-transaction-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class QLTransactionTest : public TransactionTestBase {

CHECKED_STATUS WaitTransactionsCleaned() {
return WaitFor(
[this] { return CountTransactions() == 0; }, kTransactionApplyTime, "Transactions cleaned");
[this] { return !HasTransactions(); }, kTransactionApplyTime, "Transactions cleaned");
}
};

Expand Down Expand Up @@ -421,7 +421,7 @@ TEST_F(QLTransactionTest, Expire) {
latch.Wait();
std::this_thread::sleep_for(std::chrono::microseconds(FLAGS_transaction_heartbeat_usec * 2));
ASSERT_OK(cluster_->CleanTabletLogs());
ASSERT_EQ(0, CountTransactions());
ASSERT_FALSE(HasTransactions());
}

TEST_F(QLTransactionTest, PreserveLogs) {
Expand Down Expand Up @@ -455,7 +455,7 @@ TEST_F(QLTransactionTest, ResendApplying) {
DisableApplyingIntents();
WriteData();
std::this_thread::sleep_for(5s); // Transaction should not be applied here.
ASSERT_NE(0, CountTransactions());
ASSERT_TRUE(HasTransactions());

SetIgnoreApplyingProbability(0.0);

Expand Down Expand Up @@ -782,9 +782,12 @@ TEST_F_EX(QLTransactionTest, IntentsCleanupAfterRestart, QLTransactionTestWithDi
for (int row = 0; row != kNumRows; ++row) {
ASSERT_OK(WriteRow(session, i * kNumRows + row, row));
}
txn->Abort();

ASSERT_OK(cluster_->FlushTablets(tablet::FlushMode::kAsync));

// Need some time for flush to be initiated.
std::this_thread::sleep_for(100ms);

txn->Abort();
}

ASSERT_OK(WaitTransactionsCleaned());
Expand All @@ -810,7 +813,7 @@ TEST_F_EX(QLTransactionTest, IntentsCleanupAfterRestart, QLTransactionTestWithDi
}
LOG(INFO) << "Compact read bytes: " << bytes;

return bytes >= 10_KB;
return bytes >= 5_KB;
}, 10s, "Enough compactions happen"));
}

Expand Down
15 changes: 9 additions & 6 deletions src/yb/client/txn-test-base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void TransactionTestBase::SetUp() {
server::SkewedClock::Register();
FLAGS_time_source = server::SkewedClock::kName;
FLAGS_load_balancer_max_concurrent_adds = 100;
KeyValueTableTest::SetUp();
ASSERT_NO_FATALS(KeyValueTableTest::SetUp());

CreateTable(Transactional::kTrue);

Expand Down Expand Up @@ -218,20 +218,23 @@ void TransactionTestBase::VerifyData(
}
}

size_t TransactionTestBase::CountTransactions() {
size_t result = 0;
bool TransactionTestBase::HasTransactions() {
for (int i = 0; i != cluster_->num_tablet_servers(); ++i) {
auto* tablet_manager = cluster_->mini_tablet_server(i)->server()->tablet_manager();
auto peers = tablet_manager->GetTabletPeers();
for (const auto& peer : peers) {
if (!peer->consensus()) {
return true; // Report true, since we could have transactions on this non ready peer.
}
if (peer->consensus()->GetLeaderStatus() !=
consensus::LeaderStatus::NOT_LEADER &&
peer->tablet()->transaction_coordinator()) {
result += peer->tablet()->transaction_coordinator()->test_count_transactions();
peer->tablet()->transaction_coordinator() &&
peer->tablet()->transaction_coordinator()->test_count_transactions()) {
return true;
}
}
}
return result;
return false;
}

size_t TransactionTestBase::CountIntents() {
Expand Down
2 changes: 1 addition & 1 deletion src/yb/client/txn-test-base.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class TransactionTestBase : public KeyValueTableTest {
void VerifyData(size_t num_transactions = 1, const WriteOpType op_type = WriteOpType::INSERT,
const std::string& column = kValueColumn);

size_t CountTransactions();
bool HasTransactions();

size_t CountIntents();

Expand Down
1 change: 1 addition & 0 deletions src/yb/consensus/consensus_peers-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class ConsensusPeersTest : public YBTest {
fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root"), "tserver_test"));

ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
ASSERT_OK(fs_manager_->Open());
ASSERT_OK(Log::Open(options_,
fs_manager_.get(),
kTabletId,
Expand Down
82 changes: 43 additions & 39 deletions src/yb/consensus/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ class Log::Appender {
// 'OnFailure()' method.
void Shutdown();

const std::string& LogPrefix() const {
return log_->LogPrefix();
}

private:
// Process the given log entry batch or does a sync if a null is passed.
void ProcessBatch(LogEntryBatch* entry_batch);
Expand All @@ -201,7 +205,7 @@ Log::Appender::Appender(Log *log, ThreadPool* append_thread_pool)
}

Status Log::Appender::Init() {
VLOG(1) << "Starting log task stream for tablet " << log_->tablet_id();
VLOG_WITH_PREFIX(1) << "Starting log task stream";
return Status::OK();
}

Expand All @@ -225,8 +229,7 @@ void Log::Appender::ProcessBatch(LogEntryBatch* entry_batch) {
Status s = log_->DoAppend(entry_batch);

if (PREDICT_FALSE(!s.ok())) {
LOG(ERROR) << "Error appending to the log: " << s.ToString();
DLOG(FATAL) << "Aborting: " << s.ToString();
LOG_WITH_PREFIX(DFATAL) << "Error appending to the log: " << s;
entry_batch->set_failed_to_append();
// TODO If a single operation fails to append, should we abort all subsequent operations
// in this batch or allow them to be appended? What about operations in future batches?
Expand Down Expand Up @@ -267,16 +270,15 @@ void Log::Appender::GroupWork() {

Status s = log_->Sync();
if (PREDICT_FALSE(!s.ok())) {
LOG(ERROR) << "Error syncing log" << s.ToString();
DLOG(FATAL) << "Aborting: " << s.ToString();
LOG_WITH_PREFIX(DFATAL) << "Error syncing log: " << s;
for (std::unique_ptr<LogEntryBatch>& entry_batch : sync_batch_) {
if (!entry_batch->callback().is_null()) {
entry_batch->callback().Run(s);
}
}
} else {
TRACE_EVENT0("log", "Callbacks");
VLOG(2) << "Synchronized " << sync_batch_.size() << " entry batches";
VLOG_WITH_PREFIX(2) << "Synchronized " << sync_batch_.size() << " entry batches";
SCOPED_WATCH_STACK(FLAGS_consensus_log_scoped_watch_delay_callback_threshold_ms);
for (std::unique_ptr<LogEntryBatch>& entry_batch : sync_batch_) {
if (PREDICT_TRUE(!entry_batch->failed_to_append() && !entry_batch->callback().is_null())) {
Expand All @@ -288,15 +290,15 @@ void Log::Appender::GroupWork() {
}
sync_batch_.clear();
}
VLOG(1) << "Exiting AppendTask for tablet " << log_->tablet_id();
VLOG_WITH_PREFIX(1) << "Exiting AppendTask for tablet " << log_->tablet_id();
}

void Log::Appender::Shutdown() {
std::lock_guard<std::mutex> lock_guard(lock_);
if (task_stream_) {
VLOG(1) << "Shutting down log task stream for tablet " << log_->tablet_id();
VLOG_WITH_PREFIX(1) << "Shutting down log task stream";
task_stream_->Stop();
VLOG(1) << "Log append task stream for tablet " << log_->tablet_id() << " is shut down";
VLOG_WITH_PREFIX(1) << "Log append task stream is shut down";
task_stream_.reset();
}
}
Expand Down Expand Up @@ -361,7 +363,8 @@ Log::Log(LogOptions options, FsManager* fs_manager, string log_path,
sync_disabled_(false),
allocation_state_(kAllocationNotStarted),
metric_entity_(metric_entity),
on_disk_size_(0) {
on_disk_size_(0),
log_prefix_(Format("T $0 P $1: ", tablet_id_, fs_manager->uuid())) {
CHECK_OK(ThreadPoolBuilder("log-alloc").set_max_threads(1).Build(&allocation_pool_));
if (metric_entity_) {
metrics_.reset(new LogMetrics(metric_entity_));
Expand All @@ -384,8 +387,8 @@ Status Log::Init() {
// The case where we are continuing an existing log. We must pick up where the previous WAL left
// off in terms of sequence numbers.
if (reader_->num_segments() != 0) {
VLOG(1) << "Using existing " << reader_->num_segments()
<< " segments from path: " << tablet_wal_path_;
VLOG_WITH_PREFIX(1) << "Using existing " << reader_->num_segments()
<< " segments from path: " << tablet_wal_path_;

vector<scoped_refptr<ReadableLogSegment> > segments;
RETURN_NOT_OK(reader_->GetSegmentsSnapshot(&segments));
Expand Down Expand Up @@ -424,11 +427,11 @@ Status Log::AsyncAllocateSegment() {

Status Log::CloseCurrentSegment() {
if (!footer_builder_.has_min_replicate_index()) {
VLOG(1) << "Writing a segment without any REPLICATE message. "
"Segment: " << active_segment_->path();
VLOG_WITH_PREFIX(1) << "Writing a segment without any REPLICATE message. Segment: "
<< active_segment_->path();
}
VLOG(2) << "Segment footer for " << active_segment_->path()
<< ": " << footer_builder_.ShortDebugString();
VLOG_WITH_PREFIX(2) << "Segment footer for " << active_segment_->path()
<< ": " << footer_builder_.ShortDebugString();

footer_builder_.set_close_timestamp_micros(GetCurrentTimeMicros());
return active_segment_->WriteFooterAndClose(footer_builder_);
Expand All @@ -447,7 +450,7 @@ Status Log::RollOver() {

RETURN_NOT_OK(SwitchToAllocatedSegment());

LOG(INFO) << "Rolled over to a new segment: " << active_segment_->path();
LOG_WITH_PREFIX(INFO) << "Rolled over to a new segment: " << active_segment_->path();
return Status::OK();
}

Expand Down Expand Up @@ -536,8 +539,8 @@ Status Log::DoAppend(LogEntryBatch* entry_batch, bool caller_owns_operation) {
// if the size of this entry overflows the current segment, get a new one
if (allocation_state() == kAllocationNotStarted) {
if ((active_segment_->Size() + entry_batch_bytes + 4) > cur_max_segment_size_) {
LOG(INFO) << "Max segment size " << cur_max_segment_size_ << " reached. "
<< "Starting new segment allocation. ";
LOG_WITH_PREFIX(INFO) << "Max segment size " << cur_max_segment_size_ << " reached. "
<< "Starting new segment allocation. ";
RETURN_NOT_OK(AsyncAllocateSegment());
if (!options_.async_preallocate_segments) {
LOG_SLOW_EXECUTION(WARNING, 50, "Log roll took a long time") {
Expand All @@ -550,7 +553,7 @@ Status Log::DoAppend(LogEntryBatch* entry_batch, bool caller_owns_operation) {
RETURN_NOT_OK(RollOver());
}
} else {
VLOG(1) << "Segment allocation already in progress...";
VLOG_WITH_PREFIX(1) << "Segment allocation already in progress...";
}

int64_t start_offset = active_segment_->written_offset();
Expand Down Expand Up @@ -652,8 +655,7 @@ Status Log::Sync() {
int sleep_ms = r.Normal(GetAtomicFlag(&FLAGS_log_inject_latency_ms_mean),
GetAtomicFlag(&FLAGS_log_inject_latency_ms_stddev));
if (sleep_ms > 0) {
LOG(INFO) << "T " << tablet_id_ << ": Injecting "
<< sleep_ms << "ms of latency in Log::Sync()";
LOG_WITH_PREFIX(INFO) << "Injecting " << sleep_ms << "ms of latency in Log::Sync()";
SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
}
}
Expand Down Expand Up @@ -701,15 +703,15 @@ Status Log::GetSegmentsToGCUnlocked(int64_t min_op_idx, SegmentSequence* segment

int max_to_delete = std::max(reader_->num_segments() - FLAGS_log_min_segments_to_retain, 0);
if (segments_to_gc->size() > max_to_delete) {
VLOG(2) << "GCing " << segments_to_gc->size() << " in " << log_dir_
VLOG_WITH_PREFIX(2)
<< "GCing " << segments_to_gc->size() << " in " << log_dir_
<< " would not leave enough remaining segments to satisfy minimum "
<< "retention requirement. Only considering "
<< max_to_delete << "/" << reader_->num_segments();
segments_to_gc->resize(max_to_delete);
} else if (segments_to_gc->size() < max_to_delete) {
int extra_segments = max_to_delete - segments_to_gc->size();
VLOG(2) << tablet_id_ << " has too many log segments, need to GC "
<< extra_segments << " more. ";
VLOG_WITH_PREFIX(2) << "Too many log segments, need to GC " << extra_segments << " more.";
}

// Don't GC segments that are newer than the configured time-based retention.
Expand All @@ -724,8 +726,9 @@ Status Log::GetSegmentsToGCUnlocked(int64_t min_op_idx, SegmentSequence* segment

int64_t age_seconds = (now - segment->footer().close_timestamp_micros()) / 1000000;
if (age_seconds < FLAGS_log_min_seconds_to_retain) {
VLOG(2) << "Segment " << segment->path() << " is only " << age_seconds << "s old: "
<< "cannot GC it yet due to configured time-based retention policy.";
VLOG_WITH_PREFIX(2)
<< "Segment " << segment->path() << " is only " << age_seconds << "s old: "
<< "cannot GC it yet due to configured time-based retention policy.";
// Truncate the list of segments to GC here -- if this one is too new, then all later ones are
// also too new.
segments_to_gc->resize(i);
Expand Down Expand Up @@ -788,8 +791,8 @@ yb::OpId Log::WaitForSafeOpIdToApply(const yb::OpId& min_allowed) {
})) {
break;
}
LOG(DFATAL) << "Long wait for safe op id: " << min_allowed
<< ", passed: " << (CoarseMonoClock::Now() - start);
LOG_WITH_PREFIX(DFATAL) << "Long wait for safe op id: " << min_allowed
<< ", passed: " << (CoarseMonoClock::Now() - start);
}
}

Expand All @@ -801,8 +804,8 @@ yb::OpId Log::WaitForSafeOpIdToApply(const yb::OpId& min_allowed) {
Status Log::GC(int64_t min_op_idx, int32_t* num_gced) {
CHECK_GE(min_op_idx, 0);

LOG(INFO) << "Running Log GC on " << log_dir_ << ": retaining ops >= " << min_op_idx
<< ", log segment size = " << options_.segment_size_bytes;
LOG_WITH_PREFIX(INFO) << "Running Log GC on " << log_dir_ << ": retaining ops >= " << min_op_idx
<< ", log segment size = " << options_.segment_size_bytes;
VLOG_TIMING(1, "Log GC") {
SegmentSequence segments_to_delete;

Expand All @@ -813,7 +816,7 @@ Status Log::GC(int64_t min_op_idx, int32_t* num_gced) {
RETURN_NOT_OK(GetSegmentsToGCUnlocked(min_op_idx, &segments_to_delete));

if (segments_to_delete.size() == 0) {
VLOG(1) << "No segments to delete.";
VLOG_WITH_PREFIX(1) << "No segments to delete.";
*num_gced = 0;
return Status::OK();
}
Expand All @@ -826,8 +829,8 @@ Status Log::GC(int64_t min_op_idx, int32_t* num_gced) {
// Now that they are no longer referenced by the Log, delete the files.
*num_gced = 0;
for (const scoped_refptr<ReadableLogSegment>& segment : segments_to_delete) {
LOG(INFO) << "Deleting log segment in path: " << segment->path()
<< " (GCed ops < " << min_op_idx << ")";
LOG_WITH_PREFIX(INFO) << "Deleting log segment in path: " << segment->path()
<< " (GCed ops < " << min_op_idx << ")";
RETURN_NOT_OK(fs_manager_->env()->DeleteFile(segment->path()));
(*num_gced)++;
}
Expand Down Expand Up @@ -927,7 +930,7 @@ Status Log::Close() {
RETURN_NOT_OK(CloseCurrentSegment());
RETURN_NOT_OK(ReplaceSegmentInReaderUnlocked());
log_state_ = kLogClosed;
VLOG(1) << "Log closed";
VLOG_WITH_PREFIX(1) << "Log closed";

// Release FDs held by these objects.
log_index_.reset();
Expand All @@ -936,7 +939,7 @@ Status Log::Close() {
return Status::OK();

case kLogClosed:
VLOG(1) << "Log already closed";
VLOG_WITH_PREFIX(1) << "Log already closed";
return Status::OK();

default:
Expand All @@ -951,7 +954,8 @@ Status Log::DeleteOnDiskData(FsManager* fs_manager,
if (!env->FileExists(tablet_wal_path)) {
return Status::OK();
}
LOG(INFO) << Substitute("Deleting WAL dir $0 for tablet $1", tablet_wal_path, tablet_id);
LOG(INFO) << "T " << tablet_id << "P " << fs_manager->uuid()
<< ": Deleting WAL dir " << tablet_wal_path;
RETURN_NOT_OK_PREPEND(env->DeleteRecursively(tablet_wal_path),
"Unable to recursively delete WAL dir for tablet " + tablet_id);
return Status::OK();
Expand Down Expand Up @@ -1072,13 +1076,13 @@ Status Log::CreatePlaceholderSegment(const WritableFileOptions& opts,
string* result_path,
shared_ptr<WritableFile>* out) {
string path_tmpl = JoinPathSegments(log_dir_, kSegmentPlaceholderFileTemplate);
VLOG(2) << "Creating temp. file for place holder segment, template: " << path_tmpl;
VLOG_WITH_PREFIX(2) << "Creating temp. file for place holder segment, template: " << path_tmpl;
gscoped_ptr<WritableFile> segment_file;
RETURN_NOT_OK(fs_manager_->env()->NewTempWritableFile(opts,
path_tmpl,
result_path,
&segment_file));
VLOG(1) << "Created next WAL segment, placeholder path: " << *result_path;
VLOG_WITH_PREFIX(1) << "Created next WAL segment, placeholder path: " << *result_path;
out->reset(segment_file.release());
return Status::OK();
}
Expand Down
6 changes: 6 additions & 0 deletions src/yb/consensus/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ class Log : public RefCountedThreadSafe<Log> {

CHECKED_STATUS TEST_SubmitFuncToAppendToken(const std::function<void()>& func);

const std::string& LogPrefix() const {
return log_prefix_;
}

private:
friend class LogTest;
friend class LogTestBase;
Expand Down Expand Up @@ -446,6 +450,8 @@ class Log : public RefCountedThreadSafe<Log> {
// Used in tests to declare all operations as safe.
bool all_op_ids_safe_ = false;

const std::string log_prefix_;

DISALLOW_COPY_AND_ASSIGN(Log);
};

Expand Down
Loading

0 comments on commit 90680ca

Please sign in to comment.