Skip to content

Commit

Permalink
[BACKPORT 2024.2.0][#24829] Revert "[BACKPORT 2024.2.0][#24829] CDC: …
Browse files Browse the repository at this point in the history
…Fix GetChanges WAL segment reading and computation of safe hybrid time for GetChanges resposne"

Summary:
This reverts commit 60174a3 on 2024.2.0 as it is planned for 2024.2.0.1.
Jira: DB-13935

Test Plan: Jenkins: compile only

Reviewers: skumar, sumukh.phalgaonkar

Reviewed By: skumar

Subscribers: ycdcxcluster

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D40449
  • Loading branch information
siddharth2411 committed Dec 4, 2024
1 parent 87de5f7 commit 0585c93
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 208 deletions.
82 changes: 19 additions & 63 deletions src/yb/cdc/cdcsdk_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ DEFINE_NON_RUNTIME_int64(
"ConsistentStreamSafeTime for CDCSDK by resolving all committed intetns");

DEFINE_RUNTIME_bool(cdc_read_wal_segment_by_segment,
true,
false,
"When this flag is set to true, GetChanges will read the WAL segment by "
"segment. If valid records are found in the first segment, GetChanges will "
"return these records in response. If no valid records are found then next "
Expand Down Expand Up @@ -2007,14 +2007,13 @@ Status GetConsistentWALRecords(
bool* wait_for_wal_update, OpId* last_seen_op_id, int64_t& last_readable_opid_index,
const int64_t& safe_hybrid_time_req, const CoarseTimePoint& deadline,
std::vector<std::shared_ptr<yb::consensus::LWReplicateMsg>>* consistent_wal_records,
std::vector<std::shared_ptr<yb::consensus::LWReplicateMsg>>* all_checkpoints,
HybridTime* last_read_wal_op_record_time, bool* is_entire_wal_read) {
std::vector<std::shared_ptr<yb::consensus::LWReplicateMsg>>* all_checkpoints) {
VLOG(2) << "Getting consistent WAL records. safe_hybrid_time_req: " << safe_hybrid_time_req
<< ", consistent_safe_time: " << *consistent_safe_time
<< ", last_seen_op_id: " << last_seen_op_id->ToString()
<< ", historical_max_op_id: " << historical_max_op_id;
auto raft_consensus = VERIFY_RESULT(tablet_peer->GetRaftConsensus());
HybridTime last_read_segment_footer_safe_time = HybridTime::kInvalid;
bool read_entire_wal = false;

do {
consensus::ReadOpsResult read_ops;
Expand All @@ -2028,10 +2027,16 @@ Status GetConsistentWALRecords(
// end of the WAL.
read_ops = VERIFY_RESULT(raft_consensus->ReadReplicatedMessagesInSegmentForCDC(
*last_seen_op_id, deadline, /* fetch_single_entry */ false, &last_readable_opid_index,
&consistent_stream_safe_time_footer, is_entire_wal_read));
&consistent_stream_safe_time_footer));

if (!consistent_stream_safe_time_footer) {
// HybridTime::kInvalid in consistent_stream_safe_time_footer indicates that we have read
// the currently active segment.
read_entire_wal = true;
consistent_stream_safe_time_footer = HybridTime(*consistent_safe_time);
}
} else {
*is_entire_wal_read = true;
read_entire_wal = true;
// Read all the committed WAL messages with hybrid time <= consistent_stream_safe_time. If
// there exist messages in the WAL which are replicated but not yet committed,
// ReadReplicatedMessagesForConsistentCDC waits for them to get committed and eventually
Expand All @@ -2048,7 +2053,6 @@ Status GetConsistentWALRecords(
for (const auto& msg : read_ops.messages) {
last_seen_op_id->term = msg->id().term();
last_seen_op_id->index = msg->id().index();
*last_read_wal_op_record_time = HybridTime(msg->hybrid_time());

if (IsIntent(msg) || (IsUpdateTransactionOp(msg) &&
msg->transaction_state().status() != TransactionStatus::APPLYING)) {
Expand All @@ -2065,17 +2069,15 @@ Status GetConsistentWALRecords(
<< ", commit_time: " << GetTransactionCommitTime(msg)
<< ", consistent safe_time: " << *consistent_safe_time
<< ", consistent_stream_safe_time_footer: " << consistent_stream_safe_time_footer
<< ", safe_hybrid_time_req: " << safe_hybrid_time_req
<< ", is_entire_wal_read: " << *is_entire_wal_read;
<< ", safe_hybrid_time_req: " << safe_hybrid_time_req;
} else if (VLOG_IS_ON(3)) {
VLOG(3) << "Read WAL msg on "
<< "tablet_id: " << tablet_peer->tablet_id() << ", op_type: " << msg->op_type()
<< ", OpId: " << msg->id().term() << "." << msg->id().index()
<< ", commit_time: " << GetTransactionCommitTime(msg)
<< ", consistent safe_time: " << *consistent_safe_time
<< ", consistent_stream_safe_time_footer: " << consistent_stream_safe_time_footer
<< ", safe_hybrid_time_req: " << safe_hybrid_time_req
<< ", is_entire_wal_read: " << *is_entire_wal_read;
<< ", safe_hybrid_time_req: " << safe_hybrid_time_req;
}

all_checkpoints->push_back(msg);
Expand All @@ -2089,7 +2091,7 @@ Status GetConsistentWALRecords(

// Handle the case where WAL doesn't have the apply record for all the committed transactions.
if (historical_max_op_id.valid() && historical_max_op_id > *last_seen_op_id &&
*is_entire_wal_read) {
read_entire_wal) {
*wait_for_wal_update = true;
break;
}
Expand All @@ -2103,36 +2105,24 @@ Status GetConsistentWALRecords(

SortConsistentWALRecords(consistent_wal_records);

// For closed segments, consistent_stream_safe_time_footer corresponds to the value read from
// segment footer. For active segment, it will be Invalid.
if (FLAGS_cdc_read_wal_segment_by_segment && consistent_stream_safe_time_footer.is_valid()) {
last_read_segment_footer_safe_time = consistent_stream_safe_time_footer;
}

if (!consistent_wal_records->empty()) {
auto record = consistent_wal_records->front();
if (FLAGS_cdc_read_wal_segment_by_segment &&
GetTransactionCommitTime(record) <= consistent_stream_safe_time_footer.ToUint64()) {
// Since there exists atleast one message with commit_time <= consistent_stream_safe_time,
// we don't need to read the next segment.
*consistent_safe_time = consistent_stream_safe_time_footer.ToUint64();
break;
}
}

// No need for another iteration if we have read the entire WAL.
if (*is_entire_wal_read) {
if (read_entire_wal) {
break;
}

} while (last_seen_op_id->index < last_readable_opid_index);

// Skip updating consistent safe time when entire WAL is read and we can ship all records
// till the consistent safe time computed in cdc producer.
if (FLAGS_cdc_read_wal_segment_by_segment && !(*is_entire_wal_read) &&
last_read_segment_footer_safe_time.is_valid()) {
*consistent_safe_time = last_read_segment_footer_safe_time.ToUint64();
}

VLOG_WITH_FUNC(1) << "Got a total of " << consistent_wal_records->size() << " WAL records "
<< "in the current segment";
return Status::OK();
Expand Down Expand Up @@ -2495,22 +2485,6 @@ bool IsReplicationSlotStream(const StreamMetadata& stream_metadata) {
!stream_metadata.GetReplicationSlotName()->empty();
}

// Response safe time follows the invaraint:
// Request safe time <= Response safe time <= value from GetConsistentStreamSafeTime().
// If response safe time is set to GetConsistentStreamSafeTime()'s value, then it implies that we
// have read the entire WAL. In any other case, the response safe time can either be the last read
// WAL segment's footer safe time ('min_start_time_running_txns') or commit time of the last
// transaction being shipped in the current response. Both these values (footer safe time or commit
// time of last txn) will be <= last read WAL OP's record time.
bool CheckResponseSafeTimeCorrectness(
HybridTime last_read_wal_op_record_time, HybridTime resp_safe_time, bool is_entire_wal_read) {
if (!last_read_wal_op_record_time.is_valid() || resp_safe_time <= last_read_wal_op_record_time) {
return true;
}

return is_entire_wal_read;
}

// CDC get changes is different from xCluster as it doesn't need
// to read intents from WAL.

Expand Down Expand Up @@ -2571,8 +2545,6 @@ Status GetChangesForCDCSDK(

auto safe_hybrid_time_resp = HybridTime::kInvalid;
HaveMoreMessages have_more_messages(false);
HybridTime last_read_wal_op_record_time = HybridTime::kInvalid;
bool is_entire_wal_read = false;
// It is snapshot call.
if (from_op_id.write_id() == -1) {
snapshot_operation = true;
Expand Down Expand Up @@ -2600,8 +2572,7 @@ Status GetChangesForCDCSDK(
RETURN_NOT_OK(GetConsistentWALRecords(
tablet_peer, mem_tracker, msgs_holder, &consumption, &consistent_stream_safe_time,
historical_max_op_id, &wait_for_wal_update, &last_seen_op_id, *last_readable_opid_index,
safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints,
&last_read_wal_op_record_time, &is_entire_wal_read));
safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints));
else
// 'skip_intents' is true here because we want the first transaction to be the partially
// streamed transaction.
Expand Down Expand Up @@ -2695,8 +2666,7 @@ Status GetChangesForCDCSDK(
RETURN_NOT_OK(GetConsistentWALRecords(
tablet_peer, mem_tracker, msgs_holder, &consumption, &consistent_stream_safe_time,
historical_max_op_id, &wait_for_wal_update, &last_seen_op_id, *last_readable_opid_index,
safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints,
&last_read_wal_op_record_time, &is_entire_wal_read));
safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints));
else
// 'skip_intents' is false otherwise in case the complete wal segment is filled with
// intents we will break the loop thinking that WAL has no more records.
Expand Down Expand Up @@ -3080,25 +3050,11 @@ Status GetChangesForCDCSDK(

// If we need to wait for WAL to get up to date with all committed transactions, we will send the
// request safe in the response as well.
auto computed_safe_hybrid_time_req =
HybridTime((safe_hybrid_time_req > 0) ? safe_hybrid_time_req : 0);
auto safe_time = wait_for_wal_update
? computed_safe_hybrid_time_req
? HybridTime((safe_hybrid_time_req > 0) ? safe_hybrid_time_req : 0)
: GetCDCSDKSafeTimeForTarget(
leader_safe_time.get(), safe_hybrid_time_resp, have_more_messages,
consistent_stream_safe_time, snapshot_operation);

if (!snapshot_operation && !CheckResponseSafeTimeCorrectness(
last_read_wal_op_record_time, safe_time, is_entire_wal_read)) {
LOG(WARNING) << "Stream_id: " << stream_id << ", tablet_id: " << tablet_id
<< ", response safe time: " << safe_time
<< " is greater than last read WAL OP's record time: "
<< last_read_wal_op_record_time
<< ", req_safe_time: " << computed_safe_hybrid_time_req
<< ", consistent stream safe time: " << HybridTime(consistent_stream_safe_time)
<< ", leader safe time: " << leader_safe_time.get()
<< ", is_entire_wal_read: " << is_entire_wal_read;
}
resp->set_safe_hybrid_time(safe_time.ToUint64());

// It is possible in case of a partially streamed transaction.
Expand Down
91 changes: 36 additions & 55 deletions src/yb/consensus/consensus_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -877,85 +877,66 @@ Result<ReadOpsResult> PeerMessageQueue::ReadReplicatedMessagesForConsistentCDC(

Result<ReadOpsResult> PeerMessageQueue::ReadReplicatedMessagesInSegmentForCDC(
const OpId& from_op_id, CoarseTimePoint deadline, bool fetch_single_entry,
int64_t* last_committed_index, HybridTime* consistent_stream_safe_time_footer,
bool* read_entire_wal) {
int64_t* last_committed_index, HybridTime* consistent_stream_safe_time_footer) {
auto read_ops = ReadOpsResult();
int64_t start_op_id_index;
int64_t current_segment_num;
int64_t segment_last_index;
int64_t committed_op_id_index;
int64_t last_replicated_op_id_index;

do {
// We wait till committed_op_id_index becomes >= last index of the segment that contains the
// from_op_id. If we reach deadline, then return have_more_messages as true to wait for wal
// update.
if (deadline - CoarseMonoClock::Now() <= FLAGS_cdcsdk_wal_reads_deadline_buffer_secs * 1s) {
read_ops.have_more_messages = HaveMoreMessages(true);
return read_ops;
}

std::tie(committed_op_id_index, last_replicated_op_id_index) =
GetCommittedAndMajorityReplicatedIndex();
auto [committed_op_id_index, last_replicated_op_id_index] =
GetCommittedAndMajorityReplicatedIndex();

// Determine if there are pending operations in RAFT but not yet LogCache.
auto pending_messages = committed_op_id_index != last_replicated_op_id_index;
// Determine if there are pending operations in RAFT but not yet LogCache.
auto pending_messages = committed_op_id_index != last_replicated_op_id_index;

if (last_committed_index) {
*last_committed_index = committed_op_id_index;
}
if (last_committed_index) {
*last_committed_index = committed_op_id_index;
}

if (from_op_id.index >= committed_op_id_index && !fetch_single_entry) {
// Nothing to read.
return ReadOpsResult{
.messages = ReplicateMsgs(),
.preceding_op = OpId(),
.have_more_messages = HaveMoreMessages(pending_messages)};
}
if (from_op_id.index >= committed_op_id_index && !fetch_single_entry) {
// Nothing to read.
return ReadOpsResult {
.messages = ReplicateMsgs(),
.preceding_op = OpId(),
.have_more_messages = HaveMoreMessages(pending_messages)
};
}

start_op_id_index = GetStartOpIdIndex(from_op_id.index);
auto start_op_id_index = GetStartOpIdIndex(from_op_id.index);

VLOG(1) << "Will read Ops from a WAL segment for tablet: " << tablet_id_
VLOG(1) << "Will read Ops from a WAL segment. "
<< " start_op_id_index = " << start_op_id_index
<< " committed_op_id_index = " << committed_op_id_index
<< " last_replicated_op_id_index = " << last_replicated_op_id_index;

auto current_segment_num_result = log_cache_.LookupOpWalSegmentNumber(start_op_id_index);
if (!current_segment_num_result.ok() ||
(*current_segment_num_result == log_cache_.GetActiveSegmentNumber())) {
auto current_segment_num_result = log_cache_.LookupOpWalSegmentNumber(start_op_id_index);
if (!current_segment_num_result.ok() ||
(*current_segment_num_result == log_cache_.GetActiveSegmentNumber())) {
// Read entire WAL.
return ReadReplicatedMessagesForConsistentCDC(
from_op_id, HybridTime::kInvalid.ToUint64(), deadline, fetch_single_entry);
}

auto current_segment_num = *current_segment_num_result;
auto segment_last_index =
VERIFY_RESULT(log_cache_.GetMaxReplicateIndexFromSegmentFooter(current_segment_num));

// Nothing to read in this segment, read the next segment.
if (start_op_id_index == segment_last_index) {
current_segment_num = VERIFY_RESULT(log_cache_.LookupOpWalSegmentNumber(start_op_id_index + 1));
if (current_segment_num == log_cache_.GetActiveSegmentNumber()) {
// Read entire WAL.
*read_entire_wal = true;
return ReadReplicatedMessagesForConsistentCDC(
from_op_id, HybridTime::kInvalid.ToUint64(), deadline, fetch_single_entry);
}

current_segment_num = *current_segment_num_result;
segment_last_index =
VERIFY_RESULT(log_cache_.GetMaxReplicateIndexFromSegmentFooter(current_segment_num));

// Nothing to read in this segment, read the next segment.
if (start_op_id_index == segment_last_index) {
current_segment_num =
VERIFY_RESULT(log_cache_.LookupOpWalSegmentNumber(start_op_id_index + 1));
if (current_segment_num == log_cache_.GetActiveSegmentNumber()) {
// Read entire WAL.
*read_entire_wal = true;
return ReadReplicatedMessagesForConsistentCDC(
from_op_id, HybridTime::kInvalid.ToUint64(), deadline, fetch_single_entry);
}
segment_last_index =
VERIFY_RESULT(log_cache_.GetMaxReplicateIndexFromSegmentFooter(current_segment_num));
}
} while (segment_last_index > committed_op_id_index);

}
auto consistent_stream_safe_time =
VERIFY_RESULT(log_cache_.GetMinStartTimeRunningTxnsFromSegmentFooter(current_segment_num));

if (consistent_stream_safe_time_footer) {
*consistent_stream_safe_time_footer = consistent_stream_safe_time;
}

VLOG(1) << "Reading a new WAL segment for tablet: " << tablet_id_ << " Segment info:"
VLOG(1) << "Reading a new WAL segment. Segment info:"
<< " current_segment_num = " << current_segment_num
<< " active_segment_num = " << log_cache_.GetActiveSegmentNumber()
<< " segment_last_index = " << segment_last_index
Expand Down
3 changes: 1 addition & 2 deletions src/yb/consensus/consensus_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,7 @@ class PeerMessageQueue {
Result<ReadOpsResult> ReadReplicatedMessagesInSegmentForCDC(
const OpId& from_op_id, CoarseTimePoint deadline, bool fetch_single_entry = false,
int64_t* last_committed_index = nullptr,
HybridTime* consistent_stream_safe_time_footer = nullptr,
bool* read_entire_wal = nullptr);
HybridTime* consistent_stream_safe_time_footer = nullptr);

void UpdateCDCConsumerOpId(const yb::OpId& op_id);

Expand Down
5 changes: 2 additions & 3 deletions src/yb/consensus/raft_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3653,11 +3653,10 @@ Result<ReadOpsResult> RaftConsensus::ReadReplicatedMessagesForConsistentCDC(

Result<ReadOpsResult> RaftConsensus::ReadReplicatedMessagesInSegmentForCDC(
const OpId& from_op_id, CoarseTimePoint deadline, bool fetch_single_entry,
int64_t* last_committed_index, HybridTime* consistent_stream_safe_time_footer,
bool* read_entire_wal) {
int64_t* last_committed_index, HybridTime* consistent_stream_safe_time_footer) {
return queue_->ReadReplicatedMessagesInSegmentForCDC(
from_op_id, deadline, fetch_single_entry, last_committed_index,
consistent_stream_safe_time_footer, read_entire_wal);
consistent_stream_safe_time_footer);
}

void RaftConsensus::UpdateCDCConsumerOpId(const yb::OpId& op_id) {
Expand Down
3 changes: 1 addition & 2 deletions src/yb/consensus/raft_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
CoarseTimePoint deadline,
bool fetch_single_entry = false,
int64_t* last_committed_index = nullptr,
HybridTime* consistent_stream_safe_time_footer = nullptr,
bool* read_entire_wal = nullptr);
HybridTime* consistent_stream_safe_time_footer = nullptr);

void UpdateCDCConsumerOpId(const yb::OpId& op_id) override;

Expand Down
Loading

0 comments on commit 0585c93

Please sign in to comment.