Skip to content

Commit

Permalink
[BACKPORT 2024.2.0][#24829] CDC: Fix GetChanges WAL segment reading a…
Browse files Browse the repository at this point in the history
…nd computation of safe hybrid time for GetChanges resposne

Summary:
**Backport description:**
No merge conflicts encountered. Note - The default value of flag `cdc_read_wal_segment_by_segment` has been again changed back to true as this diff fixes the data loss issue observed with the changes under this flag.

**Original description:**
Original commit: 103ca31 / D39925
During a CDC QA run, we encountered a data loss scenario caused by an interaction between transaction commit timing and WAL segment rollover.

WAL segment 'X' was active when transaction 'T' was still a running transaction. Due to concurrent segment rollover, the transaction's APPLY operation was appended in the subsequent segment 'X+1'. The following sequence of events led to data loss:
1. Initial GetChanges Request:
  - Requested opid index present in segment 'X'
  - No CDC related WAL OPs were found in segment 'X' between [requested op id index, last index in 'X']
  - `committed_op_id_index` (pointed to last index of segment 'X') lagged behind `majority_replicated_op_id_index`, hence we did not scan segment 'X+1'.
  - Finally, we returned empty response with incorrect safe hybrid time (=tablet leader safe time)

2. Subsequent GetChanges Request:
  - Successfully read segment 'X+1' ( as committed_op_id_index had advanced to segment 'X+1')
  - UPDATE_TXN_OP of transaction 'T' was read but CDC filtered it out as we do not ship WAL OPs having commit_time < previously returned safe hybrid time
  - Resulted in transaction 'T' never being shipped.

Root Cause:
We violated a critical invariant: the tablet leader safe time should only be returned in GetChanges responses after reading the entire WAL. But in the 1st Getchanges call, we returned the tablet leader safe time having only read till segment 'X', leading to improper transaction filtering and subsequent data loss.

This diff makes the following changes to fix the above mentioned data loss issue:
1. When reading the WAL segment by segment, we will now wait for the `committed_op_id_index` to be >= last index in the segment. If the deadline is reached while waiting, we will return 0 WAL ops and indicate to wait for WAL update.

2. Identification of active segment in GetConsistentWALRecords has been changed. Earlier, we were inferring active segment by the value of `consistent_stream_safe_time_footer`. If it was kInvalid, then we concluded its an active segment. But because of changes in 1), that conclusion no longer holds true. If we reach the deadline while waiting for `committed_op_id_index` to be >= last index in segment, then the value of `consistent_stream_safe_time_footer` will still be invalid. Hence, we are now using a dedicated variable (`read_entire_wal`) that will notify if we actually read the active segment in `ReadReplicatedMessagesInSegmentForCDC()`.

3. Value of `safe_hybrid_time` returned in GetChanges response. We have the following scenarios which will decide the safe_hybrid_time sent in the response:
    - **Case-1**: Read the active segment -> return the last record's commit_time if valid records are found. Else, return tablet leader safe time (existing logic).
    - **Case-2**: Deadline reached while waiting for `committed_op_id_index` to be >= last index in the WAL segment. There are 2 ways in which we can reach deadline. For both these ways, we want to wait for WAL to be updated.
         - Deadline reached after reading multiple segments -> return request safe hybrid time.
         - Deadline reached on the 1st segment read in GetChanges call -> return request safe hybrid time.
    - **Case-3:** requested index (from_op_id_index) >= `committed_op_id_index`
        - If all RAFT replicated Ops are not yet APPLIED (i.e. `committed_op_id_index` != `majority_replicated_op_id_index`), then we indicate to wait for WAL update and return request safe hybrid time.
        - If all RAFT replicated OPs are also APPLIED (i.e. `committed_op_id_index` == `majority_replicated_op_id_index`), then we have following scenarios:
	         - If the condition in case-3 was hit after reading multiple segments, then return footer value (`min_start_time_running_txns`) of last read segment.
	         - If the condition in case-3 was hit on the 1st segment read in GetChanges call, then return tablet leader safe time as this particular case implies we have read the entire WAL.

