diff --git a/src/yb/common/common_types.proto b/src/yb/common/common_types.proto index 194a76b06cbc..56f0a2e92717 100644 --- a/src/yb/common/common_types.proto +++ b/src/yb/common/common_types.proto @@ -159,6 +159,12 @@ enum ReplicationErrorPb { // The AutoFlags config has changed and the new version is not compatible, or // has not yet been validated. REPLICATION_AUTO_FLAG_CONFIG_VERSION_MISMATCH = 6; + + // Error connecting to the source universe. + REPLICATION_SOURCE_UNREACHABLE = 7; + + // There was a generic system error. + REPLICATION_SYSTEM_ERROR = 8; } // Stateful services. diff --git a/src/yb/integration-tests/xcluster/xcluster-test.cc b/src/yb/integration-tests/xcluster/xcluster-test.cc index dd00f3c1e6a4..ddfcd23a0931 100644 --- a/src/yb/integration-tests/xcluster/xcluster-test.cc +++ b/src/yb/integration-tests/xcluster/xcluster-test.cc @@ -3957,6 +3957,61 @@ TEST_F_EX(XClusterTest, ReplicationErrorAfterMasterFailover, XClusterTestNoParam ASSERT_OK(VerifyReplicationError(consumer_table_->id(), stream_id, std::nullopt)); } +// Make sure we get SOURCE_UNREACHABLE when the source universe is not reachable. +TEST_F_EX(XClusterTest, NetworkReplicationError, XClusterTestNoParam) { + // Send metrics report including the xCluster status in every heartbeat. + const auto short_metric_report_interval_ms = 250; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_heartbeat_interval_ms) = short_metric_report_interval_ms * 2; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_tserver_heartbeat_metrics_interval_ms) = + short_metric_report_interval_ms; + + ASSERT_OK(SetUpWithParams( + {1}, {1}, /* replication_factor */ 1, /* num_masters */ 1, /* num_tservers */ 1)); + ASSERT_OK(SetupReplication()); + ASSERT_OK(CorrectlyPollingAllTablets(1)); + const auto stream_id = ASSERT_RESULT(GetCDCStreamID(producer_table_->id())); + + ASSERT_OK(VerifyReplicationError(consumer_table_->id(), stream_id, std::nullopt)); + + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_read_rpc_timeout_ms) = 5 * 1000; + + producer_cluster()->mini_tablet_server(0)->Shutdown(); + + ASSERT_OK(VerifyReplicationError( + consumer_table_->id(), stream_id, ReplicationErrorPb::REPLICATION_SOURCE_UNREACHABLE, + kRpcTimeout)); + + ASSERT_OK(producer_cluster()->mini_tablet_server(0)->Start()); + + ASSERT_OK(VerifyReplicationError(consumer_table_->id(), stream_id, std::nullopt)); +} + +// Make sure we get SYSTEM_ERROR when apply fails. +TEST_F_EX(XClusterTest, ApplyReplicationError, XClusterTestNoParam) { + // Send metrics report including the xCluster status in every heartbeat. + const auto short_metric_report_interval_ms = 250; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_heartbeat_interval_ms) = short_metric_report_interval_ms * 2; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_tserver_heartbeat_metrics_interval_ms) = + short_metric_report_interval_ms; + + ASSERT_OK(SetUpWithParams( + {1}, {1}, /* replication_factor */ 1, /* num_masters */ 1, /* num_tservers */ 1)); + ASSERT_OK(SetupReplication()); + ASSERT_OK(CorrectlyPollingAllTablets(1)); + const auto stream_id = ASSERT_RESULT(GetCDCStreamID(producer_table_->id())); + + ASSERT_OK(VerifyReplicationError(consumer_table_->id(), stream_id, std::nullopt)); + + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_xcluster_simulate_random_failure_after_apply) = 1; + + ASSERT_OK(VerifyReplicationError( + consumer_table_->id(), stream_id, ReplicationErrorPb::REPLICATION_SYSTEM_ERROR)); + + ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_xcluster_simulate_random_failure_after_apply) = 0; + + ASSERT_OK(VerifyReplicationError(consumer_table_->id(), stream_id, std::nullopt)); +} + // Test deleting inbound replication group without performing source stream cleanup. TEST_F_EX(XClusterTest, DeleteWithoutStreamCleanup, XClusterTestNoParam) { constexpr int kNTabletsPerTable = 1; diff --git a/src/yb/integration-tests/xcluster/xcluster_test_base.cc b/src/yb/integration-tests/xcluster/xcluster_test_base.cc index 232c72935d37..851d7fbcaa41 100644 --- a/src/yb/integration-tests/xcluster/xcluster_test_base.cc +++ b/src/yb/integration-tests/xcluster/xcluster_test_base.cc @@ -822,7 +822,7 @@ Status XClusterTestBase::WaitForReplicationDrain( Status XClusterTestBase::VerifyReplicationError( const std::string& consumer_table_id, const xrepl::StreamId& stream_id, - const std::optional expected_replication_error) { + const std::optional expected_replication_error, int timeout_secs) { // 1. Verify that the RPC contains the expected error. master::GetReplicationStatusRequestPB req; master::GetReplicationStatusResponsePB resp; @@ -858,7 +858,7 @@ Status XClusterTestBase::VerifyReplicationError( return resp.statuses()[0].errors_size() == 0; } }, - MonoDelta::FromSeconds(30), "Waiting for replication error")); + MonoDelta::FromSeconds(timeout_secs), "Waiting for replication error")); // 2. Verify that the yb-admin output contains the expected error. auto admin_out = diff --git a/src/yb/integration-tests/xcluster/xcluster_test_base.h b/src/yb/integration-tests/xcluster/xcluster_test_base.h index 1bec8f992ff6..37b7775d60fa 100644 --- a/src/yb/integration-tests/xcluster/xcluster_test_base.h +++ b/src/yb/integration-tests/xcluster/xcluster_test_base.h @@ -339,7 +339,7 @@ class XClusterTestBase : public YBTest { Status VerifyReplicationError( const std::string& consumer_table_id, const xrepl::StreamId& stream_id, - const std::optional expected_replication_error); + const std::optional expected_replication_error, int timeout_secs = 30); Result GetCDCStreamID(const TableId& producer_table_id); diff --git a/src/yb/tserver/xcluster_poller.cc b/src/yb/tserver/xcluster_poller.cc index 7158f6e2867d..f38bfd05c3c9 100644 --- a/src/yb/tserver/xcluster_poller.cc +++ b/src/yb/tserver/xcluster_poller.cc @@ -412,6 +412,12 @@ void XClusterPoller::HandleGetChangesResponse( if (!status.ok()) { LOG_WITH_PREFIX(WARNING) << "XClusterPoller GetChanges failure: " << status.ToString(); + if (status.IsTimedOut() || status.IsNetworkError()) { + StoreReplicationError(ReplicationErrorPb::REPLICATION_SOURCE_UNREACHABLE); + } else { + StoreNOKReplicationError(); + } + if (FLAGS_enable_xcluster_stat_collection) { poll_stats_history_.SetError(std::move(status)); } @@ -487,6 +493,8 @@ void XClusterPoller::VerifyApplyChangesResponse(XClusterOutputClientResponse res RandomActWithProbability(FLAGS_TEST_xcluster_simulate_random_failure_after_apply)) { LOG_WITH_PREFIX(WARNING) << "ApplyChanges failure: " << response.status; + StoreNOKReplicationError(); + if (FLAGS_enable_xcluster_stat_collection) { poll_stats_history_.SetError(std::move(response.status)); } @@ -511,6 +519,9 @@ void XClusterPoller::HandleApplyChangesResponse(XClusterOutputClientResponse res if (!s.ok()) { // If processing ddl_queue table fails, then retry just this part (don't repeat ApplyChanges). YB_LOG_EVERY_N(WARNING, 30) << "ProcessDDLQueueTable Error: " << s << " " << THROTTLE_MSG; + + StoreNOKReplicationError(); + if (FLAGS_enable_xcluster_stat_collection) { poll_stats_history_.SetError(std::move(s)); } @@ -668,6 +679,17 @@ void XClusterPoller::StoreReplicationError(ReplicationErrorPb error) { } } +void XClusterPoller::StoreNOKReplicationError() { + { + std::lock_guard l(replication_error_mutex_); + if (previous_replication_error_ != ReplicationErrorPb::REPLICATION_OK) { + return; + } + } + + StoreReplicationError(ReplicationErrorPb::REPLICATION_SYSTEM_ERROR); +} + void XClusterPoller::ClearReplicationError() { StoreReplicationError(ReplicationErrorPb::REPLICATION_OK); } diff --git a/src/yb/tserver/xcluster_poller.h b/src/yb/tserver/xcluster_poller.h index d562c67bc85a..c0021d0e2e48 100644 --- a/src/yb/tserver/xcluster_poller.h +++ b/src/yb/tserver/xcluster_poller.h @@ -106,6 +106,8 @@ class XClusterPoller : public XClusterAsyncExecutor { void MarkFailed(const std::string& reason, const Status& status = Status::OK()) override; // Stores a replication error and detail. This overwrites a previously stored 'error'. void StoreReplicationError(ReplicationErrorPb error) EXCLUDES(replication_error_mutex_); + // Stores a non OK replication error if one has not already been set. + void StoreNOKReplicationError() EXCLUDES(replication_error_mutex_); void ClearReplicationError() EXCLUDES(replication_error_mutex_); void TEST_IncrementNumSuccessfulWriteRpcs(); void ApplyChangesCallback(XClusterOutputClientResponse&& response);