Skip to content

Commit

Permalink
yugabyte#1563: yugabyte#2153: [2DC] Apply changes received from produ…
Browse files Browse the repository at this point in the history
…cer universe

Summary:
This diff includes changes to add the records received from producer universe to consumer universe.
A new doc operation `KVOperation` is added which will be used to write data received in WAL format from producer.

Test Plan:
Manual testing : created 2 universes and ensured data replicates from one to another

`ctest -R twodc`
`ctest -R cdc`
`ctest -R doc_operation-test`

Reviewers: nicolas, mikhail, hector, rahuldesirazu, bogdan

Reviewed By: bogdan

Subscribers: sergei, hector, mikhail, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D7002
  • Loading branch information
ndeodhar authored and d-uspenskiy committed Sep 5, 2019
1 parent 2f3531c commit 70ba24e
Show file tree
Hide file tree
Showing 26 changed files with 787 additions and 168 deletions.
23 changes: 18 additions & 5 deletions ent/src/yb/cdc/cdc_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Status CDCProducer::GetChanges(const std::string& stream_id,
}

ReplicateMsgs messages;
RETURN_NOT_OK(tablet_peer->consensus()->ReadReplicatedMessages(from_op_id, &messages));
RETURN_NOT_OK(tablet_peer->consensus()->ReadReplicatedMessagesForCDC(from_op_id, &messages));

