Skip to content

Commit

Permalink
Merge pull request #101 from DZabavchik/feature/pattern_registrations…
Browse files Browse the repository at this point in the history
…_subs

Feature/pattern registrations & subs
  • Loading branch information
Tobias Oberstein committed Feb 19, 2016
2 parents ef1fa86 + f7627b9 commit 5ad7547
Show file tree
Hide file tree
Showing 17 changed files with 402 additions and 30 deletions.
9 changes: 9 additions & 0 deletions SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ else:
raise SCons.Errors.UserError, "Neither MSGPACK_ROOT, nor MSGPACK_INCLUDES + MSGPACK_LIBS was set!"


if os.environ.has_key('OPENSSL_ROOT'):
env.Append(CPPPATH = [os.path.join(os.environ['OPENSSL_ROOT'], 'include')])
env.Append(LIBPATH = [os.path.join(os.environ['OPENSSL_ROOT'], 'lib')])
elif os.environ.has_key('OPENSSL_INCLUDES') and os.environ.has_key('OPENSSL_LIBS'):
env.Append(CPPPATH = [os.environ['OPENSSL_INCLUDES']])
env.Append(LIBPATH = [os.environ['OPENSSL_LIBS']])
#not raising error, since this it may build fine and OpenSSL fix is for Mac only


# Autobahn (is included as in `#include <autobahn/autobahn.hpp`)
#
env.Append(CPPPATH = ['#'])
Expand Down
9 changes: 8 additions & 1 deletion autobahn/autobahn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,20 @@
#ifndef AUTOBAHN_HPP
#define AUTOBAHN_HPP

#ifdef _WIN32
#define MSGPACK_DISABLE_LEGACY_CONVERT
#define MSGPACK_DEFAULT_API_VERSION 1
#define MSGPACK_DISABLE_LEGACY_CONVERT
#endif

#include "wamp_event.hpp"
#include "wamp_invocation.hpp"
#include "wamp_session.hpp"
#include "wamp_tcp_transport.hpp"
#include "wamp_transport.hpp"
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
#include "wamp_uds_transport.hpp"

#endif
/*! \mainpage Reference Documentation
*
* Welcome to the reference documentation of <b>Autobahn</b>|Cpp.<br>
Expand Down
72 changes: 72 additions & 0 deletions autobahn/wamp_arguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,78 @@ using wamp_kw_arguments = std::unordered_map<std::string, msgpack::object>;
static const msgpack::object EMPTY_ARGUMENTS(std::array<msgpack::object, 0>(), nullptr);
static const msgpack::object EMPTY_KW_ARGUMENTS(wamp_kw_arguments(), nullptr);


//msgpack map utilities.
//TODO: refactor event & invocation to used these
template <typename T>
inline T value_for_key(const msgpack::object& object, const std::string& key)
{
if (object.type != msgpack::type::MAP) {
throw msgpack::type_error();
}
for (std::size_t i = 0; i < object.via.map.size; ++i) {
const msgpack::object_kv& kv = object.via.map.ptr[i];
if (kv.key.type == msgpack::type::STR && key.size() == kv.key.via.str.size
&& key.compare(0, key.size(), kv.key.via.str.ptr, kv.key.via.str.size) == 0)
{
return kv.val.as<T>();
}
}
throw std::out_of_range(key + " keyword argument doesn't exist");
}

template <typename T>
inline T value_for_key(const msgpack::object& object, const char* key)
{
if (object.type != msgpack::type::MAP) {
throw msgpack::type_error();
}
std::size_t key_size = strlen(key);
for (std::size_t i = 0; i < object.via.map.size; ++i) {
const msgpack::object_kv& kv = object.via.map.ptr[i];
if (kv.key.type == msgpack::type::STR && key_size == kv.key.via.str.size
&& memcmp(key, kv.key.via.str.ptr, key_size) == 0)
{
return kv.val.as<T>();
}
}
throw std::out_of_range(std::string(key) + " keyword argument doesn't exist");
}

template <typename T>
inline T value_for_key_or(const msgpack::object& object, const std::string& key, const T& fallback)
{
if (object.type != msgpack::type::MAP) {
throw msgpack::type_error();
}
for (std::size_t i = 0; i < object.via.map.size; ++i) {
const msgpack::object_kv& kv = object.via.map.ptr[i];
if (kv.key.type == msgpack::type::STR && key.size() == kv.key.via.str.size
&& key.compare(0, key.size(), kv.key.via.str.ptr, kv.key.via.str.size) == 0)
{
return kv.val.as<T>();
}
}
return fallback;
}

template <typename T>
inline T value_for_key_or(const msgpack::object& object, const char* key, const T& fallback)
{
if (object.type != msgpack::type::MAP) {
throw msgpack::type_error();
}
std::size_t key_size = strlen(key);
for (std::size_t i = 0; i < object.via.map.size; ++i) {
const msgpack::object_kv& kv = object.via.map.ptr[i];
if (kv.key.type == msgpack::type::STR && key_size == kv.key.via.str.size
&& memcmp(key, kv.key.via.str.ptr, key_size) == 0)
{
return kv.val.as<T>();
}
}
return fallback;
}
} // namespace autobahn

#endif // AUTOBAHN_WAMP_ARGUMENTS_HPP
10 changes: 10 additions & 0 deletions autobahn/wamp_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ class wamp_event
public:
wamp_event(msgpack::zone&& zone);


//add URI and details
/*!
* Event URI. Used by prefix & wildcard subscriptions
*/
const std::string& uri() const;

