diff --git a/src/yb/cdc/cdcsdk_producer.cc b/src/yb/cdc/cdcsdk_producer.cc index cca1899820f1..02edcfa9e0ba 100644 --- a/src/yb/cdc/cdcsdk_producer.cc +++ b/src/yb/cdc/cdcsdk_producer.cc @@ -92,7 +92,7 @@ DEFINE_NON_RUNTIME_int64( "ConsistentStreamSafeTime for CDCSDK by resolving all committed intetns"); DEFINE_RUNTIME_bool(cdc_read_wal_segment_by_segment, - false, + true, "When this flag is set to true, GetChanges will read the WAL segment by " "segment. If valid records are found in the first segment, GetChanges will " "return these records in response. If no valid records are found then next " @@ -2007,13 +2007,14 @@ Status GetConsistentWALRecords( bool* wait_for_wal_update, OpId* last_seen_op_id, int64_t& last_readable_opid_index, const int64_t& safe_hybrid_time_req, const CoarseTimePoint& deadline, std::vector>* consistent_wal_records, - std::vector>* all_checkpoints) { + std::vector>* all_checkpoints, + HybridTime* last_read_wal_op_record_time, bool* is_entire_wal_read) { VLOG(2) << "Getting consistent WAL records. safe_hybrid_time_req: " << safe_hybrid_time_req << ", consistent_safe_time: " << *consistent_safe_time << ", last_seen_op_id: " << last_seen_op_id->ToString() << ", historical_max_op_id: " << historical_max_op_id; auto raft_consensus = VERIFY_RESULT(tablet_peer->GetRaftConsensus()); - bool read_entire_wal = false; + HybridTime last_read_segment_footer_safe_time = HybridTime::kInvalid; do { consensus::ReadOpsResult read_ops; @@ -2027,16 +2028,10 @@ Status GetConsistentWALRecords( // end of the WAL. read_ops = VERIFY_RESULT(raft_consensus->ReadReplicatedMessagesInSegmentForCDC( *last_seen_op_id, deadline, /* fetch_single_entry */ false, &last_readable_opid_index, - &consistent_stream_safe_time_footer)); + &consistent_stream_safe_time_footer, is_entire_wal_read)); - if (!consistent_stream_safe_time_footer) { - // HybridTime::kInvalid in consistent_stream_safe_time_footer indicates that we have read - // the currently active segment. - read_entire_wal = true; - consistent_stream_safe_time_footer = HybridTime(*consistent_safe_time); - } } else { - read_entire_wal = true; + *is_entire_wal_read = true; // Read all the committed WAL messages with hybrid time <= consistent_stream_safe_time. If // there exist messages in the WAL which are replicated but not yet committed, // ReadReplicatedMessagesForConsistentCDC waits for them to get committed and eventually @@ -2053,6 +2048,7 @@ Status GetConsistentWALRecords( for (const auto& msg : read_ops.messages) { last_seen_op_id->term = msg->id().term(); last_seen_op_id->index = msg->id().index(); + *last_read_wal_op_record_time = HybridTime(msg->hybrid_time()); if (IsIntent(msg) || (IsUpdateTransactionOp(msg) && msg->transaction_state().status() != TransactionStatus::APPLYING)) { @@ -2069,7 +2065,8 @@ Status GetConsistentWALRecords( << ", commit_time: " << GetTransactionCommitTime(msg) << ", consistent safe_time: " << *consistent_safe_time << ", consistent_stream_safe_time_footer: " << consistent_stream_safe_time_footer - << ", safe_hybrid_time_req: " << safe_hybrid_time_req; + << ", safe_hybrid_time_req: " << safe_hybrid_time_req + << ", is_entire_wal_read: " << *is_entire_wal_read; } else if (VLOG_IS_ON(3)) { VLOG(3) << "Read WAL msg on " << "tablet_id: " << tablet_peer->tablet_id() << ", op_type: " << msg->op_type() @@ -2077,7 +2074,8 @@ Status GetConsistentWALRecords( << ", commit_time: " << GetTransactionCommitTime(msg) << ", consistent safe_time: " << *consistent_safe_time << ", consistent_stream_safe_time_footer: " << consistent_stream_safe_time_footer - << ", safe_hybrid_time_req: " << safe_hybrid_time_req; + << ", safe_hybrid_time_req: " << safe_hybrid_time_req + << ", is_entire_wal_read: " << *is_entire_wal_read; } all_checkpoints->push_back(msg); @@ -2091,7 +2089,7 @@ Status GetConsistentWALRecords( // Handle the case where WAL doesn't have the apply record for all the committed transactions. if (historical_max_op_id.valid() && historical_max_op_id > *last_seen_op_id && - read_entire_wal) { + *is_entire_wal_read) { *wait_for_wal_update = true; break; } @@ -2105,24 +2103,36 @@ Status GetConsistentWALRecords( SortConsistentWALRecords(consistent_wal_records); + // For closed segments, consistent_stream_safe_time_footer corresponds to the value read from + // segment footer. For active segment, it will be Invalid. + if (FLAGS_cdc_read_wal_segment_by_segment && consistent_stream_safe_time_footer.is_valid()) { + last_read_segment_footer_safe_time = consistent_stream_safe_time_footer; + } + if (!consistent_wal_records->empty()) { auto record = consistent_wal_records->front(); if (FLAGS_cdc_read_wal_segment_by_segment && GetTransactionCommitTime(record) <= consistent_stream_safe_time_footer.ToUint64()) { // Since there exists atleast one message with commit_time <= consistent_stream_safe_time, // we don't need to read the next segment. - *consistent_safe_time = consistent_stream_safe_time_footer.ToUint64(); break; } } // No need for another iteration if we have read the entire WAL. - if (read_entire_wal) { + if (*is_entire_wal_read) { break; } } while (last_seen_op_id->index < last_readable_opid_index); + // Skip updating consistent safe time when entire WAL is read and we can ship all records + // till the consistent safe time computed in cdc producer. + if (FLAGS_cdc_read_wal_segment_by_segment && !(*is_entire_wal_read) && + last_read_segment_footer_safe_time.is_valid()) { + *consistent_safe_time = last_read_segment_footer_safe_time.ToUint64(); + } + VLOG_WITH_FUNC(1) << "Got a total of " << consistent_wal_records->size() << " WAL records " << "in the current segment"; return Status::OK(); @@ -2485,6 +2495,22 @@ bool IsReplicationSlotStream(const StreamMetadata& stream_metadata) { !stream_metadata.GetReplicationSlotName()->empty(); } +// Response safe time follows the invaraint: +// Request safe time <= Response safe time <= value from GetConsistentStreamSafeTime(). +// If response safe time is set to GetConsistentStreamSafeTime()'s value, then it implies that we +// have read the entire WAL. In any other case, the response safe time can either be the last read +// WAL segment's footer safe time ('min_start_time_running_txns') or commit time of the last +// transaction being shipped in the current response. Both these values (footer safe time or commit +// time of last txn) will be <= last read WAL OP's record time. +bool CheckResponseSafeTimeCorrectness( + HybridTime last_read_wal_op_record_time, HybridTime resp_safe_time, bool is_entire_wal_read) { + if (!last_read_wal_op_record_time.is_valid() || resp_safe_time <= last_read_wal_op_record_time) { + return true; + } + + return is_entire_wal_read; +} + // CDC get changes is different from xCluster as it doesn't need // to read intents from WAL. @@ -2545,6 +2571,8 @@ Status GetChangesForCDCSDK( auto safe_hybrid_time_resp = HybridTime::kInvalid; HaveMoreMessages have_more_messages(false); + HybridTime last_read_wal_op_record_time = HybridTime::kInvalid; + bool is_entire_wal_read = false; // It is snapshot call. if (from_op_id.write_id() == -1) { snapshot_operation = true; @@ -2572,7 +2600,8 @@ Status GetChangesForCDCSDK( RETURN_NOT_OK(GetConsistentWALRecords( tablet_peer, mem_tracker, msgs_holder, &consumption, &consistent_stream_safe_time, historical_max_op_id, &wait_for_wal_update, &last_seen_op_id, *last_readable_opid_index, - safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints)); + safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints, + &last_read_wal_op_record_time, &is_entire_wal_read)); else // 'skip_intents' is true here because we want the first transaction to be the partially // streamed transaction. @@ -2666,7 +2695,8 @@ Status GetChangesForCDCSDK( RETURN_NOT_OK(GetConsistentWALRecords( tablet_peer, mem_tracker, msgs_holder, &consumption, &consistent_stream_safe_time, historical_max_op_id, &wait_for_wal_update, &last_seen_op_id, *last_readable_opid_index, - safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints)); + safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints, + &last_read_wal_op_record_time, &is_entire_wal_read)); else // 'skip_intents' is false otherwise in case the complete wal segment is filled with // intents we will break the loop thinking that WAL has no more records. @@ -3050,11 +3080,25 @@ Status GetChangesForCDCSDK( // If we need to wait for WAL to get up to date with all committed transactions, we will send the // request safe in the response as well. + auto computed_safe_hybrid_time_req = + HybridTime((safe_hybrid_time_req > 0) ? safe_hybrid_time_req : 0); auto safe_time = wait_for_wal_update - ? HybridTime((safe_hybrid_time_req > 0) ? safe_hybrid_time_req : 0) + ? computed_safe_hybrid_time_req : GetCDCSDKSafeTimeForTarget( leader_safe_time.get(), safe_hybrid_time_resp, have_more_messages, consistent_stream_safe_time, snapshot_operation); + + if (!snapshot_operation && !CheckResponseSafeTimeCorrectness( + last_read_wal_op_record_time, safe_time, is_entire_wal_read)) { + LOG(WARNING) << "Stream_id: " << stream_id << ", tablet_id: " << tablet_id + << ", response safe time: " << safe_time + << " is greater than last read WAL OP's record time: " + << last_read_wal_op_record_time + << ", req_safe_time: " << computed_safe_hybrid_time_req + << ", consistent stream safe time: " << HybridTime(consistent_stream_safe_time) + << ", leader safe time: " << leader_safe_time.get() + << ", is_entire_wal_read: " << is_entire_wal_read; + } resp->set_safe_hybrid_time(safe_time.ToUint64()); // It is possible in case of a partially streamed transaction. diff --git a/src/yb/consensus/consensus_queue.cc b/src/yb/consensus/consensus_queue.cc index 783d6b576ce6..0b714f4bf980 100644 --- a/src/yb/consensus/consensus_queue.cc +++ b/src/yb/consensus/consensus_queue.cc @@ -877,58 +877,77 @@ Result PeerMessageQueue::ReadReplicatedMessagesForConsistentCDC( Result PeerMessageQueue::ReadReplicatedMessagesInSegmentForCDC( const OpId& from_op_id, CoarseTimePoint deadline, bool fetch_single_entry, - int64_t* last_committed_index, HybridTime* consistent_stream_safe_time_footer) { + int64_t* last_committed_index, HybridTime* consistent_stream_safe_time_footer, + bool* read_entire_wal) { auto read_ops = ReadOpsResult(); + int64_t start_op_id_index; + int64_t current_segment_num; + int64_t segment_last_index; + int64_t committed_op_id_index; + int64_t last_replicated_op_id_index; - auto [committed_op_id_index, last_replicated_op_id_index] = - GetCommittedAndMajorityReplicatedIndex(); + do { + // We wait till committed_op_id_index becomes >= last index of the segment that contains the + // from_op_id. If we reach deadline, then return have_more_messages as true to wait for wal + // update. + if (deadline - CoarseMonoClock::Now() <= FLAGS_cdcsdk_wal_reads_deadline_buffer_secs * 1s) { + read_ops.have_more_messages = HaveMoreMessages(true); + return read_ops; + } - // Determine if there are pending operations in RAFT but not yet LogCache. - auto pending_messages = committed_op_id_index != last_replicated_op_id_index; + std::tie(committed_op_id_index, last_replicated_op_id_index) = + GetCommittedAndMajorityReplicatedIndex(); - if (last_committed_index) { - *last_committed_index = committed_op_id_index; - } + // Determine if there are pending operations in RAFT but not yet LogCache. + auto pending_messages = committed_op_id_index != last_replicated_op_id_index; - if (from_op_id.index >= committed_op_id_index && !fetch_single_entry) { - // Nothing to read. - return ReadOpsResult { - .messages = ReplicateMsgs(), - .preceding_op = OpId(), - .have_more_messages = HaveMoreMessages(pending_messages) - }; - } + if (last_committed_index) { + *last_committed_index = committed_op_id_index; + } - auto start_op_id_index = GetStartOpIdIndex(from_op_id.index); + if (from_op_id.index >= committed_op_id_index && !fetch_single_entry) { + // Nothing to read. + return ReadOpsResult{ + .messages = ReplicateMsgs(), + .preceding_op = OpId(), + .have_more_messages = HaveMoreMessages(pending_messages)}; + } - VLOG(1) << "Will read Ops from a WAL segment. " + start_op_id_index = GetStartOpIdIndex(from_op_id.index); + + VLOG(1) << "Will read Ops from a WAL segment for tablet: " << tablet_id_ << " start_op_id_index = " << start_op_id_index << " committed_op_id_index = " << committed_op_id_index << " last_replicated_op_id_index = " << last_replicated_op_id_index; - auto current_segment_num_result = log_cache_.LookupOpWalSegmentNumber(start_op_id_index); - if (!current_segment_num_result.ok() || - (*current_segment_num_result == log_cache_.GetActiveSegmentNumber())) { - // Read entire WAL. - return ReadReplicatedMessagesForConsistentCDC( - from_op_id, HybridTime::kInvalid.ToUint64(), deadline, fetch_single_entry); - } - - auto current_segment_num = *current_segment_num_result; - auto segment_last_index = - VERIFY_RESULT(log_cache_.GetMaxReplicateIndexFromSegmentFooter(current_segment_num)); - - // Nothing to read in this segment, read the next segment. - if (start_op_id_index == segment_last_index) { - current_segment_num = VERIFY_RESULT(log_cache_.LookupOpWalSegmentNumber(start_op_id_index + 1)); - if (current_segment_num == log_cache_.GetActiveSegmentNumber()) { + auto current_segment_num_result = log_cache_.LookupOpWalSegmentNumber(start_op_id_index); + if (!current_segment_num_result.ok() || + (*current_segment_num_result == log_cache_.GetActiveSegmentNumber())) { // Read entire WAL. + *read_entire_wal = true; return ReadReplicatedMessagesForConsistentCDC( from_op_id, HybridTime::kInvalid.ToUint64(), deadline, fetch_single_entry); } + + current_segment_num = *current_segment_num_result; segment_last_index = VERIFY_RESULT(log_cache_.GetMaxReplicateIndexFromSegmentFooter(current_segment_num)); - } + + // Nothing to read in this segment, read the next segment. + if (start_op_id_index == segment_last_index) { + current_segment_num = + VERIFY_RESULT(log_cache_.LookupOpWalSegmentNumber(start_op_id_index + 1)); + if (current_segment_num == log_cache_.GetActiveSegmentNumber()) { + // Read entire WAL. + *read_entire_wal = true; + return ReadReplicatedMessagesForConsistentCDC( + from_op_id, HybridTime::kInvalid.ToUint64(), deadline, fetch_single_entry); + } + segment_last_index = + VERIFY_RESULT(log_cache_.GetMaxReplicateIndexFromSegmentFooter(current_segment_num)); + } + } while (segment_last_index > committed_op_id_index); + auto consistent_stream_safe_time = VERIFY_RESULT(log_cache_.GetMinStartTimeRunningTxnsFromSegmentFooter(current_segment_num)); @@ -936,7 +955,7 @@ Result PeerMessageQueue::ReadReplicatedMessagesInSegmentForCDC( *consistent_stream_safe_time_footer = consistent_stream_safe_time; } - VLOG(1) << "Reading a new WAL segment. Segment info:" + VLOG(1) << "Reading a new WAL segment for tablet: " << tablet_id_ << " Segment info:" << " current_segment_num = " << current_segment_num << " active_segment_num = " << log_cache_.GetActiveSegmentNumber() << " segment_last_index = " << segment_last_index diff --git a/src/yb/consensus/consensus_queue.h b/src/yb/consensus/consensus_queue.h index 2617085fd694..a1d8f830b1e7 100644 --- a/src/yb/consensus/consensus_queue.h +++ b/src/yb/consensus/consensus_queue.h @@ -408,7 +408,8 @@ class PeerMessageQueue { Result ReadReplicatedMessagesInSegmentForCDC( const OpId& from_op_id, CoarseTimePoint deadline, bool fetch_single_entry = false, int64_t* last_committed_index = nullptr, - HybridTime* consistent_stream_safe_time_footer = nullptr); + HybridTime* consistent_stream_safe_time_footer = nullptr, + bool* read_entire_wal = nullptr); void UpdateCDCConsumerOpId(const yb::OpId& op_id); diff --git a/src/yb/consensus/raft_consensus.cc b/src/yb/consensus/raft_consensus.cc index d1f8d99e43c0..b4a53181dec9 100644 --- a/src/yb/consensus/raft_consensus.cc +++ b/src/yb/consensus/raft_consensus.cc @@ -3653,10 +3653,11 @@ Result RaftConsensus::ReadReplicatedMessagesForConsistentCDC( Result RaftConsensus::ReadReplicatedMessagesInSegmentForCDC( const OpId& from_op_id, CoarseTimePoint deadline, bool fetch_single_entry, - int64_t* last_committed_index, HybridTime* consistent_stream_safe_time_footer) { + int64_t* last_committed_index, HybridTime* consistent_stream_safe_time_footer, + bool* read_entire_wal) { return queue_->ReadReplicatedMessagesInSegmentForCDC( from_op_id, deadline, fetch_single_entry, last_committed_index, - consistent_stream_safe_time_footer); + consistent_stream_safe_time_footer, read_entire_wal); } void RaftConsensus::UpdateCDCConsumerOpId(const yb::OpId& op_id) { diff --git a/src/yb/consensus/raft_consensus.h b/src/yb/consensus/raft_consensus.h index 5ffbb0240926..cfef2bc6dd29 100644 --- a/src/yb/consensus/raft_consensus.h +++ b/src/yb/consensus/raft_consensus.h @@ -293,7 +293,8 @@ class RaftConsensus : public std::enable_shared_from_this, CoarseTimePoint deadline, bool fetch_single_entry = false, int64_t* last_committed_index = nullptr, - HybridTime* consistent_stream_safe_time_footer = nullptr); + HybridTime* consistent_stream_safe_time_footer = nullptr, + bool* read_entire_wal = nullptr); void UpdateCDCConsumerOpId(const yb::OpId& op_id) override; diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index c92c76f79632..4f8187e300d2 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -10043,6 +10043,88 @@ TEST_F(CDCSDKYsqlTest, TestWithMajorityReplicatedButNonCommittedSingleShardTxn) ASSERT_GE(total, 12); } +TEST_F(CDCSDKYsqlTest, TestWithMajorityReplicatedButNonCommittedMultiShardTxn) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = true; + constexpr int num_tservers = 1; + ASSERT_OK(SetUpWithParams(num_tservers, /* num_masters */ 1, false)); + + constexpr auto num_tablets = 1; + auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName)); + ASSERT_OK(conn.ExecuteFormat("CREATE TABLE test1(id1 int primary key) SPLIT INTO 1 tablets;")); + auto table = ASSERT_RESULT(GetTable(&test_cluster_, kNamespaceName, "test1")); + google::protobuf::RepeatedPtrField tablets; + ASSERT_OK(test_client()->GetTablets( + table, 0, &tablets, + /* partition_list_version =*/nullptr)); + ASSERT_EQ(tablets.size(), num_tablets); + + const auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStream()); + const auto& tablet_id = tablets.Get(0).tablet_id(); + + auto checkpoint_result = ASSERT_RESULT(GetCDCSDKSnapshotCheckpoint(stream_id, tablet_id)); + // Switch to streaming directly. + checkpoint_result.set_write_id(0); + + constexpr int num_inserts = 10; + LOG(INFO) << "Starting txn"; + ASSERT_OK(conn.Execute("BEGIN")); + for (int i = 0; i < num_inserts; i++) { + ASSERT_OK(conn.ExecuteFormat("INSERT INTO test1 VALUES ($0)", i)); + } + + // Explicitly rollover so that the UPDATE_TXN_OP of txn1 goes into the next segment. + log::SegmentSequence segments; + for (const auto& peer : test_cluster()->GetTabletPeers(num_tservers - 1)) { + if (peer->tablet_id() != tablet_id) { + continue; + } + auto tablet = ASSERT_RESULT(peer->shared_tablet_safe()); + ASSERT_OK(peer->log()->AllocateSegmentAndRollOver()); + ASSERT_OK(peer->log()->GetSegmentsSnapshot(&segments)); + ASSERT_EQ(segments.size(), 2); + } + + uint64_t min_start_time_running_txns; + const log::ReadableLogSegmentPtr& last_segment = ASSERT_RESULT(segments.back()); + for (const auto& segment : segments) { + // All segments except for the last should have a footer. + if (&segment == &last_segment) { + continue; + } + ASSERT_TRUE(segment->HasFooter()); + ASSERT_TRUE(segment->footer().has_min_start_time_running_txns()); + min_start_time_running_txns = segment->footer().min_start_time_running_txns(); + } + + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_stop_committed_op_id_updation) = true; + + ASSERT_OK(conn.Execute("COMMIT")); + + // DDL record will be read but it will be filtered due to commit time threshold. + auto change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &checkpoint_result)); + ASSERT_EQ(change_resp.cdc_sdk_proto_records_size(), 0); + + // Segment-1 will be read till the end but no WAL OPs relevant for CDC would be found. + auto change_resp2 = ASSERT_RESULT(GetChangesFromCDC( + stream_id, tablets, &change_resp.cdc_sdk_checkpoint(), 0, change_resp.safe_hybrid_time(), + change_resp.wal_segment_index())); + ASSERT_EQ(change_resp2.cdc_sdk_proto_records_size(), 0); + // safe time received in the response should match with the footer value of segment-1. + ASSERT_EQ(change_resp2.safe_hybrid_time(), min_start_time_running_txns); + + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_stop_committed_op_id_updation) = false; + // Perform another txn so that committed_op_id gets updated. + ASSERT_OK(conn.Execute("BEGIN")); + ASSERT_OK(conn.ExecuteFormat("INSERT INTO test1 VALUES (10)")); + ASSERT_OK(conn.Execute("COMMIT")); + + auto change_resp3 = ASSERT_RESULT(GetChangesFromCDC( + stream_id, tablets, &change_resp2.cdc_sdk_checkpoint(), 0, change_resp2.safe_hybrid_time(), + change_resp2.wal_segment_index())); + // 1 DDL + Txn1 (B + 10 inserts + C) + Txn2 (B + 1 insert + C) + ASSERT_EQ(change_resp3.cdc_sdk_proto_records_size(), 16); +} + TEST_F(CDCSDKYsqlTest, TestCleanupOfTableNotOfInterest) { ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1; ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = true; diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.h b/src/yb/integration-tests/cdcsdk_ysql_test_base.h index f60987846b6c..3aea6bd21bb6 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.h +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.h @@ -26,6 +26,7 @@ #include "yb/client/schema.h" #include "yb/client/table_handle.h" #include "yb/client/transaction.h" +#include "yb/consensus/log.h" #include "yb/master/catalog_manager_if.h" #include "yb/tablet/transaction_participant.h"