From e83132ca2e402ac6643e13853b5a4806d634db3d Mon Sep 17 00:00:00 2001 From: Karsten Knese Date: Tue, 14 Nov 2017 15:29:48 -0800 Subject: [PATCH] WIP publish and subscribe raw --- .../connext_static_client_info.hpp | 1 + .../connext_static_publisher_info.hpp | 1 + .../connext_static_service_info.hpp | 1 + .../connext_static_subscriber_info.hpp | 1 + rmw_connext_cpp/src/rmw_client.cpp | 8 + rmw_connext_cpp/src/rmw_publish.cpp | 33 ++- rmw_connext_cpp/src/rmw_publisher.cpp | 26 ++- rmw_connext_cpp/src/rmw_request.cpp | 2 +- rmw_connext_cpp/src/rmw_service.cpp | 27 ++- rmw_connext_cpp/src/rmw_subscription.cpp | 27 ++- rmw_connext_cpp/src/rmw_take.cpp | 116 +++++++++- .../resource/msg__type_support_c.cpp.em | 38 +++- .../connext_static_cdr_stream.hpp | 27 +++ .../message_type_support.h | 14 +- ...msg__rosidl_typesupport_connext_cpp.hpp.em | 16 +- .../resource/msg__type_support.cpp.em | 146 +++++++----- .../__init__.py | 214 +++++++++++++----- 17 files changed, 549 insertions(+), 149 deletions(-) create mode 100644 rosidl_typesupport_connext_cpp/include/rosidl_typesupport_connext_cpp/connext_static_cdr_stream.hpp diff --git a/rmw_connext_cpp/include/rmw_connext_cpp/connext_static_client_info.hpp b/rmw_connext_cpp/include/rmw_connext_cpp/connext_static_client_info.hpp index 3fc8728d..e282d0c3 100644 --- a/rmw_connext_cpp/include/rmw_connext_cpp/connext_static_client_info.hpp +++ b/rmw_connext_cpp/include/rmw_connext_cpp/connext_static_client_info.hpp @@ -27,6 +27,7 @@ struct ConnextStaticClientInfo DDSDataReader * response_datareader_; DDSReadCondition * read_condition_; const service_type_support_callbacks_t * callbacks_; + bool raw_stream_subscriber = true; }; } // extern "C" diff --git a/rmw_connext_cpp/include/rmw_connext_cpp/connext_static_publisher_info.hpp b/rmw_connext_cpp/include/rmw_connext_cpp/connext_static_publisher_info.hpp index 11ce127c..b6f6185c 100644 --- a/rmw_connext_cpp/include/rmw_connext_cpp/connext_static_publisher_info.hpp +++ b/rmw_connext_cpp/include/rmw_connext_cpp/connext_static_publisher_info.hpp @@ -27,6 +27,7 @@ struct ConnextStaticPublisherInfo DDSDataWriter * topic_writer_; const message_type_support_callbacks_t * callbacks_; rmw_gid_t publisher_gid; + bool raw_stream_publisher = true; }; } // extern "C" diff --git a/rmw_connext_cpp/include/rmw_connext_cpp/connext_static_service_info.hpp b/rmw_connext_cpp/include/rmw_connext_cpp/connext_static_service_info.hpp index b57e28bd..8621532f 100644 --- a/rmw_connext_cpp/include/rmw_connext_cpp/connext_static_service_info.hpp +++ b/rmw_connext_cpp/include/rmw_connext_cpp/connext_static_service_info.hpp @@ -27,6 +27,7 @@ struct ConnextStaticServiceInfo DDSDataReader * request_datareader_; DDSReadCondition * read_condition_; const service_type_support_callbacks_t * callbacks_; + bool raw_stream_subscriber = true; }; } // extern "C" diff --git a/rmw_connext_cpp/include/rmw_connext_cpp/connext_static_subscriber_info.hpp b/rmw_connext_cpp/include/rmw_connext_cpp/connext_static_subscriber_info.hpp index c513f384..2b2702a6 100644 --- a/rmw_connext_cpp/include/rmw_connext_cpp/connext_static_subscriber_info.hpp +++ b/rmw_connext_cpp/include/rmw_connext_cpp/connext_static_subscriber_info.hpp @@ -28,6 +28,7 @@ struct ConnextStaticSubscriberInfo DDSReadCondition * read_condition_; bool ignore_local_publications; const message_type_support_callbacks_t * callbacks_; + bool raw_stream_subscriber = true; }; } // extern "C" diff --git a/rmw_connext_cpp/src/rmw_client.cpp b/rmw_connext_cpp/src/rmw_client.cpp index b3eda177..bb35c9f5 100644 --- a/rmw_connext_cpp/src/rmw_client.cpp +++ b/rmw_connext_cpp/src/rmw_client.cpp @@ -78,6 +78,7 @@ rmw_create_client( DDS_SubscriberQos subscriber_qos; DDS_ReturnCode_t status; DDS_PublisherQos publisher_qos; + DDS_TypeSupportQosPolicy ts; DDS_DataReaderQos datareader_qos; DDS_DataWriterQos datawriter_qos; DDS::Publisher * dds_publisher = nullptr; @@ -123,6 +124,13 @@ rmw_create_client( { goto fail; } + // Set the plugin typesupport to the connext info + // If this is true, a raw CDR Stream is enabled + //ts.plugin_data = &client_info->raw_stream_subscriber; + //ts.cdr_padding_kind = DDS_AUTO_CDR_PADDING; + //datareader_qos.type_support = ts; + //datawriter_qos.type_support = ts; + (void) ts; // TODO(karsten1987): For now, I'll expose the datawriter // to access the respective DDSPublisher object. diff --git a/rmw_connext_cpp/src/rmw_publish.cpp b/rmw_connext_cpp/src/rmw_publish.cpp index 304991c0..8f0b8fa3 100644 --- a/rmw_connext_cpp/src/rmw_publish.cpp +++ b/rmw_connext_cpp/src/rmw_publish.cpp @@ -56,13 +56,27 @@ rmw_publish(const rmw_publisher_t * publisher, const void * ros_message) return RMW_RET_ERROR; } - ConnextStaticMessageHandle message_handle; - message_handle.untyped_ros_message = ros_message; - bool published = callbacks->publish(topic_writer, &message_handle); - if (!published) { + + ConnextStaticCDRStream * my_cdr = new ConnextStaticCDRStream(); + ConnextStaticCDRStream & cdr_stream = *my_cdr; + if (!callbacks->to_cdr_stream(ros_message, &cdr_stream)) { + RMW_SET_ERROR_MSG("failed to convert ros_message to cdr stream"); + return RMW_RET_ERROR; + } + fprintf(stderr, "successfully converted to cdr stream\n"); + if (cdr_stream.message_length == 0) { + RMW_SET_ERROR_MSG("no message length set"); + return RMW_RET_ERROR; + } + if (!cdr_stream.raw_message) { + RMW_SET_ERROR_MSG("no raw message attached"); + return RMW_RET_ERROR; + } + if (!callbacks->publish(topic_writer, &cdr_stream)) { RMW_SET_ERROR_MSG("failed to publish message"); return RMW_RET_ERROR; } + free(cdr_stream.raw_message); return RMW_RET_OK; } @@ -99,10 +113,13 @@ rmw_publish_raw(const rmw_publisher_t * publisher, const rmw_message_raw_t * raw return RMW_RET_ERROR; } - ConnextStaticMessageHandle message_handle; - message_handle.raw_message = raw_message->buffer; - message_handle.raw_message_length = &raw_message->buffer_length; - bool published = callbacks->publish(topic_writer, &message_handle); + ConnextStaticCDRStream cdr_stream; + cdr_stream.raw_message = DDS_String_dup(raw_message->buffer); + cdr_stream.message_length = static_cast(raw_message->buffer_length); + //ConnextStaticMessageHandle message_handle; + //message_handle.raw_message = raw_message->buffer; + //message_handle.raw_message_length = &raw_message->buffer_length; + bool published = callbacks->publish(topic_writer, &cdr_stream); if (!published) { RMW_SET_ERROR_MSG("failed to publish message"); return RMW_RET_ERROR; diff --git a/rmw_connext_cpp/src/rmw_publisher.cpp b/rmw_connext_cpp/src/rmw_publisher.cpp index 2530e77d..da43cd40 100644 --- a/rmw_connext_cpp/src/rmw_publisher.cpp +++ b/rmw_connext_cpp/src/rmw_publisher.cpp @@ -84,6 +84,7 @@ rmw_create_publisher( // Past this point, a failure results in unrolling code in the goto fail block. bool registered; DDS_DataWriterQos datawriter_qos; + DDS_TypeSupportQosPolicy ts; DDS_PublisherQos publisher_qos; DDS_ReturnCode_t status; DDSPublisher * dds_publisher = nullptr; @@ -174,27 +175,34 @@ rmw_create_publisher( DDS_String_free(topic_str); topic_str = nullptr; + // Allocate memory for the ConnextStaticPublisherInfo object. + buf = rmw_allocate(sizeof(ConnextStaticPublisherInfo)); + if (!buf) { + RMW_SET_ERROR_MSG("failed to allocate memory"); + goto fail; + } + // Use a placement new to construct the ConnextStaticPublisherInfo in the preallocated buffer. + RMW_TRY_PLACEMENT_NEW(publisher_info, buf, goto fail, ConnextStaticPublisherInfo, ) + buf = nullptr; // Only free the publisher_info pointer; don't need the buf pointer anymore. + if (!get_datawriter_qos(participant, *qos_profile, datawriter_qos)) { // error string was set within the function goto fail; } + // Set the plugin typesupport to the connext info + // If this is true, a raw CDR Stream is enabled + ts.plugin_data = &publisher_info->raw_stream_publisher; + ts.cdr_padding_kind = DDS_AUTO_CDR_PADDING; + datawriter_qos.type_support = ts; topic_writer = dds_publisher->create_datawriter( topic, datawriter_qos, NULL, DDS_STATUS_MASK_NONE); if (!topic_writer) { RMW_SET_ERROR_MSG("failed to create datawriter"); goto fail; } + fprintf(stderr, "created datawriter\n"); - // Allocate memory for the ConnextStaticPublisherInfo object. - buf = rmw_allocate(sizeof(ConnextStaticPublisherInfo)); - if (!buf) { - RMW_SET_ERROR_MSG("failed to allocate memory"); - goto fail; - } - // Use a placement new to construct the ConnextStaticPublisherInfo in the preallocated buffer. - RMW_TRY_PLACEMENT_NEW(publisher_info, buf, goto fail, ConnextStaticPublisherInfo, ) - buf = nullptr; // Only free the publisher_info pointer; don't need the buf pointer anymore. publisher_info->dds_publisher_ = dds_publisher; publisher_info->topic_writer_ = topic_writer; publisher_info->callbacks_ = callbacks; diff --git a/rmw_connext_cpp/src/rmw_request.cpp b/rmw_connext_cpp/src/rmw_request.cpp index 772d958e..d6e0de7b 100644 --- a/rmw_connext_cpp/src/rmw_request.cpp +++ b/rmw_connext_cpp/src/rmw_request.cpp @@ -111,7 +111,7 @@ rmw_take_request( } *taken = callbacks->take_request(replier, request_header, ros_request); - + fprintf(stderr, "Correctly taken the request\n"); return RMW_RET_OK; } } // extern "C" diff --git a/rmw_connext_cpp/src/rmw_service.cpp b/rmw_connext_cpp/src/rmw_service.cpp index 4fd3363b..3568d93c 100644 --- a/rmw_connext_cpp/src/rmw_service.cpp +++ b/rmw_connext_cpp/src/rmw_service.cpp @@ -75,6 +75,7 @@ rmw_create_service( } // Past this point, a failure results in unrolling code in the goto fail block. + DDS_TypeSupportQosPolicy ts; DDS_DataReaderQos datareader_qos; DDS_DataWriterQos datawriter_qos; DDS_SubscriberQos subscriber_qos; @@ -96,6 +97,15 @@ rmw_create_service( char * response_partition_str = nullptr; char * service_str = nullptr; + buf = rmw_allocate(sizeof(ConnextStaticServiceInfo)); + if (!buf) { + RMW_SET_ERROR_MSG("failed to allocate memory"); + goto fail; + } + // Use a placement new to construct the ConnextStaticServiceInfo in the preallocated buffer. + RMW_TRY_PLACEMENT_NEW(service_info, buf, goto fail, ConnextStaticServiceInfo, ) + buf = nullptr; // Only free the service_info pointer; don't need the buf pointer anymore. + // Begin initializing elements. service = rmw_service_allocate(); if (!service) { @@ -123,6 +133,13 @@ rmw_create_service( { goto fail; } + // Set the plugin typesupport to the connext info + // If this is true, a raw CDR Stream is enabled + //ts.plugin_data = &service_info->raw_stream_subscriber; + //ts.cdr_padding_kind = DDS_AUTO_CDR_PADDING; + //datareader_qos.type_support = ts; + //datawriter_qos.type_support = ts; + (void) ts; replier = callbacks->create_replier( participant, service_str, &datareader_qos, &datawriter_qos, @@ -131,6 +148,8 @@ rmw_create_service( &rmw_allocate); DDS_String_free(service_str); service_str = nullptr; + + fprintf(stderr, "created replier\n"); if (!replier) { RMW_SET_ERROR_MSG("failed to create replier"); goto fail; @@ -191,14 +210,6 @@ rmw_create_service( // update attached publisher dds_publisher->set_qos(publisher_qos); - buf = rmw_allocate(sizeof(ConnextStaticServiceInfo)); - if (!buf) { - RMW_SET_ERROR_MSG("failed to allocate memory"); - goto fail; - } - // Use a placement new to construct the ConnextStaticServiceInfo in the preallocated buffer. - RMW_TRY_PLACEMENT_NEW(service_info, buf, goto fail, ConnextStaticServiceInfo, ) - buf = nullptr; // Only free the service_info pointer; don't need the buf pointer anymore. service_info->replier_ = replier; service_info->callbacks_ = callbacks; service_info->request_datareader_ = request_datareader; diff --git a/rmw_connext_cpp/src/rmw_subscription.cpp b/rmw_connext_cpp/src/rmw_subscription.cpp index f3969e9b..7c5a44c2 100644 --- a/rmw_connext_cpp/src/rmw_subscription.cpp +++ b/rmw_connext_cpp/src/rmw_subscription.cpp @@ -79,6 +79,7 @@ rmw_create_subscription( // Past this point, a failure results in unrolling code in the goto fail block. bool registered; DDS_DataReaderQos datareader_qos; + DDS_TypeSupportQosPolicy ts; DDS_SubscriberQos subscriber_qos; DDS_ReturnCode_t status; DDSSubscriber * dds_subscriber = nullptr; @@ -170,11 +171,27 @@ rmw_create_subscription( DDS_String_free(topic_str); topic_str = nullptr; + // Allocate memory for the ConnextStaticSubscriberInfo object. + buf = rmw_allocate(sizeof(ConnextStaticSubscriberInfo)); + if (!buf) { + RMW_SET_ERROR_MSG("failed to allocate memory"); + goto fail; + } + // Use a placement new to construct the ConnextStaticSubscriberInfo in the preallocated buffer. + RMW_TRY_PLACEMENT_NEW(subscriber_info, buf, goto fail, ConnextStaticSubscriberInfo, ) + buf = nullptr; // Only free the subscriber_info pointer; don't need the buf pointer anymore. + if (!get_datareader_qos(participant, *qos_profile, datareader_qos)) { // error string was set within the function goto fail; } + // Set the plugin typesupport to the connext info + // If this is true, a raw CDR Stream is enabled + ts.plugin_data = &subscriber_info->raw_stream_subscriber; + ts.cdr_padding_kind = DDS_AUTO_CDR_PADDING; + datareader_qos.type_support = ts; + topic_reader = dds_subscriber->create_datareader( topic, datareader_qos, NULL, DDS_STATUS_MASK_NONE); @@ -182,6 +199,7 @@ rmw_create_subscription( RMW_SET_ERROR_MSG("failed to create datareader"); goto fail; } + fprintf(stderr, "data reader created\n"); read_condition = topic_reader->create_readcondition( DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE); @@ -190,15 +208,6 @@ rmw_create_subscription( goto fail; } - // Allocate memory for the ConnextStaticSubscriberInfo object. - buf = rmw_allocate(sizeof(ConnextStaticSubscriberInfo)); - if (!buf) { - RMW_SET_ERROR_MSG("failed to allocate memory"); - goto fail; - } - // Use a placement new to construct the ConnextStaticSubscriberInfo in the preallocated buffer. - RMW_TRY_PLACEMENT_NEW(subscriber_info, buf, goto fail, ConnextStaticSubscriberInfo, ) - buf = nullptr; // Only free the subscriber_info pointer; don't need the buf pointer anymore. subscriber_info->dds_subscriber_ = dds_subscriber; subscriber_info->topic_reader_ = topic_reader; subscriber_info->read_condition_ = read_condition; diff --git a/rmw_connext_cpp/src/rmw_take.cpp b/rmw_connext_cpp/src/rmw_take.cpp index fef16a6b..a3ffcb2b 100644 --- a/rmw_connext_cpp/src/rmw_take.cpp +++ b/rmw_connext_cpp/src/rmw_take.cpp @@ -21,8 +21,11 @@ #include "rmw_connext_cpp/identifier.hpp" #include "rmw_connext_cpp/connext_static_subscriber_info.hpp" +#include "rosidl_typesupport_connext_cpp/connext_static_cdr_stream.hpp" + extern "C" { + rmw_ret_t _take( const rmw_subscription_t * subscription, @@ -65,11 +68,22 @@ _take( return RMW_RET_ERROR; } - bool success = callbacks->take( - topic_reader, subscriber_info->ignore_local_publications, ros_message, taken, - sending_publication_handle); + // fetch the incoming message as cdr stream + ConnextStaticCDRStream cdr_stream; + if (!callbacks->take( + topic_reader, subscriber_info->ignore_local_publications, &cdr_stream, taken, + sending_publication_handle)) + { + RMW_SET_ERROR_MSG("error occured while taking message"); + return RMW_RET_ERROR; + } + // convert the cdr stream to the message + if (!callbacks->to_message(&cdr_stream, ros_message)) { + RMW_SET_ERROR_MSG("can't convert cdr stream to ros message"); + return RMW_RET_ERROR; + } - return success ? RMW_RET_OK : RMW_RET_ERROR; + return RMW_RET_OK; } rmw_ret_t @@ -104,4 +118,98 @@ rmw_take_with_info( return RMW_RET_OK; } + +rmw_ret_t +_take_raw( + const rmw_subscription_t * subscription, + rmw_message_raw_t * raw_message, + bool * taken, + DDS_InstanceHandle_t * sending_publication_handle) +{ + if (!subscription) { + RMW_SET_ERROR_MSG("subscription handle is null"); + return RMW_RET_ERROR; + } + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + subscription handle, + subscription->implementation_identifier, rti_connext_identifier, + return RMW_RET_ERROR) + + if (!raw_message) { + RMW_SET_ERROR_MSG("ros message handle is null"); + return RMW_RET_ERROR; + } + if (!taken) { + RMW_SET_ERROR_MSG("taken handle is null"); + return RMW_RET_ERROR; + } + + ConnextStaticSubscriberInfo * subscriber_info = + static_cast(subscription->data); + if (!subscriber_info) { + RMW_SET_ERROR_MSG("subscriber info handle is null"); + return RMW_RET_ERROR; + } + DDSDataReader * topic_reader = subscriber_info->topic_reader_; + if (!topic_reader) { + RMW_SET_ERROR_MSG("topic reader handle is null"); + return RMW_RET_ERROR; + } + const message_type_support_callbacks_t * callbacks = subscriber_info->callbacks_; + if (!callbacks) { + RMW_SET_ERROR_MSG("callbacks handle is null"); + return RMW_RET_ERROR; + } + + // fetch the incoming message as cdr stream + ConnextStaticCDRStream cdr_stream; + if (!callbacks->take( + topic_reader, subscriber_info->ignore_local_publications, &cdr_stream, taken, + sending_publication_handle)) + { + RMW_SET_ERROR_MSG("error occured while taking message"); + return RMW_RET_ERROR; + } + + raw_message->buffer_length = cdr_stream.message_length; + raw_message->buffer = cdr_stream.raw_message; + + return RMW_RET_OK; +} + +rmw_ret_t +rmw_take_raw( + const rmw_subscription_t * subscription, + rmw_message_raw_t * raw_message, + bool * taken) +{ + return _take_raw(subscription, raw_message, taken, nullptr); +} + +rmw_ret_t +rmw_take_raw_with_info( + const rmw_subscription_t * subscription, + rmw_message_raw_t * raw_message, + bool * taken, + rmw_message_info_t * message_info) +{ + if (!message_info) { + RMW_SET_ERROR_MSG("message info is null"); + return RMW_RET_ERROR; + } + DDS_InstanceHandle_t sending_publication_handle; + auto ret = _take_raw(subscription, raw_message, taken, &sending_publication_handle); + if (ret != RMW_RET_OK) { + // Error string is already set. + return RMW_RET_ERROR; + } + + rmw_gid_t * sender_gid = &message_info->publisher_gid; + sender_gid->implementation_identifier = rti_connext_identifier; + memset(sender_gid->data, 0, RMW_GID_STORAGE_SIZE); + auto detail = reinterpret_cast(sender_gid->data); + detail->publication_handle = sending_publication_handle; + + return RMW_RET_OK; +} } // extern "C" diff --git a/rosidl_typesupport_connext_c/resource/msg__type_support_c.cpp.em b/rosidl_typesupport_connext_c/resource/msg__type_support_c.cpp.em index 7e45b312..5dcc22d8 100644 --- a/rosidl_typesupport_connext_c/resource/msg__type_support_c.cpp.em +++ b/rosidl_typesupport_connext_c/resource/msg__type_support_c.cpp.em @@ -28,7 +28,7 @@ #include "rosidl_typesupport_connext_c/identifier.h" // Provides the definition of the message_type_support_callbacks_t struct. #include "rosidl_typesupport_connext_cpp/message_type_support.h" -#include "rosidl_typesupport_connext_cpp/connext_static_message_handle.hpp" +#include "rosidl_typesupport_connext_cpp/connext_static_cdr_stream.hpp" #include "@(pkg)/msg/rosidl_typesupport_connext_c__visibility_control.h" @{header_file_name = get_header_filename_from_msg_name(type)}@ @@ -44,6 +44,7 @@ # endif #endif #include "@(spec.base_type.pkg_name)/@(subfolder)/dds_connext/@(spec.base_type.type)_Support.h" +#include "@(spec.base_type.pkg_name)/@(subfolder)/dds_connext/@(spec.base_type.type)_Plugin.h" #ifndef _WIN32 # pragma GCC diagnostic pop #endif @@ -256,7 +257,7 @@ convert_ros_to_dds(const void * untyped_ros_message, void * untyped_dds_message) } static bool -publish(void * dds_data_writer, ConnextStaticMessageHandle* untyped_ros_messageFIX) +publish(void * dds_data_writer, ConnextStaticCDRStream * untyped_ros_messageFIX) { void *untyped_ros_message = (void *) untyped_ros_messageFIX; if (!dds_data_writer) { @@ -425,10 +426,11 @@ static bool take( void * dds_data_reader, bool ignore_local_publications, - void * untyped_ros_message, + ConnextStaticCDRStream * cdr_streamFIX, bool * taken, void * sending_publication_handle) { + void * untyped_ros_message = reinterpret_cast(cdr_streamFIX); if (untyped_ros_message == 0) { fprintf(stderr, "invalid ros message pointer\n"); return false; @@ -562,6 +564,34 @@ finally: return false; } + +static bool +to_cdr_stream( + const void * untyped_ros_message, + ConnextStaticCDRStream * cdr_stream) +{ + if (!untyped_ros_message) { + return false; + } + if (!cdr_stream) { + return false; + } + return true; +} + +static bool +to_message( + const ConnextStaticCDRStream * cdr_stream, + void * untyped_ros_message) +{ + if (!cdr_stream) { + return false; + } + if (!untyped_ros_message) { + return false; + } + return true; +} @ @# // Collect the callback functions and provide a function to get the type support struct. @@ -573,6 +603,8 @@ static message_type_support_callbacks_t __callbacks = { take, // take convert_ros_to_dds, // convert_ros_to_dds convert_dds_to_ros, // convert_dds_to_ros + to_cdr_stream, // to_cdr_stream + to_message // to_message }; static rosidl_message_type_support_t __type_support = { diff --git a/rosidl_typesupport_connext_cpp/include/rosidl_typesupport_connext_cpp/connext_static_cdr_stream.hpp b/rosidl_typesupport_connext_cpp/include/rosidl_typesupport_connext_cpp/connext_static_cdr_stream.hpp new file mode 100644 index 00000000..422e6e8f --- /dev/null +++ b/rosidl_typesupport_connext_cpp/include/rosidl_typesupport_connext_cpp/connext_static_cdr_stream.hpp @@ -0,0 +1,27 @@ +// Copyright 2017 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSIDL_TYPESUPPORT_CONNEXT_CPP__CONNEXT_STATIC_CDR_STREAM_HPP_ +#define ROSIDL_TYPESUPPORT_CONNEXT_CPP__CONNEXT_STATIC_CDR_STREAM_HPP_ + +extern "C" +{ +struct ConnextStaticCDRStream +{ + char * raw_message = nullptr; + unsigned int message_length = 0; +}; +} // extern "C" + +#endif // ROSIDL_TYPESUPPORT_CONNEXT_CPP__CONNEXT_STATIC_CDR_STREAM_HPP_ diff --git a/rosidl_typesupport_connext_cpp/include/rosidl_typesupport_connext_cpp/message_type_support.h b/rosidl_typesupport_connext_cpp/include/rosidl_typesupport_connext_cpp/message_type_support.h index a693b859..a52b400c 100644 --- a/rosidl_typesupport_connext_cpp/include/rosidl_typesupport_connext_cpp/message_type_support.h +++ b/rosidl_typesupport_connext_cpp/include/rosidl_typesupport_connext_cpp/message_type_support.h @@ -17,7 +17,7 @@ #include "rosidl_generator_c/message_type_support_struct.h" -#include "rosidl_typesupport_connext_cpp/connext_static_message_handle.hpp" +#include "rosidl_typesupport_connext_cpp/connext_static_cdr_stream.hpp" typedef struct message_type_support_callbacks_t { @@ -26,10 +26,10 @@ typedef struct message_type_support_callbacks_t // Function to register type with given dds_participant bool (* register_type)(void * dds_participant, const char * type_name); // Function to publish a ROS message with a given DDS data_writer - bool (* publish)(void * dds_data_writer, ConnextStaticMessageHandle * ros_message); + bool (* publish)(void * dds_data_writer, ConnextStaticCDRStream * cdr_stream); // Function to take a ROS message from a dds data reader bool (* take)( - void * dds_data_reader, bool ignore_local_publications, void * ros_message, bool * taken, + void * dds_data_reader, bool ignore_local_publications, ConnextStaticCDRStream * cdr_stream, bool * taken, void * sending_publication_handle); bool (* convert_ros_to_dds)( const void * untyped_ros_message, @@ -37,6 +37,14 @@ typedef struct message_type_support_callbacks_t bool (* convert_dds_to_ros)( const void * untyped_data_message, void * untyped_ros_message); + // Function to serialize a ROS message to a CDR stream + bool (* to_cdr_stream)( + const void * untyped_ros_message, + ConnextStaticCDRStream * cdr_stream); + // Function to deserialize a CDR message to a ROS message + bool (* to_message)( + const ConnextStaticCDRStream * cdr_stream, + void * untyped_ros_message); } message_type_support_callbacks_t; #endif // ROSIDL_TYPESUPPORT_CONNEXT_CPP__MESSAGE_TYPE_SUPPORT_H_ diff --git a/rosidl_typesupport_connext_cpp/resource/msg__rosidl_typesupport_connext_cpp.hpp.em b/rosidl_typesupport_connext_cpp/resource/msg__rosidl_typesupport_connext_cpp.hpp.em index 6b95e9ed..cd927fb3 100644 --- a/rosidl_typesupport_connext_cpp/resource/msg__rosidl_typesupport_connext_cpp.hpp.em +++ b/rosidl_typesupport_connext_cpp/resource/msg__rosidl_typesupport_connext_cpp.hpp.em @@ -39,10 +39,14 @@ header_guard_variable = '__'.join([x.upper() for x in header_guard_parts]) + '_' # endif #endif #include "@(spec.base_type.pkg_name)/@(subfolder)/dds_connext/@(spec.base_type.type)_Support.h" +#include "@(spec.base_type.pkg_name)/@(subfolder)/dds_connext/@(spec.base_type.type)_Plugin.h" #ifndef _WIN32 # pragma GCC diagnostic pop #endif +// forward declaration of internal CDR Stream +struct ConnextStaticCDRStream; + // forward declaration of DDS types class DDSDomainParticipant; class DDSDataWriter; @@ -83,9 +87,19 @@ bool take__@(spec.base_type.type)( DDSDataReader * topic_reader, bool ignore_local_publications, - void * untyped_ros_message, + ConnextStaticCDRStream * untyped_ros_message, bool * taken); +bool +to_cdr_stream__@(spec.base_type.type)( + const void * untyped_ros_message, + ConnextStaticCDRStream * cdr_stream); + +bool +to_message__@(spec.base_type.type)( + const ConnextStaticCDRStream * cdr_stream, + void * untyped_ros_message); + } // namespace typesupport_connext_cpp } // namespace @(subfolder) diff --git a/rosidl_typesupport_connext_cpp/resource/msg__type_support.cpp.em b/rosidl_typesupport_connext_cpp/resource/msg__type_support.cpp.em index 11f0596d..e8306eb9 100644 --- a/rosidl_typesupport_connext_cpp/resource/msg__type_support.cpp.em +++ b/rosidl_typesupport_connext_cpp/resource/msg__type_support.cpp.em @@ -20,6 +20,7 @@ #include "rosidl_typesupport_cpp/message_type_support.hpp" +#include "rosidl_typesupport_connext_cpp/connext_static_cdr_stream.hpp" #include "rosidl_typesupport_connext_cpp/identifier.hpp" #include "rosidl_typesupport_connext_cpp/message_type_support.h" #include "rosidl_typesupport_connext_cpp/message_type_support_decl.hpp" @@ -149,52 +150,21 @@ convert_ros_message_to_dds( bool publish__@(spec.base_type.type)( void * untyped_topic_writer, - ConnextStaticMessageHandle * message_handle) + ConnextStaticCDRStream * cdr_stream) { + bool success = true; DDSDataWriter * topic_writer = static_cast(untyped_topic_writer); - bool success = false; - //@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_ * sample = nullptr; - - // in case we have an untyped ros message (classic publish) - if (message_handle->untyped_ros_message) { - const @(spec.base_type.pkg_name)::@(subfolder)::@(spec.base_type.type) & ros_message = - *(const @(spec.base_type.pkg_name)::@(subfolder)::@(spec.base_type.type) *)message_handle->untyped_ros_message; - @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_ * dds_message = @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::create_data(); - message_handle->untyped_dds_message = (void *)dds_message; - if (!dds_message) { - return false; - } - success = convert_ros_message_to_dds(ros_message, *dds_message); - // in case we have a raw CDR message - // HACK only print data for now, this has no action to be done - } else if (message_handle->raw_message) { - fprintf(stderr, "connext publish callback\n"); - for (size_t i = 0; i < *(message_handle->raw_message_length); ++i) { - fprintf(stderr, "%02x ", message_handle->raw_message[i]); - } - fprintf(stderr, "\n"); - success = true; + @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataWriter * data_writer = @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataWriter::narrow(topic_writer); + if (!data_writer) { + fprintf(stderr, "failed to narrow data writer\n"); + success = false; + } else { + @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_ * sample = reinterpret_cast<@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_ *>(cdr_stream); + DDS_ReturnCode_t status = data_writer->write(*sample, DDS_HANDLE_NIL); + success = status == DDS_RETCODE_OK; } - if (success) { - @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataWriter * data_writer = - @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_DataWriter::narrow(topic_writer); - if (!data_writer) { - fprintf(stderr, "failed to narrow data writer\n"); - success = false; - } else { - @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_ * sample = reinterpret_cast<@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_ *>(message_handle); - DDS_ReturnCode_t status = data_writer->write(*sample, DDS_HANDLE_NIL); - success = status == DDS_RETCODE_OK; - } - } - - if (message_handle->untyped_dds_message) { - DDS_ReturnCode_t status = - @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::delete_data(reinterpret_cast<@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_ *>(message_handle->untyped_dds_message)); - success &= status == DDS_RETCODE_OK; - } return success; } @@ -253,15 +223,15 @@ bool take__@(spec.base_type.type)( void * untyped_topic_reader, bool ignore_local_publications, - void * untyped_ros_message, + ConnextStaticCDRStream * cdr_stream, bool * taken, void * sending_publication_handle) { if (!untyped_topic_reader) { throw std::runtime_error("topic reader handle is null"); } - if (!untyped_ros_message) { - throw std::runtime_error("ros message handle is null"); + if (!cdr_stream) { + throw std::runtime_error("cdr stream handle is null"); } if (!taken) { throw std::runtime_error("taken handle is null"); @@ -319,19 +289,89 @@ take__@(spec.base_type.type)( sample_info.publication_handle; } - bool success = true; if (!ignore_sample) { - @(spec.base_type.pkg_name)::@(subfolder)::@(spec.base_type.type) & ros_message = - *(@(spec.base_type.pkg_name)::@(subfolder)::@(spec.base_type.type) *)untyped_ros_message; - success = convert_dds_message_to_ros(dds_messages[0], ros_message); - if (success) { - *taken = true; - } + ConnextStaticCDRStream * input_stream = reinterpret_cast(&dds_messages[0]); + cdr_stream->raw_message = input_stream->raw_message; + cdr_stream->message_length = input_stream->message_length; + fprintf(stderr, "Received stream of length %u\n", cdr_stream->message_length); + *taken = true; } else { *taken = false; } data_reader->return_loan(dds_messages, sample_infos); + return true; +} + +bool +to_cdr_stream__@(spec.base_type.type)( + const void * untyped_ros_message, + ConnextStaticCDRStream * cdr_stream) +{ + if (!cdr_stream) { + return false; + } + if (!untyped_ros_message) { + return false; + } + + // cast the untyped to the known ros message + const @(spec.base_type.pkg_name)::@(subfolder)::@(spec.base_type.type) & ros_message = + *(const @(spec.base_type.pkg_name)::@(subfolder)::@(spec.base_type.type) *)untyped_ros_message; + // create a respective connext dds type + @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_ * dds_message = @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::create_data(); + if (!dds_message) { + return false; + } + // convert ros to dds + if (!convert_ros_message_to_dds(ros_message, *dds_message)) { + return false; + } + + // call the serialize function for the first time to get the expected length of the message + if (@(spec.base_type.type)_Plugin_serialize_to_cdr_buffer(NULL, &cdr_stream->message_length, dds_message) != RTI_TRUE) { + return false; + } + fprintf(stderr, "message length: %d\n", cdr_stream->message_length); + // allocate enough memory for the CDR stream + cdr_stream->raw_message = (char *)malloc(sizeof(char) * cdr_stream->message_length); + // call the function again and fill the buffer this time + if (@(spec.base_type.type)_Plugin_serialize_to_cdr_buffer(cdr_stream->raw_message, &cdr_stream->message_length, dds_message) != RTI_TRUE) { + return false; + } + if (@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::delete_data(dds_message) != DDS_RETCODE_OK){ + return false; + } + return true; +} + +bool +to_message__@(spec.base_type.type)( + const ConnextStaticCDRStream * cdr_stream, + void * untyped_ros_message) +{ + if (!cdr_stream) { + return false; + } + if (!cdr_stream->raw_message) { + fprintf(stderr, "cdr stream doesn't contain data\n"); + } + if (!untyped_ros_message) { + return false; + } + + @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_ * dds_message = @(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::create_data(); + if (@(spec.base_type.type)_Plugin_deserialize_from_cdr_buffer(dds_message, cdr_stream->raw_message, cdr_stream->message_length) != RTI_TRUE) { + fprintf(stderr, "deserialize from cdr buffer failed\n"); + return false; + } + + @(spec.base_type.pkg_name)::@(subfolder)::@(spec.base_type.type) & ros_message = + *(@(spec.base_type.pkg_name)::@(subfolder)::@(spec.base_type.type) *)untyped_ros_message; + bool success = convert_dds_message_to_ros(*dds_message, ros_message); + if (@(spec.base_type.pkg_name)::@(subfolder)::dds_::@(spec.base_type.type)_TypeSupport::delete_data(dds_message) != DDS_RETCODE_OK){ + return false; + } return success; } @@ -342,7 +382,9 @@ static message_type_support_callbacks_t callbacks = { &publish__@(spec.base_type.type), &take__@(spec.base_type.type), nullptr, - nullptr + nullptr, + &to_cdr_stream__@(spec.base_type.type), + &to_message__@(spec.base_type.type) }; static rosidl_message_type_support_t handle = { diff --git a/rosidl_typesupport_connext_cpp/rosidl_typesupport_connext_cpp/__init__.py b/rosidl_typesupport_connext_cpp/rosidl_typesupport_connext_cpp/__init__.py index 6869f52f..5d490055 100644 --- a/rosidl_typesupport_connext_cpp/rosidl_typesupport_connext_cpp/__init__.py +++ b/rosidl_typesupport_connext_cpp/rosidl_typesupport_connext_cpp/__init__.py @@ -86,25 +86,25 @@ def generate_dds_connext_cpp( count = 1 max_count = 5 while True: - subprocess.check_call(cmd) - - # fail safe if the generator does not work as expected - any_missing = False - for suffix in ['.h', '.cxx', 'Plugin.h', 'Plugin.cxx', 'Support.h', 'Support.cxx']: - filename = os.path.join(output_path, msg_name + suffix) - if not os.path.exists(filename): - any_missing = True - break - if not any_missing: - break - print("'%s' failed to generate the expected files for '%s/%s'" % - (idl_pp, pkg_name, msg_name), file=sys.stderr) - if count < max_count: - count += 1 - print('Running code generator again (retry %d of %d)...' % - (count, max_count), file=sys.stderr) - continue - raise RuntimeError('failed to generate the expected files') +# subprocess.check_call(cmd) + + # fail safe if the generator does not work as expected + any_missing = False + for suffix in ['.h', '.cxx', 'Plugin.h', 'Plugin.cxx', 'Support.h', 'Support.cxx']: + filename = os.path.join(output_path, msg_name + suffix) + if not os.path.exists(filename): + any_missing = True + break + if not any_missing: + break + print("'%s' failed to generate the expected files for '%s/%s'" % + (idl_pp, pkg_name, msg_name), file=sys.stderr) + if count < max_count: + count += 1 + print('Running code generator again (retry %d of %d)...' % + (count, max_count), file=sys.stderr) + continue + raise RuntimeError('failed to generate the expected files') if os.name != 'nt': # modify generated code to avoid unsed global variable warning @@ -114,43 +114,145 @@ def generate_dds_connext_cpp( # to avoid multiple file readings _modify(msg_filename, pkg_name, msg_name, _inject_unused_attribute) - plugin_filename = os.path.join(output_path, msg_name + 'Plugin.cxx') - _modify(plugin_filename, pkg_name, msg_name, _modify_plugin_serialize_function) +# plugin_filename = os.path.join(output_path, msg_name + 'Plugin.cxx') +# if not "Request" in msg_name and not "Response" in msg_name: +# _modify(plugin_filename, pkg_name, msg_name, _modify_include_headers) +# _modify(plugin_filename, pkg_name, msg_name, _modify_plugin_create_data_function) +# _modify(plugin_filename, pkg_name, msg_name, _modify_plugin_destroy_data_function) +# _modify(plugin_filename, pkg_name, msg_name, _modify_plugin_serialize_function) +# _modify(plugin_filename, pkg_name, msg_name, _modify_plugin_deserialize_function) return 0 +def _get_create_data_code(msg_name, indentation): + val = (\ + "{{\n" + "{indentation}// MODIFIED FOR ROS2 PURPOSES\n" + "{indentation}ConnextStaticCDRStream * cdr_stream = NULL;\n" + "{indentation}RTIOsapiHeap_allocateStructure(&cdr_stream, ConnextStaticCDRStream);\n" + "{indentation}return reinterpret_cast<{msg_name} *>(cdr_stream);\n" + "\n".format(msg_name=msg_name, indentation=indentation)) + return val + + +def _modify_plugin_create_data_function(pkg_name, msg_name, lines): + create_data_fcn_signature = msg_name + 'PluginSupport_create_data_ex(' + print("looking for '%s' create_data function" % create_data_fcn_signature) + signature_found = False + injection_start = None + for index, line in enumerate(lines): + if line.lstrip().startswith(create_data_fcn_signature): + signature_found = True + injection_start = index + break + if not signature_found: + raise RuntimeError('failed to locate %sPlugin_create_data function' % msg_name) + + indentation = ' ' * 16 + lines[injection_start] = line.replace('{', _get_create_data_code(msg_name, indentation)) + return True + + +def _get_destroy_data_code(msg_name, indentation): + val = (\ + "{{\n" + "{indentation}// MODIFIED FOR ROS2 PURPOSES\n" + "{indentation}RTIOsapiHeap_freeStructure(sample);\n" + "{indentation}return;\n".format(indentation=indentation)) + return val + + +def _modify_plugin_destroy_data_function(pkg_name, msg_name, lines): + destroy_fcn_signature = msg_name + 'PluginSupport_destroy_data_ex(' + print("looking for '%s' destroy function" % destroy_fcn_signature) + signature_found = False + injection_start = None + for index, line in enumerate(lines): + if not signature_found: + if line.lstrip().startswith(destroy_fcn_signature): + signature_found = True + else: + if '{' in line.lstrip(): + print("found %s destroy function in line: %d" % (msg_name, index)) + injection_start = index + break + if not signature_found: + raise RuntimeError('failed to locate %sPlugin_destroy function' % msg_name) + + indentation = ' ' * 16 + lines[injection_start] = line.replace('{', _get_destroy_data_code(msg_name, indentation)) + return True + + +def _get_deserialization_code(msg_name, indentation): + val = (\ + "{{\n" + "{indentation}// MODIFIED FOR ROS2 PURPOSES\n" + "{indentation}if (endpoint_plugin_qos) {{\n" + "{indentation} if (!reinterpret_cast(endpoint_plugin_qos)) {{\n" + "{indentation} return RTI_FALSE;\n" + "{indentation} }}\n" + "{indentation} ConnextStaticCDRStream * cdr_stream =\n" + "{indentation} reinterpret_cast(sample);\n" + "{indentation} cdr_stream->raw_message = stream->_buffer;\n" + "{indentation} cdr_stream->message_length = stream->_bufferLength;\n" + "{indentation} return RTI_TRUE;\n" + "{indentation}}}\n" + .format(indentation=indentation)) + return val + + +def _modify_plugin_deserialize_function(pkg_name, msg_name, lines): + deserialize_fcn_signature = msg_name + 'Plugin_deserialize_sample(' + print("looking for '%s' deserialize function" % deserialize_fcn_signature) + signature_found = False + injection_start = None + for index, line in enumerate(lines): + if not signature_found: + if line.lstrip().startswith(deserialize_fcn_signature): + signature_found = True + else: + if '{' in line.lstrip(): + print("found %s deserialize function in line: %d" % (msg_name, index)) + injection_start = index + break + if not signature_found: + raise RuntimeError('failed to locate %sPlugin_deserialize function' % msg_name) + + indentation = ' ' * 16 + lines[injection_start] = line.replace('{', _get_deserialization_code(msg_name, indentation)) + return True + + def _get_serialization_code(msg_name, indentation): - val = ("{{\n" - "{indentation}// MODIFIED FOR ROS2 PURPOSES\n" - "{indentation}const ConnextStaticMessageHandle * message_handle =\n" - "{indentation} reinterpret_cast(fake_sample);\n" - "{indentation}if (message_handle->raw_message) {{\n" - "{indentation} memcpy(stream->_buffer, message_handle->raw_message, " - "*(message_handle->raw_message_length));\n" - "{indentation} stream->_relativeBuffer = stream->_buffer;\n" - "{indentation} stream->_tmpRelativeBuffer = stream->_buffer;\n" - "{indentation} stream->_buffer = stream->_buffer;\n" - "{indentation} //stream->_endian = \'\\x01\';\n" - "{indentation} //stream->_nativeEndian = \'\\x01\';\n" - "{indentation} //stream->_encapsulationKind = 1;\n" - "{indentation} //stream->_zeroOnAlign = 0;\n" - "{indentation} stream->_currentPosition = " - "stream->_buffer + *(message_handle->raw_message_length);\n" - "{indentation} return RTI_TRUE;\n" - "{indentation}}}\n" - "{indentation}const {msg_name} * sample = reinterpret_cast " - "(message_handle->untyped_dds_message);\n" - .format(indentation=indentation, msg_name=msg_name)) + val = (\ + "{{\n" + "{indentation}// MODIFIED FOR ROS2 PURPOSES\n" + "{indentation}if (endpoint_plugin_qos) {{\n" + "{indentation} if (!reinterpret_cast(endpoint_plugin_qos)) {{\n" + "{indentation} return RTI_FALSE;\n" + "{indentation} }}\n" + "{indentation} const ConnextStaticCDRStream * cdr_stream =\n" + "{indentation} reinterpret_cast(sample);\n" + "{indentation} memcpy(stream->_buffer, cdr_stream->raw_message, " + "cdr_stream->message_length);\n" + "{indentation} stream->_relativeBuffer = stream->_buffer;\n" + "{indentation} stream->_tmpRelativeBuffer = stream->_buffer;\n" + "{indentation} stream->_buffer = stream->_buffer;\n" + "{indentation} //stream->_endian = \'\\x01\';\n" + "{indentation} //stream->_nativeEndian = \'\\x01\';\n" + "{indentation} //stream->_encapsulationKind = 1;\n" + "{indentation} //stream->_zeroOnAlign = 0;\n" + "{indentation} stream->_currentPosition = " + "stream->_buffer + cdr_stream->message_length;\n" + "{indentation} return RTI_TRUE;\n" + "{indentation}}}\n" + .format(indentation=indentation, msg_name=msg_name)) return val def _modify_plugin_serialize_function(pkg_name, msg_name, lines): - # set include correctly - line 49 is the last generated include - if lines[49] == '': - lines[49] = ('\n// MODIFIED FOR ROS2 PURPOSES\n#include \"' - 'rosidl_typesupport_connext_cpp/connext_static_message_handle.hpp\"\n') - serialize_fcn_signature = msg_name + 'Plugin_serialize(' print("looking for '%s' serialize function" % serialize_fcn_signature) signature_found = False @@ -167,15 +269,25 @@ def _modify_plugin_serialize_function(pkg_name, msg_name, lines): if not signature_found: raise RuntimeError('failed to locate %sPlugin_serialize function' % msg_name) - # rename message argument from sample to fake_sample - # this eases the modification within the serialize function - print(lines[injection_start - 6]) - lines[injection_start - 6] = lines[injection_start - 6].replace('sample', 'fake_sample') indentation = ' ' * 16 lines[injection_start] = line.replace('{', _get_serialization_code(msg_name, indentation)) return True +def _get_include_headers(): + val = (\ + "\n// MODIFIED FOR ROS2 PURPOSES\n#include \"" + "rosidl_typesupport_connext_cpp/connext_static_cdr_stream.hpp\"\n") + return val + + +def _modify_include_headers(pkg_name, msg_name, lines): + # set include correctly - line 49 is the last generated include + if lines[49] == '': + lines[49] = _get_include_headers() + return True + + def _inject_unused_attribute(pkg_name, msg_name, lines): # prepend attribute before constants of string type prefix = 'static const DDS_Char * Constants__'