diff --git a/include/ylt/coro_io/client_pool.hpp b/include/ylt/coro_io/client_pool.hpp index b66bd2ced..73d321f68 100644 --- a/include/ylt/coro_io/client_pool.hpp +++ b/include/ylt/coro_io/client_pool.hpp @@ -58,15 +58,15 @@ class client_pool : public std::enable_shared_from_this< client_pool> { using client_pools_t = client_pools; static async_simple::coro::Lazy collect_idle_timeout_client( - std::weak_ptr self_weak) { + std::weak_ptr self_weak, + coro_io::detail::client_queue>& clients, + std::chrono::milliseconds sleep_time, std::size_t clear_cnt) { std::shared_ptr self = self_weak.lock(); if (self == nullptr) { co_return; } while (true) { - auto sleep_time = self->pool_config_.idle_timeout; - auto clear_cnt = self->pool_config_.idle_queue_per_max_clear_count; - self->free_clients_.reselect(); + clients.reselect(); self = nullptr; co_await coro_io::sleep_for(sleep_time); if ((self = self_weak.lock()) == nullptr) { @@ -74,7 +74,7 @@ class client_pool : public std::enable_shared_from_this< } std::unique_ptr client; while (true) { - std::size_t is_all_cleared = self->free_clients_.clear_old(clear_cnt); + std::size_t is_all_cleared = clients.clear_old(clear_cnt); if (is_all_cleared != 0) [[unlikely]] { try { co_await async_simple::coro::Yield{}; @@ -86,12 +86,12 @@ class client_pool : public std::enable_shared_from_this< break; } } - --self->collecter_cnt_; - if (self->free_clients_.size() == 0) { + --clients.collecter_cnt_; + if (clients.size() == 0) { break; } std::size_t expected = 0; - if (!self->collecter_cnt_.compare_exchange_strong(expected, 1)) + if (!clients.collecter_cnt_.compare_exchange_strong(expected, 1)) break; } co_return; @@ -111,14 +111,11 @@ class client_pool : public std::enable_shared_from_this< const typename client_t::config& client_config) { std::unique_ptr client; - while (true) { - if (!free_clients_.try_dequeue(client)) { - break; - } - if (!client->has_closed()) { - break; - } + free_clients_.try_dequeue(client); + if (!client) { + short_connect_clients_.try_dequeue(client); } + assert(client == nullptr || !client->has_closed()); if (client == nullptr) { client = std::make_unique(*io_context_pool_.get_executor()); @@ -137,18 +134,33 @@ class client_pool : public std::enable_shared_from_this< } } + void enqueue( + coro_io::detail::client_queue>& clients, + std::unique_ptr client, bool is_short_client) { + if (clients.enqueue(std::move(client)) == 1) { + std::size_t expected = 0; + if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) { + collect_idle_timeout_client( + this->shared_from_this(), clients, + is_short_client + ? (std::min)(pool_config_.idle_timeout, + pool_config_.short_connect_idle_timeout) + : pool_config_.idle_timeout, + pool_config_.idle_queue_per_max_clear_count) + .via(coro_io::get_global_executor()) + .start([](auto&&) { + }); + } + } + } + void collect_free_client(std::unique_ptr client) { - if (client && free_clients_.size() < pool_config_.max_connection) { - if (!client->has_closed()) { - if (free_clients_.enqueue(std::move(client)) == 1) { - std::size_t expected = 0; - if (collecter_cnt_.compare_exchange_strong(expected, 1)) { - collect_idle_timeout_client(this->shared_from_this()) - .via(coro_io::get_global_executor()) - .start([](auto&&) { - }); - } - } + if (client && !client->has_closed()) { + if (free_clients_.size() < pool_config_.max_connection) { + enqueue(free_clients_, std::move(client), false); + } + else { + enqueue(short_connect_clients_, std::move(client), true); } } return; @@ -181,6 +193,7 @@ class client_pool : public std::enable_shared_from_this< uint32_t idle_queue_per_max_clear_count = 1000; std::chrono::milliseconds reconnect_wait_time{1000}; std::chrono::milliseconds idle_timeout{30000}; + std::chrono::milliseconds short_connect_idle_timeout{1000}; typename client_t::config client_config; }; @@ -246,7 +259,7 @@ class client_pool : public std::enable_shared_from_this< } std::size_t free_client_count() const noexcept { - return free_clients_.size(); + return free_clients_.size() + short_connect_clients_.size(); } std::string_view get_host_name() const noexcept { return host_name_; } @@ -287,12 +300,13 @@ class client_pool : public std::enable_shared_from_this< } coro_io::detail::client_queue> free_clients_; + coro_io::detail::client_queue> + short_connect_clients_; client_pools_t* pools_manager_ = nullptr; async_simple::Promise idle_timeout_waiter; std::string host_name_; pool_config pool_config_; io_context_pool_t& io_context_pool_; - std::atomic collecter_cnt_; }; template queue_[2]; std::atomic_int_fast16_t selected_index_ = 0; std::atomic size_[2] = {}; + + public: + std::atomic collecter_cnt_ = 0; + + private: struct fake_client { template fake_client& operator=(T&&) noexcept { diff --git a/src/coro_io/tests/test_client_pool.cpp b/src/coro_io/tests/test_client_pool.cpp index 483d649b8..7c28b6dbd 100644 --- a/src/coro_io/tests/test_client_pool.cpp +++ b/src/coro_io/tests/test_client_pool.cpp @@ -88,7 +88,9 @@ TEST_CASE("test client pool") { auto is_started = server.async_start(); REQUIRE(is_started); auto pool = coro_io::client_pool::create( - "127.0.0.1:8801", {.max_connection = 100, .idle_timeout = 300ms}); + "127.0.0.1:8801", {.max_connection = 100, + .idle_timeout = 300ms, + .short_connect_idle_timeout = 50ms}); SpinLock lock; ConditionVariable cv; auto res = co_await event(20, *pool, cv, lock); @@ -97,8 +99,11 @@ TEST_CASE("test client pool") { res = co_await event(200, *pool, cv, lock); CHECK(res); auto sz = pool->free_client_count(); + CHECK(sz == 200); + co_await coro_io::sleep_for(150ms); + sz = pool->free_client_count(); CHECK((sz >= 100 && sz <= 105)); - co_await coro_io::sleep_for(700ms); + co_await coro_io::sleep_for(550ms); CHECK(pool->free_client_count() == 0); server.stop(); }());