diff --git a/src/yb/client/ql-stress-test.cc b/src/yb/client/ql-stress-test.cc index 4d317b4deaad..45cbbc8555e4 100644 --- a/src/yb/client/ql-stress-test.cc +++ b/src/yb/client/ql-stress-test.cc @@ -523,7 +523,7 @@ void QLStressTest::VerifyFlushedFrontiers() { rocksdb::Options options; - InitRocksDBOptions(&options, "test_tablet", nullptr, TabletOptions()); + InitRocksDBOptions(&options, "", nullptr, TabletOptions()); std::unique_ptr checkpoint_db; rocksdb::DB* checkpoint_db_raw_ptr = nullptr; diff --git a/src/yb/client/ql-transaction-test.cc b/src/yb/client/ql-transaction-test.cc index a10f1f34376d..f4179c499f3a 100644 --- a/src/yb/client/ql-transaction-test.cc +++ b/src/yb/client/ql-transaction-test.cc @@ -75,7 +75,7 @@ class QLTransactionTest : public TransactionTestBase { CHECKED_STATUS WaitTransactionsCleaned() { return WaitFor( - [this] { return CountTransactions() == 0; }, kTransactionApplyTime, "Transactions cleaned"); + [this] { return !HasTransactions(); }, kTransactionApplyTime, "Transactions cleaned"); } }; @@ -421,7 +421,7 @@ TEST_F(QLTransactionTest, Expire) { latch.Wait(); std::this_thread::sleep_for(std::chrono::microseconds(FLAGS_transaction_heartbeat_usec * 2)); ASSERT_OK(cluster_->CleanTabletLogs()); - ASSERT_EQ(0, CountTransactions()); + ASSERT_FALSE(HasTransactions()); } TEST_F(QLTransactionTest, PreserveLogs) { @@ -455,7 +455,7 @@ TEST_F(QLTransactionTest, ResendApplying) { DisableApplyingIntents(); WriteData(); std::this_thread::sleep_for(5s); // Transaction should not be applied here. - ASSERT_NE(0, CountTransactions()); + ASSERT_TRUE(HasTransactions()); SetIgnoreApplyingProbability(0.0); @@ -782,9 +782,12 @@ TEST_F_EX(QLTransactionTest, IntentsCleanupAfterRestart, QLTransactionTestWithDi for (int row = 0; row != kNumRows; ++row) { ASSERT_OK(WriteRow(session, i * kNumRows + row, row)); } - txn->Abort(); - ASSERT_OK(cluster_->FlushTablets(tablet::FlushMode::kAsync)); + + // Need some time for flush to be initiated. + std::this_thread::sleep_for(100ms); + + txn->Abort(); } ASSERT_OK(WaitTransactionsCleaned()); @@ -810,7 +813,7 @@ TEST_F_EX(QLTransactionTest, IntentsCleanupAfterRestart, QLTransactionTestWithDi } LOG(INFO) << "Compact read bytes: " << bytes; - return bytes >= 10_KB; + return bytes >= 5_KB; }, 10s, "Enough compactions happen")); } diff --git a/src/yb/client/txn-test-base.cc b/src/yb/client/txn-test-base.cc index bc6f95d8e89b..e3e9f2a3c12a 100644 --- a/src/yb/client/txn-test-base.cc +++ b/src/yb/client/txn-test-base.cc @@ -95,7 +95,7 @@ void TransactionTestBase::SetUp() { server::SkewedClock::Register(); FLAGS_time_source = server::SkewedClock::kName; FLAGS_load_balancer_max_concurrent_adds = 100; - KeyValueTableTest::SetUp(); + ASSERT_NO_FATALS(KeyValueTableTest::SetUp()); CreateTable(Transactional::kTrue); @@ -218,20 +218,23 @@ void TransactionTestBase::VerifyData( } } -size_t TransactionTestBase::CountTransactions() { - size_t result = 0; +bool TransactionTestBase::HasTransactions() { for (int i = 0; i != cluster_->num_tablet_servers(); ++i) { auto* tablet_manager = cluster_->mini_tablet_server(i)->server()->tablet_manager(); auto peers = tablet_manager->GetTabletPeers(); for (const auto& peer : peers) { + if (!peer->consensus()) { + return true; // Report true, since we could have transactions on this non ready peer. + } if (peer->consensus()->GetLeaderStatus() != consensus::LeaderStatus::NOT_LEADER && - peer->tablet()->transaction_coordinator()) { - result += peer->tablet()->transaction_coordinator()->test_count_transactions(); + peer->tablet()->transaction_coordinator() && + peer->tablet()->transaction_coordinator()->test_count_transactions()) { + return true; } } } - return result; + return false; } size_t TransactionTestBase::CountIntents() { diff --git a/src/yb/client/txn-test-base.h b/src/yb/client/txn-test-base.h index a23c55e4f219..1824e1f01f94 100644 --- a/src/yb/client/txn-test-base.h +++ b/src/yb/client/txn-test-base.h @@ -81,7 +81,7 @@ class TransactionTestBase : public KeyValueTableTest { void VerifyData(size_t num_transactions = 1, const WriteOpType op_type = WriteOpType::INSERT, const std::string& column = kValueColumn); - size_t CountTransactions(); + bool HasTransactions(); size_t CountIntents(); diff --git a/src/yb/consensus/consensus_peers-test.cc b/src/yb/consensus/consensus_peers-test.cc index 0f7a755718dd..b34bfa0afa38 100644 --- a/src/yb/consensus/consensus_peers-test.cc +++ b/src/yb/consensus/consensus_peers-test.cc @@ -90,6 +90,7 @@ class ConsensusPeersTest : public YBTest { fs_manager_.reset(new FsManager(env_.get(), GetTestPath("fs_root"), "tserver_test")); ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout()); + ASSERT_OK(fs_manager_->Open()); ASSERT_OK(Log::Open(options_, fs_manager_.get(), kTabletId, diff --git a/src/yb/consensus/log.cc b/src/yb/consensus/log.cc index fcd80fe0e137..7cf73e5d15fc 100644 --- a/src/yb/consensus/log.cc +++ b/src/yb/consensus/log.cc @@ -175,6 +175,10 @@ class Log::Appender { // 'OnFailure()' method. void Shutdown(); + const std::string& LogPrefix() const { + return log_->LogPrefix(); + } + private: // Process the given log entry batch or does a sync if a null is passed. void ProcessBatch(LogEntryBatch* entry_batch); @@ -201,7 +205,7 @@ Log::Appender::Appender(Log *log, ThreadPool* append_thread_pool) } Status Log::Appender::Init() { - VLOG(1) << "Starting log task stream for tablet " << log_->tablet_id(); + VLOG_WITH_PREFIX(1) << "Starting log task stream"; return Status::OK(); } @@ -225,8 +229,7 @@ void Log::Appender::ProcessBatch(LogEntryBatch* entry_batch) { Status s = log_->DoAppend(entry_batch); if (PREDICT_FALSE(!s.ok())) { - LOG(ERROR) << "Error appending to the log: " << s.ToString(); - DLOG(FATAL) << "Aborting: " << s.ToString(); + LOG_WITH_PREFIX(DFATAL) << "Error appending to the log: " << s; entry_batch->set_failed_to_append(); // TODO If a single operation fails to append, should we abort all subsequent operations // in this batch or allow them to be appended? What about operations in future batches? @@ -267,8 +270,7 @@ void Log::Appender::GroupWork() { Status s = log_->Sync(); if (PREDICT_FALSE(!s.ok())) { - LOG(ERROR) << "Error syncing log" << s.ToString(); - DLOG(FATAL) << "Aborting: " << s.ToString(); + LOG_WITH_PREFIX(DFATAL) << "Error syncing log: " << s; for (std::unique_ptr& entry_batch : sync_batch_) { if (!entry_batch->callback().is_null()) { entry_batch->callback().Run(s); @@ -276,7 +278,7 @@ void Log::Appender::GroupWork() { } } else { TRACE_EVENT0("log", "Callbacks"); - VLOG(2) << "Synchronized " << sync_batch_.size() << " entry batches"; + VLOG_WITH_PREFIX(2) << "Synchronized " << sync_batch_.size() << " entry batches"; SCOPED_WATCH_STACK(FLAGS_consensus_log_scoped_watch_delay_callback_threshold_ms); for (std::unique_ptr& entry_batch : sync_batch_) { if (PREDICT_TRUE(!entry_batch->failed_to_append() && !entry_batch->callback().is_null())) { @@ -288,15 +290,15 @@ void Log::Appender::GroupWork() { } sync_batch_.clear(); } - VLOG(1) << "Exiting AppendTask for tablet " << log_->tablet_id(); + VLOG_WITH_PREFIX(1) << "Exiting AppendTask for tablet " << log_->tablet_id(); } void Log::Appender::Shutdown() { std::lock_guard lock_guard(lock_); if (task_stream_) { - VLOG(1) << "Shutting down log task stream for tablet " << log_->tablet_id(); + VLOG_WITH_PREFIX(1) << "Shutting down log task stream"; task_stream_->Stop(); - VLOG(1) << "Log append task stream for tablet " << log_->tablet_id() << " is shut down"; + VLOG_WITH_PREFIX(1) << "Log append task stream is shut down"; task_stream_.reset(); } } @@ -361,7 +363,8 @@ Log::Log(LogOptions options, FsManager* fs_manager, string log_path, sync_disabled_(false), allocation_state_(kAllocationNotStarted), metric_entity_(metric_entity), - on_disk_size_(0) { + on_disk_size_(0), + log_prefix_(Format("T $0 P $1: ", tablet_id_, fs_manager->uuid())) { CHECK_OK(ThreadPoolBuilder("log-alloc").set_max_threads(1).Build(&allocation_pool_)); if (metric_entity_) { metrics_.reset(new LogMetrics(metric_entity_)); @@ -384,8 +387,8 @@ Status Log::Init() { // The case where we are continuing an existing log. We must pick up where the previous WAL left // off in terms of sequence numbers. if (reader_->num_segments() != 0) { - VLOG(1) << "Using existing " << reader_->num_segments() - << " segments from path: " << tablet_wal_path_; + VLOG_WITH_PREFIX(1) << "Using existing " << reader_->num_segments() + << " segments from path: " << tablet_wal_path_; vector > segments; RETURN_NOT_OK(reader_->GetSegmentsSnapshot(&segments)); @@ -424,11 +427,11 @@ Status Log::AsyncAllocateSegment() { Status Log::CloseCurrentSegment() { if (!footer_builder_.has_min_replicate_index()) { - VLOG(1) << "Writing a segment without any REPLICATE message. " - "Segment: " << active_segment_->path(); + VLOG_WITH_PREFIX(1) << "Writing a segment without any REPLICATE message. Segment: " + << active_segment_->path(); } - VLOG(2) << "Segment footer for " << active_segment_->path() - << ": " << footer_builder_.ShortDebugString(); + VLOG_WITH_PREFIX(2) << "Segment footer for " << active_segment_->path() + << ": " << footer_builder_.ShortDebugString(); footer_builder_.set_close_timestamp_micros(GetCurrentTimeMicros()); return active_segment_->WriteFooterAndClose(footer_builder_); @@ -447,7 +450,7 @@ Status Log::RollOver() { RETURN_NOT_OK(SwitchToAllocatedSegment()); - LOG(INFO) << "Rolled over to a new segment: " << active_segment_->path(); + LOG_WITH_PREFIX(INFO) << "Rolled over to a new segment: " << active_segment_->path(); return Status::OK(); } @@ -536,8 +539,8 @@ Status Log::DoAppend(LogEntryBatch* entry_batch, bool caller_owns_operation) { // if the size of this entry overflows the current segment, get a new one if (allocation_state() == kAllocationNotStarted) { if ((active_segment_->Size() + entry_batch_bytes + 4) > cur_max_segment_size_) { - LOG(INFO) << "Max segment size " << cur_max_segment_size_ << " reached. " - << "Starting new segment allocation. "; + LOG_WITH_PREFIX(INFO) << "Max segment size " << cur_max_segment_size_ << " reached. " + << "Starting new segment allocation. "; RETURN_NOT_OK(AsyncAllocateSegment()); if (!options_.async_preallocate_segments) { LOG_SLOW_EXECUTION(WARNING, 50, "Log roll took a long time") { @@ -550,7 +553,7 @@ Status Log::DoAppend(LogEntryBatch* entry_batch, bool caller_owns_operation) { RETURN_NOT_OK(RollOver()); } } else { - VLOG(1) << "Segment allocation already in progress..."; + VLOG_WITH_PREFIX(1) << "Segment allocation already in progress..."; } int64_t start_offset = active_segment_->written_offset(); @@ -652,8 +655,7 @@ Status Log::Sync() { int sleep_ms = r.Normal(GetAtomicFlag(&FLAGS_log_inject_latency_ms_mean), GetAtomicFlag(&FLAGS_log_inject_latency_ms_stddev)); if (sleep_ms > 0) { - LOG(INFO) << "T " << tablet_id_ << ": Injecting " - << sleep_ms << "ms of latency in Log::Sync()"; + LOG_WITH_PREFIX(INFO) << "Injecting " << sleep_ms << "ms of latency in Log::Sync()"; SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); } } @@ -701,15 +703,15 @@ Status Log::GetSegmentsToGCUnlocked(int64_t min_op_idx, SegmentSequence* segment int max_to_delete = std::max(reader_->num_segments() - FLAGS_log_min_segments_to_retain, 0); if (segments_to_gc->size() > max_to_delete) { - VLOG(2) << "GCing " << segments_to_gc->size() << " in " << log_dir_ + VLOG_WITH_PREFIX(2) + << "GCing " << segments_to_gc->size() << " in " << log_dir_ << " would not leave enough remaining segments to satisfy minimum " << "retention requirement. Only considering " << max_to_delete << "/" << reader_->num_segments(); segments_to_gc->resize(max_to_delete); } else if (segments_to_gc->size() < max_to_delete) { int extra_segments = max_to_delete - segments_to_gc->size(); - VLOG(2) << tablet_id_ << " has too many log segments, need to GC " - << extra_segments << " more. "; + VLOG_WITH_PREFIX(2) << "Too many log segments, need to GC " << extra_segments << " more."; } // Don't GC segments that are newer than the configured time-based retention. @@ -724,8 +726,9 @@ Status Log::GetSegmentsToGCUnlocked(int64_t min_op_idx, SegmentSequence* segment int64_t age_seconds = (now - segment->footer().close_timestamp_micros()) / 1000000; if (age_seconds < FLAGS_log_min_seconds_to_retain) { - VLOG(2) << "Segment " << segment->path() << " is only " << age_seconds << "s old: " - << "cannot GC it yet due to configured time-based retention policy."; + VLOG_WITH_PREFIX(2) + << "Segment " << segment->path() << " is only " << age_seconds << "s old: " + << "cannot GC it yet due to configured time-based retention policy."; // Truncate the list of segments to GC here -- if this one is too new, then all later ones are // also too new. segments_to_gc->resize(i); @@ -788,8 +791,8 @@ yb::OpId Log::WaitForSafeOpIdToApply(const yb::OpId& min_allowed) { })) { break; } - LOG(DFATAL) << "Long wait for safe op id: " << min_allowed - << ", passed: " << (CoarseMonoClock::Now() - start); + LOG_WITH_PREFIX(DFATAL) << "Long wait for safe op id: " << min_allowed + << ", passed: " << (CoarseMonoClock::Now() - start); } } @@ -801,8 +804,8 @@ yb::OpId Log::WaitForSafeOpIdToApply(const yb::OpId& min_allowed) { Status Log::GC(int64_t min_op_idx, int32_t* num_gced) { CHECK_GE(min_op_idx, 0); - LOG(INFO) << "Running Log GC on " << log_dir_ << ": retaining ops >= " << min_op_idx - << ", log segment size = " << options_.segment_size_bytes; + LOG_WITH_PREFIX(INFO) << "Running Log GC on " << log_dir_ << ": retaining ops >= " << min_op_idx + << ", log segment size = " << options_.segment_size_bytes; VLOG_TIMING(1, "Log GC") { SegmentSequence segments_to_delete; @@ -813,7 +816,7 @@ Status Log::GC(int64_t min_op_idx, int32_t* num_gced) { RETURN_NOT_OK(GetSegmentsToGCUnlocked(min_op_idx, &segments_to_delete)); if (segments_to_delete.size() == 0) { - VLOG(1) << "No segments to delete."; + VLOG_WITH_PREFIX(1) << "No segments to delete."; *num_gced = 0; return Status::OK(); } @@ -826,8 +829,8 @@ Status Log::GC(int64_t min_op_idx, int32_t* num_gced) { // Now that they are no longer referenced by the Log, delete the files. *num_gced = 0; for (const scoped_refptr& segment : segments_to_delete) { - LOG(INFO) << "Deleting log segment in path: " << segment->path() - << " (GCed ops < " << min_op_idx << ")"; + LOG_WITH_PREFIX(INFO) << "Deleting log segment in path: " << segment->path() + << " (GCed ops < " << min_op_idx << ")"; RETURN_NOT_OK(fs_manager_->env()->DeleteFile(segment->path())); (*num_gced)++; } @@ -927,7 +930,7 @@ Status Log::Close() { RETURN_NOT_OK(CloseCurrentSegment()); RETURN_NOT_OK(ReplaceSegmentInReaderUnlocked()); log_state_ = kLogClosed; - VLOG(1) << "Log closed"; + VLOG_WITH_PREFIX(1) << "Log closed"; // Release FDs held by these objects. log_index_.reset(); @@ -936,7 +939,7 @@ Status Log::Close() { return Status::OK(); case kLogClosed: - VLOG(1) << "Log already closed"; + VLOG_WITH_PREFIX(1) << "Log already closed"; return Status::OK(); default: @@ -951,7 +954,8 @@ Status Log::DeleteOnDiskData(FsManager* fs_manager, if (!env->FileExists(tablet_wal_path)) { return Status::OK(); } - LOG(INFO) << Substitute("Deleting WAL dir $0 for tablet $1", tablet_wal_path, tablet_id); + LOG(INFO) << "T " << tablet_id << "P " << fs_manager->uuid() + << ": Deleting WAL dir " << tablet_wal_path; RETURN_NOT_OK_PREPEND(env->DeleteRecursively(tablet_wal_path), "Unable to recursively delete WAL dir for tablet " + tablet_id); return Status::OK(); @@ -1072,13 +1076,13 @@ Status Log::CreatePlaceholderSegment(const WritableFileOptions& opts, string* result_path, shared_ptr* out) { string path_tmpl = JoinPathSegments(log_dir_, kSegmentPlaceholderFileTemplate); - VLOG(2) << "Creating temp. file for place holder segment, template: " << path_tmpl; + VLOG_WITH_PREFIX(2) << "Creating temp. file for place holder segment, template: " << path_tmpl; gscoped_ptr segment_file; RETURN_NOT_OK(fs_manager_->env()->NewTempWritableFile(opts, path_tmpl, result_path, &segment_file)); - VLOG(1) << "Created next WAL segment, placeholder path: " << *result_path; + VLOG_WITH_PREFIX(1) << "Created next WAL segment, placeholder path: " << *result_path; out->reset(segment_file.release()); return Status::OK(); } diff --git a/src/yb/consensus/log.h b/src/yb/consensus/log.h index 8cdbf6f009c9..41e8d55ee801 100644 --- a/src/yb/consensus/log.h +++ b/src/yb/consensus/log.h @@ -247,6 +247,10 @@ class Log : public RefCountedThreadSafe { CHECKED_STATUS TEST_SubmitFuncToAppendToken(const std::function& func); + const std::string& LogPrefix() const { + return log_prefix_; + } + private: friend class LogTest; friend class LogTestBase; @@ -446,6 +450,8 @@ class Log : public RefCountedThreadSafe { // Used in tests to declare all operations as safe. bool all_op_ids_safe_ = false; + const std::string log_prefix_; + DISALLOW_COPY_AND_ASSIGN(Log); }; diff --git a/src/yb/consensus/log_reader.cc b/src/yb/consensus/log_reader.cc index 8368408eda09..ee2d4d469440 100644 --- a/src/yb/consensus/log_reader.cc +++ b/src/yb/consensus/log_reader.cc @@ -41,9 +41,11 @@ #include "yb/gutil/stl_util.h" #include "yb/gutil/strings/util.h" #include "yb/gutil/strings/substitute.h" + #include "yb/util/coding.h" #include "yb/util/env_util.h" #include "yb/util/hexdump.h" +#include "yb/util/logging.h" #include "yb/util/metrics.h" #include "yb/util/path_util.h" #include "yb/util/pb_util.h" @@ -118,6 +120,7 @@ LogReader::LogReader(FsManager* fs_manager, : fs_manager_(fs_manager), log_index_(index), tablet_id_(std::move(tablet_id)), + log_prefix_(Format("T $0 P $1: ", tablet_id_, fs_manager->uuid())), state_(kLogReaderInitialized) { if (metric_entity) { bytes_read_ = METRIC_log_reader_bytes_read.Instantiate(metric_entity); @@ -134,7 +137,7 @@ Status LogReader::Init(const string& tablet_wal_path) { std::lock_guard lock(lock_); CHECK_EQ(state_, kLogReaderInitialized) << "bad state for Init(): " << state_; } - VLOG(1) << "Reading wal from path:" << tablet_wal_path; + VLOG_WITH_PREFIX(1) << "Reading wal from path:" << tablet_wal_path; Env* env = fs_manager_->env(); @@ -142,7 +145,7 @@ Status LogReader::Init(const string& tablet_wal_path) { return STATUS(IllegalState, "Cannot find wal location at", tablet_wal_path); } - VLOG(1) << "Parsing segments from path: " << tablet_wal_path; + VLOG_WITH_PREFIX(1) << "Parsing segments from path: " << tablet_wal_path; // list existing segment files vector log_files; @@ -162,8 +165,9 @@ Status LogReader::Init(const string& tablet_wal_path) { CHECK(segment->IsInitialized()) << "Uninitialized segment at: " << segment->path(); if (!segment->HasFooter()) { - LOG(WARNING) << "Log segment " << fqp << " was likely left in-progress " - "after a previous crash. Will try to rebuild footer by scanning data."; + LOG_WITH_PREFIX(WARNING) + << "Log segment " << fqp << " was likely left in-progress " + "after a previous crash. Will try to rebuild footer by scanning data."; RETURN_NOT_OK(segment->RebuildFooterByScanning()); } @@ -180,7 +184,7 @@ Status LogReader::Init(const string& tablet_wal_path) { string previous_seg_path; int64_t previous_seg_seqno = -1; for (const SegmentSequence::value_type& entry : read_segments) { - VLOG(1) << " Log Reader Indexed: " << entry->footer().ShortDebugString(); + VLOG_WITH_PREFIX(1) << " Log Reader Indexed: " << entry->footer().ShortDebugString(); // Check that the log segments are in sequence. if (previous_seg_seqno != -1 && entry->header().sequence_number() != previous_seg_seqno + 1) { return STATUS(Corruption, Substitute("Segment sequence numbers are not consecutive. " @@ -264,7 +268,8 @@ void LogReader::GetMaxIndexesToSegmentSizeMap(int64_t min_op_idx, int32_t segmen if (max_close_time_us < segment->footer().close_timestamp_micros()) { int64_t age_seconds = segment->footer().close_timestamp_micros() / 1000000; - VLOG(2) << "Segment " << segment->path() << " is only " << age_seconds << "s old: " + VLOG_WITH_PREFIX(2) + << "Segment " << segment->path() << " is only " << age_seconds << "s old: " << "won't be counted towards log retention"; break; } @@ -427,8 +432,7 @@ Status LogReader::TrimSegmentsUpToAndIncluding(int64_t segment_sequence_number) } break; } - LOG(INFO) << "T " << tablet_id_ << ": removed " << num_deleted_segments - << " log segments from log reader"; + LOG_WITH_PREFIX(INFO) << "Removed " << num_deleted_segments << " log segments from log reader"; return Status::OK(); } diff --git a/src/yb/consensus/log_reader.h b/src/yb/consensus/log_reader.h index 5e72105f8110..127cf4dd7f98 100644 --- a/src/yb/consensus/log_reader.h +++ b/src/yb/consensus/log_reader.h @@ -136,6 +136,10 @@ class LogReader { std::string ToString() const; + const std::string& LogPrefix() const { + return log_prefix_; + } + private: FRIEND_TEST(LogTest, TestLogReader); FRIEND_TEST(LogTest, TestReadLogWithReplacedReplicates); @@ -201,6 +205,7 @@ class LogReader { FsManager *fs_manager_; const scoped_refptr log_index_; const std::string tablet_id_; + const std::string log_prefix_; // Metrics scoped_refptr bytes_read_; diff --git a/src/yb/docdb/docdb_rocksdb_util.cc b/src/yb/docdb/docdb_rocksdb_util.cc index eb4023bf58d5..d8c2e306ecd8 100644 --- a/src/yb/docdb/docdb_rocksdb_util.cc +++ b/src/yb/docdb/docdb_rocksdb_util.cc @@ -422,14 +422,14 @@ void AutoInitRocksDBFlags(rocksdb::Options* options) { } // namespace void InitRocksDBOptions( - rocksdb::Options* options, const string& tablet_id, + rocksdb::Options* options, const string& log_prefix, const shared_ptr& statistics, const tablet::TabletOptions& tablet_options) { AutoInitRocksDBFlags(options); options->create_if_missing = true; options->disableDataSync = true; options->statistics = statistics; - options->log_prefix = Substitute("T $0: ", tablet_id); + options->log_prefix = log_prefix; options->info_log = std::make_shared(options->log_prefix); options->info_log_level = YBRocksDBLogger::ConvertToRocksDBLogLevel(FLAGS_minloglevel); options->initial_seqno = FLAGS_initial_seqno; diff --git a/src/yb/docdb/docdb_rocksdb_util.h b/src/yb/docdb/docdb_rocksdb_util.h index 9cbba3faf695..006ebd88122d 100644 --- a/src/yb/docdb/docdb_rocksdb_util.h +++ b/src/yb/docdb/docdb_rocksdb_util.h @@ -129,11 +129,11 @@ std::unique_ptr CreateIntentAwareIterator( std::shared_ptr file_filter = nullptr, const Slice* iterate_upper_bound = nullptr); -// Initialize the RocksDB 'options' object for tablet identified by 'tablet_id'. The 'statistics' -// object provided by the caller will be used by RocksDB to maintain the stats for the tablet -// specified by 'tablet_id'. +// Initialize the RocksDB 'options'. +// The 'statistics' object provided by the caller will be used by RocksDB to maintain the stats for +// the tablet. void InitRocksDBOptions( - rocksdb::Options* options, const std::string& tablet_id, + rocksdb::Options* options, const std::string& log_prefix, const std::shared_ptr& statistics, const tablet::TabletOptions& tablet_options); diff --git a/src/yb/docdb/docdb_util.cc b/src/yb/docdb/docdb_util.cc index 371d57ac9f35..bb819796a912 100644 --- a/src/yb/docdb/docdb_util.cc +++ b/src/yb/docdb/docdb_util.cc @@ -250,7 +250,7 @@ Status DocDBRocksDBUtil::InitCommonRocksDBOptions() { tablet::TabletOptions tablet_options; tablet_options.block_cache = block_cache_; - docdb::InitRocksDBOptions(&rocksdb_options_, tablet_id(), rocksdb::CreateDBStatistics(), + docdb::InitRocksDBOptions(&rocksdb_options_, "" /* log_prefix */, rocksdb::CreateDBStatistics(), tablet_options); InitRocksDBWriteOptions(&write_options_); rocksdb_options_.compaction_filter_factory = @@ -370,7 +370,7 @@ Status DocDBRocksDBUtil::FlushRocksDbAndWait() { Status DocDBRocksDBUtil::ReinitDBOptions() { tablet::TabletOptions tablet_options; - docdb::InitRocksDBOptions(&rocksdb_options_, tablet_id(), rocksdb_options_.statistics, + docdb::InitRocksDBOptions(&rocksdb_options_, "" /* log_prefix */, rocksdb_options_.statistics, tablet_options); return ReopenRocksDB(); } diff --git a/src/yb/tablet/tablet.cc b/src/yb/tablet/tablet.cc index e63ee9270fea..ff33d5b2840c 100644 --- a/src/yb/tablet/tablet.cc +++ b/src/yb/tablet/tablet.cc @@ -484,7 +484,7 @@ std::string Tablet::LogPrefix() const { Status Tablet::OpenKeyValueTablet() { rocksdb::Options rocksdb_options; - docdb::InitRocksDBOptions(&rocksdb_options, tablet_id(), rocksdb_statistics_, tablet_options_); + docdb::InitRocksDBOptions(&rocksdb_options, LogPrefix(), rocksdb_statistics_, tablet_options_); rocksdb_options.mem_tracker = MemTracker::FindOrCreateTracker("RegularDB", mem_tracker_); // Install the history cleanup handler. Note that TabletRetentionPolicy is going to hold a raw ptr @@ -1507,7 +1507,7 @@ Status Tablet::ModifyFlushedFrontier( } BOOST_SCOPE_EXIT_END; rocksdb::Options rocksdb_options; docdb::InitRocksDBOptions( - &rocksdb_options, tablet_id(), /* statistics */ nullptr, tablet_options_); + &rocksdb_options, LogPrefix(), /* statistics */ nullptr, tablet_options_); rocksdb_options.create_if_missing = false; LOG_WITH_PREFIX(INFO) << "Opening the test RocksDB at " << checkpoint_dir_for_test << ", expecting to see flushed frontier of " << frontier.ToString(); @@ -1554,7 +1554,7 @@ Status Tablet::Truncate(TruncateOperationState *state) { const string db_dir = regular_db_->GetName(); rocksdb::Options rocksdb_options; - docdb::InitRocksDBOptions(&rocksdb_options, tablet_id(), rocksdb_statistics_, tablet_options_); + docdb::InitRocksDBOptions(&rocksdb_options, LogPrefix(), rocksdb_statistics_, tablet_options_); Status intents_status; if (intents_db_) { diff --git a/src/yb/tablet/tablet.h b/src/yb/tablet/tablet.h index 1e4f0d7065f1..f064a8aca63f 100644 --- a/src/yb/tablet/tablet.h +++ b/src/yb/tablet/tablet.h @@ -518,6 +518,10 @@ class Tablet : public AbstractTablet, public TransactionIntentApplier { return regular_db_.get(); } + rocksdb::DB* TEST_intents_db() { + return intents_db_.get(); + } + CHECKED_STATUS TEST_SwitchMemtable(); // Initialize RocksDB's max persistent op id and hybrid time to that of the operation state. diff --git a/src/yb/tablet/tablet_metadata.cc b/src/yb/tablet/tablet_metadata.cc index 4b753132e9b4..238d272500ae 100644 --- a/src/yb/tablet/tablet_metadata.cc +++ b/src/yb/tablet/tablet_metadata.cc @@ -364,8 +364,9 @@ Status TabletMetadata::DeleteTabletData(TabletDataState delete_type, rocksdb::Options rocksdb_options; TabletOptions tablet_options; + std::string log_prefix = Format("T $0 P $1: ", tablet_id_, fs_manager_->uuid()); docdb::InitRocksDBOptions( - &rocksdb_options, tablet_id_, nullptr /* statistics */, tablet_options); + &rocksdb_options, log_prefix, nullptr /* statistics */, tablet_options); LOG(INFO) << "Destroying regular db at: " << rocksdb_dir_; rocksdb::Status status = rocksdb::DestroyDB(rocksdb_dir_, rocksdb_options); diff --git a/src/yb/tools/ysck.cc b/src/yb/tools/ysck.cc index c68a2745f80f..405ae6b576bc 100644 --- a/src/yb/tools/ysck.cc +++ b/src/yb/tools/ysck.cc @@ -484,7 +484,7 @@ bool Ysck::VerifyTablet(const shared_ptr& tablet, int table_num_repl } } if (leaders_count == 0) { - LOG(WARNING) << Substitute("Tablet $0 doesn't have a leader", tablet->id()); + LOG(WARNING) << Format("Tablet $0 doesn't have a leader, replicas: $1", tablet->id(), replicas); good_tablet = false; } VLOG(1) << Substitute("Tablet $0 has $1 leader and $2 followers", diff --git a/src/yb/tools/ysck.h b/src/yb/tools/ysck.h index f05862bea03d..094955364065 100644 --- a/src/yb/tools/ysck.h +++ b/src/yb/tools/ysck.h @@ -86,6 +86,11 @@ class YsckTabletReplica { return ts_uuid_; } + std::string ToString() const { + return Format("{ is_leader: $0 is_follower: $1 ts_uuid: $2 }", + is_leader_, is_follower_, ts_uuid_); + } + private: const bool is_leader_; const bool is_follower_; diff --git a/src/yb/tserver/mini_tablet_server.cc b/src/yb/tserver/mini_tablet_server.cc index 0527e3ce5494..ac3db4ce371d 100644 --- a/src/yb/tserver/mini_tablet_server.cc +++ b/src/yb/tserver/mini_tablet_server.cc @@ -96,6 +96,7 @@ MiniTabletServer::MiniTabletServer(const string& fs_root, opts_.broadcast_addresses = { HostPort(server::TEST_RpcAddress(index_, server::Private::kFalse), rpc_port) }; opts_.webserver_opts.port = 0; + opts_.webserver_opts.bind_interface = opts_.broadcast_addresses.front().host(); if (!opts_.has_placement_cloud()) { opts_.SetPlacement(Format("cloud$0", (index_ + 1) / 2), Format("rack$0", index_), "zone"); }