/*!
* The number of positional arguments published by the event.
*/
Expand Down Expand Up @@ -186,11 +193,14 @@ class wamp_event

void set_arguments(const msgpack::object& arguments);
void set_kw_arguments(const msgpack::object& kw_arguments);
void set_details(const msgpack::object& details);

private:
msgpack::zone m_zone;
msgpack::object m_arguments;
msgpack::object m_kw_arguments;
std::string m_uri;

};

} // namespace autobahn
Expand Down
11 changes: 11 additions & 0 deletions autobahn/wamp_event.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include <boost/lexical_cast.hpp>
#include <stdexcept>
#include "wamp_arguments.hpp"

namespace autobahn {

Expand All @@ -40,6 +41,11 @@ inline wamp_event::wamp_event(msgpack::zone&& zone)
{
}

inline const std::string& wamp_event::uri() const
{
return m_uri;
}

inline std::size_t wamp_event::number_of_arguments() const
{
return m_arguments.type == msgpack::type::ARRAY ? m_arguments.via.array.size : 0;
Expand Down Expand Up @@ -170,4 +176,9 @@ inline void wamp_event::set_kw_arguments(const msgpack::object& kw_arguments)
m_kw_arguments = kw_arguments;
}

inline void wamp_event::set_details(const msgpack::object& details)
{
m_uri = std::move(value_for_key_or<std::string>(details, "topic", std::string()));
}

} // namespace autobahn
9 changes: 8 additions & 1 deletion autobahn/wamp_invocation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ class wamp_invocation_impl
wamp_invocation_impl();
wamp_invocation_impl(wamp_invocation_impl&&) = delete; // copy wamp_invocation instead

//add URI and details
/*!
* Invocatition procedure URI. Used by prefix & wildcard registered procedures
*/
const std::string& uri() const;
/*!
* The number of positional arguments passed to the invocation.
*/
Expand Down Expand Up @@ -227,7 +232,8 @@ class wamp_invocation_impl

using send_result_fn = std::function<void(const std::shared_ptr<wamp_message>&)>;
void set_send_result_fn(send_result_fn&&);
void set_request_id(std::uint64_t);
void set_details(const msgpack::object& details);
void set_request_id(std::uint64_t);
void set_zone(msgpack::zone&&);
void set_arguments(const msgpack::object& arguments);
void set_kw_arguments(const msgpack::object& kw_arguments);
Expand All @@ -242,6 +248,7 @@ class wamp_invocation_impl
msgpack::object m_kw_arguments;
send_result_fn m_send_result_fn;
std::uint64_t m_request_id;
std::string m_uri;
};

using wamp_invocation = std::shared_ptr<wamp_invocation_impl>;
Expand Down
13 changes: 12 additions & 1 deletion autobahn/wamp_invocation.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ inline wamp_invocation_impl::wamp_invocation_impl()
{
}

inline const std::string& wamp_invocation_impl::uri() const
{
return m_uri;
}

inline std::size_t wamp_invocation_impl::number_of_arguments() const
{
return m_arguments.type == msgpack::type::ARRAY ? m_arguments.via.array.size : 0;
Expand Down Expand Up @@ -151,9 +156,10 @@ inline T wamp_invocation_impl::kw_argument_or(const char* key, const T& fallback
return kv.val.as<T>();
}
}
throw fallback;
return fallback;
}


