Skip to content

Commit

Permalink
[coro_io][feat] keep short connection. (#386)
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle committed Jul 27, 2023
1 parent fbc35e0 commit 503555a
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 30 deletions.
70 changes: 42 additions & 28 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,23 @@ class client_pool : public std::enable_shared_from_this<
client_pool<client_t, io_context_pool_t>> {
using client_pools_t = client_pools<client_t, io_context_pool_t>;
static async_simple::coro::Lazy<void> collect_idle_timeout_client(
std::weak_ptr<client_pool> self_weak) {
std::weak_ptr<client_pool> self_weak,
coro_io::detail::client_queue<std::unique_ptr<client_t>>& clients,
std::chrono::milliseconds sleep_time, std::size_t clear_cnt) {
std::shared_ptr<client_pool> self = self_weak.lock();
if (self == nullptr) {
co_return;
}
while (true) {
auto sleep_time = self->pool_config_.idle_timeout;
auto clear_cnt = self->pool_config_.idle_queue_per_max_clear_count;
self->free_clients_.reselect();
clients.reselect();
self = nullptr;
co_await coro_io::sleep_for(sleep_time);
if ((self = self_weak.lock()) == nullptr) {
break;
}
std::unique_ptr<client_t> client;
while (true) {
std::size_t is_all_cleared = self->free_clients_.clear_old(clear_cnt);
std::size_t is_all_cleared = clients.clear_old(clear_cnt);
if (is_all_cleared != 0) [[unlikely]] {
try {
co_await async_simple::coro::Yield{};
Expand All @@ -86,12 +86,12 @@ class client_pool : public std::enable_shared_from_this<
break;
}
}
--self->collecter_cnt_;
if (self->free_clients_.size() == 0) {
--clients.collecter_cnt_;
if (clients.size() == 0) {
break;
}
std::size_t expected = 0;
if (!self->collecter_cnt_.compare_exchange_strong(expected, 1))
if (!clients.collecter_cnt_.compare_exchange_strong(expected, 1))
break;
}
co_return;
Expand All @@ -111,14 +111,11 @@ class client_pool : public std::enable_shared_from_this<
const typename client_t::config& client_config) {
std::unique_ptr<client_t> client;

while (true) {
if (!free_clients_.try_dequeue(client)) {
break;
}
if (!client->has_closed()) {
break;
}
free_clients_.try_dequeue(client);
if (!client) {
short_connect_clients_.try_dequeue(client);
}
assert(client == nullptr || !client->has_closed());

if (client == nullptr) {
client = std::make_unique<client_t>(*io_context_pool_.get_executor());
Expand All @@ -137,18 +134,33 @@ class client_pool : public std::enable_shared_from_this<
}
}

void enqueue(
coro_io::detail::client_queue<std::unique_ptr<client_t>>& clients,
std::unique_ptr<client_t> client, bool is_short_client) {
if (clients.enqueue(std::move(client)) == 1) {
std::size_t expected = 0;
if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) {
collect_idle_timeout_client(
this->shared_from_this(), clients,
is_short_client
? (std::min)(pool_config_.idle_timeout,
pool_config_.short_connect_idle_timeout)
: pool_config_.idle_timeout,
pool_config_.idle_queue_per_max_clear_count)
.via(coro_io::get_global_executor())
.start([](auto&&) {
});
}
}
}

void collect_free_client(std::unique_ptr<client_t> client) {
if (client && free_clients_.size() < pool_config_.max_connection) {
if (!client->has_closed()) {
if (free_clients_.enqueue(std::move(client)) == 1) {
std::size_t expected = 0;
if (collecter_cnt_.compare_exchange_strong(expected, 1)) {
collect_idle_timeout_client(this->shared_from_this())
.via(coro_io::get_global_executor())
.start([](auto&&) {
});
}
}
if (client && !client->has_closed()) {
if (free_clients_.size() < pool_config_.max_connection) {
enqueue(free_clients_, std::move(client), false);
}
else {
enqueue(short_connect_clients_, std::move(client), true);
}
}
return;
Expand Down Expand Up @@ -181,6 +193,7 @@ class client_pool : public std::enable_shared_from_this<
uint32_t idle_queue_per_max_clear_count = 1000;
std::chrono::milliseconds reconnect_wait_time{1000};
std::chrono::milliseconds idle_timeout{30000};
std::chrono::milliseconds short_connect_idle_timeout{1000};
typename client_t::config client_config;
};

Expand Down Expand Up @@ -246,7 +259,7 @@ class client_pool : public std::enable_shared_from_this<
}

std::size_t free_client_count() const noexcept {
return free_clients_.size();
return free_clients_.size() + short_connect_clients_.size();
}

std::string_view get_host_name() const noexcept { return host_name_; }
Expand Down Expand Up @@ -287,12 +300,13 @@ class client_pool : public std::enable_shared_from_this<
}

coro_io::detail::client_queue<std::unique_ptr<client_t>> free_clients_;
coro_io::detail::client_queue<std::unique_ptr<client_t>>
short_connect_clients_;
client_pools_t* pools_manager_ = nullptr;
async_simple::Promise<async_simple::Unit> idle_timeout_waiter;
std::string host_name_;
pool_config pool_config_;
io_context_pool_t& io_context_pool_;
std::atomic<std::size_t> collecter_cnt_;
};

template <typename client_t,
Expand Down
5 changes: 5 additions & 0 deletions include/ylt/coro_io/detail/client_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ class client_queue {
moodycamel::ConcurrentQueue<client_t> queue_[2];
std::atomic_int_fast16_t selected_index_ = 0;
std::atomic<std::size_t> size_[2] = {};

public:
std::atomic<std::size_t> collecter_cnt_ = 0;

private:
struct fake_client {
template <typename T>
fake_client& operator=(T&&) noexcept {
Expand Down
9 changes: 7 additions & 2 deletions src/coro_io/tests/test_client_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ TEST_CASE("test client pool") {
auto is_started = server.async_start();
REQUIRE(is_started);
auto pool = coro_io::client_pool<coro_rpc::coro_rpc_client>::create(
"127.0.0.1:8801", {.max_connection = 100, .idle_timeout = 300ms});
"127.0.0.1:8801", {.max_connection = 100,
.idle_timeout = 300ms,
.short_connect_idle_timeout = 50ms});
SpinLock lock;
ConditionVariable<SpinLock> cv;
auto res = co_await event(20, *pool, cv, lock);
Expand All @@ -97,8 +99,11 @@ TEST_CASE("test client pool") {
res = co_await event(200, *pool, cv, lock);
CHECK(res);
auto sz = pool->free_client_count();
CHECK(sz == 200);
co_await coro_io::sleep_for(150ms);
sz = pool->free_client_count();
CHECK((sz >= 100 && sz <= 105));
co_await coro_io::sleep_for(700ms);
co_await coro_io::sleep_for(550ms);
CHECK(pool->free_client_count() == 0);
server.stop();
}());
Expand Down

0 comments on commit 503555a

Please sign in to comment.