Skip to content

Commit

Permalink
[#3771] [#3787] [cdc] Fix data races in CDC Poller
Browse files Browse the repository at this point in the history
Summary:
We had a couple data races in the CDC Poller.
1. Although the poller is called in a serial fashion, critical call stacks are placed upon
a ThreadPool.  This is not guaranteed to always enqueue on the same thread and can create a
situation where the ThreadPool is processing the call before a cache flush of the local
data has occured on the originating thread. Added a mutex to guarantee ordering.
2. A race existed within TwoDCOutputClient where failing call OpenTable would not properly reset the
OpID or ErrorStatus.  Modified code to call HandleError, which has standardized handling here.

Test Plan:
ybd tsan --cxx-test twodc-test --gtest_filter BatchSize/TwoDCTest.ApplyOperationsRandomFailures/1 -n
1000

Reviewers: rahuldesirazu, neha, hector

Reviewed By: neha, hector

Subscribers: ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D8075
  • Loading branch information
nspiegelberg committed Mar 16, 2020
1 parent 21cd1a0 commit 699caac
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 16 deletions.
13 changes: 11 additions & 2 deletions ent/src/yb/cdc/cdc_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ class CDCWriteRpc : public rpc::Rpc, public client::internal::TabletRpc {
req_.Swap(req);
}

~CDCWriteRpc() = default;
virtual ~CDCWriteRpc() {
CHECK(called_);
}

void SendRpc() override {
invoker_.Execute(tablet_id());
Expand Down Expand Up @@ -101,7 +103,13 @@ class CDCWriteRpc : public rpc::Rpc, public client::internal::TabletRpc {
}

void InvokeCallback(const Status &status) {
callback_(status, resp_);
if (!called_) {
called_ = true;
callback_(status, resp_);
} else {
LOG(WARNING) << "Multiple invocation of CDCWriteRpc: "
<< status.ToString() << " : " << resp_.DebugString();
}
}

void InvokeAsync(TabletServerServiceProxy *proxy,
Expand All @@ -115,6 +123,7 @@ class CDCWriteRpc : public rpc::Rpc, public client::internal::TabletRpc {
WriteRequestPB req_;
WriteResponsePB resp_;
WriteCDCRecordCallback callback_;
bool called_ = false;
};

rpc::RpcCommandPtr CreateCDCWriteRpc(
Expand Down
6 changes: 3 additions & 3 deletions ent/src/yb/integration-tests/twodc-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ using tserver::enterprise::CDCConsumer;

namespace enterprise {

constexpr int kRpcTimeout = NonTsanVsTsan(30, 60);
constexpr int kRpcTimeout = NonTsanVsTsan(30, 120);
static const std::string kUniverseId = "test_universe";
static const std::string kNamespaceName = "test_namespace";

Expand All @@ -106,8 +106,8 @@ class TwoDCTest : public YBTest, public testing::WithParamInterface<int> {
uint32_t num_masters = 1) {
FLAGS_enable_ysql = false;
// Allow for one-off network instability by ensuring a single CDC RPC timeout << test timeout.
FLAGS_cdc_read_rpc_timeout_ms = (kRpcTimeout / 6) * 1000;
FLAGS_cdc_write_rpc_timeout_ms = (kRpcTimeout / 6) * 1000;
FLAGS_cdc_read_rpc_timeout_ms = (kRpcTimeout / 4) * 1000;
FLAGS_cdc_write_rpc_timeout_ms = (kRpcTimeout / 4) * 1000;
// Not a useful test for us. It's testing Public+Private IP NW errors and we're only public
FLAGS_TEST_check_broadcast_address = false;
FLAGS_cdc_max_apply_batch_num_records = GetParam();
Expand Down
14 changes: 11 additions & 3 deletions ent/src/yb/tserver/cdc_poller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ void CDCPoller::Poll() {
void CDCPoller::DoPoll() {
RETURN_WHEN_OFFLINE();

std::lock_guard<std::mutex> l(data_mutex_);

// determine if we should delay our upcoming poll
if (FLAGS_async_replication_polling_delay_ms > 0 || poll_failures_ > 0) {
int64_t delay = max(FLAGS_async_replication_polling_delay_ms, // user setting
Expand Down Expand Up @@ -131,18 +133,22 @@ void CDCPoller::DoPoll() {
(**read_rpc_handle).SendRpc();
} else {
// Handle the Poll as a failure so repeated invocations will incur backoff.
HandlePoll(STATUS(Aborted, LogPrefixUnlocked() + "InvalidHandle for GetChangesCDCRpc"), resp_);
WARN_NOT_OK(thread_pool_->SubmitFunc(std::bind(&CDCPoller::HandlePoll, this,
STATUS(Aborted, LogPrefixUnlocked() + "InvalidHandle for GetChangesCDCRpc"),
resp_)),
"Could not submit HandlePoll to thread pool");
}
}

void CDCPoller::HandlePoll(yb::Status status,
std::shared_ptr<cdc::GetChangesResponsePB> resp) {
RETURN_WHEN_OFFLINE();

if (!should_continue_polling_()) {
return remove_self_from_pollers_map_();
}

std::lock_guard<std::mutex> l(data_mutex_);

status_ = status;
resp_ = resp;

Expand Down Expand Up @@ -179,10 +185,12 @@ void CDCPoller::HandleApplyChanges(cdc::OutputClientResponse response) {

void CDCPoller::DoHandleApplyChanges(cdc::OutputClientResponse response) {
RETURN_WHEN_OFFLINE();

if (!should_continue_polling_()) {
return remove_self_from_pollers_map_();
}

std::lock_guard<std::mutex> l(data_mutex_);

if (!response.status.ok()) {
LOG_WITH_PREFIX_UNLOCKED(WARNING) << "ApplyChanges failure: " << response.status;
// Repeat the ApplyChanges step, with exponential backoff
Expand Down
15 changes: 10 additions & 5 deletions ent/src/yb/tserver/cdc_poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "yb/rpc/rpc.h"
#include "yb/tserver/cdc_consumer.h"
#include "yb/tserver/tablet_server.h"
#include "yb/util/locks.h"
#include "yb/util/status.h"

#ifndef ENT_SRC_YB_TSERVER_CDC_POLLER_H
Expand Down Expand Up @@ -81,10 +82,14 @@ class CDCPoller {
std::function<bool()> should_continue_polling_;
std::function<void(void)> remove_self_from_pollers_map_;

consensus::OpId op_id_;
// Although this is processing serially, it might be on a different thread in the ThreadPool.
// Using mutex to guarantee cache flush, preventing TSAN warnings.
std::mutex data_mutex_;

yb::Status status_;
std::shared_ptr<cdc::GetChangesResponsePB> resp_;
consensus::OpId op_id_ GUARDED_BY(data_mutex_);

yb::Status status_ GUARDED_BY(data_mutex_);
std::shared_ptr<cdc::GetChangesResponsePB> resp_ GUARDED_BY(data_mutex_);

std::unique_ptr<cdc::CDCOutputClient> output_client_;
std::shared_ptr<CDCClient> producer_client_;
Expand All @@ -93,8 +98,8 @@ class CDCPoller {
CDCConsumer* cdc_consumer_;

std::atomic<bool> is_polling_{true};
int poll_failures_{0};
int apply_failures_{0};
int poll_failures_ GUARDED_BY(data_mutex_){0};
int apply_failures_ GUARDED_BY(data_mutex_){0};
};

} // namespace enterprise
Expand Down
5 changes: 2 additions & 3 deletions ent/src/yb/tserver/twodc_output_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,7 @@ Status TwoDCOutputClient::ApplyChanges(const cdc::GetChangesResponsePB* poller_r
if (!table_) {
Status s = local_client_->client->OpenTable(consumer_tablet_info_.table_id, &table_);
if (!s.ok()) {
cdc::OutputClientResponse response;
response.status = s;
apply_changes_clbk_(response);
HandleError(s, true);
return s;
}
}
Expand Down Expand Up @@ -302,6 +300,7 @@ bool TwoDCOutputClient::IncProcessedRecordCount() {
if (processed_record_count_ == record_count_) {
done_processing_ = true;
}
CHECK(processed_record_count_ <= record_count_);
return done_processing_;
}

Expand Down

0 comments on commit 699caac

Please sign in to comment.