template <typename Map>
inline Map wamp_invocation_impl::kw_arguments() const
{
Expand Down Expand Up @@ -274,6 +280,11 @@ inline void wamp_invocation_impl::set_send_result_fn(send_result_fn&& send_resul
m_send_result_fn = std::move(send_result);
}

inline void wamp_invocation_impl::set_details(const msgpack::object& details)
{
m_uri = std::move(value_for_key_or<std::string>(details, "procedure", std::string()));
}

inline void wamp_invocation_impl::set_request_id(std::uint64_t request_id)
{
m_request_id = request_id;
Expand Down
5 changes: 4 additions & 1 deletion autobahn/wamp_session.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ inline void wamp_session::process_invocation(wamp_message&& message)

wamp_invocation invocation = std::make_shared<wamp_invocation_impl>();
invocation->set_request_id(request_id);

invocation->set_details(message.field(3));
if (message.size() > 4) {
if (!message.is_field_type(4, msgpack::type::ARRAY)) {
throw protocol_error("INVOCATION.Arguments must be an array/vector");
Expand Down Expand Up @@ -1135,6 +1135,9 @@ inline void wamp_session::process_event(wamp_message&& message)
}

wamp_event event(std::move(message.zone()));

event.set_details(message.field(3));

if (message.size() > 4) {
if (!message.is_field_type(4, msgpack::type::ARRAY)) {
throw protocol_error("EVENT - EVENT.Arguments must be a list");
Expand Down
3 changes: 3 additions & 0 deletions autobahn/wamp_subscribe_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class wamp_subscribe_options
public:
wamp_subscribe_options();

//Convenience constructor
wamp_subscribe_options(const std::string& match);

wamp_subscribe_options(wamp_subscribe_options&& other) = delete;
wamp_subscribe_options(const wamp_subscribe_options& other) = delete;
wamp_subscribe_options& operator=(wamp_subscribe_options&& other) = delete;
Expand Down
7 changes: 7 additions & 0 deletions autobahn/wamp_subscribe_options.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ inline wamp_subscribe_options::wamp_subscribe_options()
{
}

inline wamp_subscribe_options::wamp_subscribe_options(const std::string& match)
: m_match()
{
//Verify match type
set_match(match);
}

inline const std::string& wamp_subscribe_options::match() const
{
return *m_match;
Expand Down
3 changes: 3 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,23 @@ add_library(examples_parameters ${PARAMETERS_SOURCES} ${PARAMETERS_HEADERS})

set(CALLER_SOURCES caller.cpp)
set(CALLEE_SOURCES callee.cpp)
set(PROVIDE_PREFIX_SOURCES callee.cpp)
set(PUBLISHER_SOURCES publisher.cpp)
set(SUBSCRIBER_SOURCES subscriber.cpp)
set(WAMPCRA_SOURCES wampcra.cpp)
set(UDS_SOURCES uds.cpp)

add_executable(caller ${CALLER_SOURCES} ${PUBLIC_HEADERS})
add_executable(callee ${CALLEE_SOURCES} ${PUBLIC_HEADERS})
add_executable(provide_prefix ${PROVIDE_PREFIX_SOURCES} ${PUBLIC_HEADERS})
add_executable(publisher ${PUBLISHER_SOURCES} ${PUBLIC_HEADERS})
add_executable(subscriber ${SUBSCRIBER_SOURCES} ${PUBLIC_HEADERS})
add_executable(wampcra ${WAMPCRA_SOURCES} ${PUBLIC_HEADERS})
add_executable(uds ${UDS_SOURCES} ${PUBLIC_HEADERS})

target_link_libraries(caller examples_parameters)
target_link_libraries(callee examples_parameters)
target_link_libraries(provide_prefix examples_parameters)
target_link_libraries(publisher examples_parameters)
target_link_libraries(subscriber examples_parameters)
target_link_libraries(wampcra examples_parameters crypto)
Expand Down
1 change: 1 addition & 0 deletions examples/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Import('env')

examples = [('subscriber.cpp', []),
('publisher.cpp', []),
('provide_prefix.cpp', []),
('callee.cpp', []),
('caller.cpp', []),
('wampcra.cpp', ['crypto']),
Expand Down
11 changes: 9 additions & 2 deletions examples/parameters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ parameters::parameters()
: m_debug(false)
, m_realm(DEFAULT_REALM)
, m_rawsocket_endpoint(LOCALHOST_IP_ADDRESS, DEFAULT_RAWSOCKET_PORT)
, m_uds_endpoint(DEFAULT_UDS_PATH)
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
, m_uds_endpoint(DEFAULT_UDS_PATH)
#endif
{
}

Expand All @@ -67,10 +69,12 @@ const boost::asio::ip::tcp::endpoint& parameters::rawsocket_endpoint() const
return m_rawsocket_endpoint;
}

#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
const boost::asio::local::stream_protocol::endpoint& parameters::uds_endpoint() const
{
return m_uds_endpoint;
}
#endif

void parameters::set_debug(bool value)
{
Expand All @@ -88,10 +92,12 @@ void parameters::set_rawsocket_endpoint(const std::string& ip_address, uint16_t
boost::asio::ip::address::from_string(ip_address), port);
}

#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
void parameters::set_uds_endpoint(const std::string& path)
{
m_uds_endpoint = boost::asio::local::stream_protocol::endpoint(path);
}
#endif

std::unique_ptr<parameters> get_parameters(int argc, char** argv)
{
Expand Down Expand Up @@ -135,8 +141,9 @@ std::unique_ptr<parameters> get_parameters(int argc, char** argv)
variables["rawsocket-ip"].as<std::string>(),
variables["rawsocket-port"].as<uint16_t>());

#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
params->set_uds_endpoint(
variables["uds-path"].as<std::string>());

#endif
return params;
}
Loading

0 comments on commit 5ad7547

Please sign in to comment.