diff --git a/docs/root/intro/arch_overview/intro/threading_model.rst b/docs/root/intro/arch_overview/intro/threading_model.rst index ca83cb92e92c..168f4321bfcb 100644 --- a/docs/root/intro/arch_overview/intro/threading_model.rst +++ b/docs/root/intro/arch_overview/intro/threading_model.rst @@ -24,3 +24,7 @@ to have Envoy forcibly balance connections between worker threads. To support th Envoy allows for different types of :ref:`connection balancing ` to be configured on each :ref:`listener `. + +On Windows the kernel is not able to balance the connections properly with the async IO model that Envoy is using. +Until this is fixed by the platform, Envoy will enforce listener connection balancing on Windows. This allows us to +balance connections between different worker threads. This behavior comes with a performance penalty. diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 6b6410036ebc..e066ef2b3ea0 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -9,6 +9,9 @@ Minor Behavior Changes Bug Fixes --------- +*Changes expected to improve the state of the world and are unlikely to have negative effects* + +* listener: fixed an issue on Windows where connections are not handled by all worker threads. Removed Config or Runtime ------------------------- diff --git a/source/server/listener_impl.cc b/source/server/listener_impl.cc index ed1ff37e85a6..09965fa30287 100644 --- a/source/server/listener_impl.cc +++ b/source/server/listener_impl.cc @@ -523,6 +523,19 @@ void ListenerImpl::buildFilterChains() { void ListenerImpl::buildSocketOptions() { // TCP specific setup. if (connection_balancer_ == nullptr) { +#ifdef WIN32 + // On Windows we use the exact connection balancer to dispatch connections + // from worker 1 to all workers. This is a perf hit but it is the only way + // to make all the workers do work. + // TODO(davinci26): We can be faster here if we create a balancer implementation + // that dispatches the connection to a random thread. + ENVOY_LOG(warn, + "ExactBalance was forced enabled for TCP listener '{}' because " + "Envoy is running on Windows." + "ExactBalance is used to load balance connections between workers on Windows.", + config_.name()); + connection_balancer_ = std::make_shared(); +#else // Not in place listener update. if (config_.has_connection_balance_config()) { // Currently exact balance is the only supported type and there are no options. @@ -531,6 +544,7 @@ void ListenerImpl::buildSocketOptions() { } else { connection_balancer_ = std::make_shared(); } +#endif } if (config_.has_tcp_fast_open_queue_length()) { diff --git a/test/integration/integration_test.cc b/test/integration/integration_test.cc index 29ca3be6d8e7..1f2d249c53df 100644 --- a/test/integration/integration_test.cc +++ b/test/integration/integration_test.cc @@ -142,6 +142,56 @@ TEST_P(IntegrationTest, PerWorkerStatsAndBalancing) { check_listener_stats(0, 1); } +// Make sure all workers pick up connections +#ifdef WIN32 +// We can only guarantee this on Windows without the reuse_port changes. +TEST_P(IntegrationTest, AllWorkersAreHandlingLoad) { + concurrency_ = 2; + initialize(); + + std::string worker0_stat_name, worker1_stat_name; + if (GetParam() == Network::Address::IpVersion::v4) { + worker0_stat_name = "listener.127.0.0.1_0.worker_0.downstream_cx_total"; + worker1_stat_name = "listener.127.0.0.1_0.worker_1.downstream_cx_total"; + } else { + worker0_stat_name = "listener.[__1]_0.worker_0.downstream_cx_total"; + worker1_stat_name = "listener.[__1]_0.worker_1.downstream_cx_total"; + } + + test_server_->waitForCounterEq(worker0_stat_name, 0); + test_server_->waitForCounterEq(worker1_stat_name, 0); + + // We set the counters for the two workers to see how many connections each handles. + uint64_t w0_ctr = 0; + uint64_t w1_ctr = 0; + constexpr int loops = 5; + for (int i = 0; i < loops; i++) { + constexpr int requests_per_loop = 4; + std::array connections; + for (int j = 0; j < requests_per_loop; j++) { + connections[j] = makeHttpConnection(lookupPort("http")); + } + + auto worker0_ctr = test_server_->counter(worker0_stat_name); + auto worker1_ctr = test_server_->counter(worker1_stat_name); + auto target = w0_ctr + w1_ctr + requests_per_loop; + while (test_server_->counter(worker0_stat_name)->value() + + test_server_->counter(worker1_stat_name)->value() < + target) { + timeSystem().advanceTimeWait(std::chrono::milliseconds(10)); + } + w0_ctr = test_server_->counter(worker0_stat_name)->value(); + w1_ctr = test_server_->counter(worker1_stat_name)->value(); + for (int j = 0; j < requests_per_loop; j++) { + connections[j]->close(); + } + } + + EXPECT_TRUE(w0_ctr > 1); + EXPECT_TRUE(w1_ctr > 1); +} +#endif + TEST_P(IntegrationTest, RouterDirectResponseWithBody) { const std::string body = "Response body"; const std::string file_path = TestEnvironment::writeStringToFileForTest("test_envoy", body);