From 8493de467cdc6a70e9a15796d4ce10f41317feb1 Mon Sep 17 00:00:00 2001 From: Neha Deodhar Date: Thu, 29 Aug 2019 02:01:22 -0700 Subject: [PATCH] #1563: #2153: [2DC] Apply changes received from producer universe Summary: This diff includes changes to add the records received from producer universe to consumer universe. A new doc operation `KVOperation` is added which will be used to write data received in WAL format from producer. Test Plan: Manual testing : created 2 universes and ensured data replicates from one to another `ctest -R twodc` `ctest -R cdc` `ctest -R doc_operation-test` Reviewers: nicolas, mikhail, hector, rahuldesirazu, bogdan Reviewed By: bogdan Subscribers: sergei, hector, mikhail, ybase Differential Revision: https://phabricator.dev.yugabyte.com/D7002 --- ent/src/yb/cdc/cdc_producer.cc | 23 +- ent/src/yb/cdc/cdc_service.cc | 156 +++++++++--- ent/src/yb/cdc/cdc_service.h | 21 +- ent/src/yb/integration-tests/twodc-test.cc | 225 +++++++++++++---- ent/src/yb/master/catalog_manager_ent.cc | 53 ++-- ent/src/yb/tserver/cdc_consumer.cc | 47 +++- ent/src/yb/tserver/cdc_consumer.h | 26 +- ent/src/yb/tserver/cdc_poller.cc | 9 +- ent/src/yb/tserver/cdc_poller.h | 11 +- ent/src/yb/tserver/tablet_server.h | 2 + ent/src/yb/tserver/tablet_server_ent.cc | 11 +- ent/src/yb/tserver/twodc_output_client.cc | 277 ++++++++++++++++++++- ent/src/yb/tserver/twodc_output_client.h | 10 + src/yb/client/client.cc | 4 + src/yb/client/client.h | 2 + src/yb/client/meta_cache.cc | 12 +- src/yb/client/meta_cache.h | 4 + src/yb/consensus/consensus.h | 3 +- src/yb/consensus/consensus_queue-test.cc | 19 +- src/yb/consensus/consensus_queue.cc | 16 +- src/yb/consensus/consensus_queue.h | 3 +- src/yb/consensus/raft_consensus.cc | 4 +- src/yb/consensus/raft_consensus.h | 2 +- src/yb/tablet/tablet.cc | 7 +- src/yb/tserver/tablet_server.h | 2 +- src/yb/tserver/tablet_service.cc | 6 +- 26 files changed, 787 insertions(+), 168 deletions(-) diff --git a/ent/src/yb/cdc/cdc_producer.cc b/ent/src/yb/cdc/cdc_producer.cc index 400bf14ebad5..a6bf9c0a1215 100644 --- a/ent/src/yb/cdc/cdc_producer.cc +++ b/ent/src/yb/cdc/cdc_producer.cc @@ -72,7 +72,7 @@ Status CDCProducer::GetChanges(const std::string& stream_id, } ReplicateMsgs messages; - RETURN_NOT_OK(tablet_peer->consensus()->ReadReplicatedMessages(from_op_id, &messages)); + RETURN_NOT_OK(tablet_peer->consensus()->ReadReplicatedMessagesForCDC(from_op_id, &messages)); TxnStatusMap txn_map = VERIFY_RESULT(BuildTxnStatusMap( messages, tablet_peer->Now(), txn_participant)); @@ -267,10 +267,19 @@ Status CDCProducer::PopulateWriteRecord(const ReplicateMsgPtr& msg, if (prev_key != key_hash) { // Write pair contains record for different row. Create a new CDCRecord in this case. record = resp->add_records(); - Slice sub_doc_key = write_pair.key(); + Slice sub_doc_key = key; docdb::SubDocKey decoded_key; RETURN_NOT_OK(decoded_key.DecodeFrom(&sub_doc_key, docdb::HybridTimeRequired::kFalse)); - AddPrimaryKey(decoded_key, schema, record); + + if (metadata.record_format == CDCRecordFormat::WAL) { + // For 2DC, populate serialized data from WAL, to avoid unnecessary deserializing on + // producer and re-serializing on consumer. + auto kv_pair = record->add_key(); + kv_pair->set_key(std::to_string(decoded_key.doc_key().hash())); + kv_pair->mutable_value()->set_binary_value(write_pair.key()); + } else { + AddPrimaryKey(decoded_key, schema, record); + } // Check whether operation is WRITE or DELETE. if (decoded_value.value_type() == docdb::ValueType::kTombstone && @@ -291,7 +300,11 @@ Status CDCProducer::PopulateWriteRecord(const ReplicateMsgPtr& msg, prev_key = key_hash; DCHECK(record); - if (record->operation() == CDCRecordPB_OperationType_WRITE) { + if (metadata.record_format == CDCRecordFormat::WAL) { + auto kv_pair = record->add_changes(); + kv_pair->set_key(write_pair.key()); + kv_pair->mutable_value()->set_binary_value(write_pair.value()); + } else if (record->operation() == CDCRecordPB_OperationType_WRITE) { PrimitiveValue column_id; Slice key_column = write_pair.key().data() + key_sizes.second; RETURN_NOT_OK(PrimitiveValue::DecodeKey(&key_column, &column_id)); @@ -299,7 +312,7 @@ Status CDCProducer::PopulateWriteRecord(const ReplicateMsgPtr& msg, ColumnSchema col = VERIFY_RESULT(schema.column_by_id(column_id.GetColumnId())); AddColumnToMap(col, decoded_value.primitive_value(), record->add_changes()); } else if (column_id.value_type() != docdb::ValueType::kSystemColumnId) { - LOG(DFATAL) << "Unexpected value type in key: "<< column_id.value_type(); + LOG(DFATAL) << "Unexpected value type in key: " << column_id.value_type(); } } } diff --git a/ent/src/yb/cdc/cdc_service.cc b/ent/src/yb/cdc/cdc_service.cc index 9bf6fcccbc8a..81b4373fd06e 100644 --- a/ent/src/yb/cdc/cdc_service.cc +++ b/ent/src/yb/cdc/cdc_service.cc @@ -12,11 +12,14 @@ #include "yb/cdc/cdc_service.h" +#include +#include #include #include #include "yb/cdc/cdc_producer.h" +#include "yb/cdc/cdc_service.proxy.h" #include "yb/common/entity_ids.h" #include "yb/common/ql_expr.h" #include "yb/common/wire_protocol.h" @@ -56,8 +59,13 @@ DEFINE_int32(cdc_wal_retention_time_secs, 4 * 3600, namespace yb { namespace cdc { +using namespace std::literals; + using rpc::RpcContext; using tserver::TSTabletManager; +using client::internal::RemoteTabletServer; + +constexpr int kMaxDurationForTabletLookup = 50; CDCServiceImpl::CDCServiceImpl(TSTabletManager* tablet_manager, const scoped_refptr& metric_entity) @@ -81,6 +89,10 @@ bool YsqlTableHasPrimaryKey(const client::YBSchema& schema) { } return true; } + +bool IsTabletPeerLeader(std::shared_ptr peer) { + return peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY; +} } // namespace template @@ -97,38 +109,15 @@ bool CDCServiceImpl::CheckOnline(const ReqType* req, RespType* resp, rpc::RpcCon } template -Result> CDCServiceImpl::GetLeaderTabletPeer( +Result> CDCServiceImpl::GetTabletPeer( const std::string& tablet_id, RespType* resp, rpc::RpcContext* rpc) { std::shared_ptr peer; - Status status = tablet_manager_->GetTabletPeer(tablet_id, &peer); - if (PREDICT_FALSE(!status.ok())) { - CDCErrorPB::Code code = status.IsNotFound() ? - CDCErrorPB::TABLET_NOT_FOUND : CDCErrorPB::TABLET_NOT_RUNNING; - SetupErrorAndRespond(resp->mutable_error(), status, code, rpc); - return status; - } + RETURN_NOT_OK(tablet_manager_->GetTabletPeer(tablet_id, &peer)); - // Check RUNNING state. - status = peer->CheckRunning(); - if (PREDICT_FALSE(!status.ok())) { - Status s = STATUS(IllegalState, "Tablet not RUNNING"); - SetupErrorAndRespond(resp->mutable_error(), s, CDCErrorPB::TABLET_NOT_RUNNING, rpc); - return s; - } - - // Check if tablet peer is leader. - consensus::LeaderStatus leader_status = peer->LeaderStatus(); - if (leader_status != consensus::LeaderStatus::LEADER_AND_READY) { - // No records to read. - if (leader_status == consensus::LeaderStatus::NOT_LEADER) { - // TODO: Change this to provide new leader - } - Status s = STATUS(IllegalState, "Tablet Server is not leader", ToCString(leader_status)); - SetupErrorAndRespond(resp->mutable_error(), s, CDCErrorPB::NOT_LEADER, rpc); - return s; - } + // Check if tablet is running. + RETURN_NOT_OK(peer->CheckRunning()); return peer; } @@ -282,7 +271,7 @@ Result> CDCService std::shared_ptr> CDCServiceImpl::GetTabletIdsForStream( const CDCStreamId& stream_id) { { - shared_lock l(lock_); + std::shared_lock l(lock_); auto it = stream_tablets_.find(stream_id); if (it != stream_tablets_.end()) { return it->second; @@ -336,8 +325,16 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req, Status s = CheckTabletValidForStream(req->stream_id(), req->tablet_id()); RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context); - auto tablet_peer = GetLeaderTabletPeer(req->tablet_id(), resp, &context); - if (!tablet_peer.ok()) { + auto result = GetTabletPeer(req->tablet_id(), resp, &context); + RPC_CHECK_AND_RETURN_ERROR(result.ok(), result.status(), resp->mutable_error(), + CDCErrorPB::TABLET_NOT_RUNNING, context); + auto tablet_peer = *result; + + if (!IsTabletPeerLeader(tablet_peer)) { + // Forward GetChanges() to tablet leader. + // TODO: Remove this once cdc consumer has meta cache and is able to direct requests to tablet + // leader. Once that is done, we should return NOT_LEADER error here. + TabletLeaderGetChanges(req, resp, &context); return; } @@ -358,7 +355,7 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req, CDCProducer cdc_producer; s = cdc_producer.GetChanges(req->stream_id(), req->tablet_id(), op_id, *record->get(), - *tablet_peer, resp); + tablet_peer, resp); RPC_STATUS_RETURN_ERROR( s, resp->mutable_error(), @@ -374,6 +371,86 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req, context.RespondSuccess(); } +Result CDCServiceImpl::GetLeaderTServer(const TabletId& tablet_id) { + std::promise> tablet_lookup_promise; + auto future = tablet_lookup_promise.get_future(); + auto callback = [&tablet_lookup_promise]( + const Result& result) { + tablet_lookup_promise.set_value(result); + }; + + auto start = CoarseMonoClock::Now(); + async_client_init_->client()->LookupTabletById( + tablet_id, + CoarseMonoClock::Now() + FLAGS_cdc_rpc_timeout_ms * 1ms, + callback, client::UseCache::kTrue); + future.wait(); + + auto duration = CoarseMonoClock::Now() - start; + if (duration > (kMaxDurationForTabletLookup * 1ms)) { + LOG(WARNING) << "LookupTabletByKey took long time: " << duration << " ms"; + } + + auto result = VERIFY_RESULT(future.get()); + + auto ts = result->LeaderTServer(); + if (ts == nullptr) { + return STATUS(NotFound, "Tablet leader not found for tablet", tablet_id); + } + return ts; +} + +std::shared_ptr CDCServiceImpl::GetCDCServiceProxy(RemoteTabletServer* ts) { + auto hostport = HostPortFromPB(DesiredHostPort( + ts->public_rpc_hostports(), ts->private_rpc_hostports(), ts->cloud_info(), + async_client_init_->client()->cloud_info())); + DCHECK(!hostport.host().empty()); + + { + std::shared_lock l(lock_); + auto it = cdc_service_map_.find(hostport); + if (it != cdc_service_map_.end()) { + return it->second; + } + } + + auto cdc_service = std::make_shared(&async_client_init_->client()->proxy_cache(), + hostport); + { + std::lock_guard l(lock_); + cdc_service_map_.emplace(hostport, cdc_service); + } + return cdc_service; +} + +void CDCServiceImpl::TabletLeaderGetChanges(const GetChangesRequestPB* req, + GetChangesResponsePB* resp, + RpcContext* context) { + auto ts_leader = GetLeaderTServer(req->tablet_id()); + RPC_CHECK_AND_RETURN_ERROR(ts_leader.ok(), ts_leader.status(), resp->mutable_error(), + CDCErrorPB::TABLET_NOT_FOUND, *context); + + auto cdc_proxy = GetCDCServiceProxy(*ts_leader); + rpc::RpcController rpc; + rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_rpc_timeout_ms)); + cdc_proxy->GetChanges(*req, resp, &rpc); + context->RespondSuccess(); +} + +void CDCServiceImpl::TabletLeaderGetCheckpoint(const GetCheckpointRequestPB* req, + GetCheckpointResponsePB* resp, + RpcContext* context) { + auto ts_leader = GetLeaderTServer(req->tablet_id()); + RPC_CHECK_AND_RETURN_ERROR(ts_leader.ok(), ts_leader.status(), resp->mutable_error(), + CDCErrorPB::TABLET_NOT_FOUND, *context); + + auto cdc_proxy = GetCDCServiceProxy(*ts_leader); + rpc::RpcController rpc; + rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_rpc_timeout_ms)); + cdc_proxy->GetCheckpoint(*req, resp, &rpc); + context->RespondSuccess(); +} + void CDCServiceImpl::GetCheckpoint(const GetCheckpointRequestPB* req, GetCheckpointResponsePB* resp, RpcContext context) { @@ -392,6 +469,19 @@ void CDCServiceImpl::GetCheckpoint(const GetCheckpointRequestPB* req, CDCErrorPB::INVALID_REQUEST, context); + auto res = GetTabletPeer(req->tablet_id(), resp, &context); + RPC_CHECK_AND_RETURN_ERROR(res.ok(), res.status(), resp->mutable_error(), + CDCErrorPB::TABLET_NOT_RUNNING, context); + auto tablet_peer = *res; + + if (!IsTabletPeerLeader(tablet_peer)) { + // Forward GetCheckpoint() to tablet leader. + // TODO: Remove this once cdc consumer has meta cache and is able to direct requests to tablet + // leader. Once that is done, we should return NOT_LEADER error here. + TabletLeaderGetCheckpoint(req, resp, &context); + return; + } + // Check that requested tablet_id is part of the CDC stream. Status s = CheckTabletValidForStream(req->stream_id(), req->tablet_id()); RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context); @@ -414,7 +504,7 @@ Result CDCServiceImpl::GetLastCheckpoint( const std::string& tablet_id, const std::shared_ptr& session) { { - boost::shared_lock l(lock_); + std::shared_lock l(lock_); auto it = tablet_checkpoints_.find(tablet_id); if (it != tablet_checkpoints_.end()) { return it->second; @@ -514,7 +604,7 @@ void CDCServiceImpl::AddStreamMetadataToCache(const std::string& stream_id, std::shared_ptr CDCServiceImpl::GetStreamMetadataFromCache( const std::string& stream_id) { - boost::shared_lock l(lock_); + std::shared_lock l(lock_); auto it = stream_metadata_.find(stream_id); if (it != stream_metadata_.end()) { return it->second; diff --git a/ent/src/yb/cdc/cdc_service.h b/ent/src/yb/cdc/cdc_service.h index bfdb51e9f221..63b208c3246d 100644 --- a/ent/src/yb/cdc/cdc_service.h +++ b/ent/src/yb/cdc/cdc_service.h @@ -16,15 +16,20 @@ #include "yb/cdc/cdc_service.service.h" #include "yb/cdc/cdc_producer.h" +#include "yb/cdc/cdc_service.proxy.h" #include "yb/rpc/rpc_context.h" #include "yb/tablet/tablet_peer.h" #include "yb/tserver/ts_tablet_manager.h" #include "yb/util/metrics.h" +#include "yb/util/net/net_util.h" #include "yb/util/service_util.h" namespace yb { namespace cdc { +typedef std::unordered_map, HostPortHash> + CDCServiceProxyMap; + static const char* const kRecordType = "record_type"; static const char* const kRecordFormat = "record_format"; static const char* const kRetentionSec = "retention_sec"; @@ -60,7 +65,7 @@ class CDCServiceImpl : public CDCServiceIf { bool CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc); template - Result> GetLeaderTabletPeer( + Result> GetTabletPeer( const std::string& tablet_id, RespType* resp, rpc::RpcContext* rpc); Result GetLastCheckpoint(const std::string& stream_id, @@ -87,6 +92,16 @@ class CDCServiceImpl : public CDCServiceIf { CHECKED_STATUS CheckTabletValidForStream(const std::string& stream_id, const std::string& tablet_id); + void TabletLeaderGetChanges(const GetChangesRequestPB* req, + GetChangesResponsePB* resp, + rpc::RpcContext* context); + void TabletLeaderGetCheckpoint(const GetCheckpointRequestPB* req, + GetCheckpointResponsePB* resp, + rpc::RpcContext* context); + + Result GetLeaderTServer(const TabletId& tablet_id); + std::shared_ptr GetCDCServiceProxy(client::internal::RemoteTabletServer* ts); + tserver::TSTabletManager* tablet_manager_; boost::optional async_client_init_; @@ -101,6 +116,10 @@ class CDCServiceImpl : public CDCServiceIf { // TODO: Add cache invalidation after tablet splitting is implemented (#1004). // Map of stream ID -> [tablet IDs]. std::unordered_map>> stream_tablets_; + + // Map of HostPort -> CDCServiceProxy. This is used to redirect requests to tablet leader's + // CDC service proxy. + CDCServiceProxyMap cdc_service_map_; }; } // namespace cdc diff --git a/ent/src/yb/integration-tests/twodc-test.cc b/ent/src/yb/integration-tests/twodc-test.cc index d8e573a1bb4d..b0cd37c12de2 100644 --- a/ent/src/yb/integration-tests/twodc-test.cc +++ b/ent/src/yb/integration-tests/twodc-test.cc @@ -21,6 +21,7 @@ #include "yb/common/wire_protocol.h" +#include "yb/cdc/cdc_service.h" #include "yb/client/client.h" #include "yb/client/client-test-util.h" #include "yb/client/schema.h" @@ -59,6 +60,7 @@ DECLARE_int32(replication_factor); DECLARE_bool(mock_get_changes_response_for_consumer_testing); +DECLARE_bool(twodc_write_hybrid_time_override); namespace yb { @@ -81,6 +83,7 @@ using tserver::enterprise::CDCConsumer; namespace enterprise { constexpr int kRpcTimeout = 30; +static const std::string kUniverseId = "test_universe"; class TwoDCTest : public YBTest { public: @@ -92,7 +95,6 @@ class TwoDCTest : public YBTest { MiniClusterOptions opts; opts.num_tablet_servers = replication_factor; FLAGS_replication_factor = replication_factor; - FLAGS_mock_get_changes_response_for_consumer_testing = true; opts.cluster_id = "producer"; producer_cluster_ = std::make_unique(Env::Default(), opts); RETURN_NOT_OK(producer_cluster_->StartSync()); @@ -150,12 +152,13 @@ class TwoDCTest : public YBTest { } Status SetupUniverseReplication( + MiniCluster* producer_cluster, MiniCluster* consumer_cluster, YBClient* consumer_client, const std::string& universe_id, const std::vector>& tables) { master::SetupUniverseReplicationRequestPB req; master::SetupUniverseReplicationResponsePB resp; req.set_producer_id(universe_id); - string master_addr = producer_cluster_->GetMasterAddresses(); + string master_addr = producer_cluster->GetMasterAddresses(); auto hp_vec = VERIFY_RESULT(HostPort::ParseStrings(master_addr, 0)); HostPortsToPBs(hp_vec, req.mutable_producer_master_addresses()); @@ -165,8 +168,8 @@ class TwoDCTest : public YBTest { } auto master_proxy = std::make_shared( - &consumer_client_->proxy_cache(), - consumer_cluster_->leader_mini_master()->bound_rpc_addr()); + &consumer_client->proxy_cache(), + consumer_cluster->leader_mini_master()->bound_rpc_addr()); rpc::RpcController rpc; rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); @@ -177,22 +180,23 @@ class TwoDCTest : public YBTest { return Status::OK(); } - Status GetUniverseReplication( + Status VerifyUniverseReplication( + MiniCluster *consumer_cluster, YBClient* consumer_client, const std::string& universe_id, master::GetUniverseReplicationResponsePB* resp) { - master::GetUniverseReplicationRequestPB req; - req.set_producer_id(universe_id); - - auto master_proxy = std::make_shared( - &consumer_client_->proxy_cache(), - consumer_cluster_->leader_mini_master()->bound_rpc_addr()); - rpc::RpcController rpc; - rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); - - RETURN_NOT_OK(master_proxy->GetUniverseReplication(req, resp, &rpc)); - if (resp->has_error()) { - return STATUS(IllegalState, "Failed getting universe replication"); - } - return Status::OK(); + return LoggedWaitFor([=]() -> Result { + master::GetUniverseReplicationRequestPB req; + req.set_producer_id(universe_id); + resp->Clear(); + + auto master_proxy = std::make_shared( + &consumer_client->proxy_cache(), + consumer_cluster->leader_mini_master()->bound_rpc_addr()); + rpc::RpcController rpc; + rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); + + Status s = master_proxy->GetUniverseReplication(req, resp, &rpc); + return s.ok() && !resp->has_error(); + }, MonoDelta::FromSeconds(kRpcTimeout), "Verify universe replication"); } Status GetCDCStreamForTable( @@ -224,14 +228,15 @@ class TwoDCTest : public YBTest { consumer_client_.reset(); } - void WriteWorkload(uint32_t start, uint32_t end, YBClient* client, const YBTableName& table) { + void WriteWorkload(uint32_t start, uint32_t end, YBClient* client, const YBTableName& table, + bool delete_op = false) { auto session = client->NewSession(); client::TableHandle table_handle; ASSERT_OK(table_handle.Open(table, client)); std::vector> ops; for (uint32_t i = start; i < end; i++) { - auto op = table_handle.NewInsertOp(); + auto op = delete_op ? table_handle.NewDeleteOp() : table_handle.NewInsertOp(); int32_t key = i; auto req = op->mutable_request(); QLAddInt32HashValue(req, key); @@ -239,6 +244,10 @@ class TwoDCTest : public YBTest { } } + void DeleteWorkload(uint32_t start, uint32_t end, YBClient* client, const YBTableName& table) { + WriteWorkload(start, end, client, table, true /* delete_op */); + } + std::vector ScanToStrings(const YBTableName& table_name, YBClient* client) { client::TableHandle table; EXPECT_OK(table.Open(table_name, client)); @@ -247,10 +256,21 @@ class TwoDCTest : public YBTest { return result; } - void VerifyWrittenRecords(const YBTableName& table_name) { - auto producer_results = ScanToStrings(table_name, producer_client_.get()); - auto consumer_results = ScanToStrings(table_name, consumer_client_.get()); - ASSERT_EQ(producer_results, consumer_results); + + Status VerifyWrittenRecords(const YBTableName& producer_table, + const YBTableName& consumer_table) { + return LoggedWaitFor([=]() -> Result { + auto producer_results = ScanToStrings(producer_table, producer_client_.get()); + auto consumer_results = ScanToStrings(consumer_table, consumer_client_.get()); + return producer_results == consumer_results; + }, MonoDelta::FromSeconds(kRpcTimeout), "Verify written records"); + } + + Status VerifyNumRecords(const YBTableName& table, YBClient* client, int expected_size) { + return LoggedWaitFor([=]() -> Result { + auto results = ScanToStrings(table, client); + return results.size() == expected_size; + }, MonoDelta::FromSeconds(kRpcTimeout), "Verify number of records"); } Status InitCDCConsumer() { @@ -266,8 +286,8 @@ class TwoDCTest : public YBTest { master::enterprise::TEST_GetConsumerProducerTableMap(master_addrs, tables_resp)); auto universe_uuid = "universe_uuid"; - return consumer_cluster_->leader_mini_master()->master()->catalog_manager()-> - InitCDCConsumer(consumer_info, master_addrs, universe_uuid); + return consumer_cluster_->leader_mini_master()->master()->catalog_manager()->InitCDCConsumer( + consumer_info, master_addrs, universe_uuid); } YBClient* producer_client() { @@ -278,10 +298,17 @@ class TwoDCTest : public YBTest { return consumer_client_.get(); } + MiniCluster* producer_cluster() { + return producer_cluster_.get(); + } - uint32_t NumProducerTabletsPolled() { + MiniCluster* consumer_cluster() { + return consumer_cluster_.get(); + } + + uint32_t NumProducerTabletsPolled(MiniCluster* cluster) { uint32_t size = 0; - for (const auto& mini_tserver : consumer_cluster_->mini_tablet_servers()) { + for (const auto& mini_tserver : cluster->mini_tablet_servers()) { uint32_t new_size = 0; auto* tserver = dynamic_cast( mini_tserver->server()); @@ -295,11 +322,11 @@ class TwoDCTest : public YBTest { return size; } - Status CorrectlyPollingAllTablets(uint32_t num_producer_tablets) { + Status CorrectlyPollingAllTablets(MiniCluster* cluster, uint32_t num_producer_tablets) { return LoggedWaitFor([=]() -> Result { static int i = 0; constexpr int kNumIterationsWithCorrectResult = 5; - if (NumProducerTabletsPolled() == num_producer_tablets) { + if (NumProducerTabletsPolled(cluster) == num_producer_tablets) { if (i++ == kNumIterationsWithCorrectResult) { i = 0; return true; @@ -308,7 +335,7 @@ class TwoDCTest : public YBTest { i = 0; } return false; - }, MonoDelta::FromSeconds(30), "Num producer tablets being polled"); + }, MonoDelta::FromSeconds(kRpcTimeout), "Num producer tablets being polled"); } std::unique_ptr producer_cluster_; @@ -323,7 +350,6 @@ class TwoDCTest : public YBTest { }; TEST_F(TwoDCTest, SetupUniverseReplication) { - static const std::string universe_id = "universe_A"; auto tables = ASSERT_RESULT(SetUpWithParams({8, 4, 4, 12}, {8, 4, 12, 8}, 3)); std::vector> producer_tables; @@ -333,15 +359,16 @@ TEST_F(TwoDCTest, SetupUniverseReplication) { for (int i = 0; i < tables.size(); i += 2) { producer_tables.push_back(tables[i]); } - ASSERT_OK(SetupUniverseReplication(universe_id, producer_tables)); + ASSERT_OK(SetupUniverseReplication( + producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); // Sleep for some time to give enough time for CDC streams and subscribers to be setup. SleepFor(MonoDelta::FromMilliseconds(500)); // Verify that universe was setup on consumer. master::GetUniverseReplicationResponsePB resp; - ASSERT_OK(GetUniverseReplication(universe_id, &resp)); - ASSERT_EQ(resp.producer_id(), universe_id); + ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, &resp)); + ASSERT_EQ(resp.producer_id(), kUniverseId); ASSERT_EQ(resp.producer_tables_size(), producer_tables.size()); for (int i = 0; i < producer_tables.size(); i++) { ASSERT_EQ(resp.producer_tables(i).table_id(), producer_tables[i]->id()); @@ -362,27 +389,28 @@ TEST_F(TwoDCTest, PollWithConsumerRestart) { uint32_t replication_factor = NonTsanVsTsan(3, 1); auto tables = ASSERT_RESULT(SetUpWithParams({8, 4, 4, 12}, {8, 4, 12, 8}, replication_factor)); + FLAGS_mock_get_changes_response_for_consumer_testing = true; ASSERT_OK(InitCDCConsumer()); // After creating the cluster, make sure all 32 tablets being polled for. - ASSERT_OK(CorrectlyPollingAllTablets(32)); + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 32)); consumer_cluster_->mini_tablet_server(0)->Shutdown(); // After shutting down a consumer node. if (replication_factor > 1) { - ASSERT_OK(CorrectlyPollingAllTablets(32)); + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 32)); } ASSERT_OK(consumer_cluster_->mini_tablet_server(0)->Start()); // After restarting the node. - ASSERT_OK(CorrectlyPollingAllTablets(32)); + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 32)); ASSERT_OK(consumer_cluster_->RestartSync()); // After consumer cluster restart. - ASSERT_OK(CorrectlyPollingAllTablets(32)); + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 32)); Destroy(); } @@ -391,29 +419,138 @@ TEST_F(TwoDCTest, PollWithProducerRestart) { uint32_t replication_factor = NonTsanVsTsan(3, 1); auto tables = ASSERT_RESULT(SetUpWithParams({8, 4, 4, 12}, {8, 4, 12, 8}, replication_factor)); + FLAGS_mock_get_changes_response_for_consumer_testing = true; ASSERT_OK(InitCDCConsumer()); // After creating the cluster, make sure all 32 tablets being polled for. - ASSERT_OK(CorrectlyPollingAllTablets(32)); + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 32)); producer_cluster_->mini_tablet_server(0)->Shutdown(); // After stopping a producer node. - ASSERT_OK(CorrectlyPollingAllTablets(32)); + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 32)); ASSERT_OK(producer_cluster_->mini_tablet_server(0)->Start()); // After starting the node. - ASSERT_OK(CorrectlyPollingAllTablets(32)); + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 32)); ASSERT_OK(producer_cluster_->RestartSync()); // After producer cluster restart. - ASSERT_OK(CorrectlyPollingAllTablets(32)); + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 32)); + + Destroy(); +} + +TEST_F(TwoDCTest, ApplyOperations) { + uint32_t replication_factor = NonTsanVsTsan(3, 1); + auto tables = ASSERT_RESULT(SetUpWithParams({2}, {2}, replication_factor)); + + std::vector> producer_tables; + // tables contains both producer and consumer universe tables (alternately). + // Pick out just the producer table from the list. + producer_tables.reserve(1); + producer_tables.push_back(tables[0]); + ASSERT_OK(SetupUniverseReplication( + producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); + + // After creating the cluster, make sure all producer tablets are being polled for. + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2)); + + WriteWorkload(0, 5, producer_client(), tables[0]->name()); + + // Check that all tablets continue to be polled for. + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2)); + + // Verify that both clusters have the same records. + ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); Destroy(); +} + +TEST_F(TwoDCTest, TestExternalWriteHybridTime) { + uint32_t replication_factor = NonTsanVsTsan(3, 1); + auto tables = ASSERT_RESULT(SetUpWithParams({2}, {2}, replication_factor)); + + std::vector> producer_tables; + producer_tables.push_back(tables[0]); + ASSERT_OK(SetupUniverseReplication( + producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); + + // After creating the cluster, make sure all producer tablets are being polled for. + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2)); + + // Write 2 rows. + WriteWorkload(0, 2, producer_client(), tables[0]->name()); + + // Ensure that records can be read. + ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); + + // Delete 1 record. + DeleteWorkload(0, 1, producer_client(), tables[0]->name()); + // Ensure that record is deleted on both universes. + ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); + // Delete 2nd record but replicate at a low timestamp (timestamp lower than insertion timestamp). + FLAGS_twodc_write_hybrid_time_override = true; + DeleteWorkload(1, 2, producer_client(), tables[0]->name()); + + // Verify that record exists on consumer universe, but is deleted from producer universe. + ASSERT_OK(VerifyNumRecords(tables[0]->name(), producer_client(), 0)); + ASSERT_OK(VerifyNumRecords(tables[1]->name(), consumer_client(), 1)); + + Destroy(); +} + +TEST_F(TwoDCTest, BiDirectionalWrites) { + auto tables = ASSERT_RESULT(SetUpWithParams({2}, {2}, 1)); + + // Setup bi-directional replication. + std::vector> producer_tables; + producer_tables.push_back(tables[0]); + ASSERT_OK(SetupUniverseReplication( + producer_cluster(), consumer_cluster(), consumer_client(), kUniverseId, producer_tables)); + + std::vector> producer_tables_reverse; + producer_tables_reverse.push_back(tables[1]); + ASSERT_OK(SetupUniverseReplication( + consumer_cluster(), producer_cluster(), producer_client(), kUniverseId, + producer_tables_reverse)); + + // After creating the cluster, make sure all producer tablets are being polled for. + ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 2)); + ASSERT_OK(CorrectlyPollingAllTablets(producer_cluster(), 2)); + + // Write non-conflicting rows on both clusters. + WriteWorkload(0, 5, producer_client(), tables[0]->name()); + WriteWorkload(5, 10, consumer_client(), tables[1]->name()); + + // Ensure that records are the same on both clusters. + ASSERT_OK(VerifyWrittenRecords(tables[0]->name(), tables[1]->name())); + // Ensure that both universes have all 10 records. + ASSERT_OK(VerifyNumRecords(tables[0]->name(), producer_client(), 10)); + + // Write conflicting records on both clusters (1 clusters adds key, another deletes key). + std::vector threads; + for (int i = 0; i < 2; ++i) { + auto client = i == 0 ? producer_client() : consumer_client(); + int index = i; + bool is_delete = i == 0; + threads.emplace_back([this, client, index, tables, is_delete] { + WriteWorkload(10, 20, client, tables[index]->name(), is_delete); + }); + } + + for (auto& thread : threads) { + thread.join(); + } + + // Ensure that same records exist on both universes. + VerifyWrittenRecords(tables[0]->name(), tables[1]->name()); + + Destroy(); } } // namespace enterprise diff --git a/ent/src/yb/master/catalog_manager_ent.cc b/ent/src/yb/master/catalog_manager_ent.cc index 959008c6ce7b..fb566ff33cd2 100644 --- a/ent/src/yb/master/catalog_manager_ent.cc +++ b/ent/src/yb/master/catalog_manager_ent.cc @@ -1967,47 +1967,30 @@ void CatalogManager::CreateCDCStreamCallback( auto map = l->mutable_data()->pb.mutable_table_streams(); (*map)[table_id] = *stream_id; - bool subscriber_error = false; if (l->mutable_data()->pb.table_streams_size() == l->data().pb.tables_size()) { // Register CDC consumers for all tables and start replication. - LOG(INFO) << "Registering CDC subscribers for universe " << universe->id(); + LOG(INFO) << "Registering CDC consumers for universe " << universe->id(); - // TODO: Enable after #1481 - #if 0 - auto resolved_tables = l->data().pb.resolved_tables(); auto validated_tables = l->data().pb.validated_tables(); - for (const auto& table : tables) { - RegisterSubscriberRequestPB req; - RegisterSubscriberResponsePB resp; - - std::vector hp; - HostPortsFromPBs(l->data().pb.producer_master_addresses(), &hp); - req.set_master_addrs(HostPort::ToCommaSeparatedString(hp)); - const auto& rpair = table.second; - - auto* producer = req.mutable_producer_table(); - producer->set_table_id(table.first); - producer->set_table_name(resolved_tables[table.first].table_name()); - producer->mutable_namespace_()->set_name(resolved_tables[table.first].namespace_().name()); - - auto* consumer = req.mutable_consumer_table(); - consumer->set_table_id(table.second); - consumer->set_table_name(resolved_tables[table.first].table_name()); - consumer->mutable_namespace_()->set_name(resolved_tables[table.first].namespace_().name()); - - universe->GetStreamForTable(table.first, req.mutable_stream_id()); - - Status s = RegisterSubscriber(&req, &resp); - if (!s.ok() || resp.has_error()) { - LOG(ERROR) << "Error registering subscriber: " << resp.DebugString() << " : " - << s.message(); - subscriber_error = true; - break; - } + + std::vector consumer_info; + consumer_info.reserve(l->data().pb.tables_size()); + for (const auto& table : validated_tables) { + CDCConsumerStreamInfo info; + info.producer_table_id = table.first; + info.consumer_table_id = table.second; + info.stream_id = (*map)[info.producer_table_id]; + consumer_info.push_back(info); } - #endif - if (subscriber_error) { + std::vector hp; + HostPortsFromPBs(l->data().pb.producer_master_addresses(), &hp); + + Status s = InitCDCConsumer(consumer_info, HostPort::ToCommaSeparatedString(hp), + l->data().pb.producer_id()); + + if (!s.ok()) { + LOG(ERROR) << "Error registering subscriber: " << s.ToString(); l->mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::FAILED); } else { l->mutable_data()->pb.set_state(SysUniverseReplicationEntryPB::ACTIVE); diff --git a/ent/src/yb/tserver/cdc_consumer.cc b/ent/src/yb/tserver/cdc_consumer.cc index 91bfa469c6af..4922184da8ea 100644 --- a/ent/src/yb/tserver/cdc_consumer.cc +++ b/ent/src/yb/tserver/cdc_consumer.cc @@ -25,6 +25,8 @@ #include "yb/util/string_util.h" #include "yb/util/thread.h" +DECLARE_int32(cdc_rpc_timeout_ms); + using namespace std::chrono_literals; namespace yb { @@ -35,24 +37,44 @@ namespace enterprise { Result> CDCConsumer::Create( std::function is_leader_for_tablet, rpc::ProxyCache* proxy_cache, - const string& ts_uuid) { - auto cdc_consumer = std::unique_ptr( - new CDCConsumer(std::move(is_leader_for_tablet), proxy_cache, ts_uuid)); + TabletServer* tserver) { + + auto master_addrs = tserver->options().GetMasterAddresses(); + std::vector hostport_strs; + hostport_strs.reserve(master_addrs->size()); + for (const auto& hp : *master_addrs) { + hostport_strs.push_back(HostPort::ToCommaSeparatedString(hp)); + } + + auto client = VERIFY_RESULT(client::YBClientBuilder() + .master_server_addrs(hostport_strs) + .default_admin_operation_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_rpc_timeout_ms)) + .Build()); + + auto cdc_consumer = std::make_unique( + std::move(is_leader_for_tablet), proxy_cache, tserver->permanent_uuid(), std::move(client)); + RETURN_NOT_OK(yb::Thread::Create( "CDCConsumer", "Poll", &CDCConsumer::RunThread, cdc_consumer.get(), &cdc_consumer->run_trigger_poll_thread_)); - RETURN_NOT_OK(ThreadPoolBuilder("Handle").Build(&cdc_consumer->thread_pool_)); + RETURN_NOT_OK(ThreadPoolBuilder("CDCConsumerHandler").Build(&cdc_consumer->thread_pool_)); return cdc_consumer; } CDCConsumer::CDCConsumer(std::function is_leader_for_tablet, rpc::ProxyCache* proxy_cache, - const string& ts_uuid) : + const string& ts_uuid, + std::unique_ptr client) : is_leader_for_tablet_(std::move(is_leader_for_tablet)), proxy_manager_(std::make_unique(proxy_cache)), - log_prefix_(Format("[TS $0]:", ts_uuid)) {} + log_prefix_(Format("[TS $0]:", ts_uuid)), + client_(std::move(client)) {} CDCConsumer::~CDCConsumer() { + Shutdown(); +} + +void CDCConsumer::Shutdown() { { std::unique_lock l(should_run_mutex_); should_run_ = false; @@ -63,6 +85,11 @@ CDCConsumer::~CDCConsumer() { WARN_NOT_OK(ThreadJoiner(run_trigger_poll_thread_.get()).Join(), "Could not join thread"); } + { + std::unique_lock lock(master_data_mutex_); + producer_consumer_tablet_map_from_master_.clear(); + } + if (thread_pool_) { thread_pool_->Shutdown(); } @@ -130,15 +157,17 @@ void CDCConsumer::TriggerPollForNewTablets() { if (start_polling) { // This is a new tablet, trigger a poll. std::unique_lock pollers_lock(producer_pollers_map_mutex_); - producer_pollers_map_[entry.first] = std::make_unique( + auto cdc_poller = std::make_shared( entry.first, entry.second, std::bind(&CDCConsumer::ShouldContinuePolling, this, entry.first), std::bind(&cdc::CDCConsumerProxyManager::GetProxy, proxy_manager_.get(), entry.first), std::bind(&CDCConsumer::RemoveFromPollersMap, this, entry.first), - thread_pool_.get()); + thread_pool_.get(), + client_); LOG_WITH_PREFIX(INFO) << Format("Start polling for producer tablet $0", entry.first.tablet_id); - producer_pollers_map_[entry.first]->Poll(); + producer_pollers_map_[entry.first] = cdc_poller; + cdc_poller->Poll(); } } } diff --git a/ent/src/yb/tserver/cdc_consumer.h b/ent/src/yb/tserver/cdc_consumer.h index 59fc032d99a6..5d58cd4906d8 100644 --- a/ent/src/yb/tserver/cdc_consumer.h +++ b/ent/src/yb/tserver/cdc_consumer.h @@ -17,7 +17,6 @@ #include #include "yb/cdc/cdc_consumer_util.h" - #include "yb/util/locks.h" namespace yb { @@ -38,19 +37,33 @@ class ConsumerRegistryPB; } // namespace cdc +namespace client { + +class YBClient; + +} // namespace client + namespace tserver { namespace enterprise { class CDCPoller; - +class TabletServer; class CDCConsumer { public: static Result> Create( std::function is_leader_for_tablet, rpc::ProxyCache* proxy_cache, - const std::string& ts_uuid); + TabletServer* tserver); + + CDCConsumer(std::function is_leader_for_tablet, + rpc::ProxyCache* proxy_cache, + const std::string& ts_uuid, + std::unique_ptr client); + ~CDCConsumer(); + void Shutdown(); + // Refreshes the in memory state when we receive a new registry from master. void RefreshWithNewRegistryFromMaster(const cdc::ConsumerRegistryPB& consumer_registry); @@ -59,10 +72,6 @@ class CDCConsumer { std::string LogPrefix(); private: - CDCConsumer(std::function is_leader_for_tablet, - rpc::ProxyCache* proxy_cache, - const std::string& ts_uuid); - // Runs a thread that periodically polls for any new threads. void RunThread(); @@ -96,12 +105,13 @@ class CDCConsumer { scoped_refptr run_trigger_poll_thread_; - std::unordered_map, + std::unordered_map, cdc::ProducerTabletInfo::Hash> producer_pollers_map_; std::unique_ptr thread_pool_; std::string log_prefix_; + std::shared_ptr client_; bool should_run_ = true; diff --git a/ent/src/yb/tserver/cdc_poller.cc b/ent/src/yb/tserver/cdc_poller.cc index ac089f70884a..6ef2cb695af3 100644 --- a/ent/src/yb/tserver/cdc_poller.cc +++ b/ent/src/yb/tserver/cdc_poller.cc @@ -17,6 +17,7 @@ #include "yb/cdc/cdc_service.pb.h" #include "yb/cdc/cdc_service.proxy.h" +#include "yb/client/client.h" #include "yb/consensus/opid_util.h" #include "yb/util/threadpool.h" @@ -35,7 +36,8 @@ CDCPoller::CDCPoller(const cdc::ProducerTabletInfo& producer_tablet_info, std::function should_continue_polling, std::function get_proxy, std::function remove_self_from_pollers_map, - ThreadPool* thread_pool) : + ThreadPool* thread_pool, + const std::shared_ptr& client) : producer_tablet_info_(producer_tablet_info), consumer_tablet_info_(consumer_tablet_info), should_continue_polling_(std::move(should_continue_polling)), @@ -46,6 +48,7 @@ CDCPoller::CDCPoller(const cdc::ProducerTabletInfo& producer_tablet_info, rpc_(std::make_unique()), output_client_(CreateTwoDCOutputClient( consumer_tablet_info, + client, std::bind(&CDCPoller::HandleApplyChanges, this, std::placeholders::_1))), thread_pool_(thread_pool) {} @@ -80,7 +83,9 @@ void CDCPoller::DoHandlePoll() { return remove_self_from_pollers_map_(); } - if (!rpc_->status().ok() || resp_->has_error()) { + if (!rpc_->status().ok() || resp_->has_error() || !resp_->has_checkpoint()) { + // In case of errors, try polling again. + // TODO: Set a max limit on polling. return Poll(); } diff --git a/ent/src/yb/tserver/cdc_poller.h b/ent/src/yb/tserver/cdc_poller.h index ab8cd821f318..d20d6f740cfe 100644 --- a/ent/src/yb/tserver/cdc_poller.h +++ b/ent/src/yb/tserver/cdc_poller.h @@ -15,6 +15,7 @@ #include "yb/cdc/cdc_consumer_util.h" #include "yb/cdc/cdc_output_client_interface.h" +#include "yb/tserver/tablet_server.h" #ifndef ENT_SRC_YB_TSERVER_CDC_POLLER_H #define ENT_SRC_YB_TSERVER_CDC_POLLER_H @@ -35,6 +36,12 @@ class CDCServiceProxy; } // namespace cdc +namespace client { + +class YBClient; + +} // namespace client + namespace tserver { namespace enterprise { @@ -48,10 +55,12 @@ class CDCPoller { std::function should_continue_polling, std::function get_proxy, std::function remove_self_from_pollers_map, - ThreadPool* thread_pool); + ThreadPool* thread_pool, + const std::shared_ptr& client); // Begins poll process for a producer tablet. void Poll(); + private: void DoPoll(); // Async handler for Poll. diff --git a/ent/src/yb/tserver/tablet_server.h b/ent/src/yb/tserver/tablet_server.h index d4d88d0c9686..291b0708b0d3 100644 --- a/ent/src/yb/tserver/tablet_server.h +++ b/ent/src/yb/tserver/tablet_server.h @@ -46,6 +46,8 @@ class TabletServer : public yb::tserver::TabletServer { Env* GetEnv() override; rocksdb::Env* GetRocksDBEnv() override; + void Shutdown() override; + yb::enterprise::UniverseKeyManager* GetUniverseKeyManager(); CHECKED_STATUS SetUniverseKeyRegistry( const yb::UniverseKeyRegistryPB& universe_key_registry) override; diff --git a/ent/src/yb/tserver/tablet_server_ent.cc b/ent/src/yb/tserver/tablet_server_ent.cc index 21ef033814a1..4f0d8159e10e 100644 --- a/ent/src/yb/tserver/tablet_server_ent.cc +++ b/ent/src/yb/tserver/tablet_server_ent.cc @@ -55,6 +55,15 @@ TabletServer::TabletServer(const TabletServerOptions& opts) : DefaultHeaderManager(universe_key_manager_.get()))) {} TabletServer::~TabletServer() { + Shutdown(); +} + +void TabletServer::Shutdown() { + auto cdc_consumer = GetCDCConsumer(); + if (cdc_consumer) { + cdc_consumer->Shutdown(); + } + super::Shutdown(); } Status TabletServer::RegisterServices() { @@ -116,7 +125,7 @@ Status TabletServer::CreateCDCConsumer() { return tablet_peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY; }; cdc_consumer_ = VERIFY_RESULT(CDCConsumer::Create(std::move(is_leader_clbk), proxy_cache_.get(), - permanent_uuid())); + this)); return Status::OK(); } diff --git a/ent/src/yb/tserver/twodc_output_client.cc b/ent/src/yb/tserver/twodc_output_client.cc index 17f998d60315..8400af0a3759 100644 --- a/ent/src/yb/tserver/twodc_output_client.cc +++ b/ent/src/yb/tserver/twodc_output_client.cc @@ -12,40 +12,299 @@ #include "yb/tserver/twodc_output_client.h" -#include "yb/cdc/cdc_consumer_util.h" +#include +#include "yb/cdc/cdc_consumer_util.h" #include "yb/client/client.h" +#include "yb/client/meta_cache.h" +#include "yb/gutil/strings/join.h" +#include "yb/rpc/rpc.h" +#include "yb/rpc/rpc_fwd.h" +#include "yb/tserver/tserver_service.proxy.h" +#include "yb/util/flag_tags.h" +#include "yb/util/net/net_util.h" + +DEFINE_test_flag(bool, twodc_write_hybrid_time_override, false, + "Override external_hybrid_time with initialHybridTimeValue for testing."); + +DECLARE_int32(cdc_rpc_timeout_ms); namespace yb { namespace tserver { namespace enterprise { +using rpc::Rpc; + +class WriteAsyncRpc; class TwoDCOutputClient : public cdc::CDCOutputClient { public: TwoDCOutputClient( const cdc::ConsumerTabletInfo& consumer_tablet_info, + const std::shared_ptr& client, std::function apply_changes_clbk) : consumer_tablet_info_(consumer_tablet_info), + client_(client), apply_changes_clbk_(std::move(apply_changes_clbk)) {} - // TODO: Override with actual implementation. - CHECKED_STATUS ApplyChanges(const cdc::GetChangesResponsePB* resp) override { - cdc::OutputClientResponse client_resp; - client_resp.status = Status::OK(); - client_resp.last_applied_op_id = resp->checkpoint().op_id(); - apply_changes_clbk_(client_resp); - return Status::OK(); + ~TwoDCOutputClient() { + rpcs_.Shutdown(); + } + + CHECKED_STATUS ApplyChanges(const cdc::GetChangesResponsePB* resp) override; + + void HandleWriteRpcResponse(const Status&s, std::unique_ptr resp); + void RegisterRpc(rpc::RpcCommandPtr call, rpc::Rpcs::Handle* handle); + rpc::RpcCommandPtr UnregisterRpc(rpc::Rpcs::Handle* handle); + + rpc::Rpcs::Handle InvalidRpcHandle() { + return rpcs_.InvalidHandle(); } + private: + void TabletLookupCallback( + const cdc::GetChangesResponsePB* resp, + const cdc::CDCRecordPB& record, + const Result& tablet); + + void IncProcessedRecordCount(); + + void HandleResponse(); + void HandleError(const Status& s); + cdc::ConsumerTabletInfo consumer_tablet_info_; + std::shared_ptr client_; std::function apply_changes_clbk_; + + rpc::Rpcs rpcs_; + std::shared_ptr table_; + + // Used to protect error_status_ and op_id_. + mutable rw_spinlock lock_; + Status error_status_; + OpIdPB op_id_ = consensus::MinimumOpId(); + + std::atomic processed_record_count_{0}; + std::atomic record_count_{0}; }; +Status TwoDCOutputClient::ApplyChanges(const cdc::GetChangesResponsePB* resp) { + // ApplyChanges is called in a single threaded manner. + // For all the changes in GetChangesResponsePB, it fans out applying the changes. + // Once all changes have been applied (successfully or not), we invoke the callback which will + // then either poll for next set of changes (in case of successful application) or will try to + // re-apply. + processed_record_count_.store(0, std::memory_order_release); + record_count_.store(resp->records_size(), std::memory_order_release); + DCHECK(resp->has_checkpoint()); + + { + std::lock_guard l(lock_); + DCHECK(consensus::OpIdEquals(op_id_, consensus::MinimumOpId())); + op_id_ = resp->checkpoint().op_id(); + error_status_ = Status::OK(); + } + + if (record_count_.load(std::memory_order_acquire) == 0) { + HandleResponse(); + return Status::OK(); + } + + if (!table_) { + Status s = client_->OpenTable(consumer_tablet_info_.table_id, &table_); + if (!s.ok()) { + cdc::OutputClientResponse response; + response.status = s; + apply_changes_clbk_(response); + return s; + } + } + + bool records_to_process = false; + for (int i = 0; i < resp->records_size(); i++) { + if (resp->records(i).key_size() == 0) { + // Transaction status record, ignore. + IncProcessedRecordCount(); + } else { + // All KV-pairs within a single CDC record will be for the same row. + // key(0).key() will contain the hash code for that row. We use this to lookup the tablet. + client_->LookupTabletByKey( + table_.get(), + PartitionSchema::EncodeMultiColumnHashValue( + boost::lexical_cast(resp->records(i).key(0).key())), + CoarseMonoClock::now() + MonoDelta::FromMilliseconds(FLAGS_cdc_rpc_timeout_ms), + std::bind(&TwoDCOutputClient::TabletLookupCallback, this, resp, resp->records(i), + std::placeholders::_1)); + records_to_process = true; + } + } + + if (!records_to_process) { + // Nothing to process, return success. + HandleResponse(); + } + return Status::OK(); +} + +void TwoDCOutputClient::HandleWriteRpcResponse( + const Status& status, std::unique_ptr resp ) { + if (!status.ok() || resp->has_error()) { + HandleError(!status.ok() ? status : StatusFromPB(resp->error().status())); + } else { + IncProcessedRecordCount(); + HandleResponse(); + } +} + +void TwoDCOutputClient::RegisterRpc(rpc::RpcCommandPtr call, rpc::Rpcs::Handle* handle) { + rpcs_.Register(call, handle); +} + +rpc::RpcCommandPtr TwoDCOutputClient::UnregisterRpc(rpc::Rpcs::Handle* handle) { + return rpcs_.Unregister(handle); +} + +void TwoDCOutputClient::TabletLookupCallback( + const cdc::GetChangesResponsePB* resp, + const cdc::CDCRecordPB& record, + const Result& tablet) { + if (!tablet.ok()) { + HandleError(tablet.status()); + return; + } + + auto ts = tablet->get()->LeaderTServer(); + if (ts == nullptr) { + HandleError(STATUS_FORMAT( + IllegalState, "Cannot find leader tserver for tablet $0", tablet->get()->tablet_id())); + return; + } + + Status s = ts->InitProxy(client_.get()); + if (!s.ok()) { + HandleError(s); + return; + } + + auto deadline = CoarseMonoClock::Now() + MonoDelta::FromMilliseconds(FLAGS_cdc_rpc_timeout_ms); + auto write_rpc = rpc::StartRpc( + deadline, client_->messenger(), &client_->proxy_cache(), ts->proxy(), + tablet->get()->tablet_id(), record, this); +} + +void TwoDCOutputClient::HandleError(const Status& s) { + IncProcessedRecordCount(); + { + std::lock_guard l(lock_); + LOG(ERROR) << "Error while applying replicated record: " << s.ToString(); + error_status_ = s; + } + HandleResponse(); +} + +void TwoDCOutputClient::HandleResponse() { + if (processed_record_count_.load(std::memory_order_acquire) == + record_count_.load(std::memory_order_acquire)) { + cdc::OutputClientResponse response; + { + std::lock_guard l(lock_); + response.status = error_status_; + if (response.status.ok()) { + response.last_applied_op_id = op_id_; + } + op_id_ = consensus::MinimumOpId(); + } + apply_changes_clbk_(response); + } +} + +void TwoDCOutputClient::IncProcessedRecordCount() { + processed_record_count_.fetch_add(1, std::memory_order_acq_rel); +} + std::unique_ptr CreateTwoDCOutputClient( const cdc::ConsumerTabletInfo& consumer_tablet_info, + const std::shared_ptr& client, std::function apply_changes_clbk) { - return std::make_unique(consumer_tablet_info, std::move(apply_changes_clbk)); + return std::make_unique( + consumer_tablet_info, client, std::move(apply_changes_clbk)); +} + +class WriteAsyncRpc : public Rpc { + public: + WriteAsyncRpc( + CoarseTimePoint deadline, + rpc::Messenger* messenger, + rpc::ProxyCache* proxy_cache, + const std::shared_ptr& proxy, + const TabletId& tablet_id, + const cdc::CDCRecordPB& record, + TwoDCOutputClient* twodc_client) : + Rpc(deadline, messenger, proxy_cache), + proxy_(proxy), + tablet_id_(tablet_id), + record_(record), + resp_(std::make_unique()), + twodc_client_(twodc_client), + retained_self_(twodc_client->InvalidRpcHandle()) {} + + void SendRpc() override; + + string ToString() const override; + + virtual ~WriteAsyncRpc() = default; + + private: + void Finished(const Status& status) override; + + std::shared_ptr proxy_; + TabletId tablet_id_; + cdc::CDCRecordPB record_; + std::unique_ptr resp_; + TwoDCOutputClient* twodc_client_; + rpc::Rpcs::Handle retained_self_; +}; + +void WriteAsyncRpc::SendRpc() { + twodc_client_->RegisterRpc(shared_from_this(), &retained_self_); + + auto now = CoarseMonoClock::Now(); + if (retrier().deadline() < now) { + Finished(STATUS(TimedOut, "WriteAsyncRpc timed out after deadline expired")); + return; + } + + auto rpc_deadline = now + MonoDelta::FromMilliseconds(FLAGS_cdc_rpc_timeout_ms); + mutable_retrier()->mutable_controller()->set_deadline( + std::min(rpc_deadline, retrier().deadline())); + + WriteRequestPB req; + req.set_tablet_id(tablet_id_); + if (FLAGS_twodc_write_hybrid_time_override) { + // Used only for testing external hybrid time. + req.set_external_hybrid_time(yb::kInitialHybridTimeValue); + } else { + req.set_external_hybrid_time(record_.time()); + } + for (const auto& kv_pair : record_.changes()) { + auto* write_pair = req.mutable_write_batch()->add_write_pairs(); + write_pair->set_key(kv_pair.key()); + write_pair->set_value(kv_pair.value().binary_value()); + } + + proxy_->WriteAsync( + req, resp_.get(), mutable_retrier()->mutable_controller(), + std::bind(&WriteAsyncRpc::Finished, this, Status::OK())); +} + +string WriteAsyncRpc::ToString() const { + return strings::Substitute("WriteAsyncRpc(tablet_id: $0, num_attempts: $1)", + tablet_id_, num_attempts()); +} + +void WriteAsyncRpc::Finished(const Status& status) { + auto retained_self = twodc_client_->UnregisterRpc(&retained_self_); + twodc_client_->HandleWriteRpcResponse(status, std::move(resp_)); } } // namespace enterprise diff --git a/ent/src/yb/tserver/twodc_output_client.h b/ent/src/yb/tserver/twodc_output_client.h index a61745fb47cf..461d5662f2ab 100644 --- a/ent/src/yb/tserver/twodc_output_client.h +++ b/ent/src/yb/tserver/twodc_output_client.h @@ -17,11 +17,21 @@ #define ENT_SRC_YB_TSERVER_TWODC_OUTPUT_CLIENT_H namespace yb { + +class ThreadPool; + +namespace client { + +class YBClient; + +} // client + namespace tserver { namespace enterprise { std::unique_ptr CreateTwoDCOutputClient( const cdc::ConsumerTabletInfo& consumer_tablet_info, + const std::shared_ptr& client, std::function apply_changes_clbk); } // namespace enterprise diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index d37a2e4d0cc1..dff63ea99989 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -1105,6 +1105,10 @@ const ClientId& YBClient::id() const { return data_->id_; } +const CloudInfoPB& YBClient::cloud_info() const { + return data_->cloud_info_pb_; +} + std::pair YBClient::NextRequestIdAndMinRunningRequestId( const TabletId& tablet_id) { std::lock_guard lock(data_->tablet_requests_mutex_); diff --git a/src/yb/client/client.h b/src/yb/client/client.h index 24b472666361..681d93304d4d 100644 --- a/src/yb/client/client.h +++ b/src/yb/client/client.h @@ -579,6 +579,8 @@ class YBClient { // Id of this client instance. const ClientId& id() const; + const CloudInfoPB& cloud_info() const; + std::pair NextRequestIdAndMinRunningRequestId( const TabletId& tablet_id); void RequestFinished(const TabletId& tablet_id, RetryableRequestId request_id); diff --git a/src/yb/client/meta_cache.cc b/src/yb/client/meta_cache.cc index 31794e921948..a51fbe7c1412 100644 --- a/src/yb/client/meta_cache.cc +++ b/src/yb/client/meta_cache.cc @@ -32,8 +32,8 @@ #include "yb/client/meta_cache.h" -#include #include +#include #include @@ -186,6 +186,16 @@ const CloudInfoPB& RemoteTabletServer::cloud_info() const { return cloud_info_pb_; } +const google::protobuf::RepeatedPtrField& + RemoteTabletServer::public_rpc_hostports() const { + return public_rpc_hostports_; +} + +const google::protobuf::RepeatedPtrField& + RemoteTabletServer::private_rpc_hostports() const { + return private_rpc_hostports_; +} + shared_ptr RemoteTabletServer::proxy() const { std::shared_lock lock(mutex_); return proxy_; diff --git a/src/yb/client/meta_cache.h b/src/yb/client/meta_cache.h index fd56d6187b60..974183b9c7c2 100644 --- a/src/yb/client/meta_cache.h +++ b/src/yb/client/meta_cache.h @@ -147,6 +147,10 @@ class RemoteTabletServer { const CloudInfoPB& cloud_info() const; + const google::protobuf::RepeatedPtrField& public_rpc_hostports() const; + + const google::protobuf::RepeatedPtrField& private_rpc_hostports() const; + bool HasCapability(CapabilityId capability) const; private: diff --git a/src/yb/consensus/consensus.h b/src/yb/consensus/consensus.h index b8b7e752c76c..3f2361bd20f6 100644 --- a/src/yb/consensus/consensus.h +++ b/src/yb/consensus/consensus.h @@ -342,7 +342,8 @@ class Consensus { // This includes heartbeats too. virtual MonoTime TimeSinceLastMessageFromLeader() = 0; - virtual CHECKED_STATUS ReadReplicatedMessages(const OpId& from, ReplicateMsgs* msgs) = 0; + // Read majority replicated messages for CDC producer. + virtual CHECKED_STATUS ReadReplicatedMessagesForCDC(const OpId& from, ReplicateMsgs* msgs) = 0; protected: friend class RefCountedThreadSafe; diff --git a/src/yb/consensus/consensus_queue-test.cc b/src/yb/consensus/consensus_queue-test.cc index 58972c0e3c49..a804e6ecbb6e 100644 --- a/src/yb/consensus/consensus_queue-test.cc +++ b/src/yb/consensus/consensus_queue-test.cc @@ -851,34 +851,41 @@ TEST_F(ConsensusQueueTest, TestTriggerRemoteBootstrapIfTabletNotFound) { rb_req.source_private_addr()[0].ShortDebugString()); } -// Tests that ReadReplicatedMessages() only reads messages until the last known +// Tests that ReadReplicatedMessagesForCDC() only reads messages until the last known // committed index. -TEST_F(ConsensusQueueTest, TestReadReplicatedMessages) { +TEST_F(ConsensusQueueTest, TestReadReplicatedMessagesForCDC) { queue_->Init(MinimumOpId()); queue_->SetLeaderMode(MinimumOpId(), MinimumOpId().term(), BuildRaftConfigPBForTests(2)); + queue_->TrackPeer(kPeerUuid); + AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, kNumMessages); // Wait for the local peer to append all messages. WaitForLocalPeerToAckIndex(kNumMessages); + // Since only the local log might have ACKed at this point, + // the committed_index should be MinimumOpId(). + queue_->raft_pool_observers_token_->Wait(); + ASSERT_OPID_EQ(queue_->GetCommittedIndexForTests(), MinimumOpId()); + ConsensusResponsePB response; - response.set_responder_uuid(kLeaderUuid); + response.set_responder_uuid(kPeerUuid); bool more_pending = false; int last_committed_index = kNumMessages - 20; - // Peer already has some messages, last one being index last_committed_index. + // Ack last_committed_index messages. SetLastReceivedAndLastCommitted(&response, MakeOpIdForIndex(last_committed_index)); queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending); ASSERT_TRUE(more_pending); ReplicateMsgs msgs; - ASSERT_OK(queue_->ReadReplicatedMessages(MakeOpIdForIndex(0), &msgs)); + ASSERT_OK(queue_->ReadReplicatedMessagesForCDC(MakeOpIdForIndex(0), &msgs)); ASSERT_EQ(last_committed_index, msgs.size()); msgs.clear(); // Read from some index > 0 int start = 10; - ASSERT_OK(queue_->ReadReplicatedMessages(MakeOpIdForIndex(start), &msgs)); + ASSERT_OK(queue_->ReadReplicatedMessagesForCDC(MakeOpIdForIndex(start), &msgs)); ASSERT_EQ(last_committed_index - start, msgs.size()); } diff --git a/src/yb/consensus/consensus_queue.cc b/src/yb/consensus/consensus_queue.cc index b87a4aae3291..32cdb66cba2a 100644 --- a/src/yb/consensus/consensus_queue.cc +++ b/src/yb/consensus/consensus_queue.cc @@ -523,18 +523,26 @@ Status PeerMessageQueue::ReadFromLogCache(int64_t from_index, return s; } -Status PeerMessageQueue::ReadReplicatedMessages(const OpId& last_op_id, ReplicateMsgs *msgs) { +// Read majority replicated messages from cache for CDC. +// CDC producer will use this to get the messages to send in response to cdc::GetChanges RPC. +Status PeerMessageQueue::ReadReplicatedMessagesForCDC(const OpId& last_op_id, ReplicateMsgs *msgs) { // The batch of messages read from cache. ReplicateMsgs messages; bool have_more_messages = false; OpIdPB preceding_id; - if (last_op_id.index() >= local_peer_->last_known_committed_idx) { - // Nothing to read + int64_t to_index; + { + LockGuard lock(queue_lock_); + to_index = queue_state_.majority_replicated_opid.index(); + } + + if (last_op_id.index() >= to_index) { + // Nothing to read. return Status::OK(); } - Status s = ReadFromLogCache(last_op_id.index(), local_peer_->last_known_committed_idx, + Status s = ReadFromLogCache(last_op_id.index(), to_index, FLAGS_consensus_max_batch_size_bytes, local_peer_uuid_, &messages, &preceding_id, &have_more_messages); if (PREDICT_FALSE(!s.ok())) { diff --git a/src/yb/consensus/consensus_queue.h b/src/yb/consensus/consensus_queue.h index a0241b77af4a..f7f796cea947 100644 --- a/src/yb/consensus/consensus_queue.h +++ b/src/yb/consensus/consensus_queue.h @@ -313,13 +313,14 @@ class PeerMessageQueue { } // Read replicated log records starting from the OpId immediately after last_op_id. - CHECKED_STATUS ReadReplicatedMessages(const OpId& last_op_id, ReplicateMsgs *msgs); + CHECKED_STATUS ReadReplicatedMessagesForCDC(const OpId& last_op_id, ReplicateMsgs *msgs); size_t LogCacheSize(); size_t EvictLogCache(size_t bytes_to_evict); private: FRIEND_TEST(ConsensusQueueTest, TestQueueAdvancesCommittedIndex); + FRIEND_TEST(ConsensusQueueTest, TestReadReplicatedMessagesForCDC); // Mode specifies how the queue currently behaves: // diff --git a/src/yb/consensus/raft_consensus.cc b/src/yb/consensus/raft_consensus.cc index c485c8611ee0..d116c4c75bea 100644 --- a/src/yb/consensus/raft_consensus.cc +++ b/src/yb/consensus/raft_consensus.cc @@ -2920,8 +2920,8 @@ Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term) { return Status::OK(); } -Status RaftConsensus::ReadReplicatedMessages(const OpId& from, ReplicateMsgs* msgs) { - return queue_->ReadReplicatedMessages(from, msgs); +Status RaftConsensus::ReadReplicatedMessagesForCDC(const OpId& from, ReplicateMsgs* msgs) { + return queue_->ReadReplicatedMessagesForCDC(from, msgs); } void RaftConsensus::RollbackIdAndDeleteOpId(const ReplicateMsgPtr& replicate_msg, diff --git a/src/yb/consensus/raft_consensus.h b/src/yb/consensus/raft_consensus.h index b24c242bd758..a4a54e090542 100644 --- a/src/yb/consensus/raft_consensus.h +++ b/src/yb/consensus/raft_consensus.h @@ -249,7 +249,7 @@ class RaftConsensus : public std::enable_shared_from_this, TEST_delay_update_.store(duration, std::memory_order_release); } - CHECKED_STATUS ReadReplicatedMessages(const OpId& from, ReplicateMsgs* msgs) override; + CHECKED_STATUS ReadReplicatedMessagesForCDC(const OpId& from, ReplicateMsgs* msgs) override; protected: // Trigger that a non-Operation ConsensusRound has finished replication. diff --git a/src/yb/tablet/tablet.cc b/src/yb/tablet/tablet.cc index 9efc84036fec..74e3d43e6e83 100644 --- a/src/yb/tablet/tablet.cc +++ b/src/yb/tablet/tablet.cc @@ -1349,7 +1349,12 @@ void Tablet::AcquireLocksAndPerformDocOperations(std::unique_ptr } if (key_value_write_request->has_write_batch()) { - auto status = StartDocWriteOperation(operation.get()); + Status status; + if (!key_value_write_request->write_batch().read_pairs().empty()) { + status = StartDocWriteOperation(operation.get()); + } else { + DCHECK(key_value_write_request->has_external_hybrid_time()); + } WriteOperation::StartSynchronization(std::move(operation), status); return; } diff --git a/src/yb/tserver/tablet_server.h b/src/yb/tserver/tablet_server.h index fd24890f949f..19b76779a9a6 100644 --- a/src/yb/tserver/tablet_server.h +++ b/src/yb/tserver/tablet_server.h @@ -95,7 +95,7 @@ class TabletServer : public server::RpcAndWebServerBase, public TabletServerIf { CHECKED_STATUS WaitInited(); CHECKED_STATUS Start(); - void Shutdown(); + virtual void Shutdown(); std::string ToString() const override; diff --git a/src/yb/tserver/tablet_service.cc b/src/yb/tserver/tablet_service.cc index 50e719c929d1..1a31ef17a6b7 100644 --- a/src/yb/tserver/tablet_service.cc +++ b/src/yb/tserver/tablet_service.cc @@ -795,7 +795,7 @@ void TabletServiceImpl::Write(const WriteRequestPB* req, } #endif - if (PREDICT_FALSE(req->has_write_batch() && + if (PREDICT_FALSE(req->has_write_batch() && !req->has_external_hybrid_time() && (!req->write_batch().write_pairs().empty() || !req->write_batch().read_pairs().empty()))) { Status s = STATUS(NotSupported, "Write Request contains write batch. This field should be " "used only for post-processed write requests during " @@ -808,7 +808,9 @@ void TabletServiceImpl::Write(const WriteRequestPB* req, bool has_operations = (req->ql_write_batch_size() != 0 || req->redis_write_batch_size() != 0 || - req->pgsql_write_batch_size()); + req->pgsql_write_batch_size() != 0 || + (req->write_batch().write_pairs_size() != 0 && + req->has_external_hybrid_time())); if (!has_operations && tablet.peer->tablet()->table_type() != TableType::REDIS_TABLE_TYPE) { // An empty request. This is fine, can just exit early with ok status instead of working hard. // This doesn't need to go to Raft log.