Skip to content
This repository has been archived by the owner on Jun 21, 2023. It is now read-only.

Expose raw CDR stream for publish and subscribe #259

Merged
merged 34 commits into from
Jun 16, 2018
Merged

Conversation

Karsten1987
Copy link
Contributor

@Karsten1987 Karsten1987 commented Oct 23, 2017

Connects to ros2/demos#185

Pretty complicated for what it is, but it seems to be the only doable way for now.

The idea here is to cast a custom raw data stream connext_static_message_handle object to a known dds message type. This fake sample is then passed the serialize function where we can depending on which pointer is set cast it back to the underlying type (either the original dds message or the custom raw data).
Unfortunately, we also have to modify the generated serialize function to deal with this fake sample.

@Karsten1987 Karsten1987 added the in progress Actively being worked on (Kanban column) label Oct 23, 2017
@Karsten1987 Karsten1987 self-assigned this Oct 23, 2017
const void * untyped_ros_message = nullptr;
void * untyped_dds_message = nullptr;
const char * raw_message = nullptr; // making this void for not including rmw dep
const size_t * raw_message_length; // making this void for not including rmw dep
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is weird. I don't seen anything void in these two lines?

@@ -149,35 +149,52 @@ convert_ros_message_to_dds(
bool
publish__@(spec.base_type.type)(
void * untyped_topic_writer,
const void * untyped_ros_message)
ConnextStaticMessageHandle * message_handle)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should this not be const anymore? Why should the untyped_dds_message be stored in this handle? Couldn't it be just a local variable?

return 0


def _get_serialization_code(msg_name, indentation):
val = ("{{\n"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to avoid an indentation which is dependent on the variable name val add a newline after the open parenthesis.



def _modify_plugin_serialize_function(pkg_name, msg_name, lines):
# set include correctly - line 49 is the last generated include
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this seems pretty fragile can you add asserts to ensure that the line actually contains an include and that the next line doesn't.

Copy link
Member

@wjwwood wjwwood left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you should go through this and find any use of a symmetric function, e.g. create_data/delete_data or malloc/free (should be rmw_allocate/rmw_free), and audit their use to ensure you call the clean up function in all cases.

"connext_static_raw_data_support.cpp"
)

# ament_lint_auto_find_test_dependencies()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be worked into a comment about why it's not being used, or should be removed imo.

// TODO(karsten1987): This allocation has to be preallocated
// or at least bring in a custom allocator
cdr_stream->raw_message =
reinterpret_cast<char *>(malloc(sizeof(char) * cdr_stream->message_length));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use rmw_allocate for now, at the very least.

RMW_SET_ERROR_MSG("failed to publish message");
return RMW_RET_ERROR;
}
free(cdr_stream.raw_message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, rmw_free.

}
if (!publish(topic_writer, &cdr_stream)) {
RMW_SET_ERROR_MSG("failed to publish message");
return RMW_RET_ERROR;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cdr_stream.raw_message would be leaked here.

return RMW_RET_ERROR;
}

ConnextStaticCDRStream cdr_stream;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this or could this be const?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean with static initialization ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think I was just confused.

if (@(spec.base_type.type)_Plugin_deserialize_from_cdr_buffer(
dds_message, cdr_stream->raw_message, cdr_stream->message_length) != RTI_TRUE)
{
fprintf(stderr, "deserialize from cdr buffer failed\n");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use the rcutils logging API at the very least, otherwise perhaps it should be the error state message? Not sure which as I'm not sure what is expected of this function by the caller.

dds_message, cdr_stream->raw_message, cdr_stream->message_length) != RTI_TRUE)
{
fprintf(stderr, "deserialize from cdr buffer failed\n");
return false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is dds_message leaked here? I think delete_data needs to be called on it right?

}

// convert ros to dds
if (!convert_ros_message_to_dds(ros_message, *dds_message)) {
return false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think dds_message is leaked here too.

cdr_stream->raw_message =
reinterpret_cast<char *>(malloc(sizeof(char) * cdr_stream->message_length));
// call the function again and fill the buffer this time
if (@(spec.base_type.type)_Plugin_serialize_to_cdr_buffer(cdr_stream->raw_message, &cdr_stream->message_length, dds_message) != RTI_TRUE) {
return false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cdr_stream->raw_message is leaked?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also dds_message.

// TODO(karsten1987): This allocation has to be preallocated
// or at least bring in a custom allocator
cdr_stream->raw_message =
reinterpret_cast<char *>(malloc(sizeof(char) * cdr_stream->message_length));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to use rmw_allocate for now.

# limitations under the License.

# originated from the stackoverflow post:
# https://stackoverflow.com/a/40967337
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dirk-thomas do you think this note is going to be ok as is?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, you can't just put OSRF copyright and Apachae license on the code copied from Stackoverflow. I guess you need to "reimplement the function if we can't find any API which does it for us.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which means that IMO it's fine to license it as Apache 2.0.
It may be worth putting the 2016 copyright to the original author that's listed in the file ("Isaac Turner") to give credit even if it's public domain

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The copyright must be put here, and better attribution (maybe the GitHub username), also the public domain license should be mentioned here.

I'd recommend:

# Based on `unifieddiff.py` by @noporpoise which is licensed as `Public domain (CC0)`
# see: https://gist.github.com/noporpoise/16e731849eb1231e86d78f9dfeca3abc

# limitations under the License.

# originated from the stackoverflow post:
# https://stackoverflow.com/a/40967337
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The copyright must be put here, and better attribution (maybe the GitHub username), also the public domain license should be mentioned here.

I'd recommend:

# Based on `unifieddiff.py` by @noporpoise which is licensed as `Public domain (CC0)`
# see: https://gist.github.com/noporpoise/16e731849eb1231e86d78f9dfeca3abc

return RMW_RET_ERROR;
}

ConnextStaticCDRStream cdr_stream;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think I was just confused.

wjwwood and others added 2 commits June 14, 2018 15:03
* use size_t

* fix generated code’s support for size_t
@Karsten1987 Karsten1987 merged commit 861b8f3 into master Jun 16, 2018
@dirk-thomas dirk-thomas removed the in progress Actively being worked on (Kanban column) label Jun 16, 2018
@Karsten1987 Karsten1987 deleted the expose_cdr branch June 16, 2018 08:35
}
if (cdr_stream->buffer_capacity < cdr_stream->buffer_length) {
cdr_stream->allocator.deallocate(cdr_stream->buffer, cdr_stream->allocator.state);
cdr_stream->buffer = static_cast<char *>(cdr_stream->allocator.allocate(cdr_stream->buffer_length, cdr_stream->allocator.state));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After allocating a bigger buffer the capacity of cdr_stream is never updated. See http://build.ros2.org/view/Rci/job/Rci__nightly-connext_ubuntu_focal_amd64/29/testReport/rclcpp/TestSerializedMessage/serialization/ for a test failing for a while.

See ros2/rosidl_typesupport_connext#58 for the proposed fix.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants