Skip to content

Commit

Permalink
Merge pull request #183 from cocaine/kobolog/actor-threads
Browse files Browse the repository at this point in the history
[ASIO] Make actors better and more wonderful.
  • Loading branch information
Andrey Sibiryov committed Apr 1, 2015
2 parents 9a7e92b + e75abe5 commit 8738d5f
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#define COCAINE_ACTOR_HPP

#include "cocaine/common.hpp"
#include "cocaine/locked_ptr.hpp"

#include <asio/io_service.hpp>
#include <asio/ip/tcp.hpp>
Expand All @@ -40,14 +41,15 @@ class actor_t {

// Initial dispatch. It's the protocol dispatch that will be initially assigned to all the new
// sessions. In case of secure actors, this might as well be the protocol dispatch to switch to
// after the authentication process completes successfully.
// after the authentication process completes successfully. Constant.
io::dispatch_ptr_t m_prototype;

// I/O acceptor. Actors have a separate thread to accept new connections. After a connection is
// is accepted, it is assigned to a carefully choosen thread from the main thread pool.
std::unique_ptr<asio::ip::tcp::acceptor> m_acceptor;
// is accepted, it is assigned to a least busy thread from the main thread pool. Synchronized to
// allow concurrent observing and operations.
synchronized<std::unique_ptr<asio::ip::tcp::acceptor>> m_acceptor;

// I/O authentication & processing.
// Main service thread.
std::unique_ptr<io::chamber_t> m_chamber;

public:
Expand Down
87 changes: 50 additions & 37 deletions src/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#include "cocaine/detail/actor.hpp"
#include "cocaine/rpc/actor.hpp"

#include "cocaine/api/service.hpp"

Expand All @@ -30,11 +30,11 @@

#include "cocaine/rpc/dispatch.hpp"

using namespace blackhole;

using namespace asio;
using namespace asio::ip;

using namespace blackhole;

using namespace cocaine;

// Actor internals
Expand All @@ -61,10 +61,16 @@ class actor_t::accept_action_t:

void
actor_t::accept_action_t::operator()() {
parent->m_acceptor->async_accept(socket, std::bind(&accept_action_t::finalize,
shared_from_this(),
std::placeholders::_1
));
parent->m_acceptor.apply([this](std::unique_ptr<tcp::acceptor>& ptr) {
if(!ptr) {
COCAINE_LOG_ERROR(parent->m_log, "abnormal termination of actor remote client pump");
return;
}

using namespace std::placeholders;

ptr->async_accept(socket, std::bind(&accept_action_t::finalize, shared_from_this(), _1));
});
}

void
Expand All @@ -83,7 +89,9 @@ actor_t::accept_action_t::finalize(const std::error_code& ec) {
return;

default:
COCAINE_LOG_ERROR(parent->m_log, "dropped remote client: [%d] %s", ec.value(), ec.message());
COCAINE_LOG_ERROR(parent->m_log, "unable to accept remote client: [%d] %s", ec.value(),
ec.message());
break;
}

// TODO: Find out if it's always a good idea to continue accepting connections no matter what.
Expand Down Expand Up @@ -128,14 +136,18 @@ actor_t::~actor_t() {

std::vector<tcp::endpoint>
actor_t::endpoints() const {
if(!m_chamber || !m_acceptor) {
return std::vector<tcp::endpoint>();
}

tcp::resolver::iterator begin;

try {
const auto local = m_acceptor->local_endpoint();
const auto local = m_acceptor.apply(
[](const std::unique_ptr<tcp::acceptor>& ptr) -> tcp::endpoint
{
if(ptr) {
return ptr->local_endpoint();
} else {
throw std::system_error(asio::error::not_connected);
}
});

if(!local.address().is_unspecified()) {
return std::vector<tcp::endpoint>({local});
Expand Down Expand Up @@ -168,7 +180,7 @@ actor_t::endpoints() const {

bool
actor_t::is_active() const {
return m_chamber && m_acceptor;
return static_cast<bool>(*m_acceptor.synchronize());
}

const io::basic_dispatch_t&
Expand All @@ -178,23 +190,24 @@ actor_t::prototype() const {

void
actor_t::run() {
BOOST_ASSERT(!m_chamber);

try {
m_acceptor = std::make_unique<tcp::acceptor>(*m_asio, tcp::endpoint {
m_context.config.network.endpoint,
m_context.mapper.assign(m_prototype->name())
});
} catch(const std::system_error& e) {
COCAINE_LOG_ERROR(m_log, "unable to bind local endpoint for service: [%d] %s",
e.code().value(), e.code().message());
throw;
}
m_acceptor.apply([this](std::unique_ptr<tcp::acceptor>& ptr) {
try {
ptr = std::make_unique<tcp::acceptor>(*m_asio, tcp::endpoint {
m_context.config.network.endpoint,
m_context.mapper.assign(m_prototype->name())
});
} catch(const std::system_error& e) {
COCAINE_LOG_ERROR(m_log, "unable to bind local endpoint for service: [%d] %s",
e.code().value(),
e.code().message());
throw;
}

std::error_code ec;
const auto endpoint = m_acceptor->local_endpoint(ec);
std::error_code ec;
const auto endpoint = ptr->local_endpoint(ec);

COCAINE_LOG_INFO(m_log, "exposing service on local endpoint %s", endpoint);
COCAINE_LOG_INFO(m_log, "exposing service on local endpoint %s", endpoint);
});

m_asio->post(std::bind(&accept_action_t::operator(),
std::make_shared<accept_action_t>(this)
Expand All @@ -206,20 +219,20 @@ actor_t::run() {

void
actor_t::terminate() {
BOOST_ASSERT(m_chamber);

// Do not wait for the service to finish all its stuff (like timers, etc). Graceful termination
// happens only in engine chambers, because that's where client connections are being handled.
m_asio->stop();

std::error_code ec;
const auto endpoint = m_acceptor->local_endpoint(ec);
m_acceptor.apply([this](std::unique_ptr<tcp::acceptor>& ptr) {
std::error_code ec;
const auto endpoint = ptr->local_endpoint(ec);

COCAINE_LOG_INFO(m_log, "removing service from local endpoint %s", endpoint);
COCAINE_LOG_INFO(m_log, "removing service from local endpoint %s", endpoint);

// Does not block, unlike the one in execution_unit_t's destructors.
m_chamber = nullptr;
m_acceptor = nullptr;
// Does not block, unlike the one in execution_unit_t's destructors.
m_chamber = nullptr;
ptr = nullptr;
});

// Be ready to restart the actor.
m_asio->reset();
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/multicast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include "cocaine/context.hpp"
#include "cocaine/logging.hpp"

#include "cocaine/detail/actor.hpp"
#include "cocaine/rpc/actor.hpp"

#include "cocaine/traits/endpoint.hpp"
#include "cocaine/traits/tuple.hpp"
Expand Down
8 changes: 6 additions & 2 deletions src/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

#include "cocaine/api/service.hpp"

#include "cocaine/detail/actor.hpp"
#include "cocaine/detail/engine.hpp"
#include "cocaine/detail/essentials.hpp"

Expand All @@ -34,6 +33,8 @@

#include "cocaine/logging.hpp"

#include "cocaine/rpc/actor.hpp"

#include <blackhole/scoped_attributes.hpp>

#include <boost/spirit/include/karma_char.hpp>
Expand Down Expand Up @@ -186,9 +187,12 @@ context_t::remove(const std::string& name) {
list.erase(it);
});

// Service is already terminated, so there's no reason to try to get its endpoints.
std::vector<asio::ip::tcp::endpoint> nothing;

// Fire off the signal to alert concerned subscribers about the service termination event.
m_signals.invoke<io::context::service::removed>(service->prototype().name(), std::make_tuple(
service->endpoints(),
nothing,
service->prototype().version(),
service->prototype().root()
));
Expand Down
4 changes: 2 additions & 2 deletions src/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@

#include <blackhole/scoped_attributes.hpp>

using namespace blackhole;

using namespace asio;
using namespace asio::ip;

using namespace blackhole;

using namespace cocaine;

class execution_unit_t::gc_action_t:
Expand Down
7 changes: 4 additions & 3 deletions src/service/locator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@

#include "cocaine/context.hpp"

#include "cocaine/detail/actor.hpp"
#include "cocaine/detail/unique_id.hpp"

#include "cocaine/idl/streaming.hpp"

#include "cocaine/logging.hpp"

#include "cocaine/rpc/actor.hpp"

#include "cocaine/traits/endpoint.hpp"
#include "cocaine/traits/graph.hpp"
#include "cocaine/traits/map.hpp"
Expand All @@ -47,11 +48,11 @@
#include <boost/spirit/include/karma_list.hpp>
#include <boost/spirit/include/karma_string.hpp>

using namespace blackhole;

using namespace asio;
using namespace asio::ip;

using namespace blackhole;

using namespace cocaine::io;
using namespace cocaine::service;

Expand Down
4 changes: 1 addition & 3 deletions src/service/node/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
#include "cocaine/context.hpp"
#include "cocaine/defaults.hpp"

#include "cocaine/detail/actor.hpp"

#include "cocaine/detail/service/node/engine.hpp"
#include "cocaine/detail/service/node/event.hpp"
#include "cocaine/detail/service/node/manifest.hpp"
Expand All @@ -38,8 +36,8 @@

#include "cocaine/logging.hpp"

#include "cocaine/rpc/actor.hpp"
#include "cocaine/rpc/asio/channel.hpp"

#include "cocaine/rpc/dispatch.hpp"
#include "cocaine/rpc/upstream.hpp"

Expand Down
4 changes: 2 additions & 2 deletions src/service/node/slave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@

#include "cocaine/context.hpp"

#include "cocaine/detail/actor.hpp"

#include "cocaine/detail/service/node/engine.hpp"
#include "cocaine/detail/service/node/event.hpp"
#include "cocaine/detail/service/node/manifest.hpp"
Expand All @@ -35,6 +33,8 @@

#include "cocaine/logging.hpp"

#include "cocaine/rpc/actor.hpp"

#include "cocaine/traits/enum.hpp"
#include "cocaine/traits/literal.hpp"

Expand Down

0 comments on commit 8738d5f

Please sign in to comment.