From 9d91c14a99e76978eadce5976574a83246a0bfc6 Mon Sep 17 00:00:00 2001 From: Yi Cheng Date: Sat, 23 Oct 2021 06:08:52 +0000 Subject: [PATCH 01/15] fix --- .../grpc_based_resource_broadcaster.cc | 23 +++---------------- .../grpc_based_resource_broadcaster.h | 12 ++++------ 2 files changed, 7 insertions(+), 28 deletions(-) diff --git a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc index 841352e8f2635..488ca243370fe 100644 --- a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc +++ b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc @@ -28,7 +28,7 @@ GrpcBasedResourceBroadcaster::GrpcBasedResourceBroadcaster( send_batch ) - : seq_no_(0), + : seq_no_(absl::GetCurrentTimeNanos()), ticker_(broadcast_service_), raylet_client_pool_(raylet_client_pool), get_resource_usage_batch_for_broadcast_(get_resource_usage_batch_for_broadcast), @@ -88,7 +88,7 @@ void GrpcBasedResourceBroadcaster::HandleNodeRemoved(const rpc::GcsNodeInfo &nod { absl::MutexLock guard(&mutex_); nodes_.erase(node_id); - inflight_updates_.erase(node_id); + // inflight_updates_.erase(node_id); RAY_LOG(DEBUG) << "Node removed (node_id: " << node_id << ")# of remaining nodes: " << nodes_.size(); } @@ -98,8 +98,7 @@ std::string GrpcBasedResourceBroadcaster::DebugString() { absl::MutexLock guard(&mutex_); std::ostringstream stream; stream << "GrpcBasedResourceBroadcaster: {Tracked nodes: " << nodes_.size() - << ", Nodes skipped in last broadcast: " << num_skipped_nodes_; - + << "}" return stream.str(); } @@ -118,17 +117,9 @@ void GrpcBasedResourceBroadcaster::SendBroadcast() { stats::OutboundHeartbeatSizeKB.Record((double)(serialized_batch.size() / 1024.0)); absl::MutexLock guard(&mutex_); - num_skipped_nodes_ = 0; for (const auto &pair : nodes_) { const auto &node_id = pair.first; const auto &address = pair.second; - - auto already_inflight = inflight_updates_[node_id]; - if (already_inflight) { - num_skipped_nodes_++; - continue; - } - double start_time = absl::GetCurrentTimeNanos(); auto callback = [this, node_id, start_time]( const Status &status, @@ -136,15 +127,7 @@ void GrpcBasedResourceBroadcaster::SendBroadcast() { double end_time = absl::GetCurrentTimeNanos(); double lapsed_time_ms = static_cast(end_time - start_time) / 1e6; ray::stats::GcsUpdateResourceUsageTime.Record(lapsed_time_ms); - - absl::MutexLock guard(&mutex_); - if (inflight_updates_.count(node_id)) { - // The entry may have already been freed if the node was removed before the - // request finished. - inflight_updates_[node_id] = false; - } }; - inflight_updates_[node_id] = true; send_batch_(address, raylet_client_pool_, serialized_batch, callback); } } diff --git a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.h b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.h index 49570f88df9ef..a4e4fd67a2a37 100644 --- a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.h +++ b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.h @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "absl/container/flat_hash_map.h" + #include "ray/common/asio/instrumented_io_context.h" #include "ray/gcs/gcs_server/gcs_resource_manager.h" #include "ray/rpc/node_manager/node_manager_client_pool.h" @@ -82,14 +84,8 @@ class GrpcBasedResourceBroadcaster { /// A lock to protect the data structures. absl::Mutex mutex_; /// The set of nodes and their addresses which are subscribed to resource usage changes. - std::unordered_map nodes_ GUARDED_BY(mutex_); - /// Whether the node currently has an inflight request already. An inflight request is - /// one that has been sent, with no reply, error, or timeout. - std::unordered_map inflight_updates_ GUARDED_BY(mutex_); - - /// The number of nodes skipped in the latest broadcast round. This is useful for - /// diagnostic purposes. - uint64_t num_skipped_nodes_; + absl::flat_hash_map nodes_ GUARDED_BY(mutex_); + const uint64_t broadcast_period_ms_; void SendBroadcast(); From 19db2c1ce29ff31f2f6d359e827197b91a453af8 Mon Sep 17 00:00:00 2001 From: Yi Cheng Date: Sat, 23 Oct 2021 06:10:23 +0000 Subject: [PATCH 02/15] up --- src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc | 4 ++-- src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.h | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc index 488ca243370fe..3b25c383c8784 100644 --- a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc +++ b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "ray/gcs/gcs_server/grpc_based_resource_broadcaster.h" + #include "ray/stats/stats.h" namespace ray { @@ -98,8 +99,7 @@ std::string GrpcBasedResourceBroadcaster::DebugString() { absl::MutexLock guard(&mutex_); std::ostringstream stream; stream << "GrpcBasedResourceBroadcaster: {Tracked nodes: " << nodes_.size() - << "}" - return stream.str(); + << "}" return stream.str(); } void GrpcBasedResourceBroadcaster::SendBroadcast() { diff --git a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.h b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.h index a4e4fd67a2a37..fdaa7d19700b1 100644 --- a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.h +++ b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.h @@ -13,7 +13,6 @@ // limitations under the License. #include "absl/container/flat_hash_map.h" - #include "ray/common/asio/instrumented_io_context.h" #include "ray/gcs/gcs_server/gcs_resource_manager.h" #include "ray/rpc/node_manager/node_manager_client_pool.h" From 6fa1a1e973e846f4797a04e03c7c09471667ff91 Mon Sep 17 00:00:00 2001 From: Yi Cheng Date: Sat, 23 Oct 2021 06:35:06 +0000 Subject: [PATCH 03/15] fix --- src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc index 3b25c383c8784..6e01db40c4448 100644 --- a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc +++ b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc @@ -99,7 +99,8 @@ std::string GrpcBasedResourceBroadcaster::DebugString() { absl::MutexLock guard(&mutex_); std::ostringstream stream; stream << "GrpcBasedResourceBroadcaster: {Tracked nodes: " << nodes_.size() - << "}" return stream.str(); + << "}"; + return stream.str(); } void GrpcBasedResourceBroadcaster::SendBroadcast() { From 9a44fcaedddc0e6afccc6926cd01d5ac744a6dc8 Mon Sep 17 00:00:00 2001 From: Yi Cheng Date: Sat, 23 Oct 2021 06:35:27 +0000 Subject: [PATCH 04/15] up --- src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc index 6e01db40c4448..575e0a864aff9 100644 --- a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc +++ b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc @@ -98,8 +98,7 @@ void GrpcBasedResourceBroadcaster::HandleNodeRemoved(const rpc::GcsNodeInfo &nod std::string GrpcBasedResourceBroadcaster::DebugString() { absl::MutexLock guard(&mutex_); std::ostringstream stream; - stream << "GrpcBasedResourceBroadcaster: {Tracked nodes: " << nodes_.size() - << "}"; + stream << "GrpcBasedResourceBroadcaster: {Tracked nodes: " << nodes_.size() << "}"; return stream.str(); } From d260df11e15ab5ca6ca9b14686ad23c85b7f5c6d Mon Sep 17 00:00:00 2001 From: Yi Cheng Date: Mon, 25 Oct 2021 18:49:20 +0000 Subject: [PATCH 05/15] up --- release/nightly_tests/many_nodes_tests/app_config_grpc.yaml | 2 +- src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/release/nightly_tests/many_nodes_tests/app_config_grpc.yaml b/release/nightly_tests/many_nodes_tests/app_config_grpc.yaml index e0881b604e885..1d167919de0c6 100644 --- a/release/nightly_tests/many_nodes_tests/app_config_grpc.yaml +++ b/release/nightly_tests/many_nodes_tests/app_config_grpc.yaml @@ -1,4 +1,4 @@ -base_image: "anyscale/ray-ml:pinned-nightly-py37" +base_image: "anyscale/ray-ml:pinned-nightly-py38" env_vars: {"RAY_gcs_server_rpc_server_thread_num": "8", "RAY_GCS_ACTOR_SCHEDULING_ENABLED": "true", "RAY_grpc_based_resource_broadcast": "true"} debian_packages: [] diff --git a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc index 575e0a864aff9..7da2916cb3b58 100644 --- a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc +++ b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc @@ -34,7 +34,6 @@ GrpcBasedResourceBroadcaster::GrpcBasedResourceBroadcaster( raylet_client_pool_(raylet_client_pool), get_resource_usage_batch_for_broadcast_(get_resource_usage_batch_for_broadcast), send_batch_(send_batch), - num_skipped_nodes_(0), broadcast_period_ms_( RayConfig::instance().raylet_report_resources_period_milliseconds()) {} From acca8c6574747b154abbe822a29f37c4c92171ca Mon Sep 17 00:00:00 2001 From: Yi Cheng Date: Mon, 25 Oct 2021 18:54:06 +0000 Subject: [PATCH 06/15] back --- release/nightly_tests/many_nodes_tests/app_config_grpc.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release/nightly_tests/many_nodes_tests/app_config_grpc.yaml b/release/nightly_tests/many_nodes_tests/app_config_grpc.yaml index 1d167919de0c6..e0881b604e885 100644 --- a/release/nightly_tests/many_nodes_tests/app_config_grpc.yaml +++ b/release/nightly_tests/many_nodes_tests/app_config_grpc.yaml @@ -1,4 +1,4 @@ -base_image: "anyscale/ray-ml:pinned-nightly-py38" +base_image: "anyscale/ray-ml:pinned-nightly-py37" env_vars: {"RAY_gcs_server_rpc_server_thread_num": "8", "RAY_GCS_ACTOR_SCHEDULING_ENABLED": "true", "RAY_grpc_based_resource_broadcast": "true"} debian_packages: [] From 5e43e40425c522c174ba1d10fcde364c17cb56ea Mon Sep 17 00:00:00 2001 From: Yi Cheng Date: Mon, 25 Oct 2021 21:50:44 +0000 Subject: [PATCH 07/15] up --- .../gcs/gcs_server/test/grpc_based_resource_broadcaster_test.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ray/gcs/gcs_server/test/grpc_based_resource_broadcaster_test.cc b/src/ray/gcs/gcs_server/test/grpc_based_resource_broadcaster_test.cc index da8f9faedd1c9..ab5b337a7d7b5 100644 --- a/src/ray/gcs/gcs_server/test/grpc_based_resource_broadcaster_test.cc +++ b/src/ray/gcs/gcs_server/test/grpc_based_resource_broadcaster_test.cc @@ -52,7 +52,6 @@ class GrpcBasedResourceBroadcasterTest : public ::testing::Test { void AssertNoLeaks() { absl::MutexLock guard(&broadcaster_.mutex_); ASSERT_EQ(broadcaster_.nodes_.size(), 0); - ASSERT_EQ(broadcaster_.inflight_updates_.size(), 0); } int num_batches_sent_; From 1c7617d8286228e9877e77a07a41a73cdcd4759d Mon Sep 17 00:00:00 2001 From: Yi Cheng Date: Mon, 25 Oct 2021 21:50:54 +0000 Subject: [PATCH 08/15] format --- .../gcs_server/test/grpc_based_resource_broadcaster_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ray/gcs/gcs_server/test/grpc_based_resource_broadcaster_test.cc b/src/ray/gcs/gcs_server/test/grpc_based_resource_broadcaster_test.cc index ab5b337a7d7b5..6c7cd029ae1b1 100644 --- a/src/ray/gcs/gcs_server/test/grpc_based_resource_broadcaster_test.cc +++ b/src/ray/gcs/gcs_server/test/grpc_based_resource_broadcaster_test.cc @@ -13,10 +13,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "ray/gcs/gcs_server/grpc_based_resource_broadcaster.h" + #include #include "gtest/gtest.h" -#include "ray/gcs/gcs_server/grpc_based_resource_broadcaster.h" #include "ray/gcs/test/gcs_test_util.h" namespace ray { From a2de6d975d689139cfe6be2ad97aa8ae0f8a2f88 Mon Sep 17 00:00:00 2001 From: Yi Cheng Date: Mon, 25 Oct 2021 21:57:43 +0000 Subject: [PATCH 09/15] up --- src/ray/common/ray_config_def.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index d0e030e4a45db..176f8003c4d27 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -245,7 +245,7 @@ RAY_CONFIG(int, gcs_resource_report_poll_period_ms, 100) RAY_CONFIG(uint64_t, gcs_max_concurrent_resource_pulls, 100) // Feature flag to use grpc instead of redis for resource broadcast. // TODO(ekl) broken as of https://github.com/ray-project/ray/issues/16858 -RAY_CONFIG(bool, grpc_based_resource_broadcast, false) +RAY_CONFIG(bool, grpc_based_resource_broadcast, true) // Feature flag to enable grpc based pubsub in GCS. RAY_CONFIG(bool, gcs_grpc_based_pubsub, false) From b051ceb9422e6cd92835c3d9e2243d04bde4a983 Mon Sep 17 00:00:00 2001 From: Yi Cheng Date: Mon, 25 Oct 2021 22:19:29 +0000 Subject: [PATCH 10/15] fix --- src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc index 7da2916cb3b58..e506ac86beca0 100644 --- a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc +++ b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc @@ -120,7 +120,7 @@ void GrpcBasedResourceBroadcaster::SendBroadcast() { const auto &node_id = pair.first; const auto &address = pair.second; double start_time = absl::GetCurrentTimeNanos(); - auto callback = [this, node_id, start_time]( + auto callback = [this, start_time]( const Status &status, const rpc::UpdateResourceUsageReply &reply) { double end_time = absl::GetCurrentTimeNanos(); From 7385a870cc417f47fe38e9e6ff380f589cf021be Mon Sep 17 00:00:00 2001 From: Yi Cheng Date: Mon, 25 Oct 2021 23:06:36 +0000 Subject: [PATCH 11/15] up --- src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc index e506ac86beca0..22c28e9103e02 100644 --- a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc +++ b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc @@ -120,9 +120,8 @@ void GrpcBasedResourceBroadcaster::SendBroadcast() { const auto &node_id = pair.first; const auto &address = pair.second; double start_time = absl::GetCurrentTimeNanos(); - auto callback = [this, start_time]( - const Status &status, - const rpc::UpdateResourceUsageReply &reply) { + auto callback = [this, start_time](const Status &status, + const rpc::UpdateResourceUsageReply &reply) { double end_time = absl::GetCurrentTimeNanos(); double lapsed_time_ms = static_cast(end_time - start_time) / 1e6; ray::stats::GcsUpdateResourceUsageTime.Record(lapsed_time_ms); From 4999bf90e7e31aae8869880b4389e4cb628ad5fe Mon Sep 17 00:00:00 2001 From: Yi Cheng Date: Mon, 25 Oct 2021 23:15:59 +0000 Subject: [PATCH 12/15] up --- .../gcs_server/grpc_based_resource_broadcaster.cc | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc index 22c28e9103e02..2c51d6fe3699e 100644 --- a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc +++ b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc @@ -88,17 +88,19 @@ void GrpcBasedResourceBroadcaster::HandleNodeRemoved(const rpc::GcsNodeInfo &nod { absl::MutexLock guard(&mutex_); nodes_.erase(node_id); - // inflight_updates_.erase(node_id); RAY_LOG(DEBUG) << "Node removed (node_id: " << node_id << ")# of remaining nodes: " << nodes_.size(); } } std::string GrpcBasedResourceBroadcaster::DebugString() { - absl::MutexLock guard(&mutex_); - std::ostringstream stream; - stream << "GrpcBasedResourceBroadcaster: {Tracked nodes: " << nodes_.size() << "}"; - return stream.str(); + size_t node_num = 0; + { + + absl::MutexLock guard(&mutex_); + node_num = nodes_.size(); + } + return absl::StrCat("GrpcBasedResourceBroadcaster: {Tracked nodes: ", node_num, "}"); } void GrpcBasedResourceBroadcaster::SendBroadcast() { @@ -117,7 +119,6 @@ void GrpcBasedResourceBroadcaster::SendBroadcast() { absl::MutexLock guard(&mutex_); for (const auto &pair : nodes_) { - const auto &node_id = pair.first; const auto &address = pair.second; double start_time = absl::GetCurrentTimeNanos(); auto callback = [this, start_time](const Status &status, From ad08270744732f20c2852d07de966da1bcf188f5 Mon Sep 17 00:00:00 2001 From: Yi Cheng Date: Mon, 25 Oct 2021 23:21:22 +0000 Subject: [PATCH 13/15] up --- src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc index 2c51d6fe3699e..b811a6857603a 100644 --- a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc +++ b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc @@ -96,9 +96,8 @@ void GrpcBasedResourceBroadcaster::HandleNodeRemoved(const rpc::GcsNodeInfo &nod std::string GrpcBasedResourceBroadcaster::DebugString() { size_t node_num = 0; { - absl::MutexLock guard(&mutex_); - node_num = nodes_.size(); + node_num = nodes_.size(); } return absl::StrCat("GrpcBasedResourceBroadcaster: {Tracked nodes: ", node_num, "}"); } From 9c735b076e2003e32ba9fffd7c913531aa5e4d81 Mon Sep 17 00:00:00 2001 From: Yi Cheng Date: Tue, 26 Oct 2021 02:44:52 +0000 Subject: [PATCH 14/15] up --- src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc index b811a6857603a..86cd748527517 100644 --- a/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc +++ b/src/ray/gcs/gcs_server/grpc_based_resource_broadcaster.cc @@ -120,8 +120,8 @@ void GrpcBasedResourceBroadcaster::SendBroadcast() { for (const auto &pair : nodes_) { const auto &address = pair.second; double start_time = absl::GetCurrentTimeNanos(); - auto callback = [this, start_time](const Status &status, - const rpc::UpdateResourceUsageReply &reply) { + auto callback = [start_time](const Status &status, + const rpc::UpdateResourceUsageReply &reply) { double end_time = absl::GetCurrentTimeNanos(); double lapsed_time_ms = static_cast(end_time - start_time) / 1e6; ray::stats::GcsUpdateResourceUsageTime.Record(lapsed_time_ms); From 827dbfcb7a6008da3362a5915df636e764f2b9f1 Mon Sep 17 00:00:00 2001 From: Yi Cheng Date: Tue, 26 Oct 2021 05:47:29 +0000 Subject: [PATCH 15/15] remove unrelated test --- .../grpc_based_resource_broadcaster_test.cc | 42 ------------------- 1 file changed, 42 deletions(-) diff --git a/src/ray/gcs/gcs_server/test/grpc_based_resource_broadcaster_test.cc b/src/ray/gcs/gcs_server/test/grpc_based_resource_broadcaster_test.cc index 6c7cd029ae1b1..f212f662d7ced 100644 --- a/src/ray/gcs/gcs_server/test/grpc_based_resource_broadcaster_test.cc +++ b/src/ray/gcs/gcs_server/test/grpc_based_resource_broadcaster_test.cc @@ -69,48 +69,6 @@ TEST_F(GrpcBasedResourceBroadcasterTest, TestBasic) { ASSERT_EQ(num_batches_sent_, 1); } -TEST_F(GrpcBasedResourceBroadcasterTest, TestStragglerNodes) { - // When a node doesn't ACK a batch update, drop future requests to that node to prevent - // a queue from building up. - for (int i = 0; i < 10; i++) { - auto node_info = Mocker::GenNodeInfo(); - broadcaster_.HandleNodeAdded(*node_info); - } - - SendBroadcast(); - ASSERT_EQ(callbacks_.size(), 10); - ASSERT_EQ(num_batches_sent_, 10); - - // Only 7 nodes reply. - for (int i = 0; i < 7; i++) { - rpc::UpdateResourceUsageReply reply; - auto &callback = callbacks_.front(); - callback(Status::OK(), reply); - callbacks_.pop_front(); - } - ASSERT_EQ(callbacks_.size(), 3); - ASSERT_EQ(num_batches_sent_, 10); - - // We should only send a new rpc to the 7 nodes that haven't received one yet. - SendBroadcast(); - ASSERT_EQ(callbacks_.size(), 10); - ASSERT_EQ(num_batches_sent_, 17); - - // Now clear the queue and resume sending broadcasts to everyone. - while (callbacks_.size()) { - rpc::UpdateResourceUsageReply reply; - auto &callback = callbacks_.front(); - callback(Status::OK(), reply); - callbacks_.pop_front(); - } - ASSERT_EQ(callbacks_.size(), 0); - ASSERT_EQ(num_batches_sent_, 17); - - SendBroadcast(); - ASSERT_EQ(callbacks_.size(), 10); - ASSERT_EQ(num_batches_sent_, 27); -} - TEST_F(GrpcBasedResourceBroadcasterTest, TestNodeRemoval) { auto node_info = Mocker::GenNodeInfo(); broadcaster_.HandleNodeAdded(*node_info);