Skip to content
This repository has been archived by the owner on Jun 21, 2023. It is now read-only.

Commit

Permalink
WIP publish and subscribe raw
Browse files Browse the repository at this point in the history
  • Loading branch information
Karsten1987 committed Nov 14, 2017
1 parent 7fb49ba commit e83132c
Show file tree
Hide file tree
Showing 17 changed files with 549 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ struct ConnextStaticClientInfo
DDSDataReader * response_datareader_;
DDSReadCondition * read_condition_;
const service_type_support_callbacks_t * callbacks_;
bool raw_stream_subscriber = true;
};
} // extern "C"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ struct ConnextStaticPublisherInfo
DDSDataWriter * topic_writer_;
const message_type_support_callbacks_t * callbacks_;
rmw_gid_t publisher_gid;
bool raw_stream_publisher = true;
};
} // extern "C"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ struct ConnextStaticServiceInfo
DDSDataReader * request_datareader_;
DDSReadCondition * read_condition_;
const service_type_support_callbacks_t * callbacks_;
bool raw_stream_subscriber = true;
};
} // extern "C"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ struct ConnextStaticSubscriberInfo
DDSReadCondition * read_condition_;
bool ignore_local_publications;
const message_type_support_callbacks_t * callbacks_;
bool raw_stream_subscriber = true;
};
} // extern "C"

