From 7eab4bf6f80b775036092e02c431c06320620ec6 Mon Sep 17 00:00:00 2001 From: Andrey Sibiryov Date: Mon, 30 Mar 2015 19:07:33 -0400 Subject: [PATCH 1/2] [ASIO] Refactored actor_t for thread safety. --- include/cocaine/detail/actor.hpp | 10 ++-- src/actor.cpp | 85 ++++++++++++++++++-------------- src/context.cpp | 5 +- src/engine.cpp | 4 +- src/service/locator.cpp | 4 +- 5 files changed, 63 insertions(+), 45 deletions(-) diff --git a/include/cocaine/detail/actor.hpp b/include/cocaine/detail/actor.hpp index bde9af1cf..abcbc162c 100644 --- a/include/cocaine/detail/actor.hpp +++ b/include/cocaine/detail/actor.hpp @@ -22,6 +22,7 @@ #define COCAINE_ACTOR_HPP #include "cocaine/common.hpp" +#include "cocaine/locked_ptr.hpp" #include #include @@ -40,14 +41,15 @@ class actor_t { // Initial dispatch. It's the protocol dispatch that will be initially assigned to all the new // sessions. In case of secure actors, this might as well be the protocol dispatch to switch to - // after the authentication process completes successfully. + // after the authentication process completes successfully. Constant. io::dispatch_ptr_t m_prototype; // I/O acceptor. Actors have a separate thread to accept new connections. After a connection is - // is accepted, it is assigned to a carefully choosen thread from the main thread pool. - std::unique_ptr m_acceptor; + // is accepted, it is assigned to a least busy thread from the main thread pool. Synchronized to + // allow concurrent observing and operations. + synchronized> m_acceptor; - // I/O authentication & processing. + // Main service thread. std::unique_ptr m_chamber; public: diff --git a/src/actor.cpp b/src/actor.cpp index fb3ff3dae..4506aca70 100644 --- a/src/actor.cpp +++ b/src/actor.cpp @@ -30,11 +30,11 @@ #include "cocaine/rpc/dispatch.hpp" -using namespace blackhole; - using namespace asio; using namespace asio::ip; +using namespace blackhole; + using namespace cocaine; // Actor internals @@ -61,10 +61,16 @@ class actor_t::accept_action_t: void actor_t::accept_action_t::operator()() { - parent->m_acceptor->async_accept(socket, std::bind(&accept_action_t::finalize, - shared_from_this(), - std::placeholders::_1 - )); + parent->m_acceptor.apply([this](std::unique_ptr& ptr) { + if(!ptr) { + COCAINE_LOG_ERROR(parent->m_log, "abnormal termination of actor remote client pump"); + return; + } + + using namespace std::placeholders; + + ptr->async_accept(socket, std::bind(&accept_action_t::finalize, shared_from_this(), _1)); + }); } void @@ -83,7 +89,9 @@ actor_t::accept_action_t::finalize(const std::error_code& ec) { return; default: - COCAINE_LOG_ERROR(parent->m_log, "dropped remote client: [%d] %s", ec.value(), ec.message()); + COCAINE_LOG_ERROR(parent->m_log, "unable to accept remote client: [%d] %s", ec.value(), + ec.message()); + break; } // TODO: Find out if it's always a good idea to continue accepting connections no matter what. @@ -128,14 +136,18 @@ actor_t::~actor_t() { std::vector actor_t::endpoints() const { - if(!m_chamber || !m_acceptor) { - return std::vector(); - } - tcp::resolver::iterator begin; try { - const auto local = m_acceptor->local_endpoint(); + const auto local = m_acceptor.apply( + [](const std::unique_ptr& ptr) -> tcp::endpoint + { + if(ptr) { + return ptr->local_endpoint(); + } else { + throw std::system_error(asio::error::not_connected); + } + }); if(!local.address().is_unspecified()) { return std::vector({local}); @@ -168,7 +180,7 @@ actor_t::endpoints() const { bool actor_t::is_active() const { - return m_chamber && m_acceptor; + return static_cast(*m_acceptor.synchronize()); } const io::basic_dispatch_t& @@ -178,23 +190,24 @@ actor_t::prototype() const { void actor_t::run() { - BOOST_ASSERT(!m_chamber); - - try { - m_acceptor = std::make_unique(*m_asio, tcp::endpoint { - m_context.config.network.endpoint, - m_context.mapper.assign(m_prototype->name()) - }); - } catch(const std::system_error& e) { - COCAINE_LOG_ERROR(m_log, "unable to bind local endpoint for service: [%d] %s", - e.code().value(), e.code().message()); - throw; - } + m_acceptor.apply([this](std::unique_ptr& ptr) { + try { + ptr = std::make_unique(*m_asio, tcp::endpoint { + m_context.config.network.endpoint, + m_context.mapper.assign(m_prototype->name()) + }); + } catch(const std::system_error& e) { + COCAINE_LOG_ERROR(m_log, "unable to bind local endpoint for service: [%d] %s", + e.code().value(), + e.code().message()); + throw; + } - std::error_code ec; - const auto endpoint = m_acceptor->local_endpoint(ec); + std::error_code ec; + const auto endpoint = ptr->local_endpoint(ec); - COCAINE_LOG_INFO(m_log, "exposing service on local endpoint %s", endpoint); + COCAINE_LOG_INFO(m_log, "exposing service on local endpoint %s", endpoint); + }); m_asio->post(std::bind(&accept_action_t::operator(), std::make_shared(this) @@ -206,20 +219,20 @@ actor_t::run() { void actor_t::terminate() { - BOOST_ASSERT(m_chamber); - // Do not wait for the service to finish all its stuff (like timers, etc). Graceful termination // happens only in engine chambers, because that's where client connections are being handled. m_asio->stop(); - std::error_code ec; - const auto endpoint = m_acceptor->local_endpoint(ec); + m_acceptor.apply([this](std::unique_ptr& ptr) { + std::error_code ec; + const auto endpoint = ptr->local_endpoint(ec); - COCAINE_LOG_INFO(m_log, "removing service from local endpoint %s", endpoint); + COCAINE_LOG_INFO(m_log, "removing service from local endpoint %s", endpoint); - // Does not block, unlike the one in execution_unit_t's destructors. - m_chamber = nullptr; - m_acceptor = nullptr; + // Does not block, unlike the one in execution_unit_t's destructors. + m_chamber = nullptr; + ptr = nullptr; + }); // Be ready to restart the actor. m_asio->reset(); diff --git a/src/context.cpp b/src/context.cpp index 2f8dabe90..20d243a38 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -186,9 +186,12 @@ context_t::remove(const std::string& name) { list.erase(it); }); + // Service is already terminated, so there's no reason to try to get its endpoints. + std::vector nothing; + // Fire off the signal to alert concerned subscribers about the service termination event. m_signals.invoke(service->prototype().name(), std::make_tuple( - service->endpoints(), + nothing, service->prototype().version(), service->prototype().root() )); diff --git a/src/engine.cpp b/src/engine.cpp index 2f2d3f9fe..a624a0a4a 100644 --- a/src/engine.cpp +++ b/src/engine.cpp @@ -30,11 +30,11 @@ #include -using namespace blackhole; - using namespace asio; using namespace asio::ip; +using namespace blackhole; + using namespace cocaine; class execution_unit_t::gc_action_t: diff --git a/src/service/locator.cpp b/src/service/locator.cpp index 058415dbd..be40da470 100644 --- a/src/service/locator.cpp +++ b/src/service/locator.cpp @@ -47,11 +47,11 @@ #include #include -using namespace blackhole; - using namespace asio; using namespace asio::ip; +using namespace blackhole; + using namespace cocaine::io; using namespace cocaine::service; From e75abe529424839de649e51b0bb73056c9bb4359 Mon Sep 17 00:00:00 2001 From: Andrey Sibiryov Date: Wed, 1 Apr 2015 11:21:11 -0400 Subject: [PATCH 2/2] [ASIO] Made actor.hpp a public header. --- include/cocaine/{detail => rpc}/actor.hpp | 0 src/actor.cpp | 2 +- src/cluster/multicast.cpp | 2 +- src/context.cpp | 3 ++- src/service/locator.cpp | 3 ++- src/service/node/app.cpp | 4 +--- src/service/node/slave.cpp | 4 ++-- 7 files changed, 9 insertions(+), 9 deletions(-) rename include/cocaine/{detail => rpc}/actor.hpp (100%) diff --git a/include/cocaine/detail/actor.hpp b/include/cocaine/rpc/actor.hpp similarity index 100% rename from include/cocaine/detail/actor.hpp rename to include/cocaine/rpc/actor.hpp diff --git a/src/actor.cpp b/src/actor.cpp index 4506aca70..1536d8c25 100644 --- a/src/actor.cpp +++ b/src/actor.cpp @@ -18,7 +18,7 @@ along with this program. If not, see . */ -#include "cocaine/detail/actor.hpp" +#include "cocaine/rpc/actor.hpp" #include "cocaine/api/service.hpp" diff --git a/src/cluster/multicast.cpp b/src/cluster/multicast.cpp index 90be49b3e..328eb585b 100644 --- a/src/cluster/multicast.cpp +++ b/src/cluster/multicast.cpp @@ -23,7 +23,7 @@ #include "cocaine/context.hpp" #include "cocaine/logging.hpp" -#include "cocaine/detail/actor.hpp" +#include "cocaine/rpc/actor.hpp" #include "cocaine/traits/endpoint.hpp" #include "cocaine/traits/tuple.hpp" diff --git a/src/context.cpp b/src/context.cpp index 20d243a38..9f8c196db 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -22,7 +22,6 @@ #include "cocaine/api/service.hpp" -#include "cocaine/detail/actor.hpp" #include "cocaine/detail/engine.hpp" #include "cocaine/detail/essentials.hpp" @@ -34,6 +33,8 @@ #include "cocaine/logging.hpp" +#include "cocaine/rpc/actor.hpp" + #include #include diff --git a/src/service/locator.cpp b/src/service/locator.cpp index be40da470..83a5a3aac 100644 --- a/src/service/locator.cpp +++ b/src/service/locator.cpp @@ -26,13 +26,14 @@ #include "cocaine/context.hpp" -#include "cocaine/detail/actor.hpp" #include "cocaine/detail/unique_id.hpp" #include "cocaine/idl/streaming.hpp" #include "cocaine/logging.hpp" +#include "cocaine/rpc/actor.hpp" + #include "cocaine/traits/endpoint.hpp" #include "cocaine/traits/graph.hpp" #include "cocaine/traits/map.hpp" diff --git a/src/service/node/app.cpp b/src/service/node/app.cpp index 4673b17f0..44fc9a854 100644 --- a/src/service/node/app.cpp +++ b/src/service/node/app.cpp @@ -25,8 +25,6 @@ #include "cocaine/context.hpp" #include "cocaine/defaults.hpp" -#include "cocaine/detail/actor.hpp" - #include "cocaine/detail/service/node/engine.hpp" #include "cocaine/detail/service/node/event.hpp" #include "cocaine/detail/service/node/manifest.hpp" @@ -38,8 +36,8 @@ #include "cocaine/logging.hpp" +#include "cocaine/rpc/actor.hpp" #include "cocaine/rpc/asio/channel.hpp" - #include "cocaine/rpc/dispatch.hpp" #include "cocaine/rpc/upstream.hpp" diff --git a/src/service/node/slave.cpp b/src/service/node/slave.cpp index b1c6cd205..aceeea820 100644 --- a/src/service/node/slave.cpp +++ b/src/service/node/slave.cpp @@ -22,8 +22,6 @@ #include "cocaine/context.hpp" -#include "cocaine/detail/actor.hpp" - #include "cocaine/detail/service/node/engine.hpp" #include "cocaine/detail/service/node/event.hpp" #include "cocaine/detail/service/node/manifest.hpp" @@ -35,6 +33,8 @@ #include "cocaine/logging.hpp" +#include "cocaine/rpc/actor.hpp" + #include "cocaine/traits/enum.hpp" #include "cocaine/traits/literal.hpp"