Skip to content

Commit

Permalink
#2897: Don't reset cdc_state checkpoint to 0.0
Browse files Browse the repository at this point in the history
Summary:
In the situation where producer does not have checkpoint for a tablet in it's local cache, it resets the tablet checkpoint to 0.0 in cdc_state table incorrectly. This can happen when there is a leader change and producer tablet server does not have information about the tablet in its cache.
Note that this bug is a rare situation which happens when both producer and consumer tablet leadership would have changed. In this case, consumer does not send a commit checkpoint, and producer ends up overwriting the checkpoint in cdc_state table.

Fix is:
If we cannot find tablet in cache or if data in cache is stale (can happen during frequent leader changes), then make sure that we read latest data from cdc_state table and don't incorrectly update the table to 0 or to a stale value.

Test Plan:
Jenkins
Added unit test.

Reviewers: rahuldesirazu, hector, nicolas

Reviewed By: nicolas

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D7541
  • Loading branch information
ndeodhar committed Nov 12, 2019
1 parent 12bd537 commit b78aec4
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 10 deletions.
21 changes: 11 additions & 10 deletions ent/src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ constexpr int kMaxDurationForTabletLookup = 50;
const client::YBTableName kCdcStateTableName(
master::kSystemNamespaceName, master::kCdcStateTableName);

const auto kCdcStateCheckpointInterval = FLAGS_cdc_state_checkpoint_update_interval_ms * 1ms;
const auto kCheckpointOpIdInterval = FLAGS_cdc_checkpoint_opid_interval_ms * 1ms;

CDCServiceImpl::CDCServiceImpl(TSTabletManager* tablet_manager,
const scoped_refptr<MetricEntity>& metric_entity)
: CDCServiceIf(metric_entity),
Expand Down Expand Up @@ -327,10 +324,7 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req,
s.IsNotFound() ? CDCErrorPB::CHECKPOINT_TOO_OLD : CDCErrorPB::UNKNOWN_ERROR,
context);

s = UpdateCheckpoint(
producer_tablet, resp->checkpoint().op_id(),
req->has_from_checkpoint() ? req->from_checkpoint().op_id() : consensus::MinimumOpId(),
session);
s = UpdateCheckpoint(producer_tablet, resp->checkpoint().op_id(), op_id, session);
RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);

tablet_peer->consensus()->UpdateCDCConsumerOpId(GetMinSentCheckpointForTablet(req->tablet_id()));
Expand Down Expand Up @@ -509,7 +503,12 @@ Result<OpIdPB> CDCServiceImpl::GetLastCheckpoint(
std::shared_lock<decltype(lock_)> l(lock_);
auto it = tablet_checkpoints_.find(producer_tablet);
if (it != tablet_checkpoints_.end()) {
return it->second.cdc_state_checkpoint.op_id;
// Use checkpoint from cache only if it is current.
if (it->second.cdc_state_checkpoint.op_id.index() > 0 &&
CoarseMonoClock::Now() - it->second.cdc_state_checkpoint.last_update_time <=
(FLAGS_cdc_state_checkpoint_update_interval_ms * 1ms)) {
return it->second.cdc_state_checkpoint.op_id;
}
}
}

Expand Down Expand Up @@ -565,7 +564,8 @@ Status CDCServiceImpl::UpdateCheckpoint(const ProducerTabletInfo& producer_table
}

// Check if we need to update cdc_state table.
if (now - it->second.cdc_state_checkpoint.last_update_time <= kCdcStateCheckpointInterval) {
if (now - it->second.cdc_state_checkpoint.last_update_time <=
(FLAGS_cdc_state_checkpoint_update_interval_ms * 1ms)) {
update_cdc_state = false;
} else {
it->second.cdc_state_checkpoint.last_update_time = now;
Expand Down Expand Up @@ -612,7 +612,8 @@ OpIdPB CDCServiceImpl::GetMinSentCheckpointForTablet(const std::string& tablet_i
// We don't want to include streams that are not being actively polled.
// So, if the stream has not been polled in the last x seconds,
// then we ignore that stream while calculating min op ID.
if (now - checkpoint->second.sent_checkpoint.last_update_time <= kCheckpointOpIdInterval &&
if (now - checkpoint->second.sent_checkpoint.last_update_time <=
(FLAGS_cdc_checkpoint_opid_interval_ms * 1ms) &&
checkpoint->second.sent_checkpoint.op_id.index() < min_op_id.index()) {
min_op_id = checkpoint->second.sent_checkpoint.op_id;
}
Expand Down
100 changes: 100 additions & 0 deletions ent/src/yb/integration-tests/cdc_service-int-test.cc
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright (c) YugaByte, Inc.

#include <boost/lexical_cast.hpp>

#include "yb/common/wire_protocol.h"
#include "yb/common/wire_protocol-test-util.h"
#include "yb/common/ql_value.h"
#include "yb/cdc/cdc_service.proxy.h"
#include "yb/client/error.h"
#include "yb/client/table.h"
Expand Down Expand Up @@ -30,6 +33,7 @@
#include "yb/yql/cql/ql/util/statement_result.h"

DECLARE_int32(cdc_wal_retention_time_secs);
DECLARE_int32(cdc_state_checkpoint_update_interval_ms);

namespace yb {
namespace cdc {
Expand Down Expand Up @@ -111,6 +115,19 @@ void AssertChangeRecords(const google::protobuf::RepeatedPtrField<cdc::KeyValueP
ASSERT_EQ(changes[1].value().string_value(), expected_str);
}

void VerifyCdcState(client::YBClient* client) {
client::TableHandle table;
client::YBTableName cdc_state_table(master::kSystemNamespaceName, master::kCdcStateTableName);
ASSERT_OK(table.Open(cdc_state_table, client));
ASSERT_EQ(1, boost::size(client::TableRange(table)));
const auto& row = client::TableRange(table).begin();
string checkpoint = row->column(2).string_value();
size_t split = checkpoint.find(".");
auto index = boost::lexical_cast<int>(checkpoint.substr(split + 1, string::npos));
// Verify that op id index has been advanced and is not 0.
ASSERT_GT(index, 0);
}

void CDCServiceTest::GetTablet(std::string* tablet_id) {
std::vector<TabletId> tablet_ids;
std::vector<std::string> ranges;
Expand Down Expand Up @@ -508,5 +525,88 @@ TEST_F(CDCServiceTest, TestCheckpointUpdatedForRemoteRows) {
ASSERT_NO_FATALS(CheckChanges());
}

// Test to ensure that cdc_state table's checkpoint is updated as expected.
// This also tests for #2897 to ensure that cdc_state table checkpoint is not overwritten to 0.0
// in case the consumer does not send from checkpoint.
TEST_F(CDCServiceTest, TestCheckpointUpdate) {
FLAGS_cdc_state_checkpoint_update_interval_ms = 0;

CDCStreamId stream_id;
CreateCDCStream(cdc_proxy_, table_.table()->id(), &stream_id);

std::string tablet_id;
GetTablet(&tablet_id);

const auto& proxy = cluster_->mini_tablet_server(0)->server()->proxy();

// Insert test rows.
tserver::WriteRequestPB write_req;
tserver::WriteResponsePB write_resp;
write_req.set_tablet_id(tablet_id);
{
RpcController rpc;
AddTestRowInsert(1, 11, "key1", &write_req);
AddTestRowInsert(2, 22, "key2", &write_req);

SCOPED_TRACE(write_req.DebugString());
ASSERT_OK(proxy->Write(write_req, &write_resp, &rpc));
SCOPED_TRACE(write_resp.DebugString());
ASSERT_FALSE(write_resp.has_error());
}

// Get CDC changes.
GetChangesRequestPB change_req;
GetChangesResponsePB change_resp;

change_req.set_tablet_id(tablet_id);
change_req.set_stream_id(stream_id);
change_req.mutable_from_checkpoint()->mutable_op_id()->set_index(0);
change_req.mutable_from_checkpoint()->mutable_op_id()->set_term(0);

{
RpcController rpc;
SCOPED_TRACE(change_req.DebugString());
ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
SCOPED_TRACE(change_resp.DebugString());
ASSERT_FALSE(change_resp.has_error());
ASSERT_EQ(change_resp.records_size(), 2);
}

// Call GetChanges again and pass in checkpoint that producer can mark as committed.
change_req.mutable_from_checkpoint()->CopyFrom(change_resp.checkpoint());
change_resp.Clear();
{
RpcController rpc;
SCOPED_TRACE(change_req.DebugString());
ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
SCOPED_TRACE(change_resp.DebugString());
ASSERT_FALSE(change_resp.has_error());
// No more changes, so 0 records should be received.
ASSERT_EQ(change_resp.records_size(), 0);
}

// Verify that cdc_state table has correct checkpoint.
ASSERT_NO_FATALS(VerifyCdcState(client_.get()));

// Call GetChanges again but without any from checkpoint.
change_req.Clear();
change_req.set_tablet_id(tablet_id);
change_req.set_stream_id(stream_id);
change_resp.Clear();
{
RpcController rpc;
SCOPED_TRACE(change_req.DebugString());
ASSERT_OK(cdc_proxy_->GetChanges(change_req, &change_resp, &rpc));
SCOPED_TRACE(change_resp.DebugString());
ASSERT_FALSE(change_resp.has_error());
// Verify that producer uses the "from_checkpoint" from cdc_state table and does not send back
// any records.
ASSERT_EQ(change_resp.records_size(), 0);
}

// Verify that cdc_state table's checkpoint is unaffected.
ASSERT_NO_FATALS(VerifyCdcState(client_.get()));
}

} // namespace cdc
} // namespace yb

0 comments on commit b78aec4

Please sign in to comment.