diff --git a/rmw_connext_cpp/CMakeLists.txt b/rmw_connext_cpp/CMakeLists.txt index 42b21f69..b127fc26 100644 --- a/rmw_connext_cpp/CMakeLists.txt +++ b/rmw_connext_cpp/CMakeLists.txt @@ -66,9 +66,62 @@ register_rmw_implementation( "cpp:rosidl_typesupport_cpp:rosidl_typesupport_connext_cpp" ) +# generate code for raw data +set(_idl_pp "${Connext_DDSGEN}") +if(NOT "${Connext_DDSGEN_SERVER}" STREQUAL "") + # use the code generator in server mode when available + # because it speeds up the code generation step significantly + set(_idl_pp "${Connext_DDSGEN_SERVER}") +endif() +set(generated_directory "${CMAKE_CURRENT_BINARY_DIR}/resources/generated") +file(MAKE_DIRECTORY ${generated_directory}) +set(generated_files + ${generated_directory}/connext_static_serialized_data.cxx + ${generated_directory}/connext_static_serialized_data.h + ${generated_directory}/connext_static_serialized_dataPlugin.cxx + ${generated_directory}/connext_static_serialized_dataPlugin.h + ${generated_directory}/connext_static_serialized_dataSupport.cxx + ${generated_directory}/connext_static_serialized_dataSupport.h +) +add_custom_command( + OUTPUT ${generated_files} + COMMAND "${_idl_pp}" -language C++ -unboundedSupport "connext_static_serialized_data.idl" -d ${generated_directory} + WORKING_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/resources" + COMMENT "Generating serialized type support for RTI Connext (using '${_idl_pp}')" + VERBATIM +) + +# patch the generate code for raw data +set(patch_files + ${CMAKE_CURRENT_SOURCE_DIR}/resources/patch_files/connext_static_serialized_data.cxx.patch + ${CMAKE_CURRENT_SOURCE_DIR}/resources/patch_files/connext_static_serialized_data.h.patch + ${CMAKE_CURRENT_SOURCE_DIR}/resources/patch_files/connext_static_serialized_dataPlugin.cxx.patch + ${CMAKE_CURRENT_SOURCE_DIR}/resources/patch_files/connext_static_serialized_dataPlugin.h.patch + ${CMAKE_CURRENT_SOURCE_DIR}/resources/patch_files/connext_static_serialized_dataSupport.cxx.patch + ${CMAKE_CURRENT_SOURCE_DIR}/resources/patch_files/connext_static_serialized_dataSupport.h.patch +) +set(patched_directory "${CMAKE_CURRENT_BINARY_DIR}/resources/patched") +file(MAKE_DIRECTORY ${patched_directory}) +set(patched_files + ${patched_directory}/connext_static_serialized_data.cxx + ${patched_directory}/connext_static_serialized_data.h + ${patched_directory}/connext_static_serialized_dataPlugin.cxx + ${patched_directory}/connext_static_serialized_dataPlugin.h + ${patched_directory}/connext_static_serialized_dataSupport.cxx + ${patched_directory}/connext_static_serialized_dataSupport.h +) +add_custom_command( + OUTPUT ${patched_files} + COMMAND ${PYTHON_EXECUTABLE} "${CMAKE_SOURCE_DIR}/bin/apply-patch.py" --input ${generated_files} --patch ${patch_files} --out ${patched_files} + DEPENDS ${generated_files} + COMMENT "Patching serialized type support for RTI Connext" + VERBATIM +) + add_library( rmw_connext_cpp SHARED + ${patched_files} src/get_client.cpp src/get_participant.cpp src/get_publisher.cpp @@ -90,6 +143,7 @@ add_library( src/rmw_publisher.cpp src/rmw_request.cpp src/rmw_response.cpp + src/rmw_serialize.cpp src/rmw_service.cpp src/rmw_service_names_and_types.cpp src/rmw_service_server_is_available.cpp @@ -108,6 +162,7 @@ ament_target_dependencies(rmw_connext_cpp "rosidl_typesupport_connext_c" "rosidl_typesupport_connext_cpp" "Connext") +target_include_directories(rmw_connext_cpp PUBLIC ${patched_directory}) ament_export_libraries(rmw_connext_cpp) # Causes the visibility macros to use dllexport rather than dllimport diff --git a/rmw_connext_cpp/bin/apply-patch.py b/rmw_connext_cpp/bin/apply-patch.py new file mode 100644 index 00000000..8514841e --- /dev/null +++ b/rmw_connext_cpp/bin/apply-patch.py @@ -0,0 +1,80 @@ +# Copyright 2016 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Based on `unifieddiff.py` by @noporpoise which is licensed as `Public domain (CC0)` +# see: https://gist.github.com/noporpoise/16e731849eb1231e86d78f9dfeca3abc +# TODO(Karsten1987): Rewrite this function for clarity and tests + +import argparse +import re + +# regular expression / pattern for patch header +_hdr_pat = re.compile('^@@ -(\d+),?(\d+)? \+(\d+),?(\d+)? @@.*$') + + +def apply_patch(s, patch): + """ + Apply unified diff patch to string s to recover newer string. + + If revert is True, treat s as the newer string, recover older string. + """ + s = s.splitlines(True) + p = patch.splitlines(True) + t = '' + i = sl = 0 + (midx, sign) = (1, '+') + while i < len(p) and p[i].startswith(('---', '+++')): + i += 1 # skip header lines + while i < len(p): + # find patch header + m = _hdr_pat.match(p[i]) + if not m: + raise Exception('Cannot process diff in line ' + str(i)) + + i += 1 + ll = int(m.group(midx)) - 1 + (m.group(midx + 1) == '0') + t += ''.join(s[sl:ll]) + sl = ll + while i < len(p) and p[i][0] != '@': + if i + 1 < len(p) and p[i + 1][0] == '\\': + line = p[i][:-1] + i += 2 + else: + line = p[i] + i += 1 + if len(line) > 0: + if line[0] == sign or line[0] == ' ': + t += line[1:] + sl += (line[0] != sign) + t += ''.join(s[sl:]) + return t + + +parser = argparse.ArgumentParser() +parser.add_argument('--input', nargs='+') +parser.add_argument('--patch', nargs='+') +parser.add_argument('--out', nargs='+') +args = parser.parse_args() +for i, p, o in zip(args.input, args.patch, args.out): + with open(i, 'r') as h: + content_in = h.read() + with open(p, 'r') as h: + content_patch = h.read() + try: + content_out = apply_patch(content_in, content_patch) + except Exception: + print(i, p, o) + raise + with open(o, 'w') as h: + h.write(content_out) diff --git a/rmw_connext_cpp/resources/connext_static_serialized_data.idl b/rmw_connext_cpp/resources/connext_static_serialized_data.idl new file mode 100644 index 00000000..90c69dc8 --- /dev/null +++ b/rmw_connext_cpp/resources/connext_static_serialized_data.idl @@ -0,0 +1,7 @@ +const long KEY_HASH_LENGTH_16 = 16; + +struct ConnextStaticSerializedData { + @key octet key_hash[KEY_HASH_LENGTH_16]; + sequence serialized_key; + sequence serialized_data; +}; diff --git a/rmw_connext_cpp/resources/patch_files/connext_static_serialized_data.cxx.patch b/rmw_connext_cpp/resources/patch_files/connext_static_serialized_data.cxx.patch new file mode 100644 index 00000000..3b9800ae --- /dev/null +++ b/rmw_connext_cpp/resources/patch_files/connext_static_serialized_data.cxx.patch @@ -0,0 +1,11 @@ +--- a/rmw_connext_cpp/resources/patch_generated/connext_static_serialized_data.cxx ++++ b/rmw_connext_cpp/resources/patch_generated/connext_static_serialized_data.cxx +@@ -11,7 +11,7 @@ or consult the RTI Connext manual. + + #ifndef NDDS_STANDALONE_TYPE + #ifndef ndds_cpp_h +-#include "ndds/ndds_cpp.h" ++#include "rmw_connext_shared_cpp/ndds_include.hpp" + #endif + #ifndef dds_c_log_impl_h + #include "dds_c/dds_c_log_impl.h" diff --git a/rmw_connext_cpp/resources/patch_files/connext_static_serialized_data.h.patch b/rmw_connext_cpp/resources/patch_files/connext_static_serialized_data.h.patch new file mode 100644 index 00000000..04ffd023 --- /dev/null +++ b/rmw_connext_cpp/resources/patch_files/connext_static_serialized_data.h.patch @@ -0,0 +1,11 @@ +--- a/rmw_connext_cpp/resources/patch_generated/connext_static_serialized_data.h ++++ b/rmw_connext_cpp/resources/patch_generated/connext_static_serialized_data.h +@@ -14,7 +14,7 @@ or consult the RTI Connext manual. + + #ifndef NDDS_STANDALONE_TYPE + #ifndef ndds_cpp_h +-#include "ndds/ndds_cpp.h" ++#include "rmw_connext_shared_cpp/ndds_include.hpp" + #endif + #else + #include "ndds_standalone_type.h" diff --git a/rmw_connext_cpp/resources/patch_files/connext_static_serialized_dataPlugin.cxx.patch b/rmw_connext_cpp/resources/patch_files/connext_static_serialized_dataPlugin.cxx.patch new file mode 100644 index 00000000..327c4f69 --- /dev/null +++ b/rmw_connext_cpp/resources/patch_files/connext_static_serialized_dataPlugin.cxx.patch @@ -0,0 +1,353 @@ +--- a/rmw_connext_cpp/resources/patch_generated/connext_static_serialized_dataPlugin.cxx ++++ b/rmw_connext_cpp/resources/patch_generated/connext_static_serialized_dataPlugin.cxx +@@ -11,7 +11,7 @@ or consult the RTI Connext manual. + #include + + #ifndef ndds_cpp_h +-#include "ndds/ndds_cpp.h" ++#include "rmw_connext_shared_cpp/ndds_include.hpp" + #endif + + #ifndef osapi_type_h +@@ -379,74 +379,81 @@ ConnextStaticSerializedDataPlugin_serialize( + RTIBool serialize_sample, + void *endpoint_plugin_qos) + { +- char * position = NULL; +- RTIBool retval = RTI_TRUE; +- +- if (endpoint_data) {} /* To avoid warnings */ +- if (endpoint_plugin_qos) {} /* To avoid warnings */ +- +- if(serialize_encapsulation) { +- if (!RTICdrStream_serializeAndSetCdrEncapsulation(stream , encapsulation_id)) { +- return RTI_FALSE; +- } ++ char * position = NULL; ++ RTIBool retval = RTI_TRUE; ++ ++ if (endpoint_data) {} /* To avoid warnings */ ++ if (endpoint_plugin_qos) {} /* To avoid warnings */ ++ ++ /* This plugin can only be used to publish the top-level DDS Topic-Type ++ * in which case serialize_encapsulation==TRUE. If that is not ++ * the case then it is an error. ++ */ ++ if (!serialize_encapsulation) { ++ return RTI_FALSE; ++ } ++ ++ position = RTICdrStream_resetAlignment(stream); ++ ++ if (serialize_sample) { ++ /* The sample->serialized_data contains the serialized encapsulation followed by the serialized ++ * data, so we only need to copy that into ++ * the CDR stream. Not the key_hash, not the length of the data itself ++ * The SerializedType sample->serialized_data is always a contiguous buffer ++ */ ++ DDS_Octet * buffer = DDS_OctetSeq_get_contiguous_bufferI(&sample->serialized_data); ++ if (buffer == NULL) { ++ return RTI_FALSE; ++ } + +- position = RTICdrStream_resetAlignment(stream); ++ /* The encapsulation_id appears in the sample->serialized_data as octet[2] using big-endian ++ * byte order ++ */ ++ if (encapsulation_id != (buffer[0] * 256 + buffer[1]) ) { ++ return RTI_FALSE; + } + +- if(serialize_sample) { ++ /* Use RTICdrStream_serializePrimitiveArray so that there is no additional length prepended */ ++ if (!RTICdrStream_serializePrimitiveArray( ++ stream, (void *)buffer, ++ DDS_OctetSeq_get_length(&sample->serialized_data), ++ RTI_CDR_OCTET_TYPE)) ++ { ++ return RTI_FALSE; ++ } ++ } + +- if (!RTICdrStream_serializePrimitiveArray( +- stream, (void*) sample->key_hash, ((KEY_HASH_LENGTH_16)), RTI_CDR_OCTET_TYPE)) { +- return RTI_FALSE; +- } ++ RTICdrStream_restoreAlignment(stream, position); + +- if (DDS_OctetSeq_get_contiguous_bufferI(&sample->serialized_key) != NULL) { +- if (!RTICdrStream_serializePrimitiveSequence( +- stream, +- DDS_OctetSeq_get_contiguous_bufferI(&sample->serialized_key), +- DDS_OctetSeq_get_length(&sample->serialized_key), +- (RTI_INT32_MAX-1), +- RTI_CDR_OCTET_TYPE)) { +- return RTI_FALSE; +- } +- } else { +- if (!RTICdrStream_serializePrimitivePointerSequence( +- stream, +- (const void **) DDS_OctetSeq_get_discontiguous_bufferI(&sample->serialized_key), +- DDS_OctetSeq_get_length(&sample->serialized_key), +- (RTI_INT32_MAX-1), +- RTI_CDR_OCTET_TYPE)) { +- return RTI_FALSE; +- } +- } +- +- if (DDS_OctetSeq_get_contiguous_bufferI(&sample->serialized_data) != NULL) { +- if (!RTICdrStream_serializePrimitiveSequence( +- stream, +- DDS_OctetSeq_get_contiguous_bufferI(&sample->serialized_data), +- DDS_OctetSeq_get_length(&sample->serialized_data), +- (RTI_INT32_MAX-1), +- RTI_CDR_OCTET_TYPE)) { +- return RTI_FALSE; +- } +- } else { +- if (!RTICdrStream_serializePrimitivePointerSequence( +- stream, +- (const void **) DDS_OctetSeq_get_discontiguous_bufferI(&sample->serialized_data), +- DDS_OctetSeq_get_length(&sample->serialized_data), +- (RTI_INT32_MAX-1), +- RTI_CDR_OCTET_TYPE)) { +- return RTI_FALSE; +- } +- } ++ return retval; ++} + +- } ++/** ++ TODO. The code-block below does not belong here. ++ It should be pushed to the CDR module, perhaps inside ++ RTICdrStream_deserializeAndSetCdrEncapsulation so that the ++ stream size is alredy correct when SerializedTypePlugin_deserialize_sample ++ is called. + +- if(serialize_encapsulation) { +- RTICdrStream_restoreAlignment(stream,position); +- } ++ Adjust the size of the CDR stream to not include the alignment ++ padding. See http://issues.omg.org/browse/DDSXTY12-10 + +- return retval; ++ @precondition The RTICdrStream *stream has alreadt processed ++ the encapsulation header and therefore has set the ++ encapsulation options returned by ++ RTICdrStream_getEncapsulationOptions() ++*/ ++void ++ConnextStaticSerializedDataPlugin_remove_padding_from_stream(struct RTICdrStream * stream) ++{ ++ /* See http://issues.omg.org/browse/DDSXTY12-10 */ ++ DDS_UnsignedShort padding_size_mask = 0x0003; ++ DDS_UnsignedShort padding_size; ++ int adjustedBufferLength; ++ ++ padding_size = RTICdrStream_getEncapsulationOptions(stream) & padding_size_mask; ++ adjustedBufferLength = RTICdrStream_getBufferLength(stream) - padding_size; ++ RTICdrStream_setBufferLength(stream, adjustedBufferLength); + } + + RTIBool +@@ -458,114 +465,65 @@ ConnextStaticSerializedDataPlugin_deserialize_sample( + RTIBool deserialize_sample, + void *endpoint_plugin_qos) + { ++ char * position = NULL; ++ RTIBool done = RTI_FALSE; ++ ++ if (endpoint_data) {} /* To avoid warnings */ ++ if (endpoint_plugin_qos) {} /* To avoid warnings */ ++ ++ /* This plugin can only be used to publish the top-level DDS Topic-Type ++ * in which case deserialize_encapsulation==TRUE. If that is not ++ * the case then it is an error. ++ */ ++ if (!deserialize_encapsulation) { ++ return RTI_FALSE; ++ } ++ ++ position = RTICdrStream_resetAlignment(stream); ++ ++ /* TODO. The call does not belong here. It should be pushed ++ * inside RTICdrStream_deserializeAndSetCdrEncapsulation ++ */ ++ ConnextStaticSerializedDataPlugin_remove_padding_from_stream(stream); ++ ++ if (deserialize_sample) { ++ /* Note that sample->key_hash was already set by SerializedTypePlugin_deserialize() ++ it is done there because SerializedTypePlugin_deserialize_sample does not ++ have access to the SampleInfo where that information is ++ */ ++ ++ /* We do not set the serialized_key on deserialization */ ++ DDS_OctetSeq_set_length(&sample->serialized_key, 0); ++ ++ /* We copy everything that remains in the CDR stream */ ++ int bytesLeftInStream = RTICdrStream_getRemainder(stream); ++ DDS_Octet * cdrBufferPtr = (DDS_Octet *) RTICdrStream_getCurrentPosition(stream); ++ if (cdrBufferPtr == NULL) { ++ goto fin; ++ } + +- char * position = NULL; +- +- RTIBool done = RTI_FALSE; +- +- try { +- +- if (endpoint_data) {} /* To avoid warnings */ +- if (endpoint_plugin_qos) {} /* To avoid warnings */ +- if(deserialize_encapsulation) { +- +- if (!RTICdrStream_deserializeAndSetCdrEncapsulation(stream)) { +- return RTI_FALSE; +- } +- +- position = RTICdrStream_resetAlignment(stream); +- } +- if(deserialize_sample) { +- +- ConnextStaticSerializedData_initialize_ex(sample, RTI_FALSE, RTI_FALSE); +- +- if (!RTICdrStream_deserializePrimitiveArray( +- stream, (void*) sample->key_hash, ((KEY_HASH_LENGTH_16)), RTI_CDR_OCTET_TYPE)) { +- goto fin; +- } +- +- { +- RTICdrUnsignedLong sequence_length; +- if (!RTICdrStream_lookUnsignedLong(stream,&sequence_length)) { +- goto fin; +- } +- if (!DDS_OctetSeq_set_maximum(&sample->serialized_key,sequence_length)) { +- return RTI_FALSE; +- } +- if (DDS_OctetSeq_get_contiguous_bufferI(&sample->serialized_key) != NULL) { +- if (!RTICdrStream_deserializePrimitiveSequence( +- stream, +- DDS_OctetSeq_get_contiguous_bufferI(&sample->serialized_key), +- &sequence_length, +- DDS_OctetSeq_get_maximum(&sample->serialized_key), +- RTI_CDR_OCTET_TYPE)){ +- goto fin; +- } +- } else { +- if (!RTICdrStream_deserializePrimitivePointerSequence( +- stream, +- (void **) DDS_OctetSeq_get_discontiguous_bufferI(&sample->serialized_key), +- &sequence_length, +- DDS_OctetSeq_get_maximum(&sample->serialized_key), +- RTI_CDR_OCTET_TYPE)){ +- goto fin; +- } +- } +- if (!DDS_OctetSeq_set_length(&sample->serialized_key, sequence_length)) { +- return RTI_FALSE; +- } +- +- } +- { +- RTICdrUnsignedLong sequence_length; +- if (!RTICdrStream_lookUnsignedLong(stream,&sequence_length)) { +- goto fin; +- } +- if (!DDS_OctetSeq_set_maximum(&sample->serialized_data,sequence_length)) { +- return RTI_FALSE; +- } +- if (DDS_OctetSeq_get_contiguous_bufferI(&sample->serialized_data) != NULL) { +- if (!RTICdrStream_deserializePrimitiveSequence( +- stream, +- DDS_OctetSeq_get_contiguous_bufferI(&sample->serialized_data), +- &sequence_length, +- DDS_OctetSeq_get_maximum(&sample->serialized_data), +- RTI_CDR_OCTET_TYPE)){ +- goto fin; +- } +- } else { +- if (!RTICdrStream_deserializePrimitivePointerSequence( +- stream, +- (void **) DDS_OctetSeq_get_discontiguous_bufferI(&sample->serialized_data), +- &sequence_length, +- DDS_OctetSeq_get_maximum(&sample->serialized_data), +- RTI_CDR_OCTET_TYPE)){ +- goto fin; +- } +- } +- if (!DDS_OctetSeq_set_length(&sample->serialized_data, sequence_length)) { +- return RTI_FALSE; +- } ++ /* Do not call SerializedType_initialize_ex initialize here ++ because it would override the key_hash field ++ SerializedType_initialize_ex(sample, RTI_FALSE, RTI_FALSE); ++ */ ++ if (!DDS_OctetSeq_from_array(&sample->serialized_data, cdrBufferPtr, bytesLeftInStream) ) { ++ goto fin; ++ } ++ RTICdrStream_incrementCurrentPosition(stream, bytesLeftInStream); ++ } + +- } +- } ++ done = RTI_TRUE; + +- done = RTI_TRUE; +- fin: +- if (done != RTI_TRUE && +- RTICdrStream_getRemainder(stream) >= +- RTI_CDR_PARAMETER_HEADER_ALIGNMENT) { +- return RTI_FALSE; +- } +- if(deserialize_encapsulation) { +- RTICdrStream_restoreAlignment(stream,position); +- } ++fin: ++ if ( (done != RTI_TRUE) && ++ (RTICdrStream_getRemainder(stream) >= RTI_CDR_PARAMETER_HEADER_ALIGNMENT) ) ++ { ++ return RTI_FALSE; ++ } + +- return RTI_TRUE; ++ RTICdrStream_restoreAlignment(stream, position); + +- } catch (std::bad_alloc&) { +- return RTI_FALSE; +- } ++ return RTI_TRUE; + } + + RTIBool +@@ -971,7 +929,9 @@ Key Management functions: + PRESTypePluginKeyKind + ConnextStaticSerializedDataPlugin_get_key_kind(void) + { +- return PRES_TYPEPLUGIN_USER_KEY; ++ // TODO(karsten1987): Whenever we introduce keys for our data types ++ // this might have to change. ++ return PRES_TYPEPLUGIN_NO_KEY; + } + + RTIBool +@@ -1408,6 +1368,11 @@ ConnextStaticSerializedDataPlugin_serialized_sample_to_keyhash( + * ------------------------------------------------------------------------ */ + struct PRESTypePlugin *ConnextStaticSerializedDataPlugin_new(void) + { ++ return NULL; ++} ++ ++struct PRESTypePlugin *ConnextStaticSerializedDataPlugin_new_external(struct DDS_TypeCode * external_type_code) ++{ + struct PRESTypePlugin *plugin = NULL; + const struct PRESTypePluginVersion PLUGIN_VERSION = + PRES_TYPE_PLUGIN_VERSION_2_0; +@@ -1503,7 +1468,7 @@ struct PRESTypePlugin *ConnextStaticSerializedDataPlugin_new(void) + (PRESTypePluginKeyToInstanceFunction) + ConnextStaticSerializedDataPlugin_key_to_instance; + plugin->serializedKeyToKeyHashFnc = NULL; /* Not supported yet */ +- plugin->typeCode = (struct RTICdrTypeCode *)ConnextStaticSerializedData_get_typecode(); ++ plugin->typeCode = (struct RTICdrTypeCode *)external_type_code; + + plugin->languageKind = PRES_TYPEPLUGIN_CPP_LANG; diff --git a/rmw_connext_cpp/resources/patch_files/connext_static_serialized_dataPlugin.h.patch b/rmw_connext_cpp/resources/patch_files/connext_static_serialized_dataPlugin.h.patch new file mode 100644 index 00000000..c59c6981 --- /dev/null +++ b/rmw_connext_cpp/resources/patch_files/connext_static_serialized_dataPlugin.h.patch @@ -0,0 +1,11 @@ +--- a/rmw_connext_cpp/resources/patch_generated/connext_static_serialized_dataPlugin.h ++++ b/rmw_connext_cpp/resources/patch_generated/connext_static_serialized_dataPlugin.h +@@ -324,6 +324,9 @@ extern "C" { + NDDSUSERDllExport extern struct PRESTypePlugin* + ConnextStaticSerializedDataPlugin_new(void); + ++ NDDSUSERDllExport extern struct PRESTypePlugin* ++ ConnextStaticSerializedDataPlugin_new_external(struct DDS_TypeCode * external_type_code); ++ + NDDSUSERDllExport extern void + ConnextStaticSerializedDataPlugin_delete(struct PRESTypePlugin *); diff --git a/rmw_connext_cpp/resources/patch_files/connext_static_serialized_dataSupport.cxx.patch b/rmw_connext_cpp/resources/patch_files/connext_static_serialized_dataSupport.cxx.patch new file mode 100644 index 00000000..0849bf1d --- /dev/null +++ b/rmw_connext_cpp/resources/patch_files/connext_static_serialized_dataSupport.cxx.patch @@ -0,0 +1,66 @@ +--- a/rmw_connext_cpp/resources/patch_generated/connext_static_serialized_dataSupport.cxx ++++ b/rmw_connext_cpp/resources/patch_generated/connext_static_serialized_dataSupport.cxx +@@ -115,3 +115,62 @@ Defines: TTypeSupport, TData, TDataReader, TDataWriter + #undef TPlugin_new + #undef TPlugin_delete + ++DDS_ReturnCode_t ++ConnextStaticSerializedDataSupport_register_external_type( ++ DDSDomainParticipant * participant, ++ const char * type_name, ++ struct DDS_TypeCode * type_code) ++{ ++ DDSTypeSupport * dds_data_type = NULL; ++ struct PRESTypePlugin * presTypePlugin = NULL; ++ DDS_ReturnCode_t retcode = DDS_RETCODE_ERROR; ++ DDS_Boolean delete_data_type = DDS_BOOLEAN_FALSE; ++ RTIBool already_registered = RTI_FALSE; ++ ++ if (type_code == NULL) { ++ goto finError; ++ } ++ ++ if (participant == NULL) { ++ goto finError; ++ } ++ ++ /* TODO pass the type_code */ ++ presTypePlugin = ConnextStaticSerializedDataPlugin_new_external(type_code); ++ if (presTypePlugin == NULL) { ++ goto finError; ++ } ++ ++ dds_data_type = new ConnextStaticSerializedDataTypeSupport(true); ++ if (dds_data_type == NULL) { ++ fprintf(stderr, "Error while registering external type\n"); ++ goto finError; ++ } ++ delete_data_type = RTI_TRUE; ++ ++ presTypePlugin->_userBuffer = (PRESWord *)dds_data_type; ++ already_registered = participant->is_type_registered(type_name); ++ ++ retcode = participant->register_type(type_name, presTypePlugin, NULL, !already_registered); ++ if (retcode != DDS_RETCODE_OK) { ++ fprintf(stderr, "error while registering external type\n"); ++ goto finError; ++ } ++ ++ if (!already_registered) { ++ delete_data_type = DDS_BOOLEAN_FALSE; ++ } ++ ++ retcode = DDS_RETCODE_OK; ++ ++finError: ++ if (presTypePlugin != NULL) { ++ ConnextStaticSerializedDataPlugin_delete(presTypePlugin); ++ } ++ if (delete_data_type) { ++ delete (ConnextStaticSerializedDataTypeSupport *)dds_data_type; ++ dds_data_type = NULL; ++ } ++ ++ return retcode; ++} + diff --git a/rmw_connext_cpp/resources/patch_files/connext_static_serialized_dataSupport.h.patch b/rmw_connext_cpp/resources/patch_files/connext_static_serialized_dataSupport.h.patch new file mode 100644 index 00000000..285b19ed --- /dev/null +++ b/rmw_connext_cpp/resources/patch_files/connext_static_serialized_dataSupport.h.patch @@ -0,0 +1,121 @@ +--- a/rmw_connext_cpp/resources/patch_generated/connext_static_serialized_dataSupport.h ++++ b/rmw_connext_cpp/resources/patch_generated/connext_static_serialized_dataSupport.h +@@ -15,7 +15,7 @@ or consult the RTI Connext manual. + #include "connext_static_serialized_data.h" + + #ifndef ndds_cpp_h +-#include "ndds/ndds_cpp.h" ++#include "rmw_connext_shared_cpp/ndds_include.hpp" + #endif + + #if (defined(RTI_WIN32) || defined (RTI_WINCE)) && defined(NDDS_USER_DLL_EXPORT) +@@ -44,13 +44,106 @@ implementing generics in C and C++. + + #endif + +-DDS_TYPESUPPORT_CPP( +- ConnextStaticSerializedDataTypeSupport, +- ConnextStaticSerializedData); ++class NDDSUSERDllExport DDSCPPDllExport ConnextStaticSerializedDataTypeSupport : public ::DDSTypeSupport ++{ ++public: ++ ConnextStaticSerializedDataTypeSupport(bool osrf) ++ { ++ (void) osrf; ++ } ++ ++ ~ConnextStaticSerializedDataTypeSupport(); ++ ++ static DDS_ReturnCode_t register_type( ++ DDSDomainParticipant * participant, ++ const char * type_name = "ConnextStaticSerializedData"); ++ ++ static DDS_ReturnCode_t unregister_type( ++ DDSDomainParticipant * participant, ++ const char * type_name = "ConnextStaticSerializedData"); ++ ++ static const char * get_type_name(); ++ ++ static ConnextStaticSerializedData * create_data_ex(DDS_Boolean allocatePointers); ++ ++ static ConnextStaticSerializedData * create_data( ++ const DDS_TypeAllocationParams_t & alloc_params = ++ DDS_TYPE_ALLOCATION_PARAMS_DEFAULT); ++ ++ static DDS_ReturnCode_t delete_data_ex( ++ ConnextStaticSerializedData * a_data, ++ DDS_Boolean deletePointers); ++ ++ static DDS_ReturnCode_t delete_data( ++ ConnextStaticSerializedData * a_data, ++ const DDS_TypeDeallocationParams_t & dealloc_params = ++ DDS_TYPE_DEALLOCATION_PARAMS_DEFAULT); ++ ++ static void print_data(const ConnextStaticSerializedData * a_data); ++ ++ static DDS_ReturnCode_t copy_data( ++ ConnextStaticSerializedData * dst_data, const ConnextStaticSerializedData * src_data); ++ ++ static DDS_ReturnCode_t initialize_data_ex( ++ ConnextStaticSerializedData * a_data, ++ DDS_Boolean allocatePointers); ++ ++ static DDS_ReturnCode_t initialize_data( ++ ConnextStaticSerializedData * a_data, ++ const DDS_TypeAllocationParams_t & alloc_params = ++ DDS_TYPE_ALLOCATION_PARAMS_DEFAULT); ++ ++ static DDS_ReturnCode_t finalize_data_ex( ++ ConnextStaticSerializedData * a_data, ++ DDS_Boolean deletePointers); ++ ++ static DDS_ReturnCode_t finalize_data( ++ ConnextStaticSerializedData * a_data, ++ const DDS_TypeDeallocationParams_t & dealloc_params = ++ DDS_TYPE_DEALLOCATION_PARAMS_DEFAULT); ++ ++ DDSDataReader * create_datareaderI(DDSDataReader * dataReader); ++ ++ DDS_ReturnCode_t destroy_datareaderI(DDSDataReader * dataReader); ++ ++ DDSDataWriter * create_datawriterI(DDSDataWriter * dataWriter); ++ ++ DDS_ReturnCode_t destroy_datawriterI(DDSDataWriter * dataWriter); ++ ++ static DDS_TypeCode * get_typecode(); ++ ++ static DDS_ReturnCode_t serialize_data_to_cdr_buffer( ++ char * buffer, ++ unsigned int & length, ++ const ConnextStaticSerializedData * a_data); ++ ++ static DDS_ReturnCode_t deserialize_data_from_cdr_buffer( ++ ConnextStaticSerializedData * a_data, ++ const char * buffer, ++ unsigned int length); ++ ++ static DDS_ReturnCode_t data_to_string( ++ ConnextStaticSerializedData * sample, ++ char * str, ++ DDS_UnsignedLong & str_size, ++ const DDS_PrintFormatProperty & property); ++ ++ static void finalize(); ++ ++private: ++ ConnextStaticSerializedDataTypeSupport(); ++}; + + DDS_DATAWRITER_CPP(ConnextStaticSerializedDataDataWriter, ConnextStaticSerializedData); + DDS_DATAREADER_CPP(ConnextStaticSerializedDataDataReader, ConnextStaticSerializedDataSeq, ConnextStaticSerializedData); + ++NDDSUSERDllExport ++DDS_ReturnCode_t ++ConnextStaticSerializedDataSupport_register_external_type( ++ DDSDomainParticipant * participant, ++ const char * type_name, ++ struct DDS_TypeCode * type_code); ++ + #if (defined(RTI_WIN32) || defined (RTI_WINCE)) && defined(NDDS_USER_DLL_EXPORT) + /* If the code is building on Windows, stop exporting symbols. + */ diff --git a/rmw_connext_cpp/src/rmw_client.cpp b/rmw_connext_cpp/src/rmw_client.cpp index 14d14bec..fdbb4f9c 100644 --- a/rmw_connext_cpp/src/rmw_client.cpp +++ b/rmw_connext_cpp/src/rmw_client.cpp @@ -50,7 +50,7 @@ rmw_create_client( node->implementation_identifier, rti_connext_identifier, return NULL) - RMW_CONNEXT_EXTRACT_SERVICE_TYPESUPPORT(type_supports, type_support) + RMW_CONNEXT_EXTRACT_SERVICE_TYPESUPPORT(type_supports, type_support, NULL) if (!qos_profile) { RMW_SET_ERROR_MSG("qos_profile is null"); diff --git a/rmw_connext_cpp/src/rmw_publish.cpp b/rmw_connext_cpp/src/rmw_publish.cpp index ddc26b29..e5226409 100644 --- a/rmw_connext_cpp/src/rmw_publish.cpp +++ b/rmw_connext_cpp/src/rmw_publish.cpp @@ -12,12 +12,65 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include "rmw/error_handling.h" #include "rmw/rmw.h" #include "rmw/types.h" -#include "rmw_connext_cpp/identifier.hpp" #include "rmw_connext_cpp/connext_static_publisher_info.hpp" +#include "rmw_connext_cpp/identifier.hpp" + +#include "rosidl_typesupport_connext_cpp/connext_static_cdr_stream.hpp" + +// include patched generated code from the build folder +#include "connext_static_serialized_dataSupport.h" + +bool +publish(DDSDataWriter * dds_data_writer, ConnextStaticCDRStream * cdr_stream) +{ + ConnextStaticSerializedDataDataWriter * data_writer = + ConnextStaticSerializedDataDataWriter::narrow(dds_data_writer); + if (!data_writer) { + RMW_SET_ERROR_MSG("failed to narrow data writer"); + return false; + } + + ConnextStaticSerializedData * instance = ConnextStaticSerializedDataTypeSupport::create_data(); + if (!instance) { + RMW_SET_ERROR_MSG("failed to create dds message instance"); + return false; + } + + DDS_ReturnCode_t status = DDS_RETCODE_ERROR; + + instance->serialized_data.maximum(0); + if (cdr_stream->buffer_length > (std::numeric_limits::max)()) { + RMW_SET_ERROR_MSG("cdr_stream->buffer_length unexpectedly larger than DDS_Long's max value"); + return false; + } + if (!instance->serialized_data.loan_contiguous( + reinterpret_cast(cdr_stream->buffer), + static_cast(cdr_stream->buffer_length), + static_cast(cdr_stream->buffer_length))) + { + RMW_SET_ERROR_MSG("failed to loan memory for message"); + goto cleanup; + } + + status = data_writer->write(*instance, DDS_HANDLE_NIL); + +cleanup: + if (instance) { + if (!instance->serialized_data.unloan()) { + fprintf(stderr, "failed to return loaned memory\n"); + status = DDS_RETCODE_ERROR; + } + ConnextStaticSerializedDataTypeSupport::delete_data(instance); + } + + return status == DDS_RETCODE_OK; +} extern "C" { @@ -54,7 +107,74 @@ rmw_publish(const rmw_publisher_t * publisher, const void * ros_message) return RMW_RET_ERROR; } - bool published = callbacks->publish(topic_writer, ros_message); + auto ret = RMW_RET_OK; + ConnextStaticCDRStream cdr_stream; + cdr_stream.allocator = rcutils_get_default_allocator(); + if (!callbacks->to_cdr_stream(ros_message, &cdr_stream)) { + RMW_SET_ERROR_MSG("failed to convert ros_message to cdr stream"); + ret = RMW_RET_ERROR; + goto fail; + } + if (cdr_stream.buffer_length == 0) { + RMW_SET_ERROR_MSG("no message length set"); + ret = RMW_RET_ERROR; + goto fail; + } + if (!cdr_stream.buffer) { + RMW_SET_ERROR_MSG("no serialized message attached"); + ret = RMW_RET_ERROR; + goto fail; + } + if (!publish(topic_writer, &cdr_stream)) { + RMW_SET_ERROR_MSG("failed to publish message"); + ret = RMW_RET_ERROR; + goto fail; + } + +fail: + cdr_stream.allocator.deallocate(cdr_stream.buffer, cdr_stream.allocator.state); + return ret; +} + +rmw_ret_t +rmw_publish_serialized_message( + const rmw_publisher_t * publisher, const rmw_serialized_message_t * serialized_message) +{ + if (!publisher) { + RMW_SET_ERROR_MSG("publisher handle is null"); + return RMW_RET_ERROR; + } + if (publisher->implementation_identifier != rti_connext_identifier) { + RMW_SET_ERROR_MSG("publisher handle is not from this rmw implementation"); + return RMW_RET_ERROR; + } + if (!serialized_message) { + RMW_SET_ERROR_MSG("serialized message handle is null"); + return RMW_RET_ERROR; + } + + ConnextStaticPublisherInfo * publisher_info = + static_cast(publisher->data); + if (!publisher_info) { + RMW_SET_ERROR_MSG("publisher info handle is null"); + return RMW_RET_ERROR; + } + const message_type_support_callbacks_t * callbacks = publisher_info->callbacks_; + if (!callbacks) { + RMW_SET_ERROR_MSG("callbacks handle is null"); + return RMW_RET_ERROR; + } + DDSDataWriter * topic_writer = publisher_info->topic_writer_; + if (!topic_writer) { + RMW_SET_ERROR_MSG("topic writer handle is null"); + return RMW_RET_ERROR; + } + + ConnextStaticCDRStream cdr_stream; + cdr_stream.buffer = serialized_message->buffer; + cdr_stream.buffer_length = serialized_message->buffer_length; + cdr_stream.buffer_capacity = serialized_message->buffer_capacity; + bool published = publish(topic_writer, &cdr_stream); if (!published) { RMW_SET_ERROR_MSG("failed to publish message"); return RMW_RET_ERROR; diff --git a/rmw_connext_cpp/src/rmw_publisher.cpp b/rmw_connext_cpp/src/rmw_publisher.cpp index a10f8fc0..d10d74f8 100644 --- a/rmw_connext_cpp/src/rmw_publisher.cpp +++ b/rmw_connext_cpp/src/rmw_publisher.cpp @@ -24,10 +24,14 @@ #include "rmw_connext_shared_cpp/types.hpp" #include "rmw_connext_cpp/identifier.hpp" + #include "process_topic_and_service_names.hpp" #include "type_support_common.hpp" #include "rmw_connext_cpp/connext_static_publisher_info.hpp" +// include patched generated code from the build folder +#include "connext_static_serialized_dataSupport.h" + // Uncomment this to get extra console output about discovery. // This affects code in this file, but there is a similar variable in: // rmw_connext_shared_cpp/shared_functions.cpp @@ -51,7 +55,7 @@ rmw_create_publisher( node->implementation_identifier, rti_connext_identifier, return NULL) - RMW_CONNEXT_EXTRACT_MESSAGE_TYPESUPPORT(type_supports, type_support) + RMW_CONNEXT_EXTRACT_MESSAGE_TYPESUPPORT(type_supports, type_support, NULL) if (!topic_name || strlen(topic_name) == 0) { RMW_SET_ERROR_MSG("publisher topic is null or empty string"); @@ -82,7 +86,7 @@ rmw_create_publisher( } std::string type_name = _create_type_name(callbacks, "msg"); // Past this point, a failure results in unrolling code in the goto fail block. - bool registered; + DDS_TypeCode * type_code = nullptr; DDS_DataWriterQos datawriter_qos; DDS_PublisherQos publisher_qos; DDS_ReturnCode_t status; @@ -104,9 +108,21 @@ rmw_create_publisher( goto fail; } - registered = callbacks->register_type(participant, type_name.c_str()); - if (!registered) { - RMW_SET_ERROR_MSG("failed to register type"); + type_code = callbacks->get_type_code(); + if (!type_code) { + RMW_SET_ERROR_MSG("failed to fetch type code\n"); + goto fail; + } + // This is a non-standard RTI Connext function + // It allows to register an external type to a static data writer + // In this case, we register the custom message type to a data writer, + // which only publishes DDS_Octets + // The purpose of this is to send only raw data DDS_Octets over the wire, + // advertise the topic however with a type of the message, e.g. std_msgs::msg::dds_::String + status = ConnextStaticSerializedDataSupport_register_external_type( + participant, type_name.c_str(), type_code); + if (status != DDS_RETCODE_OK) { + RMW_SET_ERROR_MSG("failed to register external type"); goto fail; } diff --git a/rmw_connext_cpp/src/rmw_serialize.cpp b/rmw_connext_cpp/src/rmw_serialize.cpp new file mode 100644 index 00000000..8ffafd38 --- /dev/null +++ b/rmw_connext_cpp/src/rmw_serialize.cpp @@ -0,0 +1,85 @@ +// Copyright 2014-2018 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rmw/error_handling.h" +#include "rmw/rmw.h" + +#include "./type_support_common.hpp" + +// include patched generated code from the build folder +#include "connext_static_serialized_dataSupport.h" + +extern "C" +{ +rmw_ret_t +rmw_serialize( + const void * ros_message, + const rosidl_message_type_support_t * type_support, + rmw_serialized_message_t * serialized_message) +{ + RMW_CONNEXT_EXTRACT_MESSAGE_TYPESUPPORT(type_support, ts, RMW_RET_ERROR) + + const message_type_support_callbacks_t * callbacks = + static_cast(ts->data); + if (!callbacks) { + RMW_SET_ERROR_MSG("callbacks handle is null"); + return RMW_RET_ERROR; + } + + ConnextStaticCDRStream cdr_stream; + cdr_stream.buffer = serialized_message->buffer; + cdr_stream.buffer_length = serialized_message->buffer_length; + cdr_stream.buffer_capacity = serialized_message->buffer_capacity; + cdr_stream.allocator = serialized_message->allocator; + + if (!callbacks->to_cdr_stream(ros_message, &cdr_stream)) { + RMW_SET_ERROR_MSG("failed to convert ros_message to cdr stream"); + return RMW_RET_ERROR; + } + // reassgin buffer because it might have been resized + serialized_message->buffer = cdr_stream.buffer; + serialized_message->buffer_length = cdr_stream.buffer_length; + serialized_message->buffer_capacity = cdr_stream.buffer_capacity; + return RMW_RET_OK; +} + +rmw_ret_t +rmw_deserialize( + const rmw_serialized_message_t * serialized_message, + const rosidl_message_type_support_t * type_support, + void * ros_message) +{ + RMW_CONNEXT_EXTRACT_MESSAGE_TYPESUPPORT(type_support, ts, RMW_RET_ERROR) + + const message_type_support_callbacks_t * callbacks = + static_cast(ts->data); + if (!callbacks) { + RMW_SET_ERROR_MSG("callbacks handle is null"); + return RMW_RET_ERROR; + } + + ConnextStaticCDRStream cdr_stream; + cdr_stream.buffer = serialized_message->buffer; + cdr_stream.buffer_length = serialized_message->buffer_length; + cdr_stream.buffer_capacity = serialized_message->buffer_capacity; + cdr_stream.allocator = serialized_message->allocator; + // convert the cdr stream to the message + if (!callbacks->to_message(&cdr_stream, ros_message)) { + RMW_SET_ERROR_MSG("can't convert cdr stream to ros message"); + return RMW_RET_ERROR; + } + + return RMW_RET_OK; +} +} // extern "C" diff --git a/rmw_connext_cpp/src/rmw_service.cpp b/rmw_connext_cpp/src/rmw_service.cpp index 3d552997..ad02328c 100644 --- a/rmw_connext_cpp/src/rmw_service.cpp +++ b/rmw_connext_cpp/src/rmw_service.cpp @@ -49,7 +49,7 @@ rmw_create_service( node->implementation_identifier, rti_connext_identifier, return NULL) - RMW_CONNEXT_EXTRACT_SERVICE_TYPESUPPORT(type_supports, type_support) + RMW_CONNEXT_EXTRACT_SERVICE_TYPESUPPORT(type_supports, type_support, NULL) if (!qos_profile) { RMW_SET_ERROR_MSG("qos_profile is null"); diff --git a/rmw_connext_cpp/src/rmw_subscription.cpp b/rmw_connext_cpp/src/rmw_subscription.cpp index affa408e..b548a9f5 100644 --- a/rmw_connext_cpp/src/rmw_subscription.cpp +++ b/rmw_connext_cpp/src/rmw_subscription.cpp @@ -23,10 +23,14 @@ #include "rmw_connext_shared_cpp/types.hpp" #include "rmw_connext_cpp/identifier.hpp" + #include "process_topic_and_service_names.hpp" #include "type_support_common.hpp" #include "rmw_connext_cpp/connext_static_subscriber_info.hpp" +// include patched generated code from the build folder +#include "connext_static_serialized_dataSupport.h" + // Uncomment this to get extra console output about discovery. // This affects code in this file, but there is a similar variable in: // rmw_connext_shared_cpp/shared_functions.cpp @@ -51,7 +55,7 @@ rmw_create_subscription( node->implementation_identifier, rti_connext_identifier, return NULL) - RMW_CONNEXT_EXTRACT_MESSAGE_TYPESUPPORT(type_supports, type_support) + RMW_CONNEXT_EXTRACT_MESSAGE_TYPESUPPORT(type_supports, type_support, NULL) if (!qos_profile) { RMW_SET_ERROR_MSG("qos_profile is null"); @@ -77,7 +81,7 @@ rmw_create_subscription( } std::string type_name = _create_type_name(callbacks, "msg"); // Past this point, a failure results in unrolling code in the goto fail block. - bool registered; + DDS_TypeCode * type_code = nullptr; DDS_DataReaderQos datareader_qos; DDS_SubscriberQos subscriber_qos; DDS_ReturnCode_t status; @@ -100,9 +104,21 @@ rmw_create_subscription( goto fail; } - registered = callbacks->register_type(participant, type_name.c_str()); - if (!registered) { - RMW_SET_ERROR_MSG("failed to register type"); + type_code = callbacks->get_type_code(); + if (!type_code) { + RMW_SET_ERROR_MSG("failed to fetch type code\n"); + goto fail; + } + // This is a non-standard RTI Connext function + // It allows to register an external type to a static data writer + // In this case, we register the custom message type to a data writer, + // which only publishes DDS_Octets + // The purpose of this is to send only raw data DDS_Octets over the wire, + // advertise the topic however with a type of the message, e.g. std_msgs::msg::dds_::String + status = ConnextStaticSerializedDataSupport_register_external_type( + participant, type_name.c_str(), type_code); + if (status != DDS_RETCODE_OK) { + RMW_SET_ERROR_MSG("failed to register external type"); goto fail; } diff --git a/rmw_connext_cpp/src/rmw_take.cpp b/rmw_connext_cpp/src/rmw_take.cpp index fef16a6b..e174e85d 100644 --- a/rmw_connext_cpp/src/rmw_take.cpp +++ b/rmw_connext_cpp/src/rmw_take.cpp @@ -12,14 +12,120 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include "rmw/error_handling.h" #include "rmw/impl/cpp/macros.hpp" #include "rmw/types.h" #include "rmw_connext_shared_cpp/types.hpp" -#include "rmw_connext_cpp/identifier.hpp" #include "rmw_connext_cpp/connext_static_subscriber_info.hpp" +#include "rmw_connext_cpp/identifier.hpp" + +#include "rosidl_typesupport_connext_cpp/connext_static_cdr_stream.hpp" + +// include patched generated code from the build folder +#include "./connext_static_serialized_dataSupport.h" +#include "./connext_static_serialized_data.h" + +static bool +take( + DDSDataReader * dds_data_reader, + bool ignore_local_publications, + ConnextStaticCDRStream * cdr_stream, + bool * taken, + void * sending_publication_handle) +{ + if (!dds_data_reader) { + RMW_SET_ERROR_MSG("dds_data_reader is null"); + return false; + } + if (!cdr_stream) { + RMW_SET_ERROR_MSG("cdr stream handle is null"); + return false; + } + if (!taken) { + RMW_SET_ERROR_MSG("taken handle is null"); + return false; + } + + ConnextStaticSerializedDataDataReader * data_reader = + ConnextStaticSerializedDataDataReader::narrow(dds_data_reader); + if (!data_reader) { + RMW_SET_ERROR_MSG("failed to narrow data reader"); + return false; + } + + ConnextStaticSerializedDataSeq dds_messages; + DDS_SampleInfoSeq sample_infos; + bool ignore_sample = false; + + DDS_ReturnCode_t status = data_reader->take( + dds_messages, + sample_infos, + 1, + DDS_ANY_SAMPLE_STATE, + DDS_ANY_VIEW_STATE, + DDS_ANY_INSTANCE_STATE); + if (status == DDS_RETCODE_NO_DATA) { + data_reader->return_loan(dds_messages, sample_infos); + *taken = false; + return true; + } + if (status != DDS_RETCODE_OK) { + RMW_SET_ERROR_MSG("take failed"); + data_reader->return_loan(dds_messages, sample_infos); + } + + DDS_SampleInfo & sample_info = sample_infos[0]; + if (!sample_info.valid_data) { + // skip sample without data + ignore_sample = true; + } else if (ignore_local_publications) { + // compare the lower 12 octets of the guids from the sender and this receiver + // if they are equal the sample has been sent from this process and should be ignored + DDS_GUID_t sender_guid = sample_info.original_publication_virtual_guid; + DDS_InstanceHandle_t receiver_instance_handle = dds_data_reader->get_instance_handle(); + ignore_sample = true; + for (size_t i = 0; i < 12; ++i) { + DDS_Octet * sender_element = &(sender_guid.value[i]); + DDS_Octet * receiver_element = &(reinterpret_cast(&receiver_instance_handle)[i]); + if (*sender_element != *receiver_element) { + ignore_sample = false; + break; + } + } + } + if (sample_info.valid_data && sending_publication_handle) { + *static_cast(sending_publication_handle) = + sample_info.publication_handle; + } + + if (!ignore_sample) { + cdr_stream->buffer_length = dds_messages[0].serialized_data.length(); + // TODO(karsten1987): This malloc has to go! + cdr_stream->buffer = + reinterpret_cast(malloc(cdr_stream->buffer_length * sizeof(char))); + + if (cdr_stream->buffer_length > (std::numeric_limits::max)()) { + RMW_SET_ERROR_MSG("cdr_stream->buffer_length unexpectedly larger than max unsiged int value"); + data_reader->return_loan(dds_messages, sample_infos); + *taken = false; + return DDS_RETCODE_ERROR; + } + for (unsigned int i = 0; i < static_cast(cdr_stream->buffer_length); ++i) { + cdr_stream->buffer[i] = dds_messages[0].serialized_data[i]; + } + *taken = true; + } else { + *taken = false; + } + + data_reader->return_loan(dds_messages, sample_infos); + + return status == DDS_RETCODE_OK; +} extern "C" { @@ -65,11 +171,26 @@ _take( return RMW_RET_ERROR; } - bool success = callbacks->take( - topic_reader, subscriber_info->ignore_local_publications, ros_message, taken, - sending_publication_handle); + // fetch the incoming message as cdr stream + ConnextStaticCDRStream cdr_stream; + if (!take( + topic_reader, subscriber_info->ignore_local_publications, &cdr_stream, taken, + sending_publication_handle)) + { + RMW_SET_ERROR_MSG("error occured while taking message"); + return RMW_RET_ERROR; + } + // convert the cdr stream to the message + if (!callbacks->to_message(&cdr_stream, ros_message)) { + RMW_SET_ERROR_MSG("can't convert cdr stream to ros message"); + return RMW_RET_ERROR; + } - return success ? RMW_RET_OK : RMW_RET_ERROR; + // the call to take allocates memory for the serialized message + // we have to free this here again + free(cdr_stream.buffer); + + return RMW_RET_OK; } rmw_ret_t @@ -104,4 +225,101 @@ rmw_take_with_info( return RMW_RET_OK; } + +rmw_ret_t +_take_serialized_message( + const rmw_subscription_t * subscription, + rmw_serialized_message_t * serialized_message, + bool * taken, + DDS_InstanceHandle_t * sending_publication_handle) +{ + if (!subscription) { + RMW_SET_ERROR_MSG("subscription handle is null"); + return RMW_RET_ERROR; + } + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription handle, + subscription->implementation_identifier, rti_connext_identifier, + return RMW_RET_ERROR) + + if (!serialized_message) { + RMW_SET_ERROR_MSG("ros message handle is null"); + return RMW_RET_ERROR; + } + if (!taken) { + RMW_SET_ERROR_MSG("taken handle is null"); + return RMW_RET_ERROR; + } + + ConnextStaticSubscriberInfo * subscriber_info = + static_cast(subscription->data); + if (!subscriber_info) { + RMW_SET_ERROR_MSG("subscriber info handle is null"); + return RMW_RET_ERROR; + } + DDSDataReader * topic_reader = subscriber_info->topic_reader_; + if (!topic_reader) { + RMW_SET_ERROR_MSG("topic reader handle is null"); + return RMW_RET_ERROR; + } + const message_type_support_callbacks_t * callbacks = subscriber_info->callbacks_; + if (!callbacks) { + RMW_SET_ERROR_MSG("callbacks handle is null"); + return RMW_RET_ERROR; + } + + // fetch the incoming message as cdr stream + ConnextStaticCDRStream cdr_stream; + if (!take( + topic_reader, subscriber_info->ignore_local_publications, &cdr_stream, taken, + sending_publication_handle)) + { + RMW_SET_ERROR_MSG("error occured while taking message"); + return RMW_RET_ERROR; + } + + serialized_message->buffer_length = cdr_stream.buffer_length; + // we don't free the allocated memory + // and thus can directly set the pointer + serialized_message->buffer = cdr_stream.buffer; + + return RMW_RET_OK; +} + +rmw_ret_t +rmw_take_serialized_message( + const rmw_subscription_t * subscription, + rmw_serialized_message_t * serialized_message, + bool * taken) +{ + return _take_serialized_message(subscription, serialized_message, taken, nullptr); +} + +rmw_ret_t +rmw_take_serialized_message_with_info( + const rmw_subscription_t * subscription, + rmw_serialized_message_t * serialized_message, + bool * taken, + rmw_message_info_t * message_info) +{ + if (!message_info) { + RMW_SET_ERROR_MSG("message info is null"); + return RMW_RET_ERROR; + } + DDS_InstanceHandle_t sending_publication_handle; + auto ret = + _take_serialized_message(subscription, serialized_message, taken, &sending_publication_handle); + if (ret != RMW_RET_OK) { + // Error string is already set. + return RMW_RET_ERROR; + } + + rmw_gid_t * sender_gid = &message_info->publisher_gid; + sender_gid->implementation_identifier = rti_connext_identifier; + memset(sender_gid->data, 0, RMW_GID_STORAGE_SIZE); + auto detail = reinterpret_cast(sender_gid->data); + detail->publication_handle = sending_publication_handle; + + return RMW_RET_OK; +} } // extern "C" diff --git a/rmw_connext_cpp/src/type_support_common.hpp b/rmw_connext_cpp/src/type_support_common.hpp index 06d90229..28dfd875 100644 --- a/rmw_connext_cpp/src/type_support_common.hpp +++ b/rmw_connext_cpp/src/type_support_common.hpp @@ -27,10 +27,10 @@ #include "rosidl_typesupport_connext_cpp/message_type_support.h" #include "rosidl_typesupport_connext_cpp/service_type_support.h" -#define RMW_CONNEXT_EXTRACT_MESSAGE_TYPESUPPORT(TYPE_SUPPORTS, TYPE_SUPPORT) \ +#define RMW_CONNEXT_EXTRACT_MESSAGE_TYPESUPPORT(TYPE_SUPPORTS, TYPE_SUPPORT, RET_VAL) \ if (!TYPE_SUPPORTS) { \ RMW_SET_ERROR_MSG("type supports handle is null"); \ - return NULL; \ + return RET_VAL; \ } \ const rosidl_message_type_support_t * TYPE_SUPPORT = \ get_message_typesupport_handle( \ @@ -51,14 +51,14 @@ rosidl_typesupport_connext_c__identifier, \ static_cast(rosidl_typesupport_connext_c__identifier)); \ RMW_SET_ERROR_MSG(__msg); \ - return NULL; \ + return RET_VAL; \ } \ } -#define RMW_CONNEXT_EXTRACT_SERVICE_TYPESUPPORT(TYPE_SUPPORTS, TYPE_SUPPORT) \ +#define RMW_CONNEXT_EXTRACT_SERVICE_TYPESUPPORT(TYPE_SUPPORTS, TYPE_SUPPORT, RET_VAL) \ if (!TYPE_SUPPORTS) { \ RMW_SET_ERROR_MSG("type supports handle is null"); \ - return NULL; \ + return RET_VAL; \ } \ const rosidl_service_type_support_t * TYPE_SUPPORT = \ get_service_typesupport_handle( \ @@ -79,7 +79,7 @@ rosidl_typesupport_connext_c__identifier, \ static_cast(rosidl_typesupport_connext_c__identifier)); \ RMW_SET_ERROR_MSG(__msg); \ - return NULL; \ + return RET_VAL; \ } \ } diff --git a/rosidl_typesupport_connext_c/CMakeLists.txt b/rosidl_typesupport_connext_c/CMakeLists.txt index bb7b4b92..94914d9d 100644 --- a/rosidl_typesupport_connext_c/CMakeLists.txt +++ b/rosidl_typesupport_connext_c/CMakeLists.txt @@ -51,7 +51,10 @@ if(WIN32) PRIVATE "ROSIDL_TYPESUPPORT_CONNEXT_C_BUILDING_DLL") endif() target_include_directories(${PROJECT_NAME} PUBLIC include) -ament_target_dependencies(${PROJECT_NAME} "rosidl_typesupport_connext_cpp") +ament_target_dependencies( + ${PROJECT_NAME} + "Connext" + "rosidl_typesupport_connext_cpp") ament_export_libraries(${PROJECT_NAME}) ament_index_register_resource("rosidl_typesupport_c") diff --git a/rosidl_typesupport_connext_c/resource/msg__type_support_c.cpp.em b/rosidl_typesupport_connext_c/resource/msg__type_support_c.cpp.em index f3d3fdce..a1ae9ed9 100644 --- a/rosidl_typesupport_connext_c/resource/msg__type_support_c.cpp.em +++ b/rosidl_typesupport_connext_c/resource/msg__type_support_c.cpp.em @@ -28,6 +28,7 @@ #include "rosidl_typesupport_connext_c/identifier.h" // Provides the definition of the message_type_support_callbacks_t struct. #include "rosidl_typesupport_connext_cpp/message_type_support.h" +#include "rosidl_typesupport_connext_cpp/connext_static_cdr_stream.hpp" #include "@(pkg)/msg/rosidl_typesupport_connext_c__visibility_control.h" @{header_file_name = get_header_filename_from_msg_name(type)}@ @@ -43,6 +44,7 @@ # endif #endif #include "@(spec.base_type.pkg_name)/@(subfolder)/dds_connext/@(spec.base_type.type)_Support.h" +#include "@(spec.base_type.pkg_name)/@(subfolder)/dds_connext/@(spec.base_type.type)_Plugin.h" #ifndef _WIN32 # pragma GCC diagnostic pop #endif @@ -105,44 +107,10 @@ __dds_msg_type_prefix = "{0}::{1}::dds_::{2}_".format( using __dds_msg_type = @(__dds_msg_type_prefix); using __ros_msg_type = @(pkg)__@(subfolder)__@(type); -static bool -register_type(void * untyped_participant, const char * type_name) +static DDS_TypeCode * +get_type_code() { - if (!untyped_participant) { - fprintf(stderr, "untyped participant handle is null\n"); - return false; - } - if (!type_name) { - fprintf(stderr, "type name handle is null\n"); - return false; - } - DDSDomainParticipant * participant = static_cast(untyped_participant); - - DDS_ReturnCode_t status = - @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::register_type(participant, type_name); - switch (status) { - case DDS_RETCODE_ERROR: - fprintf(stderr, "@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::register_type: " - "an internal error has occurred\n"); - return false; - case DDS_RETCODE_BAD_PARAMETER: - fprintf(stderr, "@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::register_type: " - "bad domain participant or type name parameter\n"); - return false; - case DDS_RETCODE_OUT_OF_RESOURCES: - fprintf(stderr, "@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::register_type: " - "out of resources\n"); - return false; - case DDS_RETCODE_PRECONDITION_NOT_MET: - fprintf(stderr, "@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::register_type: " - "already registered with a different TypeSupport class\n"); - return false; - case DDS_RETCODE_OK: - return true; - default: - fprintf(stderr, "@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::register_type: unknown return code\n"); - } - return false; + return @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::get_typecode(); } static bool @@ -254,72 +222,6 @@ convert_ros_to_dds(const void * untyped_ros_message, void * untyped_dds_message) return true; } -static bool -publish(void * dds_data_writer, const void * untyped_ros_message) -{ - if (!dds_data_writer) { - fprintf(stderr, "data writer handle is null\n"); - return false; - } - const __ros_msg_type * ros_message = static_cast(untyped_ros_message); - if (!ros_message) { - fprintf(stderr, "ros message handle is null\n"); - return false; - } - - DDSDataWriter * topic_writer = static_cast(dds_data_writer); - - __dds_msg_type dds_message; - if (!convert_ros_to_dds(ros_message, &dds_message)) { - return false; - } - @(pkg)::@(subfolder)::dds_::@(type)_DataWriter * data_writer = - @(pkg)::@(subfolder)::dds_::@(type)_DataWriter::narrow(topic_writer); - if (!data_writer) { - fprintf(stderr, "failed to narrow data writer\n"); - return false; - } - DDS_ReturnCode_t status = data_writer->write(dds_message, DDS_HANDLE_NIL); - - switch (status) { - case DDS_RETCODE_ERROR: - fprintf(stderr, "@(pkg)::@(subfolder)::dds_::@(type)_DataWriter.write: " - "an internal error has occurred\n"); - return false; - case DDS_RETCODE_BAD_PARAMETER: - fprintf(stderr, "@(pkg)::@(subfolder)::dds_::@(type)_DataWriter.write: " - "bad handle or instance_data parameter\n"); - return false; - case DDS_RETCODE_ALREADY_DELETED: - fprintf(stderr, "@(pkg)::@(subfolder)::dds_::@(type)_DataWriter.write: " - "this @(__dds_msg_type_prefix)DataWriter has already been deleted\n"); - return false; - case DDS_RETCODE_OUT_OF_RESOURCES: - fprintf(stderr, "@(pkg)::@(subfolder)::dds_::@(type)_DataWriter.write: " - "out of resources\n"); - return false; - case DDS_RETCODE_NOT_ENABLED: - fprintf(stderr, "@(pkg)::@(subfolder)::dds_::@(type)_DataWriter.write: " - "this @(__dds_msg_type_prefix)DataWriter is not enabled\n"); - return false; - case DDS_RETCODE_PRECONDITION_NOT_MET: - fprintf(stderr, "@(pkg)::@(subfolder)::dds_::@(type)_DataWriter.write: " - "the handle has not been registered with this @(__dds_msg_type_prefix)DataWriter\n"); - return false; - case DDS_RETCODE_TIMEOUT: - fprintf(stderr, "@(pkg)::@(subfolder)::dds_::@(type)_DataWriter.write: " - "writing resulted in blocking and then exceeded the timeout set by the " - "max_blocking_time of the ReliabilityQosPolicy\n"); - return false; - case DDS_RETCODE_OK: - return true; - default: - fprintf(stderr, "@(pkg)::@(subfolder)::dds_::@(type)_DataWriter.write: " - "unknown return code\n"); - } - return false; -} - static bool convert_dds_to_ros(const void * untyped_dds_message, void * untyped_ros_message) { @@ -419,158 +321,98 @@ else: return true; } + static bool -take( - void * dds_data_reader, - bool ignore_local_publications, - void * untyped_ros_message, - bool * taken, - void * sending_publication_handle) +to_cdr_stream( + const void * untyped_ros_message, + ConnextStaticCDRStream * cdr_stream) { - if (untyped_ros_message == 0) { - fprintf(stderr, "invalid ros message pointer\n"); + if (!untyped_ros_message) { return false; } - - DDSDataReader * topic_reader = static_cast(dds_data_reader); - - @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataReader * data_reader = - @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataReader::narrow(topic_reader); - if (!data_reader) { - fprintf(stderr, "failed to narrow data reader\n"); + if (!cdr_stream) { return false; } - - @(__dds_msg_type_prefix)Seq dds_messages; - DDS_SampleInfoSeq sample_infos; - DDS_ReturnCode_t status = data_reader->take( - dds_messages, - sample_infos, - 1, - DDS_ANY_SAMPLE_STATE, - DDS_ANY_VIEW_STATE, - DDS_ANY_INSTANCE_STATE); - - bool ignore_sample = false; - - switch (status) { - case DDS_RETCODE_ERROR: - fprintf(stderr, "@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataReader.take: " - "an internal error has occurred\n"); - goto finally; - case DDS_RETCODE_ALREADY_DELETED: - fprintf(stderr, "@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataReader.take: " - "this @(__dds_msg_type_prefix)DataReader has already been deleted\n"); - goto finally; - case DDS_RETCODE_OUT_OF_RESOURCES: - fprintf(stderr, "@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataReader.take: " - "out of resources\n"); - goto finally; - case DDS_RETCODE_NOT_ENABLED: - fprintf(stderr, "@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataReader.take: " - "this @(__dds_msg_type_prefix)DataReader is not enabled\n"); - goto finally; - case DDS_RETCODE_PRECONDITION_NOT_MET: - fprintf(stderr, "@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataReader.take: " - "a precondition is not met, one of: " - "max_samples > maximum and max_samples != LENGTH_UNLIMITED, or " - "the two sequences do not have matching parameters (length, maximum, release), or " - "maximum > 0 and release is false.\n"); - goto finally; - case DDS_RETCODE_NO_DATA: - *taken = false; - goto finally; - case DDS_RETCODE_OK: - break; - default: - fprintf(stderr, "@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataReader.take: " - "unknown return code\n"); - goto finally; + const __ros_msg_type * ros_message = static_cast(untyped_ros_message); + __dds_msg_type dds_message; + if (!convert_ros_to_dds(ros_message, &dds_message)) { + return false; } + // call the serialize function for the first time to get the expected length of the message + unsigned int expected_length; + if (@(spec.base_type.type)_Plugin_serialize_to_cdr_buffer( + NULL, &expected_length, &dds_message) != RTI_TRUE) { - DDS_SampleInfo & sample_info = sample_infos[0]; - if (!sample_info.valid_data) { - // skip sample without data - ignore_sample = true; - } else if (ignore_local_publications) { - // compare the lower 12 octets of the guids from the sender and this receiver - // if they are equal the sample has been sent from this process and should be ignored - DDS_GUID_t sender_guid = sample_info.original_publication_virtual_guid; - DDS_InstanceHandle_t receiver_instance_handle = topic_reader->get_instance_handle(); - ignore_sample = true; - for (size_t i = 0; i < 12; ++i) { - DDS_Octet * sender_element = &(sender_guid.value[i]); - DDS_Octet * receiver_element = &(reinterpret_cast(&receiver_instance_handle)[i]); - if (*sender_element != *receiver_element) { - ignore_sample = false; - break; - } - } - // This is nullptr when being used with plain rmw_take, so check first. - if (sending_publication_handle) { - *static_cast(sending_publication_handle) = - sample_info.publication_handle; - } - } + fprintf(stderr, "failed to call @(spec.base_type.type)_Plugin_serialize_to_cdr_buffer()\n"); + return false; } - - if (!ignore_sample) { - if (!convert_dds_to_ros(&dds_messages[0], untyped_ros_message)) { - goto finally; - } - *taken = true; - } else { - *taken = false; + cdr_stream->buffer_length = expected_length; + if (cdr_stream->buffer_length > (std::numeric_limits::max)()) { + fprintf(stderr, "cdr_stream->buffer_length, unexpectedly larger than max unsigned int\n"); + return false; + } + if (cdr_stream->buffer_capacity < cdr_stream->buffer_length) { + cdr_stream->allocator.deallocate(cdr_stream->buffer, cdr_stream->allocator.state); + cdr_stream->buffer = static_cast(cdr_stream->allocator.allocate(cdr_stream->buffer_length, cdr_stream->allocator.state)); + } + // call the function again and fill the buffer this time + unsigned int buffer_length_uint = static_cast(cdr_stream->buffer_length); + if (@(spec.base_type.type)_Plugin_serialize_to_cdr_buffer( + cdr_stream->buffer, + &buffer_length_uint, + &dds_message) != RTI_TRUE) + { + return false; } -finally: - // Ensure the loan is returned. - status = data_reader->return_loan(dds_messages, sample_infos); - switch (status) { - case DDS_RETCODE_ERROR: - fprintf(stderr, "@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataReader.return_loan: " - "an internal error has occurred\n"); - return false; - case DDS_RETCODE_ALREADY_DELETED: - fprintf(stderr, "@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataReader.return_loan: " - "this @(__dds_msg_type_prefix)DataReader has already been deleted\n"); - return false; - case DDS_RETCODE_OUT_OF_RESOURCES: - fprintf(stderr, "@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataReader.return_loan: " - "out of resources\n"); - return false; - case DDS_RETCODE_NOT_ENABLED: - fprintf(stderr, "@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataReader.return_loan: " - "this @(__dds_msg_type_prefix)DataReader is not enabled\n"); - return false; - case DDS_RETCODE_PRECONDITION_NOT_MET: - fprintf(stderr, "@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataReader.return_loan: " - "a precondition is not met, one of: " - "the data_values and info_seq do not belong to a single related pair, or " - "the data_values and info_seq were not obtained from this " - "@(__dds_msg_type_prefix)DataReader\n"); - return false; - case DDS_RETCODE_OK: - return true; - default: - fprintf(stderr, "@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataReader.return_loan: " - "unknown return code\n"); + return true; +} + +static bool +to_message( + const ConnextStaticCDRStream * cdr_stream, + void * untyped_ros_message) +{ + if (!cdr_stream) { + return false; + } + if (!untyped_ros_message) { + return false; } - return false; + @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_ * dds_message = + @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::create_data(); + if (cdr_stream->buffer_length > (std::numeric_limits::max)()) { + fprintf(stderr, "cdr_stream->buffer_length, unexpectedly larger than max unsigned int\n"); + return false; + } + if (@(spec.base_type.type)_Plugin_deserialize_from_cdr_buffer( + dds_message, + cdr_stream->buffer, + static_cast(cdr_stream->buffer_length)) != RTI_TRUE) + { + fprintf(stderr, "deserialize from cdr buffer failed\n"); + return false; + } + bool success = convert_dds_to_ros(dds_message, untyped_ros_message); + if (@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::delete_data(dds_message) != DDS_RETCODE_OK) { + return false; + } + return success; } + @ @# // Collect the callback functions and provide a function to get the type support struct. static message_type_support_callbacks_t __callbacks = { "@(pkg)", // package_name "@(msg)", // message_name - register_type, // register_type - publish, // publish - take, // take + get_type_code, // get_type_code convert_ros_to_dds, // convert_ros_to_dds convert_dds_to_ros, // convert_dds_to_ros + to_cdr_stream, // to_cdr_stream + to_message // to_message }; static rosidl_message_type_support_t __type_support = { diff --git a/rosidl_typesupport_connext_cpp/RawData.idl b/rosidl_typesupport_connext_cpp/RawData.idl new file mode 100644 index 00000000..da9f7131 --- /dev/null +++ b/rosidl_typesupport_connext_cpp/RawData.idl @@ -0,0 +1,7 @@ +const long KEY_HASH_LENGTH_16 = 16; + +struct RawData { + @key octet key_hash[KEY_HASH_LENGTH_16]; + sequence key_data; + sequence raw_data; +}; diff --git a/rosidl_typesupport_connext_cpp/include/rosidl_typesupport_connext_cpp/connext_static_cdr_stream.hpp b/rosidl_typesupport_connext_cpp/include/rosidl_typesupport_connext_cpp/connext_static_cdr_stream.hpp new file mode 100644 index 00000000..da719a40 --- /dev/null +++ b/rosidl_typesupport_connext_cpp/include/rosidl_typesupport_connext_cpp/connext_static_cdr_stream.hpp @@ -0,0 +1,33 @@ +// Copyright 2017 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSIDL_TYPESUPPORT_CONNEXT_CPP__CONNEXT_STATIC_CDR_STREAM_HPP_ +#define ROSIDL_TYPESUPPORT_CONNEXT_CPP__CONNEXT_STATIC_CDR_STREAM_HPP_ + +#include "rcutils/allocator.h" + +extern "C" +{ +// TODO(karsten1987): Join rmw_serialized_message_t and this ConnextStaticCDRStream struct +// inside rcutils to have only one implementation of it. +typedef struct ConnextStaticCDRStream +{ + char * buffer = NULL; + size_t buffer_length = 0; + size_t buffer_capacity = 0; + rcutils_allocator_t allocator; +} ConnextStaticCDRStream; +} // extern "C" + +#endif // ROSIDL_TYPESUPPORT_CONNEXT_CPP__CONNEXT_STATIC_CDR_STREAM_HPP_ diff --git a/rosidl_typesupport_connext_cpp/include/rosidl_typesupport_connext_cpp/message_type_support.h b/rosidl_typesupport_connext_cpp/include/rosidl_typesupport_connext_cpp/message_type_support.h index 10208f96..94fee623 100644 --- a/rosidl_typesupport_connext_cpp/include/rosidl_typesupport_connext_cpp/message_type_support.h +++ b/rosidl_typesupport_connext_cpp/include/rosidl_typesupport_connext_cpp/message_type_support.h @@ -17,24 +17,31 @@ #include "rosidl_generator_c/message_type_support_struct.h" +#include "rosidl_typesupport_connext_cpp/connext_static_cdr_stream.hpp" + +// forward declare DDS_TypeCode +struct DDS_TypeCode; + typedef struct message_type_support_callbacks_t { const char * package_name; const char * message_name; // Function to register type with given dds_participant - bool (* register_type)(void * dds_participant, const char * type_name); - // Function to publish a ROS message with a given DDS data_writer - bool (* publish)(void * dds_data_writer, const void * ros_message); - // Function to take a ROS message from a dds data reader - bool (* take)( - void * dds_data_reader, bool ignore_local_publications, void * ros_message, bool * taken, - void * sending_publication_handle); + DDS_TypeCode * (*get_type_code)(void); bool (* convert_ros_to_dds)( const void * untyped_ros_message, void * untyped_data_message); bool (* convert_dds_to_ros)( const void * untyped_data_message, void * untyped_ros_message); + // Function to serialize a ROS message to a CDR stream + bool (* to_cdr_stream)( + const void * untyped_ros_message, + ConnextStaticCDRStream * cdr_stream); + // Function to deserialize a CDR message to a ROS message + bool (* to_message)( + const ConnextStaticCDRStream * cdr_stream, + void * untyped_ros_message); } message_type_support_callbacks_t; #endif // ROSIDL_TYPESUPPORT_CONNEXT_CPP__MESSAGE_TYPE_SUPPORT_H_ diff --git a/rosidl_typesupport_connext_cpp/resource/msg__rosidl_typesupport_connext_cpp.hpp.em b/rosidl_typesupport_connext_cpp/resource/msg__rosidl_typesupport_connext_cpp.hpp.em index 6b95e9ed..c5047635 100644 --- a/rosidl_typesupport_connext_cpp/resource/msg__rosidl_typesupport_connext_cpp.hpp.em +++ b/rosidl_typesupport_connext_cpp/resource/msg__rosidl_typesupport_connext_cpp.hpp.em @@ -39,10 +39,16 @@ header_guard_variable = '__'.join([x.upper() for x in header_guard_parts]) + '_' # endif #endif #include "@(spec.base_type.pkg_name)/@(subfolder)/dds_connext/@(spec.base_type.type)_Support.h" +#include "@(spec.base_type.pkg_name)/@(subfolder)/dds_connext/@(spec.base_type.type)_Plugin.h" + +#include "ndds/ndds_cpp.h" #ifndef _WIN32 # pragma GCC diagnostic pop #endif +// forward declaration of internal CDR Stream +struct ConnextStaticCDRStream; + // forward declaration of DDS types class DDSDomainParticipant; class DDSDataWriter; @@ -57,10 +63,8 @@ namespace @(subfolder) namespace typesupport_connext_cpp { -bool -register_type__@(spec.base_type.type)( - DDSDomainParticipant * participant, - const char * type_name); +DDS_TypeCode * +get_type_code__@(spec.base_type.type)(); bool ROSIDL_TYPESUPPORT_CONNEXT_CPP_PUBLIC_@(spec.base_type.pkg_name) @@ -68,11 +72,6 @@ convert_ros_message_to_dds( const @(spec.base_type.pkg_name)::@(subfolder)::@(spec.base_type.type) & ros_message, @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_ & dds_message); -bool -publish__@(spec.base_type.type)( - DDSDataWriter * topic_writer, - const void * untyped_ros_message); - bool ROSIDL_TYPESUPPORT_CONNEXT_CPP_PUBLIC_@(spec.base_type.pkg_name) convert_dds_message_to_ros( @@ -80,11 +79,14 @@ convert_dds_message_to_ros( @(spec.base_type.pkg_name)::@(subfolder)::@(spec.base_type.type) & ros_message); bool -take__@(spec.base_type.type)( - DDSDataReader * topic_reader, - bool ignore_local_publications, - void * untyped_ros_message, - bool * taken); +to_cdr_stream__@(spec.base_type.type)( + const void * untyped_ros_message, + ConnextStaticCDRStream * cdr_stream); + +bool +to_message__@(spec.base_type.type)( + const ConnextStaticCDRStream * cdr_stream, + void * untyped_ros_message); } // namespace typesupport_connext_cpp diff --git a/rosidl_typesupport_connext_cpp/resource/msg__type_support.cpp.em b/rosidl_typesupport_connext_cpp/resource/msg__type_support.cpp.em index 85c8fbfa..8c86fa4e 100644 --- a/rosidl_typesupport_connext_cpp/resource/msg__type_support.cpp.em +++ b/rosidl_typesupport_connext_cpp/resource/msg__type_support.cpp.em @@ -20,6 +20,7 @@ #include "rosidl_typesupport_cpp/message_type_support.hpp" +#include "rosidl_typesupport_connext_cpp/connext_static_cdr_stream.hpp" #include "rosidl_typesupport_connext_cpp/identifier.hpp" #include "rosidl_typesupport_connext_cpp/message_type_support.h" #include "rosidl_typesupport_connext_cpp/message_type_support_decl.hpp" @@ -59,16 +60,10 @@ namespace @(subfolder) namespace typesupport_connext_cpp { -bool -register_type__@(spec.base_type.type)( - void * untyped_participant, - const char * type_name) +DDS_TypeCode * +get_type_code__@(spec.base_type.type)() { - DDSDomainParticipant * participant = static_cast(untyped_participant); - DDS_ReturnCode_t status = - @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::register_type(participant, type_name); - - return status == DDS_RETCODE_OK; + return @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::get_typecode(); } bool @@ -146,41 +141,6 @@ convert_ros_message_to_dds( return true; } -bool -publish__@(spec.base_type.type)( - void * untyped_topic_writer, - const void * untyped_ros_message) -{ - DDSDataWriter * topic_writer = static_cast(untyped_topic_writer); - - const @(spec.base_type.pkg_name)::@(subfolder)::@(spec.base_type.type) & ros_message = - *(const @(spec.base_type.pkg_name)::@(subfolder)::@(spec.base_type.type) *)untyped_ros_message; - @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_ * dds_message = - @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::create_data(); - if (!dds_message) { - return false; - } - - bool success = convert_ros_message_to_dds(ros_message, *dds_message); - if (success) { - @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataWriter * data_writer = - @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataWriter::narrow(topic_writer); - if (!data_writer) { - fprintf(stderr, "failed to narrow data writer\n"); - success = false; - } else { - DDS_ReturnCode_t status = data_writer->write(*dds_message, DDS_HANDLE_NIL); - success = status == DDS_RETCODE_OK; - } - } - - DDS_ReturnCode_t status = - @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::delete_data(dds_message); - success &= status == DDS_RETCODE_OK; - - return success; -} - bool convert_dds_message_to_ros( const @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_ & dds_message, @@ -233,99 +193,113 @@ convert_dds_message_to_ros( } bool -take__@(spec.base_type.type)( - void * untyped_topic_reader, - bool ignore_local_publications, - void * untyped_ros_message, - bool * taken, - void * sending_publication_handle) +to_cdr_stream__@(spec.base_type.type)( + const void * untyped_ros_message, + ConnextStaticCDRStream * cdr_stream) { - if (!untyped_topic_reader) { - throw std::runtime_error("topic reader handle is null"); + if (!cdr_stream) { + return false; } if (!untyped_ros_message) { - throw std::runtime_error("ros message handle is null"); - } - if (!taken) { - throw std::runtime_error("taken handle is null"); + return false; } - DDSDataReader * topic_reader = static_cast(untyped_topic_reader); + // cast the untyped to the known ros message + const @(spec.base_type.pkg_name)::@(subfolder)::@(spec.base_type.type) & ros_message = + *(const @(spec.base_type.pkg_name)::@(subfolder)::@(spec.base_type.type) *)untyped_ros_message; - @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataReader * data_reader = - @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataReader::narrow(topic_reader); - if (!data_reader) { - fprintf(stderr, "failed to narrow data reader\n"); + // create a respective connext dds type + @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_ * dds_message = @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::create_data(); + if (!dds_message) { return false; } - @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_Seq dds_messages; - DDS_SampleInfoSeq sample_infos; - DDS_ReturnCode_t status = data_reader->take( - dds_messages, - sample_infos, - 1, - DDS_ANY_SAMPLE_STATE, - DDS_ANY_VIEW_STATE, - DDS_ANY_INSTANCE_STATE); - if (status == DDS_RETCODE_NO_DATA) { - *taken = false; - return true; + // convert ros to dds + if (!convert_ros_message_to_dds(ros_message, *dds_message)) { + return false; } - if (status != DDS_RETCODE_OK) { - fprintf(stderr, "take failed with status = %d\n", status); + + // call the serialize function for the first time to get the expected length of the message + unsigned int expected_length; + if (@(spec.base_type.type)_Plugin_serialize_to_cdr_buffer( + NULL, + &expected_length, + dds_message) != RTI_TRUE) + { + fprintf(stderr, "failed to call @(spec.base_type.type)_Plugin_serialize_to_cdr_buffer()\n"); return false; } + cdr_stream->buffer_length = expected_length; + if (cdr_stream->buffer_length > (std::numeric_limits::max)()) { + fprintf(stderr, "cdr_stream->buffer_length, unexpectedly larger than max unsigned int\n"); + return false; + } + if (cdr_stream->buffer_capacity < cdr_stream->buffer_length) { + cdr_stream->allocator.deallocate(cdr_stream->buffer, cdr_stream->allocator.state); + cdr_stream->buffer = static_cast(cdr_stream->allocator.allocate(cdr_stream->buffer_length, cdr_stream->allocator.state)); + } + // call the function again and fill the buffer this time + unsigned int buffer_length_uint = static_cast(cdr_stream->buffer_length); + if (@(spec.base_type.type)_Plugin_serialize_to_cdr_buffer( + cdr_stream->buffer, + &buffer_length_uint, + dds_message) != RTI_TRUE) + { + return false; + } + if (@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::delete_data(dds_message) != DDS_RETCODE_OK) { + return false; + } + return true; +} - bool ignore_sample = false; - DDS_SampleInfo & sample_info = sample_infos[0]; - if (!sample_info.valid_data) { - // skip sample without data - ignore_sample = true; - } else if (ignore_local_publications) { - // compare the lower 12 octets of the guids from the sender and this receiver - // if they are equal the sample has been sent from this process and should be ignored - DDS_GUID_t sender_guid = sample_info.original_publication_virtual_guid; - DDS_InstanceHandle_t receiver_instance_handle = topic_reader->get_instance_handle(); - ignore_sample = true; - for (size_t i = 0; i < 12; ++i) { - DDS_Octet * sender_element = &(sender_guid.value[i]); - DDS_Octet * receiver_element = &(reinterpret_cast(&receiver_instance_handle)[i]); - if (*sender_element != *receiver_element) { - ignore_sample = false; - break; - } - } +bool +to_message__@(spec.base_type.type)( + const ConnextStaticCDRStream * cdr_stream, + void * untyped_ros_message) +{ + if (!cdr_stream) { + return false; + } + if (!cdr_stream->buffer) { + fprintf(stderr, "cdr stream doesn't contain data\n"); } - if (sample_info.valid_data && sending_publication_handle) { - *static_cast(sending_publication_handle) = - sample_info.publication_handle; + if (!untyped_ros_message) { + return false; } - bool success = true; - if (!ignore_sample) { - @(spec.base_type.pkg_name)::@(subfolder)::@(spec.base_type.type) & ros_message = - *(@(spec.base_type.pkg_name)::@(subfolder)::@(spec.base_type.type) *)untyped_ros_message; - success = convert_dds_message_to_ros(dds_messages[0], ros_message); - if (success) { - *taken = true; - } - } else { - *taken = false; + @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_ * dds_message = + @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::create_data(); + if (cdr_stream->buffer_length > (std::numeric_limits::max)()) { + fprintf(stderr, "cdr_stream->buffer_length, unexpectedly larger than max unsigned int\n"); + return false; + } + if (@(spec.base_type.type)_Plugin_deserialize_from_cdr_buffer( + dds_message, + cdr_stream->buffer, + static_cast(cdr_stream->buffer_length)) != RTI_TRUE) + { + fprintf(stderr, "deserialize from cdr buffer failed\n"); + return false; } - data_reader->return_loan(dds_messages, sample_infos); + @(spec.base_type.pkg_name)::@(subfolder)::@(spec.base_type.type) & ros_message = + *(@(spec.base_type.pkg_name)::@(subfolder)::@(spec.base_type.type) *)untyped_ros_message; + bool success = convert_dds_message_to_ros(*dds_message, ros_message); + if (@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::delete_data(dds_message) != DDS_RETCODE_OK) { + return false; + } return success; } static message_type_support_callbacks_t callbacks = { "@(spec.base_type.pkg_name)", "@(spec.base_type.type)", - ®ister_type__@(spec.base_type.type), - &publish__@(spec.base_type.type), - &take__@(spec.base_type.type), + &get_type_code__@(spec.base_type.type), + nullptr, nullptr, - nullptr + &to_cdr_stream__@(spec.base_type.type), + &to_message__@(spec.base_type.type) }; static rosidl_message_type_support_t handle = {