Expand Down
8 changes: 8 additions & 0 deletions rmw_connext_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ rmw_create_client(
DDS_SubscriberQos subscriber_qos;
DDS_ReturnCode_t status;
DDS_PublisherQos publisher_qos;
DDS_TypeSupportQosPolicy ts;
DDS_DataReaderQos datareader_qos;
DDS_DataWriterQos datawriter_qos;
DDS::Publisher * dds_publisher = nullptr;
Expand Down Expand Up @@ -123,6 +124,13 @@ rmw_create_client(
{
goto fail;
}
// Set the plugin typesupport to the connext info
// If this is true, a raw CDR Stream is enabled
//ts.plugin_data = &client_info->raw_stream_subscriber;
//ts.cdr_padding_kind = DDS_AUTO_CDR_PADDING;
//datareader_qos.type_support = ts;
//datawriter_qos.type_support = ts;
(void) ts;

// TODO(karsten1987): For now, I'll expose the datawriter
// to access the respective DDSPublisher object.
Expand Down
33 changes: 25 additions & 8 deletions rmw_connext_cpp/src/rmw_publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,27 @@ rmw_publish(const rmw_publisher_t * publisher, const void * ros_message)
return RMW_RET_ERROR;
}

ConnextStaticMessageHandle message_handle;
message_handle.untyped_ros_message = ros_message;
bool published = callbacks->publish(topic_writer, &message_handle);
if (!published) {

ConnextStaticCDRStream * my_cdr = new ConnextStaticCDRStream();
ConnextStaticCDRStream & cdr_stream = *my_cdr;
if (!callbacks->to_cdr_stream(ros_message, &cdr_stream)) {
RMW_SET_ERROR_MSG("failed to convert ros_message to cdr stream");
return RMW_RET_ERROR;
}
fprintf(stderr, "successfully converted to cdr stream\n");
if (cdr_stream.message_length == 0) {
RMW_SET_ERROR_MSG("no message length set");
return RMW_RET_ERROR;
}
if (!cdr_stream.raw_message) {
RMW_SET_ERROR_MSG("no raw message attached");
return RMW_RET_ERROR;
}
if (!callbacks->publish(topic_writer, &cdr_stream)) {
RMW_SET_ERROR_MSG("failed to publish message");
return RMW_RET_ERROR;
}
free(cdr_stream.raw_message);
return RMW_RET_OK;
}

Expand Down Expand Up @@ -99,10 +113,13 @@ rmw_publish_raw(const rmw_publisher_t * publisher, const rmw_message_raw_t * raw
return RMW_RET_ERROR;
}

ConnextStaticMessageHandle message_handle;
message_handle.raw_message = raw_message->buffer;
message_handle.raw_message_length = &raw_message->buffer_length;
bool published = callbacks->publish(topic_writer, &message_handle);
ConnextStaticCDRStream cdr_stream;
cdr_stream.raw_message = DDS_String_dup(raw_message->buffer);
cdr_stream.message_length = static_cast<unsigned int>(raw_message->buffer_length);
//ConnextStaticMessageHandle message_handle;
//message_handle.raw_message = raw_message->buffer;
//message_handle.raw_message_length = &raw_message->buffer_length;
bool published = callbacks->publish(topic_writer, &cdr_stream);
if (!published) {
RMW_SET_ERROR_MSG("failed to publish message");
return RMW_RET_ERROR;
Expand Down
26 changes: 17 additions & 9 deletions rmw_connext_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ rmw_create_publisher(
// Past this point, a failure results in unrolling code in the goto fail block.
bool registered;
DDS_DataWriterQos datawriter_qos;
DDS_TypeSupportQosPolicy ts;
DDS_PublisherQos publisher_qos;
DDS_ReturnCode_t status;
DDSPublisher * dds_publisher = nullptr;
Expand Down Expand Up @@ -174,27 +175,34 @@ rmw_create_publisher(
DDS_String_free(topic_str);
topic_str = nullptr;

// Allocate memory for the ConnextStaticPublisherInfo object.
buf = rmw_allocate(sizeof(ConnextStaticPublisherInfo));
if (!buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
goto fail;
}
// Use a placement new to construct the ConnextStaticPublisherInfo in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(publisher_info, buf, goto fail, ConnextStaticPublisherInfo, )
buf = nullptr; // Only free the publisher_info pointer; don't need the buf pointer anymore.

if (!get_datawriter_qos(participant, *qos_profile, datawriter_qos)) {
// error string was set within the function
goto fail;
}

// Set the plugin typesupport to the connext info
// If this is true, a raw CDR Stream is enabled
ts.plugin_data = &publisher_info->raw_stream_publisher;
ts.cdr_padding_kind = DDS_AUTO_CDR_PADDING;
datawriter_qos.type_support = ts;
topic_writer = dds_publisher->create_datawriter(
topic, datawriter_qos, NULL, DDS_STATUS_MASK_NONE);
if (!topic_writer) {
RMW_SET_ERROR_MSG("failed to create datawriter");
goto fail;
}
fprintf(stderr, "created datawriter\n");

// Allocate memory for the ConnextStaticPublisherInfo object.
buf = rmw_allocate(sizeof(ConnextStaticPublisherInfo));
if (!buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
goto fail;
}
// Use a placement new to construct the ConnextStaticPublisherInfo in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(publisher_info, buf, goto fail, ConnextStaticPublisherInfo, )
buf = nullptr; // Only free the publisher_info pointer; don't need the buf pointer anymore.
publisher_info->dds_publisher_ = dds_publisher;
publisher_info->topic_writer_ = topic_writer;
publisher_info->callbacks_ = callbacks;
Expand Down
2 changes: 1 addition & 1 deletion rmw_connext_cpp/src/rmw_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ rmw_take_request(
}

*taken = callbacks->take_request(replier, request_header, ros_request);

fprintf(stderr, "Correctly taken the request\n");
return RMW_RET_OK;
}
} // extern "C"
27 changes: 19 additions & 8 deletions rmw_connext_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ rmw_create_service(
}

// Past this point, a failure results in unrolling code in the goto fail block.
DDS_TypeSupportQosPolicy ts;
DDS_DataReaderQos datareader_qos;
DDS_DataWriterQos datawriter_qos;
DDS_SubscriberQos subscriber_qos;
Expand All @@ -96,6 +97,15 @@ rmw_create_service(
char * response_partition_str = nullptr;
char * service_str = nullptr;

buf = rmw_allocate(sizeof(ConnextStaticServiceInfo));
if (!buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
goto fail;
}
// Use a placement new to construct the ConnextStaticServiceInfo in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(service_info, buf, goto fail, ConnextStaticServiceInfo, )
buf = nullptr; // Only free the service_info pointer; don't need the buf pointer anymore.

// Begin initializing elements.
service = rmw_service_allocate();
if (!service) {
Expand Down Expand Up @@ -123,6 +133,13 @@ rmw_create_service(
{
goto fail;
}
// Set the plugin typesupport to the connext info
// If this is true, a raw CDR Stream is enabled
//ts.plugin_data = &service_info->raw_stream_subscriber;
//ts.cdr_padding_kind = DDS_AUTO_CDR_PADDING;
//datareader_qos.type_support = ts;
//datawriter_qos.type_support = ts;
(void) ts;

replier = callbacks->create_replier(
participant, service_str, &datareader_qos, &datawriter_qos,
Expand All @@ -131,6 +148,8 @@ rmw_create_service(
&rmw_allocate);
DDS_String_free(service_str);
service_str = nullptr;

fprintf(stderr, "created replier\n");
if (!replier) {
RMW_SET_ERROR_MSG("failed to create replier");
goto fail;
Expand Down Expand Up @@ -191,14 +210,6 @@ rmw_create_service(
// update attached publisher
dds_publisher->set_qos(publisher_qos);

buf = rmw_allocate(sizeof(ConnextStaticServiceInfo));
if (!buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
goto fail;
}
// Use a placement new to construct the ConnextStaticServiceInfo in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(service_info, buf, goto fail, ConnextStaticServiceInfo, )
buf = nullptr; // Only free the service_info pointer; don't need the buf pointer anymore.
service_info->replier_ = replier;
service_info->callbacks_ = callbacks;
service_info->request_datareader_ = request_datareader;
Expand Down
27 changes: 18 additions & 9 deletions rmw_connext_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ rmw_create_subscription(
// Past this point, a failure results in unrolling code in the goto fail block.
bool registered;
DDS_DataReaderQos datareader_qos;
DDS_TypeSupportQosPolicy ts;
DDS_SubscriberQos subscriber_qos;
DDS_ReturnCode_t status;
DDSSubscriber * dds_subscriber = nullptr;
Expand Down Expand Up @@ -170,18 +171,35 @@ rmw_create_subscription(
DDS_String_free(topic_str);
topic_str = nullptr;

// Allocate memory for the ConnextStaticSubscriberInfo object.
buf = rmw_allocate(sizeof(ConnextStaticSubscriberInfo));
if (!buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
goto fail;
}
// Use a placement new to construct the ConnextStaticSubscriberInfo in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(subscriber_info, buf, goto fail, ConnextStaticSubscriberInfo, )
buf = nullptr; // Only free the subscriber_info pointer; don't need the buf pointer anymore.

if (!get_datareader_qos(participant, *qos_profile, datareader_qos)) {
// error string was set within the function
goto fail;
}

// Set the plugin typesupport to the connext info
// If this is true, a raw CDR Stream is enabled
ts.plugin_data = &subscriber_info->raw_stream_subscriber;
ts.cdr_padding_kind = DDS_AUTO_CDR_PADDING;
datareader_qos.type_support = ts;

topic_reader = dds_subscriber->create_datareader(
topic, datareader_qos,
NULL, DDS_STATUS_MASK_NONE);
if (!topic_reader) {
RMW_SET_ERROR_MSG("failed to create datareader");
goto fail;
}
fprintf(stderr, "data reader created\n");

read_condition = topic_reader->create_readcondition(
DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE);
Expand All @@ -190,15 +208,6 @@ rmw_create_subscription(
goto fail;
}

// Allocate memory for the ConnextStaticSubscriberInfo object.
buf = rmw_allocate(sizeof(ConnextStaticSubscriberInfo));
if (!buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
goto fail;
}
// Use a placement new to construct the ConnextStaticSubscriberInfo in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(subscriber_info, buf, goto fail, ConnextStaticSubscriberInfo, )
buf = nullptr; // Only free the subscriber_info pointer; don't need the buf pointer anymore.
subscriber_info->dds_subscriber_ = dds_subscriber;
subscriber_info->topic_reader_ = topic_reader;
subscriber_info->read_condition_ = read_condition;
Expand Down
Loading

0 comments on commit e83132c

Please sign in to comment.