TxnStatusMap txn_map = VERIFY_RESULT(BuildTxnStatusMap(
messages, tablet_peer->Now(), txn_participant));
Expand Down Expand Up @@ -267,10 +267,19 @@ Status CDCProducer::PopulateWriteRecord(const ReplicateMsgPtr& msg,
if (prev_key != key_hash) {
// Write pair contains record for different row. Create a new CDCRecord in this case.
record = resp->add_records();
Slice sub_doc_key = write_pair.key();
Slice sub_doc_key = key;
docdb::SubDocKey decoded_key;
RETURN_NOT_OK(decoded_key.DecodeFrom(&sub_doc_key, docdb::HybridTimeRequired::kFalse));
AddPrimaryKey(decoded_key, schema, record);

if (metadata.record_format == CDCRecordFormat::WAL) {
// For 2DC, populate serialized data from WAL, to avoid unnecessary deserializing on
// producer and re-serializing on consumer.
auto kv_pair = record->add_key();
kv_pair->set_key(std::to_string(decoded_key.doc_key().hash()));
kv_pair->mutable_value()->set_binary_value(write_pair.key());
} else {
AddPrimaryKey(decoded_key, schema, record);
}

// Check whether operation is WRITE or DELETE.
if (decoded_value.value_type() == docdb::ValueType::kTombstone &&
Expand All @@ -291,15 +300,19 @@ Status CDCProducer::PopulateWriteRecord(const ReplicateMsgPtr& msg,
prev_key = key_hash;
DCHECK(record);

if (record->operation() == CDCRecordPB_OperationType_WRITE) {
if (metadata.record_format == CDCRecordFormat::WAL) {
auto kv_pair = record->add_changes();
kv_pair->set_key(write_pair.key());
kv_pair->mutable_value()->set_binary_value(write_pair.value());
} else if (record->operation() == CDCRecordPB_OperationType_WRITE) {
PrimitiveValue column_id;
Slice key_column = write_pair.key().data() + key_sizes.second;
RETURN_NOT_OK(PrimitiveValue::DecodeKey(&key_column, &column_id));
if (column_id.value_type() == docdb::ValueType::kColumnId) {
ColumnSchema col = VERIFY_RESULT(schema.column_by_id(column_id.GetColumnId()));
AddColumnToMap(col, decoded_value.primitive_value(), record->add_changes());
} else if (column_id.value_type() != docdb::ValueType::kSystemColumnId) {
LOG(DFATAL) << "Unexpected value type in key: "<< column_id.value_type();
LOG(DFATAL) << "Unexpected value type in key: " << column_id.value_type();
}
}
}
Expand Down
156 changes: 123 additions & 33 deletions ent/src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@

#include "yb/cdc/cdc_service.h"

#include <shared_mutex>
#include <chrono>
#include <memory>

#include <boost/algorithm/string.hpp>

#include "yb/cdc/cdc_producer.h"
#include "yb/cdc/cdc_service.proxy.h"
#include "yb/common/entity_ids.h"
#include "yb/common/ql_expr.h"
#include "yb/common/wire_protocol.h"
Expand Down Expand Up @@ -56,8 +59,13 @@ DEFINE_int32(cdc_wal_retention_time_secs, 4 * 3600,
namespace yb {
namespace cdc {

using namespace std::literals;

using rpc::RpcContext;
using tserver::TSTabletManager;
using client::internal::RemoteTabletServer;

constexpr int kMaxDurationForTabletLookup = 50;

CDCServiceImpl::CDCServiceImpl(TSTabletManager* tablet_manager,
const scoped_refptr<MetricEntity>& metric_entity)
Expand All @@ -81,6 +89,10 @@ bool YsqlTableHasPrimaryKey(const client::YBSchema& schema) {
}
return true;
}

bool IsTabletPeerLeader(std::shared_ptr<tablet::TabletPeer> peer) {
return peer->LeaderStatus() == consensus::LeaderStatus::LEADER_AND_READY;
}
} // namespace

template <class ReqType, class RespType>
Expand All @@ -97,38 +109,15 @@ bool CDCServiceImpl::CheckOnline(const ReqType* req, RespType* resp, rpc::RpcCon
}

template <class RespType>
Result<std::shared_ptr<tablet::TabletPeer>> CDCServiceImpl::GetLeaderTabletPeer(
Result<std::shared_ptr<tablet::TabletPeer>> CDCServiceImpl::GetTabletPeer(
const std::string& tablet_id,
RespType* resp,
rpc::RpcContext* rpc) {
std::shared_ptr<tablet::TabletPeer> peer;
Status status = tablet_manager_->GetTabletPeer(tablet_id, &peer);
if (PREDICT_FALSE(!status.ok())) {
CDCErrorPB::Code code = status.IsNotFound() ?
CDCErrorPB::TABLET_NOT_FOUND : CDCErrorPB::TABLET_NOT_RUNNING;
SetupErrorAndRespond(resp->mutable_error(), status, code, rpc);
return status;
}
RETURN_NOT_OK(tablet_manager_->GetTabletPeer(tablet_id, &peer));

// Check RUNNING state.
status = peer->CheckRunning();
if (PREDICT_FALSE(!status.ok())) {
Status s = STATUS(IllegalState, "Tablet not RUNNING");
SetupErrorAndRespond(resp->mutable_error(), s, CDCErrorPB::TABLET_NOT_RUNNING, rpc);
return s;
}

// Check if tablet peer is leader.
consensus::LeaderStatus leader_status = peer->LeaderStatus();
if (leader_status != consensus::LeaderStatus::LEADER_AND_READY) {
// No records to read.
if (leader_status == consensus::LeaderStatus::NOT_LEADER) {
// TODO: Change this to provide new leader
}
Status s = STATUS(IllegalState, "Tablet Server is not leader", ToCString(leader_status));
SetupErrorAndRespond(resp->mutable_error(), s, CDCErrorPB::NOT_LEADER, rpc);
return s;
}
// Check if tablet is running.
RETURN_NOT_OK(peer->CheckRunning());
return peer;
}

Expand Down Expand Up @@ -282,7 +271,7 @@ Result<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> CDCService
std::shared_ptr<std::unordered_set<std::string>> CDCServiceImpl::GetTabletIdsForStream(
const CDCStreamId& stream_id) {
{
shared_lock<rw_spinlock> l(lock_);
std::shared_lock<rw_spinlock> l(lock_);
auto it = stream_tablets_.find(stream_id);
if (it != stream_tablets_.end()) {
return it->second;
Expand Down Expand Up @@ -336,8 +325,16 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req,
Status s = CheckTabletValidForStream(req->stream_id(), req->tablet_id());
RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);

auto tablet_peer = GetLeaderTabletPeer(req->tablet_id(), resp, &context);
if (!tablet_peer.ok()) {
auto result = GetTabletPeer(req->tablet_id(), resp, &context);
RPC_CHECK_AND_RETURN_ERROR(result.ok(), result.status(), resp->mutable_error(),
CDCErrorPB::TABLET_NOT_RUNNING, context);
auto tablet_peer = *result;

if (!IsTabletPeerLeader(tablet_peer)) {
// Forward GetChanges() to tablet leader.
// TODO: Remove this once cdc consumer has meta cache and is able to direct requests to tablet
// leader. Once that is done, we should return NOT_LEADER error here.
TabletLeaderGetChanges(req, resp, &context);
return;
}

Expand All @@ -358,7 +355,7 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req,

CDCProducer cdc_producer;
s = cdc_producer.GetChanges(req->stream_id(), req->tablet_id(), op_id, *record->get(),
*tablet_peer, resp);
tablet_peer, resp);
RPC_STATUS_RETURN_ERROR(
s,
resp->mutable_error(),
Expand All @@ -374,6 +371,86 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req,
context.RespondSuccess();
}

Result<RemoteTabletServer *> CDCServiceImpl::GetLeaderTServer(const TabletId& tablet_id) {
std::promise<Result<client::internal::RemoteTabletPtr>> tablet_lookup_promise;
auto future = tablet_lookup_promise.get_future();
auto callback = [&tablet_lookup_promise](
const Result<client::internal::RemoteTabletPtr>& result) {
tablet_lookup_promise.set_value(result);
};

auto start = CoarseMonoClock::Now();
async_client_init_->client()->LookupTabletById(
tablet_id,
CoarseMonoClock::Now() + FLAGS_cdc_rpc_timeout_ms * 1ms,
callback, client::UseCache::kTrue);
future.wait();

auto duration = CoarseMonoClock::Now() - start;
if (duration > (kMaxDurationForTabletLookup * 1ms)) {
LOG(WARNING) << "LookupTabletByKey took long time: " << duration << " ms";
}

auto result = VERIFY_RESULT(future.get());

auto ts = result->LeaderTServer();
if (ts == nullptr) {
return STATUS(NotFound, "Tablet leader not found for tablet", tablet_id);
}
return ts;
}

std::shared_ptr<CDCServiceProxy> CDCServiceImpl::GetCDCServiceProxy(RemoteTabletServer* ts) {
auto hostport = HostPortFromPB(DesiredHostPort(
ts->public_rpc_hostports(), ts->private_rpc_hostports(), ts->cloud_info(),
async_client_init_->client()->cloud_info()));
DCHECK(!hostport.host().empty());

{
std::shared_lock<rw_spinlock> l(lock_);
auto it = cdc_service_map_.find(hostport);
if (it != cdc_service_map_.end()) {
return it->second;
}
}

auto cdc_service = std::make_shared<CDCServiceProxy>(&async_client_init_->client()->proxy_cache(),
hostport);
{
std::lock_guard<rw_spinlock> l(lock_);
cdc_service_map_.emplace(hostport, cdc_service);
}
return cdc_service;
}

void CDCServiceImpl::TabletLeaderGetChanges(const GetChangesRequestPB* req,
GetChangesResponsePB* resp,
RpcContext* context) {
auto ts_leader = GetLeaderTServer(req->tablet_id());
RPC_CHECK_AND_RETURN_ERROR(ts_leader.ok(), ts_leader.status(), resp->mutable_error(),
CDCErrorPB::TABLET_NOT_FOUND, *context);

auto cdc_proxy = GetCDCServiceProxy(*ts_leader);
rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_rpc_timeout_ms));
cdc_proxy->GetChanges(*req, resp, &rpc);
context->RespondSuccess();
}

void CDCServiceImpl::TabletLeaderGetCheckpoint(const GetCheckpointRequestPB* req,
GetCheckpointResponsePB* resp,
RpcContext* context) {
auto ts_leader = GetLeaderTServer(req->tablet_id());
RPC_CHECK_AND_RETURN_ERROR(ts_leader.ok(), ts_leader.status(), resp->mutable_error(),
CDCErrorPB::TABLET_NOT_FOUND, *context);

auto cdc_proxy = GetCDCServiceProxy(*ts_leader);
rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_cdc_rpc_timeout_ms));
cdc_proxy->GetCheckpoint(*req, resp, &rpc);
context->RespondSuccess();
}

void CDCServiceImpl::GetCheckpoint(const GetCheckpointRequestPB* req,
GetCheckpointResponsePB* resp,
RpcContext context) {
Expand All @@ -392,6 +469,19 @@ void CDCServiceImpl::GetCheckpoint(const GetCheckpointRequestPB* req,
CDCErrorPB::INVALID_REQUEST,
context);

auto res = GetTabletPeer(req->tablet_id(), resp, &context);
RPC_CHECK_AND_RETURN_ERROR(res.ok(), res.status(), resp->mutable_error(),
CDCErrorPB::TABLET_NOT_RUNNING, context);
auto tablet_peer = *res;

if (!IsTabletPeerLeader(tablet_peer)) {
// Forward GetCheckpoint() to tablet leader.
// TODO: Remove this once cdc consumer has meta cache and is able to direct requests to tablet
// leader. Once that is done, we should return NOT_LEADER error here.
TabletLeaderGetCheckpoint(req, resp, &context);
return;
}

// Check that requested tablet_id is part of the CDC stream.
Status s = CheckTabletValidForStream(req->stream_id(), req->tablet_id());
RPC_STATUS_RETURN_ERROR(s, resp->mutable_error(), CDCErrorPB::INVALID_REQUEST, context);
Expand All @@ -414,7 +504,7 @@ Result<OpIdPB> CDCServiceImpl::GetLastCheckpoint(
const std::string& tablet_id,
const std::shared_ptr<client::YBSession>& session) {
{
boost::shared_lock<rw_spinlock> l(lock_);
std::shared_lock<rw_spinlock> l(lock_);
auto it = tablet_checkpoints_.find(tablet_id);
if (it != tablet_checkpoints_.end()) {
return it->second;
Expand Down Expand Up @@ -514,7 +604,7 @@ void CDCServiceImpl::AddStreamMetadataToCache(const std::string& stream_id,

std::shared_ptr<StreamMetadata> CDCServiceImpl::GetStreamMetadataFromCache(
const std::string& stream_id) {
boost::shared_lock<rw_spinlock> l(lock_);
std::shared_lock<rw_spinlock> l(lock_);
auto it = stream_metadata_.find(stream_id);
if (it != stream_metadata_.end()) {
return it->second;
Expand Down
21 changes: 20 additions & 1 deletion ent/src/yb/cdc/cdc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@
#include "yb/cdc/cdc_service.service.h"

#include "yb/cdc/cdc_producer.h"
#include "yb/cdc/cdc_service.proxy.h"
#include "yb/rpc/rpc_context.h"
#include "yb/tablet/tablet_peer.h"
#include "yb/tserver/ts_tablet_manager.h"
#include "yb/util/metrics.h"
#include "yb/util/net/net_util.h"
#include "yb/util/service_util.h"

namespace yb {
namespace cdc {

typedef std::unordered_map<HostPort, std::shared_ptr<CDCServiceProxy>, HostPortHash>
CDCServiceProxyMap;

static const char* const kRecordType = "record_type";
static const char* const kRecordFormat = "record_format";
static const char* const kRetentionSec = "retention_sec";
Expand Down Expand Up @@ -60,7 +65,7 @@ class CDCServiceImpl : public CDCServiceIf {
bool CheckOnline(const ReqType* req, RespType* resp, rpc::RpcContext* rpc);

template <class RespType>
Result<std::shared_ptr<tablet::TabletPeer>> GetLeaderTabletPeer(
Result<std::shared_ptr<tablet::TabletPeer>> GetTabletPeer(
const std::string& tablet_id, RespType* resp, rpc::RpcContext* rpc);

Result<OpIdPB> GetLastCheckpoint(const std::string& stream_id,
Expand All @@ -87,6 +92,16 @@ class CDCServiceImpl : public CDCServiceIf {
CHECKED_STATUS CheckTabletValidForStream(const std::string& stream_id,
const std::string& tablet_id);

void TabletLeaderGetChanges(const GetChangesRequestPB* req,
GetChangesResponsePB* resp,
rpc::RpcContext* context);
void TabletLeaderGetCheckpoint(const GetCheckpointRequestPB* req,
GetCheckpointResponsePB* resp,
rpc::RpcContext* context);

Result<client::internal::RemoteTabletServer *> GetLeaderTServer(const TabletId& tablet_id);
std::shared_ptr<CDCServiceProxy> GetCDCServiceProxy(client::internal::RemoteTabletServer* ts);

tserver::TSTabletManager* tablet_manager_;

boost::optional<yb::client::AsyncClientInitialiser> async_client_init_;
Expand All @@ -101,6 +116,10 @@ class CDCServiceImpl : public CDCServiceIf {
// TODO: Add cache invalidation after tablet splitting is implemented (#1004).
// Map of stream ID -> [tablet IDs].
std::unordered_map<std::string, std::shared_ptr<std::unordered_set<std::string>>> stream_tablets_;

// Map of HostPort -> CDCServiceProxy. This is used to redirect requests to tablet leader's
// CDC service proxy.
CDCServiceProxyMap cdc_service_map_;
};

} // namespace cdc
Expand Down
Loading

0 comments on commit 70ba24e

Please sign in to comment.