-
Notifications
You must be signed in to change notification settings - Fork 33
Expose raw CDR stream for publish and subscribe #259
Conversation
const void * untyped_ros_message = nullptr; | ||
void * untyped_dds_message = nullptr; | ||
const char * raw_message = nullptr; // making this void for not including rmw dep | ||
const size_t * raw_message_length; // making this void for not including rmw dep |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is weird. I don't seen anything void
in these two lines?
@@ -149,35 +149,52 @@ convert_ros_message_to_dds( | |||
bool | |||
publish__@(spec.base_type.type)( | |||
void * untyped_topic_writer, | |||
const void * untyped_ros_message) | |||
ConnextStaticMessageHandle * message_handle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why should this not be const anymore? Why should the untyped_dds_message
be stored in this handle? Couldn't it be just a local variable?
return 0 | ||
|
||
|
||
def _get_serialization_code(msg_name, indentation): | ||
val = ("{{\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to avoid an indentation which is dependent on the variable name val
add a newline after the open parenthesis.
|
||
|
||
def _modify_plugin_serialize_function(pkg_name, msg_name, lines): | ||
# set include correctly - line 49 is the last generated include |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this seems pretty fragile can you add asserts to ensure that the line actually contains an include and that the next line doesn't.
e83132c
to
c79816a
Compare
c8145a6
to
5e72c19
Compare
70f5a77
to
52399cf
Compare
cc1ce49
to
255f5c3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you should go through this and find any use of a symmetric function, e.g. create_data/delete_data or malloc/free (should be rmw_allocate/rmw_free), and audit their use to ensure you call the clean up function in all cases.
rmw_connext_cpp/CMakeLists.txt
Outdated
"connext_static_raw_data_support.cpp" | ||
) | ||
|
||
# ament_lint_auto_find_test_dependencies() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be worked into a comment about why it's not being used, or should be removed imo.
// TODO(karsten1987): This allocation has to be preallocated | ||
// or at least bring in a custom allocator | ||
cdr_stream->raw_message = | ||
reinterpret_cast<char *>(malloc(sizeof(char) * cdr_stream->message_length)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should use rmw_allocate
for now, at the very least.
rmw_connext_cpp/src/rmw_publish.cpp
Outdated
RMW_SET_ERROR_MSG("failed to publish message"); | ||
return RMW_RET_ERROR; | ||
} | ||
free(cdr_stream.raw_message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, rmw_free
.
rmw_connext_cpp/src/rmw_publish.cpp
Outdated
} | ||
if (!publish(topic_writer, &cdr_stream)) { | ||
RMW_SET_ERROR_MSG("failed to publish message"); | ||
return RMW_RET_ERROR; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cdr_stream.raw_message
would be leaked here.
return RMW_RET_ERROR; | ||
} | ||
|
||
ConnextStaticCDRStream cdr_stream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this or could this be const
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean with static initialization ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I think I was just confused.
if (@(spec.base_type.type)_Plugin_deserialize_from_cdr_buffer( | ||
dds_message, cdr_stream->raw_message, cdr_stream->message_length) != RTI_TRUE) | ||
{ | ||
fprintf(stderr, "deserialize from cdr buffer failed\n"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should use the rcutils logging API at the very least, otherwise perhaps it should be the error state message? Not sure which as I'm not sure what is expected of this function by the caller.
dds_message, cdr_stream->raw_message, cdr_stream->message_length) != RTI_TRUE) | ||
{ | ||
fprintf(stderr, "deserialize from cdr buffer failed\n"); | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is dds_message
leaked here? I think delete_data
needs to be called on it right?
} | ||
|
||
// convert ros to dds | ||
if (!convert_ros_message_to_dds(ros_message, *dds_message)) { | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think dds_message
is leaked here too.
cdr_stream->raw_message = | ||
reinterpret_cast<char *>(malloc(sizeof(char) * cdr_stream->message_length)); | ||
// call the function again and fill the buffer this time | ||
if (@(spec.base_type.type)_Plugin_serialize_to_cdr_buffer(cdr_stream->raw_message, &cdr_stream->message_length, dds_message) != RTI_TRUE) { | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cdr_stream->raw_message
is leaked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe also dds_message
.
// TODO(karsten1987): This allocation has to be preallocated | ||
// or at least bring in a custom allocator | ||
cdr_stream->raw_message = | ||
reinterpret_cast<char *>(malloc(sizeof(char) * cdr_stream->message_length)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs to use rmw_allocate
for now.
793fb6a
to
07aa7b9
Compare
rmw_connext_cpp/bin/apply-patch.py
Outdated
# limitations under the License. | ||
|
||
# originated from the stackoverflow post: | ||
# https://stackoverflow.com/a/40967337 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dirk-thomas do you think this note is going to be ok as is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, you can't just put OSRF copyright and Apachae license on the code copied from Stackoverflow. I guess you need to "reimplement the function if we can't find any API which does it for us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently the code is public domain: https://gist.github.com/noporpoise/16e731849eb1231e86d78f9dfeca3abc#file-unifieddiff-py-L3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which means that IMO it's fine to license it as Apache 2.0.
It may be worth putting the 2016 copyright to the original author that's listed in the file ("Isaac Turner") to give credit even if it's public domain
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The copyright must be put here, and better attribution (maybe the GitHub username), also the public domain license should be mentioned here.
I'd recommend:
# Based on `unifieddiff.py` by @noporpoise which is licensed as `Public domain (CC0)`
# see: https://gist.github.com/noporpoise/16e731849eb1231e86d78f9dfeca3abc
rmw_connext_cpp/bin/apply-patch.py
Outdated
# limitations under the License. | ||
|
||
# originated from the stackoverflow post: | ||
# https://stackoverflow.com/a/40967337 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The copyright must be put here, and better attribution (maybe the GitHub username), also the public domain license should be mentioned here.
I'd recommend:
# Based on `unifieddiff.py` by @noporpoise which is licensed as `Public domain (CC0)`
# see: https://gist.github.com/noporpoise/16e731849eb1231e86d78f9dfeca3abc
return RMW_RET_ERROR; | ||
} | ||
|
||
ConnextStaticCDRStream cdr_stream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I think I was just confused.
* use size_t * fix generated code’s support for size_t
} | ||
if (cdr_stream->buffer_capacity < cdr_stream->buffer_length) { | ||
cdr_stream->allocator.deallocate(cdr_stream->buffer, cdr_stream->allocator.state); | ||
cdr_stream->buffer = static_cast<char *>(cdr_stream->allocator.allocate(cdr_stream->buffer_length, cdr_stream->allocator.state)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After allocating a bigger buffer the capacity
of cdr_stream
is never updated. See http://build.ros2.org/view/Rci/job/Rci__nightly-connext_ubuntu_focal_amd64/29/testReport/rclcpp/TestSerializedMessage/serialization/ for a test failing for a while.
See ros2/rosidl_typesupport_connext#58 for the proposed fix.
Connects to ros2/demos#185
Pretty complicated for what it is, but it seems to be the only doable way for now.
The idea here is to cast a custom raw data stream
connext_static_message_handle
object to a known dds message type. This fake sample is then passed the serialize function where we can depending on which pointer is set cast it back to the underlying type (either the original dds message or the custom raw data).Unfortunately, we also have to modify the generated serialize function to deal with this fake sample.