Additionally, we have added a WARNING log when the invariant for response safe time is broken.
Jira: DB-13935

Test Plan: Jenkins

Reviewers: skumar, sumukh.phalgaonkar

Reviewed By: sumukh.phalgaonkar

Subscribers: ycdcxcluster, ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D40325
  • Loading branch information
siddharth2411 committed Dec 2, 2024
1 parent dad4e21 commit 60174a3
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 59 deletions.
82 changes: 63 additions & 19 deletions src/yb/cdc/cdcsdk_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ DEFINE_NON_RUNTIME_int64(
"ConsistentStreamSafeTime for CDCSDK by resolving all committed intetns");

DEFINE_RUNTIME_bool(cdc_read_wal_segment_by_segment,
false,
true,
"When this flag is set to true, GetChanges will read the WAL segment by "
"segment. If valid records are found in the first segment, GetChanges will "
"return these records in response. If no valid records are found then next "
Expand Down Expand Up @@ -2007,13 +2007,14 @@ Status GetConsistentWALRecords(
bool* wait_for_wal_update, OpId* last_seen_op_id, int64_t& last_readable_opid_index,
const int64_t& safe_hybrid_time_req, const CoarseTimePoint& deadline,
std::vector<std::shared_ptr<yb::consensus::LWReplicateMsg>>* consistent_wal_records,
std::vector<std::shared_ptr<yb::consensus::LWReplicateMsg>>* all_checkpoints) {
std::vector<std::shared_ptr<yb::consensus::LWReplicateMsg>>* all_checkpoints,
HybridTime* last_read_wal_op_record_time, bool* is_entire_wal_read) {
VLOG(2) << "Getting consistent WAL records. safe_hybrid_time_req: " << safe_hybrid_time_req
<< ", consistent_safe_time: " << *consistent_safe_time
<< ", last_seen_op_id: " << last_seen_op_id->ToString()
<< ", historical_max_op_id: " << historical_max_op_id;
auto raft_consensus = VERIFY_RESULT(tablet_peer->GetRaftConsensus());
bool read_entire_wal = false;
HybridTime last_read_segment_footer_safe_time = HybridTime::kInvalid;

do {
consensus::ReadOpsResult read_ops;
Expand All @@ -2027,16 +2028,10 @@ Status GetConsistentWALRecords(
// end of the WAL.
read_ops = VERIFY_RESULT(raft_consensus->ReadReplicatedMessagesInSegmentForCDC(
*last_seen_op_id, deadline, /* fetch_single_entry */ false, &last_readable_opid_index,
&consistent_stream_safe_time_footer));
&consistent_stream_safe_time_footer, is_entire_wal_read));

if (!consistent_stream_safe_time_footer) {
// HybridTime::kInvalid in consistent_stream_safe_time_footer indicates that we have read
// the currently active segment.
read_entire_wal = true;
consistent_stream_safe_time_footer = HybridTime(*consistent_safe_time);
}
} else {
read_entire_wal = true;
*is_entire_wal_read = true;
// Read all the committed WAL messages with hybrid time <= consistent_stream_safe_time. If
// there exist messages in the WAL which are replicated but not yet committed,
// ReadReplicatedMessagesForConsistentCDC waits for them to get committed and eventually
Expand All @@ -2053,6 +2048,7 @@ Status GetConsistentWALRecords(
for (const auto& msg : read_ops.messages) {
last_seen_op_id->term = msg->id().term();
last_seen_op_id->index = msg->id().index();
*last_read_wal_op_record_time = HybridTime(msg->hybrid_time());

if (IsIntent(msg) || (IsUpdateTransactionOp(msg) &&
msg->transaction_state().status() != TransactionStatus::APPLYING)) {
Expand All @@ -2069,15 +2065,17 @@ Status GetConsistentWALRecords(
<< ", commit_time: " << GetTransactionCommitTime(msg)
<< ", consistent safe_time: " << *consistent_safe_time
<< ", consistent_stream_safe_time_footer: " << consistent_stream_safe_time_footer
<< ", safe_hybrid_time_req: " << safe_hybrid_time_req;
<< ", safe_hybrid_time_req: " << safe_hybrid_time_req
<< ", is_entire_wal_read: " << *is_entire_wal_read;
} else if (VLOG_IS_ON(3)) {
VLOG(3) << "Read WAL msg on "
<< "tablet_id: " << tablet_peer->tablet_id() << ", op_type: " << msg->op_type()
<< ", OpId: " << msg->id().term() << "." << msg->id().index()
<< ", commit_time: " << GetTransactionCommitTime(msg)
<< ", consistent safe_time: " << *consistent_safe_time
<< ", consistent_stream_safe_time_footer: " << consistent_stream_safe_time_footer
<< ", safe_hybrid_time_req: " << safe_hybrid_time_req;
<< ", safe_hybrid_time_req: " << safe_hybrid_time_req
<< ", is_entire_wal_read: " << *is_entire_wal_read;
}

all_checkpoints->push_back(msg);
Expand All @@ -2091,7 +2089,7 @@ Status GetConsistentWALRecords(

// Handle the case where WAL doesn't have the apply record for all the committed transactions.
if (historical_max_op_id.valid() && historical_max_op_id > *last_seen_op_id &&
read_entire_wal) {
*is_entire_wal_read) {
*wait_for_wal_update = true;
break;
}
Expand All @@ -2105,24 +2103,36 @@ Status GetConsistentWALRecords(

SortConsistentWALRecords(consistent_wal_records);

// For closed segments, consistent_stream_safe_time_footer corresponds to the value read from
// segment footer. For active segment, it will be Invalid.
if (FLAGS_cdc_read_wal_segment_by_segment && consistent_stream_safe_time_footer.is_valid()) {
last_read_segment_footer_safe_time = consistent_stream_safe_time_footer;
}

if (!consistent_wal_records->empty()) {
auto record = consistent_wal_records->front();
if (FLAGS_cdc_read_wal_segment_by_segment &&
GetTransactionCommitTime(record) <= consistent_stream_safe_time_footer.ToUint64()) {
// Since there exists atleast one message with commit_time <= consistent_stream_safe_time,
// we don't need to read the next segment.
*consistent_safe_time = consistent_stream_safe_time_footer.ToUint64();
break;
}
}

// No need for another iteration if we have read the entire WAL.
if (read_entire_wal) {
if (*is_entire_wal_read) {
break;
}

} while (last_seen_op_id->index < last_readable_opid_index);

// Skip updating consistent safe time when entire WAL is read and we can ship all records
// till the consistent safe time computed in cdc producer.
if (FLAGS_cdc_read_wal_segment_by_segment && !(*is_entire_wal_read) &&
last_read_segment_footer_safe_time.is_valid()) {
*consistent_safe_time = last_read_segment_footer_safe_time.ToUint64();
}

VLOG_WITH_FUNC(1) << "Got a total of " << consistent_wal_records->size() << " WAL records "
<< "in the current segment";
return Status::OK();
Expand Down Expand Up @@ -2485,6 +2495,22 @@ bool IsReplicationSlotStream(const StreamMetadata& stream_metadata) {
!stream_metadata.GetReplicationSlotName()->empty();
}

// Response safe time follows the invaraint:
// Request safe time <= Response safe time <= value from GetConsistentStreamSafeTime().
// If response safe time is set to GetConsistentStreamSafeTime()'s value, then it implies that we
// have read the entire WAL. In any other case, the response safe time can either be the last read
// WAL segment's footer safe time ('min_start_time_running_txns') or commit time of the last
// transaction being shipped in the current response. Both these values (footer safe time or commit
// time of last txn) will be <= last read WAL OP's record time.
bool CheckResponseSafeTimeCorrectness(
HybridTime last_read_wal_op_record_time, HybridTime resp_safe_time, bool is_entire_wal_read) {
if (!last_read_wal_op_record_time.is_valid() || resp_safe_time <= last_read_wal_op_record_time) {
return true;
}

return is_entire_wal_read;
}

// CDC get changes is different from xCluster as it doesn't need
// to read intents from WAL.

Expand Down Expand Up @@ -2545,6 +2571,8 @@ Status GetChangesForCDCSDK(

auto safe_hybrid_time_resp = HybridTime::kInvalid;
HaveMoreMessages have_more_messages(false);
HybridTime last_read_wal_op_record_time = HybridTime::kInvalid;
bool is_entire_wal_read = false;
// It is snapshot call.
if (from_op_id.write_id() == -1) {
snapshot_operation = true;
Expand Down Expand Up @@ -2572,7 +2600,8 @@ Status GetChangesForCDCSDK(
RETURN_NOT_OK(GetConsistentWALRecords(
tablet_peer, mem_tracker, msgs_holder, &consumption, &consistent_stream_safe_time,
historical_max_op_id, &wait_for_wal_update, &last_seen_op_id, *last_readable_opid_index,
safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints));
safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints,
&last_read_wal_op_record_time, &is_entire_wal_read));
else
// 'skip_intents' is true here because we want the first transaction to be the partially
// streamed transaction.
Expand Down Expand Up @@ -2666,7 +2695,8 @@ Status GetChangesForCDCSDK(
RETURN_NOT_OK(GetConsistentWALRecords(
tablet_peer, mem_tracker, msgs_holder, &consumption, &consistent_stream_safe_time,
historical_max_op_id, &wait_for_wal_update, &last_seen_op_id, *last_readable_opid_index,
safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints));
safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints,
&last_read_wal_op_record_time, &is_entire_wal_read));
else
// 'skip_intents' is false otherwise in case the complete wal segment is filled with
// intents we will break the loop thinking that WAL has no more records.
Expand Down Expand Up @@ -3050,11 +3080,25 @@ Status GetChangesForCDCSDK(

// If we need to wait for WAL to get up to date with all committed transactions, we will send the
// request safe in the response as well.
auto computed_safe_hybrid_time_req =
HybridTime((safe_hybrid_time_req > 0) ? safe_hybrid_time_req : 0);
auto safe_time = wait_for_wal_update
? HybridTime((safe_hybrid_time_req > 0) ? safe_hybrid_time_req : 0)
? computed_safe_hybrid_time_req
: GetCDCSDKSafeTimeForTarget(
leader_safe_time.get(), safe_hybrid_time_resp, have_more_messages,
consistent_stream_safe_time, snapshot_operation);

if (!snapshot_operation && !CheckResponseSafeTimeCorrectness(
last_read_wal_op_record_time, safe_time, is_entire_wal_read)) {
LOG(WARNING) << "Stream_id: " << stream_id << ", tablet_id: " << tablet_id
<< ", response safe time: " << safe_time
<< " is greater than last read WAL OP's record time: "
<< last_read_wal_op_record_time
<< ", req_safe_time: " << computed_safe_hybrid_time_req
<< ", consistent stream safe time: " << HybridTime(consistent_stream_safe_time)
<< ", leader safe time: " << leader_safe_time.get()
<< ", is_entire_wal_read: " << is_entire_wal_read;
}
resp->set_safe_hybrid_time(safe_time.ToUint64());

// It is possible in case of a partially streamed transaction.
Expand Down
91 changes: 55 additions & 36 deletions src/yb/consensus/consensus_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -877,66 +877,85 @@ Result<ReadOpsResult> PeerMessageQueue::ReadReplicatedMessagesForConsistentCDC(

Result<ReadOpsResult> PeerMessageQueue::ReadReplicatedMessagesInSegmentForCDC(
const OpId& from_op_id, CoarseTimePoint deadline, bool fetch_single_entry,
int64_t* last_committed_index, HybridTime* consistent_stream_safe_time_footer) {
int64_t* last_committed_index, HybridTime* consistent_stream_safe_time_footer,
bool* read_entire_wal) {
auto read_ops = ReadOpsResult();
int64_t start_op_id_index;
int64_t current_segment_num;
int64_t segment_last_index;
int64_t committed_op_id_index;
int64_t last_replicated_op_id_index;

auto [committed_op_id_index, last_replicated_op_id_index] =
GetCommittedAndMajorityReplicatedIndex();
do {
// We wait till committed_op_id_index becomes >= last index of the segment that contains the
// from_op_id. If we reach deadline, then return have_more_messages as true to wait for wal
// update.
if (deadline - CoarseMonoClock::Now() <= FLAGS_cdcsdk_wal_reads_deadline_buffer_secs * 1s) {
read_ops.have_more_messages = HaveMoreMessages(true);
return read_ops;
}

// Determine if there are pending operations in RAFT but not yet LogCache.
auto pending_messages = committed_op_id_index != last_replicated_op_id_index;
std::tie(committed_op_id_index, last_replicated_op_id_index) =
GetCommittedAndMajorityReplicatedIndex();

if (last_committed_index) {
*last_committed_index = committed_op_id_index;
}
// Determine if there are pending operations in RAFT but not yet LogCache.
auto pending_messages = committed_op_id_index != last_replicated_op_id_index;

if (from_op_id.index >= committed_op_id_index && !fetch_single_entry) {
// Nothing to read.
return ReadOpsResult {
.messages = ReplicateMsgs(),
.preceding_op = OpId(),
.have_more_messages = HaveMoreMessages(pending_messages)
};
}
if (last_committed_index) {
*last_committed_index = committed_op_id_index;
}

auto start_op_id_index = GetStartOpIdIndex(from_op_id.index);
if (from_op_id.index >= committed_op_id_index && !fetch_single_entry) {
// Nothing to read.
return ReadOpsResult{
.messages = ReplicateMsgs(),
.preceding_op = OpId(),
.have_more_messages = HaveMoreMessages(pending_messages)};
}

VLOG(1) << "Will read Ops from a WAL segment. "
start_op_id_index = GetStartOpIdIndex(from_op_id.index);

VLOG(1) << "Will read Ops from a WAL segment for tablet: " << tablet_id_
<< " start_op_id_index = " << start_op_id_index
<< " committed_op_id_index = " << committed_op_id_index
<< " last_replicated_op_id_index = " << last_replicated_op_id_index;

auto current_segment_num_result = log_cache_.LookupOpWalSegmentNumber(start_op_id_index);
if (!current_segment_num_result.ok() ||
(*current_segment_num_result == log_cache_.GetActiveSegmentNumber())) {
// Read entire WAL.
return ReadReplicatedMessagesForConsistentCDC(
from_op_id, HybridTime::kInvalid.ToUint64(), deadline, fetch_single_entry);
}

auto current_segment_num = *current_segment_num_result;
auto segment_last_index =
VERIFY_RESULT(log_cache_.GetMaxReplicateIndexFromSegmentFooter(current_segment_num));

// Nothing to read in this segment, read the next segment.
if (start_op_id_index == segment_last_index) {
current_segment_num = VERIFY_RESULT(log_cache_.LookupOpWalSegmentNumber(start_op_id_index + 1));
if (current_segment_num == log_cache_.GetActiveSegmentNumber()) {
auto current_segment_num_result = log_cache_.LookupOpWalSegmentNumber(start_op_id_index);
if (!current_segment_num_result.ok() ||
(*current_segment_num_result == log_cache_.GetActiveSegmentNumber())) {
// Read entire WAL.
*read_entire_wal = true;
return ReadReplicatedMessagesForConsistentCDC(
from_op_id, HybridTime::kInvalid.ToUint64(), deadline, fetch_single_entry);
}

current_segment_num = *current_segment_num_result;
segment_last_index =
VERIFY_RESULT(log_cache_.GetMaxReplicateIndexFromSegmentFooter(current_segment_num));
}

// Nothing to read in this segment, read the next segment.
if (start_op_id_index == segment_last_index) {
current_segment_num =
VERIFY_RESULT(log_cache_.LookupOpWalSegmentNumber(start_op_id_index + 1));
if (current_segment_num == log_cache_.GetActiveSegmentNumber()) {
// Read entire WAL.
*read_entire_wal = true;
return ReadReplicatedMessagesForConsistentCDC(
from_op_id, HybridTime::kInvalid.ToUint64(), deadline, fetch_single_entry);
}
segment_last_index =
VERIFY_RESULT(log_cache_.GetMaxReplicateIndexFromSegmentFooter(current_segment_num));
}
} while (segment_last_index > committed_op_id_index);

auto consistent_stream_safe_time =
VERIFY_RESULT(log_cache_.GetMinStartTimeRunningTxnsFromSegmentFooter(current_segment_num));

if (consistent_stream_safe_time_footer) {
*consistent_stream_safe_time_footer = consistent_stream_safe_time;
}

VLOG(1) << "Reading a new WAL segment. Segment info:"
VLOG(1) << "Reading a new WAL segment for tablet: " << tablet_id_ << " Segment info:"
<< " current_segment_num = " << current_segment_num
<< " active_segment_num = " << log_cache_.GetActiveSegmentNumber()
<< " segment_last_index = " << segment_last_index
Expand Down
3 changes: 2 additions & 1 deletion src/yb/consensus/consensus_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,8 @@ class PeerMessageQueue {
Result<ReadOpsResult> ReadReplicatedMessagesInSegmentForCDC(
const OpId& from_op_id, CoarseTimePoint deadline, bool fetch_single_entry = false,
int64_t* last_committed_index = nullptr,
HybridTime* consistent_stream_safe_time_footer = nullptr);
HybridTime* consistent_stream_safe_time_footer = nullptr,
bool* read_entire_wal = nullptr);

void UpdateCDCConsumerOpId(const yb::OpId& op_id);

Expand Down
5 changes: 3 additions & 2 deletions src/yb/consensus/raft_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3653,10 +3653,11 @@ Result<ReadOpsResult> RaftConsensus::ReadReplicatedMessagesForConsistentCDC(

Result<ReadOpsResult> RaftConsensus::ReadReplicatedMessagesInSegmentForCDC(
const OpId& from_op_id, CoarseTimePoint deadline, bool fetch_single_entry,
int64_t* last_committed_index, HybridTime* consistent_stream_safe_time_footer) {
int64_t* last_committed_index, HybridTime* consistent_stream_safe_time_footer,
bool* read_entire_wal) {
return queue_->ReadReplicatedMessagesInSegmentForCDC(
from_op_id, deadline, fetch_single_entry, last_committed_index,
consistent_stream_safe_time_footer);
consistent_stream_safe_time_footer, read_entire_wal);
}

void RaftConsensus::UpdateCDCConsumerOpId(const yb::OpId& op_id) {
Expand Down
3 changes: 2 additions & 1 deletion src/yb/consensus/raft_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
CoarseTimePoint deadline,
bool fetch_single_entry = false,
int64_t* last_committed_index = nullptr,
HybridTime* consistent_stream_safe_time_footer = nullptr);
HybridTime* consistent_stream_safe_time_footer = nullptr,
bool* read_entire_wal = nullptr);

void UpdateCDCConsumerOpId(const yb::OpId& op_id) override;

Expand Down
Loading

0 comments on commit 60174a3

Please sign in to comment.