Skip to content

Commit

Permalink
[#5003, #3759, #4983] Fix WAL overwriting by new leader and replay of…
Browse files Browse the repository at this point in the history
… incorrect entries on tablet bootstrap

Summary:
Fixing various bugs in tablet bootstrap.

#5003: WAL tail is not being overwritten correctly by a new leader
==================================================================

We can get into a situation when part of the WAL with indexes higher
than the last index flushed into RocksDB stays in pending_replicates by
the end of tablet bootstrap. Appending a future entry will fail in this
case, as happens in #5003. This bug happens because we don't process
WAL tail overwrites caused by entries whose index is less than or equal
to the last flushed index. The solution is to always process these
overwrites, even if the newly added entry's index has no exact match in
pending_replicates, and remove entries with higher indexes.

Treating entries that were never committed as committed
=======================================================

Ppart of tablet bootstrap logic that is supposed to only run on
committed entries could sometimes run on entries that were never
committed. This was possible because we still had an old way to recover
the comitted OpId from using a Raft index-only match of flushed OpId to
the entry being processed. This is a leftover from the time when we
were setting RocksDB sequence numbers to Raft indexes. E.g. suppose
entry 2.10 is flushed into RocksDB. When we encounter entry 1.10 during
bootstrap, we would consider it committed, even though it might have
been overwritten later by entry 2.5, because its index 10 matches that
of the flushed OpId. We probably would not apply such potentially
overwritten entries to RocksDB anyway as its index is too low and we
have more checks later in the process. However, this outdated logic is
not needed, because we have the "user frontier" / "consensus frontier"
mechanism to keep track of last OpIds flushed in to RocksDBs.

Not registering flushed operations with RetryableRequests
=========================================================

RetryableRequests is a subsystem introduced in
023c20a for ignoring duplicate write
requests to a tablet. It is important for consistency, and ensures that
retries at a higher layer do not cause multiple writes. In tablet
bootstrap, we need to register committed operations that happened
during last FLAGS_retryable_request_timeout_secs seconds (120 seconds
by default) with RetryableRequests, whether or not those operations are
flushed. (Uncommitted operations are supposed to be passed to the
consensus layer as "orphaned replicates" and be registered with
RetryableRequests there.) Prior to this diff, operations with index
lower than the flushed index were not register with RetryableRequests
for transactional tables.

#3759: potentially skipping too many segments with skip_wal_rewrite
===================================================================

We were starting replay from the last segment where the committed OpId
of the first entry was less than or equal to the flushed OpId.Suppose
segment 1 has entries 1.1 to 1.10 and nothing is committed, and segment
2 has entries 1.11 and 1.20, and only entry 1.2 is committed. Flushed
OpId is 1.5. Then we might look at segment 2, find that 1.2 (committed
OpId) <= 1.5 (flushed OpId), and incorrectly skip segment 1, even
though it contains entries we need. Instead of using committed OpId, we
need to be selecting the first segment to replay as the last segment
where the actual OpId of the first entry is less than or equal to the
flushed OpId.

#4983: only create a new log segment after successful bootstrap
===============================================================

In the skip_wal_rewrite mode we are always creating a new log segment
before starting log replay. Then log replay picks up the new segment,
finds a "corruption" in it, and has to skip it. If a tablet server gets
into a restart loop due to tablet bootstrap failure, hundreds of new
empty log segments may get created quickly. It is much cleaner to only
create a new log segment after successful tablet bootstrap in the
skip_wal_rewrite mode (which is now the default mode). Also as part of
doing this, I simplified the ReadFirstEntryMetadata function by
delegating to the function ReadEntries with an additional parameter to
just read one entry.

Refactoring
===========

Reorganizing the tablet bootstrap code to fully hide the
TabletBootstrap class in the implementation file.

Other improvements
==================

Adding new test macros ASSERT_SETS_EQ and ASSERT_VECTORS_EQ.
ASSERT_VECTORS_EQ reports differences between element sets of two
vectors in case they do not match, in addition to regular output,
making it easier to spot differences in many cases.

Documenting intended usage for YB_STRUCT_TO_STRING and
YB_CLASS_TO_STRING macros.

Test Plan:
Jenkins

A new randomized tablet bootstrap test creating a log with multiple terms and overwriting of the WAL's tail.

Reviewers: amitanand, sergei, rahuldesirazu

Reviewed By: sergei

Subscribers: sergei, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D8917
  • Loading branch information
mbautin committed Jul 21, 2020
1 parent dc07d0b commit e4e2d39
Show file tree
Hide file tree
Showing 27 changed files with 2,121 additions and 1,153 deletions.
115 changes: 80 additions & 35 deletions src/yb/consensus/log-test-base.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ using consensus::ReplicateMsg;
using consensus::WRITE_OP;
using consensus::NO_OP;
using consensus::MakeOpId;
using consensus::MakeOpIdPB;

using server::Clock;

Expand All @@ -91,8 +92,8 @@ using docdb::ValueType;

const char* kTestTable = "test-log-table";
const char* kTestTablet = "test-log-tablet";
const bool APPEND_SYNC = true;
const bool APPEND_ASYNC = false;

YB_STRONGLY_TYPED_BOOL(AppendSync);

// Append a single batch of 'count' NoOps to the log. If 'size' is not NULL, increments it by the
// expected increase in log size. Increments 'op_id''s index once for each operation logged.
Expand Down Expand Up @@ -171,6 +172,11 @@ class LogTestBase : public YBTest {
.Build(&append_pool_));
}

