Skip to content

Commit

Permalink
Merge branch 'master' into v0.12
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Sibiryov committed Apr 23, 2015
2 parents b34bc3a + ddfccec commit 380866d
Show file tree
Hide file tree
Showing 26 changed files with 241 additions and 223 deletions.
1 change: 0 additions & 1 deletion include/cocaine/context/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

#include <asio/ip/address.hpp>

#define BOOST_BIND_NO_PLACEHOLDERS
#include <blackhole/blackhole.hpp>

namespace cocaine {
Expand Down
15 changes: 7 additions & 8 deletions include/cocaine/context/signal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ template<class Tag>
struct event_visitor:
public boost::static_visitor<void>
{
event_visitor(const std::shared_ptr<dispatch<Tag>>& slot_, asio::io_service& asio_):
event_visitor(const std::shared_ptr<const dispatch<Tag>>& slot_, asio::io_service& asio_):
slot(slot_),
asio(asio_)
{ }
Expand All @@ -85,7 +85,7 @@ struct event_visitor:
}

private:
const std::shared_ptr<dispatch<Tag>>& slot;
const std::shared_ptr<const dispatch<Tag>>& slot;
asio::io_service& asio;
};

Expand All @@ -96,7 +96,7 @@ class retroactive_signal {
typedef typename io::make_frozen_over<Tag>::type variant_type;

struct subscriber_t {
std::weak_ptr<dispatch<Tag>> slot;
std::weak_ptr<const dispatch<Tag>> slot;
asio::io_service& asio;
};

Expand All @@ -106,14 +106,13 @@ class retroactive_signal {

public:
void
listen(const std::shared_ptr<dispatch<Tag>>& slot, asio::io_service& asio) {
listen(const std::shared_ptr<const dispatch<Tag>>& slot, asio::io_service& asio) {
subscribers->push_back(subscriber_t{slot, asio});

auto ptr = history.synchronize();
auto ptr = history.synchronize();
auto visitor = aux::event_visitor<Tag>(slot, asio);

std::for_each(ptr->begin(), ptr->end(), [&](const variant_type& event) {
boost::apply_visitor(aux::event_visitor<Tag>(slot, asio), event);
});
std::for_each(ptr->begin(), ptr->end(), boost::apply_visitor(visitor));
}

template<class Event, class... Args>
Expand Down
1 change: 0 additions & 1 deletion include/cocaine/detail/chamber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include <asio/deadline_timer.hpp>
#include <asio/io_service.hpp>

#define BOOST_BIND_NO_PLACEHOLDERS
#include <boost/thread/thread.hpp>

namespace cocaine { namespace io {
Expand Down
5 changes: 5 additions & 0 deletions include/cocaine/detail/cluster/multicast.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

#include "cocaine/api/cluster.hpp"

#include "cocaine/idl/context.hpp"

#include <asio/deadline_timer.hpp>

#include <asio/ip/tcp.hpp>
Expand Down Expand Up @@ -61,6 +63,9 @@ class multicast_t:
// Announce expiration timeouts.
std::map<std::string, std::unique_ptr<asio::deadline_timer>> m_expirations;

// Signal to handle context ready event
std::shared_ptr<dispatch<io::context_tag>> m_signals;

public:
multicast_t(context_t& context, interface& locator, const std::string& name, const dynamic_t& args);

Expand Down
5 changes: 5 additions & 0 deletions include/cocaine/detail/cluster/predefine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

#include "cocaine/api/cluster.hpp"

#include "cocaine/idl/context.hpp"

#include <asio/deadline_timer.hpp>
#include <asio/ip/tcp.hpp>

Expand Down Expand Up @@ -52,6 +54,9 @@ class predefine_t:
// Simply try linking the whole predefined list every timer tick.
asio::deadline_timer m_timer;

// Signal to handle context ready event
std::shared_ptr<dispatch<io::context_tag>> m_signals;

public:
predefine_t(context_t& context, interface& locator, const std::string& name, const dynamic_t& args);

Expand Down
10 changes: 6 additions & 4 deletions include/cocaine/detail/service/locator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class locator_t:
public dispatch<io::locator_tag>
{
class remote_t;
class expose_slot_t;
class publish_slot_t;

typedef std::map<std::string, continuum_t> rg_map_t;

Expand Down Expand Up @@ -113,13 +113,13 @@ class locator_t:
synchronized<client_map_t> m_clients;

// Snapshot of the cluster service disposition. Synchronized with incoming streams.
std::map<std::string, partition_view_t> m_protocol;
std::map<std::string, partition_view_t> m_aggregate;

// Outgoing remote locator streams indexed by node uuid.
synchronized<remote_map_t> m_remotes;

// Snapshot of the local service disposition. Synchronized with outgoing remote streams.
std::map<std::string, results::resolve> m_snapshot;
std::map<std::string, results::resolve> m_snapshots;

// Outgoing router streams indexed by some arbitrary router-provided uuid.
synchronized<router_map_t> m_routers;
Expand Down Expand Up @@ -172,8 +172,10 @@ class locator_t:

// Context signals

enum class modes { exposed, removed };

void
on_service(const std::string& name, const results::resolve& meta, bool active);
on_service(const std::string& name, const results::resolve& meta, modes mode);

void
on_context_shutdown();
Expand Down
13 changes: 13 additions & 0 deletions include/cocaine/idl/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ struct context_tag;

struct context {

struct prepared {
typedef context_tag tag;
typedef context_tag dispatch_type;

static const char* alias() {
return "prepared";
}

typedef void upstream_type;
};

struct shutdown {
typedef context_tag tag;
typedef context_tag dispatch_type;
Expand Down Expand Up @@ -85,6 +96,8 @@ struct protocol<context_tag> {
>::type version;

typedef boost::mpl::list<
// Fired after context bootstrap. Means that all essential services are now running.
context::prepared,
// Fired first thing on context shutdown. This is a very good time to cleanup persistent
// connections, synchronize disk state and so on.
context::shutdown,
Expand Down
24 changes: 13 additions & 11 deletions include/cocaine/idl/locator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ struct cluster {
>::tag upstream_type;
};

struct expose_tag;
struct publish_tag;

struct expose {
struct publish {
struct discard {
typedef locator::expose_tag tag;
typedef locator::publish_tag tag;

static const char* alias() {
return "discard";
Expand All @@ -125,17 +125,19 @@ struct expose {
};

typedef locator_tag tag;
typedef locator::expose_tag dispatch_type;
typedef locator::publish_tag dispatch_type;

static const char* alias() {
return "expose";
return "publish";
}

typedef boost::mpl::list<
/* The alias of the service to be impersonated. */
/* The name of the external service to be published. */
std::string,
/* Endpoints of the service to be impersonated. */
std::vector<asio::ip::tcp::endpoint>
/* External service endpoints. */
std::vector<asio::ip::tcp::endpoint>,
/* Service metadata, if the external service is using native protocol. */
optional<std::tuple<unsigned int, graph_root_t>>
>::type argument_type;
};

Expand Down Expand Up @@ -170,21 +172,21 @@ struct protocol<locator_tag> {
locator::connect,
locator::refresh,
locator::cluster,
locator::expose,
locator::publish,
locator::routing
>::type messages;

typedef locator scope;
};

template<>
struct protocol<locator::expose_tag> {
struct protocol<locator::publish_tag> {
typedef boost::mpl::int_<
1
>::type version;

typedef boost::mpl::list<
locator::expose::discard
locator::publish::discard
>::type messages;
};

Expand Down
1 change: 0 additions & 1 deletion include/cocaine/logging.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

#include "cocaine/common.hpp"

#define BOOST_BIND_NO_PLACEHOLDERS
#include <blackhole/blackhole.hpp>
#include <blackhole/keyword.hpp>
#include <blackhole/logger/wrapper.hpp>
Expand Down
6 changes: 3 additions & 3 deletions include/cocaine/rpc/dispatch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ template<class Tag>
class dispatch:
public io::basic_dispatch_t
{
static const io::graph_root_t kGraph;
static const io::graph_root_t kProtocol;

// Slot construction

Expand Down Expand Up @@ -151,7 +151,7 @@ class dispatch:
virtual
auto
root() const -> const io::graph_root_t& {
return kGraph;
return kProtocol;
}

virtual
Expand All @@ -168,7 +168,7 @@ class dispatch:
};

template<class Tag>
const io::graph_root_t dispatch<Tag>::kGraph = io::traverse<Tag>().get();
const io::graph_root_t dispatch<Tag>::kProtocol = io::traverse<Tag>().get();

namespace aux {

Expand Down
2 changes: 1 addition & 1 deletion include/cocaine/traits/optional.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

#include "cocaine/traits.hpp"

#include "boost/optional.hpp"
#include <boost/optional.hpp>

namespace cocaine { namespace io {

Expand Down
5 changes: 2 additions & 3 deletions src/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ actor_t::accept_action_t::operator()() {
return;
}

using namespace std::placeholders;

ptr->async_accept(socket, std::bind(&accept_action_t::finalize, shared_from_this(), _1));
ptr->async_accept(socket, std::bind(&accept_action_t::finalize, shared_from_this(),
std::placeholders::_1));
});
}

Expand Down
17 changes: 11 additions & 6 deletions src/cluster/multicast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
#include "cocaine/context.hpp"
#include "cocaine/logging.hpp"

#include "cocaine/rpc/dispatch.hpp"
#include "cocaine/rpc/actor.hpp"

#include "cocaine/traits/endpoint.hpp"
#include "cocaine/traits/graph.hpp"
#include "cocaine/traits/tuple.hpp"
#include "cocaine/traits/vector.hpp"

Expand All @@ -35,12 +37,13 @@
using namespace asio;
using namespace asio::ip;

using namespace cocaine::io;
using namespace cocaine::cluster;

namespace cocaine {

namespace ph = std::placeholders;

namespace cocaine {

template<>
struct dynamic_converter<address> {
typedef address result_type;
Expand Down Expand Up @@ -133,8 +136,10 @@ multicast_t::multicast_t(context_t& context, interface& locator, const std::stri
std::bind(&multicast_t::on_receive, this, ph::_1, ph::_2, announce)
);

m_timer.expires_from_now(m_cfg.interval);
m_timer.async_wait(std::bind(&multicast_t::on_publish, this, ph::_1));
m_signals = std::make_shared<dispatch<context_tag>>(name);
m_signals->on<context::prepared>(std::bind(&multicast_t::on_publish, this, std::error_code()));

context.listen(m_signals, m_locator.asio());
}

multicast_t::~multicast_t() {
Expand Down Expand Up @@ -171,7 +176,7 @@ multicast_t::on_publish(const std::error_code& ec) {
msgpack::sbuffer target;
msgpack::packer<msgpack::sbuffer> packer(target);

io::type_traits<announce_t::tuple_type>::pack(packer, std::forward_as_tuple(
type_traits<announce_t::tuple_type>::pack(packer, std::forward_as_tuple(
m_locator.uuid(),
endpoints
));
Expand Down Expand Up @@ -218,7 +223,7 @@ multicast_t::on_receive(const std::error_code& ec, size_t bytes_received,
std::vector<tcp::endpoint> endpoints;

try {
io::type_traits<announce_t::tuple_type>::unpack(unpacked.get(), std::tie(uuid, endpoints));
type_traits<announce_t::tuple_type>::unpack(unpacked.get(), std::tie(uuid, endpoints));
} catch(const msgpack::type_error& e) {
COCAINE_LOG_ERROR(m_log, "unable to decode announce: %s", e.what());
return;
Expand Down
16 changes: 12 additions & 4 deletions src/cluster/predefine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@

#include "cocaine/detail/cluster/predefine.hpp"


#include "cocaine/context.hpp"
#include "cocaine/logging.hpp"

#include "cocaine/traits/endpoint.hpp"
#include "cocaine/traits/graph.hpp"
#include "cocaine/traits/vector.hpp"

#include <boost/spirit/include/karma_generate.hpp>
#include <boost/spirit/include/karma_list.hpp>
#include <boost/spirit/include/karma_stream.hpp>
Expand All @@ -33,12 +38,13 @@
using namespace asio;
using namespace asio::ip;

using namespace cocaine::io;
using namespace cocaine::cluster;

namespace cocaine {

namespace ph = std::placeholders;

namespace cocaine {

template<>
struct dynamic_converter<predefine_cfg_t> {
typedef predefine_cfg_t result_type;
Expand Down Expand Up @@ -106,8 +112,10 @@ predefine_t::predefine_t(context_t& context, interface& locator, const std::stri
);
}

m_timer.expires_from_now(m_cfg.interval);
m_timer.async_wait(std::bind(&predefine_t::on_announce, this, ph::_1));
m_signals = std::make_shared<dispatch<context_tag>>(name);
m_signals->on<context::prepared>(std::bind(&predefine_t::on_announce, this, std::error_code()));

context.listen(m_signals, m_locator.asio());
}

predefine_t::~predefine_t() {
Expand Down
2 changes: 2 additions & 0 deletions src/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,5 +277,7 @@ context_t::bootstrap() {
COCAINE_LOG_ERROR(m_logger, "emergency core shutdown");

throw cocaine::error_t("couldn't start %d service(s)", errored.size());
} else {
m_signals.invoke<context::prepared>();
}
}
Loading

0 comments on commit 380866d

Please sign in to comment.