diff --git a/ent/src/yb/cdc/cdc_service.cc b/ent/src/yb/cdc/cdc_service.cc index bdf4b7bb9c05..081989272549 100644 --- a/ent/src/yb/cdc/cdc_service.cc +++ b/ent/src/yb/cdc/cdc_service.cc @@ -73,9 +73,6 @@ constexpr int kMaxDurationForTabletLookup = 50; const client::YBTableName kCdcStateTableName( master::kSystemNamespaceName, master::kCdcStateTableName); -const auto kCdcStateCheckpointInterval = FLAGS_cdc_state_checkpoint_update_interval_ms * 1ms; -const auto kCheckpointOpIdInterval = FLAGS_cdc_checkpoint_opid_interval_ms * 1ms; - CDCServiceImpl::CDCServiceImpl(TSTabletManager* tablet_manager, const scoped_refptr& metric_entity) : CDCServiceIf(metric_entity), @@ -327,10 +324,7 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req, s.IsNotFound() ? CDCErrorPB::CHECKPOINT_TOO_OLD : CDCErrorPB::UNKNOWN_ERROR, context); - s = UpdateCheckpoint( - producer_tablet, resp->checkpoint().op_id(), - req->has_from_checkpoint() ? req->from_checkpoint().op_id() : consensus::MinimumOpId(), - session); + s = UpdateCheckpoint(producer_tablet, resp->checkpoint().op_id(), op_id, session); RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); tablet_peer->consensus()->UpdateCDCConsumerOpId(GetMinSentCheckpointForTablet(req->tablet_id())); @@ -509,7 +503,12 @@ Result CDCServiceImpl::GetLastCheckpoint( std::shared_lock l(lock_); auto it = tablet_checkpoints_.find(producer_tablet); if (it != tablet_checkpoints_.end()) { - return it->second.cdc_state_checkpoint.op_id; + // Use checkpoint from cache only if it is current. + if (it->second.cdc_state_checkpoint.op_id.index() > 0 && + CoarseMonoClock::Now() - it->second.cdc_state_checkpoint.last_update_time <= + (FLAGS_cdc_state_checkpoint_update_interval_ms * 1ms)) { + return it->second.cdc_state_checkpoint.op_id; + } } } @@ -565,7 +564,8 @@ Status CDCServiceImpl::UpdateCheckpoint(const ProducerTabletInfo& producer_table } // Check if we need to update cdc_state table. - if (now - it->second.cdc_state_checkpoint.last_update_time <= kCdcStateCheckpointInterval) { + if (now - it->second.cdc_state_checkpoint.last_update_time <= + (FLAGS_cdc_state_checkpoint_update_interval_ms * 1ms)) { update_cdc_state = false; } else { it->second.cdc_state_checkpoint.last_update_time = now; @@ -612,7 +612,8 @@ OpIdPB CDCServiceImpl::GetMinSentCheckpointForTablet(const std::string& tablet_i // We don't want to include streams that are not being actively polled. // So, if the stream has not been polled in the last x seconds, // then we ignore that stream while calculating min op ID. - if (now - checkpoint->second.sent_checkpoint.last_update_time <= kCheckpointOpIdInterval && + if (now - checkpoint->second.sent_checkpoint.last_update_time <= + (FLAGS_cdc_checkpoint_opid_interval_ms * 1ms) && checkpoint->second.sent_checkpoint.op_id.index() < min_op_id.index()) { min_op_id = checkpoint->second.sent_checkpoint.op_id; } diff --git a/ent/src/yb/integration-tests/cdc_service-int-test.cc b/ent/src/yb/integration-tests/cdc_service-int-test.cc index f9174007998c..b07527c381ba 100644 --- a/ent/src/yb/integration-tests/cdc_service-int-test.cc +++ b/ent/src/yb/integration-tests/cdc_service-int-test.cc @@ -1,7 +1,10 @@ // Copyright (c) YugaByte, Inc. +#include + #include "yb/common/wire_protocol.h" #include "yb/common/wire_protocol-test-util.h" +#include "yb/common/ql_value.h" #include "yb/cdc/cdc_service.proxy.h" #include "yb/client/error.h" #include "yb/client/table.h" @@ -30,6 +33,7 @@ #include "yb/yql/cql/ql/util/statement_result.h" DECLARE_int32(cdc_wal_retention_time_secs); +DECLARE_int32(cdc_state_checkpoint_update_interval_ms); namespace yb { namespace cdc { @@ -111,6 +115,19 @@ void AssertChangeRecords(const google::protobuf::RepeatedPtrFieldcolumn(2).string_value(); + size_t split = checkpoint.find("."); + auto index = boost::lexical_cast(checkpoint.substr(split + 1, string::npos)); + // Verify that op id index has been advanced and is not 0. + ASSERT_GT(index, 0); +} + void CDCServiceTest::GetTablet(std::string* tablet_id) { std::vector tablet_ids; std::vector ranges; @@ -508,5 +525,88 @@ TEST_F(CDCServiceTest, TestCheckpointUpdatedForRemoteRows) { ASSERT_NO_FATALS(CheckChanges()); } +// Test to ensure that cdc_state table's checkpoint is updated as expected. +// This also tests for #2897 to ensure that cdc_state table checkpoint is not overwritten to 0.0 +// in case the consumer does not send from checkpoint. +TEST_F(CDCServiceTest, TestCheckpointUpdate) { + FLAGS_cdc_state_checkpoint_update_interval_ms = 0; + + CDCStreamId stream_id; + CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id); + + std::string tablet_id; + GetTablet(&tablet_id); + + const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy(); + + // Insert test rows. + tserver::WriteRequestPB write_req; + tserver::WriteResponsePB write_resp; + write_req.set_tablet_id(tablet_id); + { + RpcController rpc; + AddTestRowInsert(1, 11, "key1", &write_req); + AddTestRowInsert(2, 22, "key2", &write_req); + + SCOPED_TRACE(write_req.DebugString()); + ASSERT_OK(proxy->Write(write_req, &write_resp, &rpc)); + SCOPED_TRACE(write_resp.DebugString()); + ASSERT_FALSE(write_resp.has_error()); + } + + // Get CDC changes. + GetChangesRequestPB change_req; + GetChangesResponsePB change_resp; + + change_req.set_tablet_id(tablet_id); + change_req.set_stream_id(stream_id); + change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0); + change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0); + + { + RpcController rpc; + SCOPED_TRACE(change_req.DebugString()); + ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); + SCOPED_TRACE(change_resp.DebugString()); + ASSERT_FALSE(change_resp.has_error()); + ASSERT_EQ(change_resp.records_size(), 2); + } + + // Call GetChanges again and pass in checkpoint that producer can mark as committed. + change_req.mutable_from_checkpoint()->CopyFrom(change_resp.checkpoint()); + change_resp.Clear(); + { + RpcController rpc; + SCOPED_TRACE(change_req.DebugString()); + ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); + SCOPED_TRACE(change_resp.DebugString()); + ASSERT_FALSE(change_resp.has_error()); + // No more changes, so 0 records should be received. + ASSERT_EQ(change_resp.records_size(), 0); + } + + // Verify that cdc_state table has correct checkpoint. + ASSERT_NO_FATALS(VerifyCdcState(client_.get())); + + // Call GetChanges again but without any from checkpoint. + change_req.Clear(); + change_req.set_tablet_id(tablet_id); + change_req.set_stream_id(stream_id); + change_resp.Clear(); + { + RpcController rpc; + SCOPED_TRACE(change_req.DebugString()); + ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc)); + SCOPED_TRACE(change_resp.DebugString()); + ASSERT_FALSE(change_resp.has_error()); + // Verify that producer uses the "from_checkpoint" from cdc_state table and does not send back + // any records. + ASSERT_EQ(change_resp.records_size(), 0); + } + + // Verify that cdc_state table's checkpoint is unaffected. + ASSERT_NO_FATALS(VerifyCdcState(client_.get())); +} + } // namespace cdc } // namespace yb