void CleanTablet() {
ASSERT_OK(fs_manager_->DeleteFileSystemLayout(ShouldDeleteLogs::kTrue));
ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
}

void BuildLog() {
Schema schema_with_ids = SchemaBuilder(schema_).Build();
ASSERT_OK(Log::Open(options_,
Expand All @@ -183,13 +189,13 @@ class LogTestBase : public YBTest {
append_pool_.get(),
std::numeric_limits<int64_t>::max(), // cdc_min_replicated_index
&log_));
LOG(INFO) << "Sucessfully opened the log at " << tablet_wal_path_;
}

void CheckRightNumberOfSegmentFiles(int expected) {
// Test that we actually have the expected number of files in the fs. We should have n segments
// plus '.' and '..'
vector<string> files;
ASSERT_OK(env_->GetChildren(tablet_wal_path_, &files));
// Test that we actually have the expected number of files in the fs. We should have n segments.
const vector<string> files =
ASSERT_RESULT(env_->GetChildren(tablet_wal_path_, ExcludeDots::kTrue));
int count = 0;
for (const string& s : files) {
if (HasPrefixString(s, FsManager::kWalFileNamePrefix)) {
Expand All @@ -203,33 +209,71 @@ class LogTestBase : public YBTest {
ASSERT_OK(s);
}

struct AppendReplicateBatchData {
yb::OpId op_id;
yb::OpId committed_op_id;
std::vector<TupleForAppend> writes;
AppendSync sync = AppendSync::kTrue;
consensus::OperationType op_type = consensus::OperationType::WRITE_OP;
TransactionId txn_id = TransactionId::Nil();
TransactionStatus txn_status = TransactionStatus::CLEANUP;
};

void AppendReplicateBatch(AppendReplicateBatchData data) {
AppendReplicateBatch(
MakeOpIdPB(data.op_id),
MakeOpIdPB(data.committed_op_id),
std::move(data.writes),
data.sync,
data.op_type,
data.txn_id,
data.txn_status);
}

// Appends a batch with size 2, or the given set of writes.
void AppendReplicateBatch(const OpIdPB& opid,
const OpIdPB& committed_opid = MakeOpId(0, 0),
std::vector<TupleForAppend> writes = {},
bool sync = APPEND_SYNC,
TableType table_type = TableType::YQL_TABLE_TYPE) {
void AppendReplicateBatch(
const OpIdPB& opid,
const OpIdPB& committed_opid = MakeOpId(0, 0),
std::vector<TupleForAppend> writes = {},
AppendSync sync = AppendSync::kTrue,
consensus::OperationType op_type = consensus::OperationType::WRITE_OP,
TransactionId txn_id = TransactionId::Nil(),
TransactionStatus txn_status = TransactionStatus::APPLYING) {
auto replicate = std::make_shared<ReplicateMsg>();
replicate->set_op_type(WRITE_OP);
replicate->set_op_type(op_type);
replicate->mutable_id()->CopyFrom(opid);
replicate->mutable_committed_op_id()->CopyFrom(committed_opid);
replicate->set_hybrid_time(clock_->Now().ToUint64());
WriteRequestPB *batch_request = replicate->mutable_write_request();
if (writes.empty()) {
const int opid_index_as_int = static_cast<int>(opid.index());
// Since OpIds deal with int64 index and term, we are downcasting here. In order to be able
// to test with values > INT_MAX, we need to make sure we do not overflow, while still
// wanting to add 2 different values here.
//
// Picking x and x / 2 + 1 as the 2 values.
// For small numbers, special casing x <= 2.
const int other_int = opid_index_as_int <= 2 ? 3 : opid_index_as_int / 2 + 1;
writes.emplace_back(opid_index_as_int, 0, "this is a test insert");
writes.emplace_back(other_int, 0, "this is a test mutate");
}
auto write_batch = batch_request->mutable_write_batch();
for (const auto &w : writes) {
AddKVToPB(std::get<0>(w), std::get<1>(w), std::get<2>(w), write_batch);

if (op_type == consensus::OperationType::UPDATE_TRANSACTION_OP) {
ASSERT_TRUE(!txn_id.IsNil());
replicate->mutable_transaction_state()->set_status(txn_status);
} else if (op_type == consensus::OperationType::WRITE_OP) {
if (writes.empty()) {
const int opid_index_as_int = static_cast<int>(opid.index());
// Since OpIds deal with int64 index and term, we are downcasting here. In order to be able
// to test with values > INT_MAX, we need to make sure we do not overflow, while still
// wanting to add 2 different values here.
//
// Picking x and x / 2 + 1 as the 2 values.
// For small numbers, special casing x <= 2.
const int other_int = opid_index_as_int <= 2 ? 3 : opid_index_as_int / 2 + 1;
writes.emplace_back(
/* key */ opid_index_as_int, /* int_val */ 0, /* string_val */ "this is a test insert");
writes.emplace_back(
/* key */ other_int, /* int_val */ 0, /* string_val */ "this is a test mutate");
}

auto write_batch = batch_request->mutable_write_batch();
if (!txn_id.IsNil()) {
write_batch->mutable_transaction()->set_transaction_id(txn_id.data(), txn_id.size());
}
for (const auto &w : writes) {
AddKVToPB(std::get<0>(w), std::get<1>(w), std::get<2>(w), write_batch);
}
} else {
FAIL() << "Unexpected operation type: " << consensus::OperationType_Name(op_type);
}

batch_request->set_tablet_id(kTestTablet);
Expand All @@ -238,40 +282,41 @@ class LogTestBase : public YBTest {

// Appends the provided batch to the log.
void AppendReplicateBatch(const consensus::ReplicateMsgPtr& replicate,
bool sync = APPEND_SYNC) {
AppendSync sync = AppendSync::kTrue) {
const auto committed_op_id = yb::OpId::FromPB(replicate->committed_op_id());
const auto batch_mono_time = restart_safe_coarse_mono_clock_.Now();
if (sync) {
Synchronizer s;
ASSERT_OK(log_->AsyncAppendReplicates(
{ replicate }, yb::OpId() /* committed_op_id */, restart_safe_coarse_mono_clock_.Now(),
s.AsStatusCallback()));
{ replicate }, committed_op_id, batch_mono_time, s.AsStatusCallback()));
ASSERT_OK(s.Wait());
} else {
// AsyncAppendReplicates does not free the ReplicateMsg on completion, so we
// need to pass it through to our callback.
ASSERT_OK(log_->AsyncAppendReplicates(
{ replicate }, yb::OpId() /* committed_op_id */, restart_safe_coarse_mono_clock_.Now(),
{ replicate }, committed_op_id, batch_mono_time,
Bind(&LogTestBase::CheckReplicateResult, replicate)));
}
}

// Appends 'count' ReplicateMsgs to the log as committed entries.
void AppendReplicateBatchToLog(int count, bool sync = true) {
void AppendReplicateBatchToLog(int count, AppendSync sync = AppendSync::kTrue) {
for (int i = 0; i < count; i++) {
OpIdPB opid = consensus::MakeOpId(1, current_index_);
AppendReplicateBatch(opid, opid);
AppendReplicateBatch(opid, opid, /* writes */ {}, sync);
current_index_ += 1;
}
}

// Append a single NO_OP entry. Increments op_id by one. If non-NULL, and if the write is
// successful, 'size' is incremented by the size of the written operation.
CHECKED_STATUS AppendNoOp(OpIdPB* op_id, int* size = NULL) {
CHECKED_STATUS AppendNoOp(OpIdPB* op_id, int* size = nullptr) {
return AppendNoOpToLogSync(clock_, log_.get(), op_id, size);
}

// Append a number of no-op entries to the log. Increments op_id's index by the number of records
// written. If non-NULL, 'size' keeps track of the size of the operations successfully written.
CHECKED_STATUS AppendNoOps(OpIdPB* op_id, int num, int* size = NULL) {
CHECKED_STATUS AppendNoOps(OpIdPB* op_id, int num, int* size = nullptr) {
for (int i = 0; i < num; i++) {
RETURN_NOT_OK(AppendNoOp(op_id, size));
}
Expand Down
11 changes: 8 additions & 3 deletions src/yb/consensus/log-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ void LogTest::DoCorruptionTest(CorruptionType type, CorruptionPosition place,
// Last entry is ignored, but we should still see the previous ones.
ASSERT_EQ(expected_entries, read_entries.entries.size());
}

// Tests that the log reader reads up until some truncated entry is found.
// It should still return OK, since on a crash, it's acceptable to have
// a partial entry at EOF.
Expand Down Expand Up @@ -684,7 +685,7 @@ TEST_F(LogTest, TestGCOfIndexChunks) {
TEST_F(LogTest, TestWaitUntilAllFlushed) {
BuildLog();
// Append 2 replicate pairs asynchronously
AppendReplicateBatchToLog(2, APPEND_ASYNC);
AppendReplicateBatchToLog(2, AppendSync::kTrue);

ASSERT_OK(log_->WaitUntilAllFlushed());

Expand Down Expand Up @@ -876,8 +877,12 @@ TEST_F(LogTest, TestLogReader) {
TEST_F(LogTest, TestLogReaderReturnsLatestSegmentIfIndexEmpty) {
BuildLog();

OpIdPB opid = MakeOpId(1, 1);
AppendReplicateBatch(opid, MakeOpId(0, 0), {}, APPEND_SYNC, kTableType);
AppendReplicateBatch({
.op_id = {1, 1},
.committed_op_id = {0, 0},
.writes = {},
.sync = AppendSync::kTrue,
});

SegmentSequence segments;
ASSERT_OK(log_->GetLogReader()->GetSegmentsSnapshot(&segments));
Expand Down
51 changes: 37 additions & 14 deletions src/yb/consensus/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ Status Log::Open(const LogOptions &options,
const scoped_refptr<MetricEntity>& metric_entity,
ThreadPool* append_thread_pool,
int64_t cdc_min_replicated_index,
scoped_refptr<Log>* log) {
scoped_refptr<Log>* log,
CreateNewSegment create_new_segment) {

RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(options.env, DirName(wal_dir)),
Substitute("Failed to create table wal dir $0", DirName(wal_dir)));
Expand All @@ -358,16 +359,23 @@ Status Log::Open(const LogOptions &options,
schema,
schema_version,
metric_entity,
append_thread_pool));
append_thread_pool,
create_new_segment));
RETURN_NOT_OK(new_log->Init());
log->swap(new_log);
return Status::OK();
}

Log::Log(
LogOptions options, string wal_dir, string tablet_id, string peer_uuid, const Schema& schema,
uint32_t schema_version, const scoped_refptr<MetricEntity>& metric_entity,
ThreadPool* append_thread_pool)
LogOptions options,
string wal_dir,
string tablet_id,
string peer_uuid,
const Schema& schema,
uint32_t schema_version,
const scoped_refptr<MetricEntity>& metric_entity,
ThreadPool* append_thread_pool,
CreateNewSegment create_new_segment)
: options_(std::move(options)),
wal_dir_(std::move(wal_dir)),
tablet_id_(std::move(tablet_id)),
Expand All @@ -388,7 +396,8 @@ Log::Log(
allocation_state_(kAllocationNotStarted),
metric_entity_(metric_entity),
on_disk_size_(0),
log_prefix_(consensus::MakeTabletLogPrefix(tablet_id_, peer_uuid_)) {
log_prefix_(consensus::MakeTabletLogPrefix(tablet_id_, peer_uuid_)),
create_new_segment_at_start_(create_new_segment) {
set_wal_retention_secs(options.retention_secs);
CHECK_OK(ThreadPoolBuilder("log-alloc").set_max_threads(1).Build(&allocation_pool_));
if (metric_entity_) {
Expand Down Expand Up @@ -434,13 +443,9 @@ Status Log::Init() {
YB_LOG_FIRST_N(INFO, 1) << "durable_wal_write is turned off. Buffered IO will be used for WAL.";
}

// We always create a new segment when the log starts.
RETURN_NOT_OK(AsyncAllocateSegment());
RETURN_NOT_OK(allocation_status_.Get());
RETURN_NOT_OK(SwitchToAllocatedSegment());

RETURN_NOT_OK(appender_->Init());
log_state_ = kLogWriting;
if (create_new_segment_at_start_) {
RETURN_NOT_OK(EnsureInitialNewSegmentAllocated());
}
return Status::OK();
}

Expand Down Expand Up @@ -472,7 +477,7 @@ Status Log::RollOver() {

DCHECK_EQ(allocation_state(), kAllocationFinished);

LOG_WITH_PREFIX(INFO) << Format("Last appended opid in segment $0: $1", active_segment_->path(),
LOG_WITH_PREFIX(INFO) << Format("Last appended OpId in segment $0: $1", active_segment_->path(),
last_appended_entry_op_id_.ToString());

RETURN_NOT_OK(Sync());
Expand Down Expand Up @@ -680,6 +685,24 @@ Status Log::AllocateSegmentAndRollOver() {
return RollOver();
}

Status Log::EnsureInitialNewSegmentAllocated() {
if (log_state_ == LogState::kLogWriting) {
// New segment already created.
return Status::OK();
}
if (log_state_ != LogState::kLogInitialized) {
return STATUS_FORMAT(
IllegalState, "Unexpected log state in EnsureInitialNewSegmentAllocated: $0", log_state_);
}
RETURN_NOT_OK(AsyncAllocateSegment());
RETURN_NOT_OK(allocation_status_.Get());
RETURN_NOT_OK(SwitchToAllocatedSegment());

RETURN_NOT_OK(appender_->Init());
log_state_ = LogState::kLogWriting;
return Status::OK();
}

Status Log::Sync() {
TRACE_EVENT0("log", "Sync");
SCOPED_LATENCY_METRIC(metrics_, sync_latency);
Expand Down
25 changes: 20 additions & 5 deletions src/yb/consensus/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class LogEntryBatch;
class LogIndex;
class LogReader;

YB_STRONGLY_TYPED_BOOL(CreateNewSegment);

// Log interface, inspired by Raft's (logcabin) Log. Provides durability to YugaByte as a normal
// Write Ahead Log and also plays the role of persistent storage for the consensus state machine.
//
Expand Down Expand Up @@ -103,7 +105,7 @@ class Log : public RefCountedThreadSafe<Log> {
static const Status kLogShutdownStatus;

// Opens or continues a log and sets 'log' to the newly built Log.
// After a successful Open() the Log is ready to receive entries.
// After a successful Open() the Log is ready to receive entries, if create_new_segment is true.
static CHECKED_STATUS Open(const LogOptions &options,
const std::string& tablet_id,
const std::string& wal_dir,
Expand All @@ -113,7 +115,8 @@ class Log : public RefCountedThreadSafe<Log> {
const scoped_refptr<MetricEntity>& metric_entity,
ThreadPool *append_thread_pool,
int64_t cdc_min_replicated_index,
scoped_refptr<Log> *log);
scoped_refptr<Log> *log,
CreateNewSegment create_new_segment = CreateNewSegment::kTrue);

~Log();

Expand Down Expand Up @@ -233,6 +236,10 @@ class Log : public RefCountedThreadSafe<Log> {
// entries appended up to this point are available in closed, readable segments.
CHECKED_STATUS AllocateSegmentAndRollOver();

// For a log created with CreateNewSegment::kFalse, this is used to finish log initialization by
// allocating a new segment.
CHECKED_STATUS EnsureInitialNewSegmentAllocated();

// Returns the total size of the current segments, in bytes.
// Returns 0 if the log is shut down.
uint64_t OnDiskSize();
Expand Down Expand Up @@ -320,9 +327,15 @@ class Log : public RefCountedThreadSafe<Log> {
kAllocationFinished // Next segment ready
};

Log(LogOptions options, std::string wal_dir, std::string tablet_id, std::string peer_uuid,
const Schema& schema, uint32_t schema_version,
const scoped_refptr<MetricEntity>& metric_entity, ThreadPool* append_thread_pool);
Log(LogOptions options,
std::string wal_dir,
std::string tablet_id,
std::string peer_uuid,
const Schema& schema,
uint32_t schema_version,
const scoped_refptr<MetricEntity>& metric_entity,
ThreadPool* append_thread_pool,
CreateNewSegment create_new_segment = CreateNewSegment::kTrue);

Env* get_env() {
return options_.env;
Expand Down Expand Up @@ -512,6 +525,8 @@ class Log : public RefCountedThreadSafe<Log> {
// The current replicated index that CDC has read. Used for CDC read cache optimization.
std::atomic<int64_t> cdc_min_replicated_index_{std::numeric_limits<int64_t>::max()};

CreateNewSegment create_new_segment_at_start_;

DISALLOW_COPY_AND_ASSIGN(Log);
};

Expand Down
Loading

0 comments on commit e4e2d39

Please sign in to comment.