Skip to content
This repository has been archived by the owner on Jun 21, 2023. It is now read-only.

Commit

Permalink
publish raw with connext cpp
Browse files Browse the repository at this point in the history
  • Loading branch information
Karsten1987 committed Feb 16, 2018
1 parent 5e72c19 commit 52399cf
Show file tree
Hide file tree
Showing 25 changed files with 3,315 additions and 305 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ struct ConnextStaticClientInfo
DDSDataReader * response_datareader_;
DDSReadCondition * read_condition_;
const service_type_support_callbacks_t * callbacks_;
bool raw_stream_subscriber = true;
};
} // extern "C"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ struct ConnextStaticPublisherInfo
DDSDataWriter * topic_writer_;
const message_type_support_callbacks_t * callbacks_;
rmw_gid_t publisher_gid;
bool raw_stream_publisher = true;
};
} // extern "C"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ struct ConnextStaticServiceInfo
DDSDataReader * request_datareader_;
DDSReadCondition * read_condition_;
const service_type_support_callbacks_t * callbacks_;
bool raw_stream_subscriber = true;
};
} // extern "C"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ struct ConnextStaticSubscriberInfo
DDSReadCondition * read_condition_;
bool ignore_local_publications;
const message_type_support_callbacks_t * callbacks_;
bool raw_stream_subscriber = true;
};
} // extern "C"

Expand Down
8 changes: 0 additions & 8 deletions rmw_connext_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ rmw_create_client(
DDS_SubscriberQos subscriber_qos;
DDS_ReturnCode_t status;
DDS_PublisherQos publisher_qos;
DDS_TypeSupportQosPolicy ts;
DDS_DataReaderQos datareader_qos;
DDS_DataWriterQos datawriter_qos;
DDS::Publisher * dds_publisher = nullptr;
Expand Down Expand Up @@ -124,13 +123,6 @@ rmw_create_client(
{
goto fail;
}
// Set the plugin typesupport to the connext info
// If this is true, a raw CDR Stream is enabled
//ts.plugin_data = &client_info->raw_stream_subscriber;
//ts.cdr_padding_kind = DDS_AUTO_CDR_PADDING;
//datareader_qos.type_support = ts;
//datawriter_qos.type_support = ts;
(void) ts;

// TODO(karsten1987): For now, I'll expose the datawriter
// to access the respective DDSPublisher object.
Expand Down
8 changes: 2 additions & 6 deletions rmw_connext_cpp/src/rmw_publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ rmw_publish(const rmw_publisher_t * publisher, const void * ros_message)
RMW_SET_ERROR_MSG("failed to convert ros_message to cdr stream");
return RMW_RET_ERROR;
}
fprintf(stderr, "successfully converted to cdr stream\n");
if (cdr_stream.message_length == 0) {
RMW_SET_ERROR_MSG("no message length set");
return RMW_RET_ERROR;
Expand Down Expand Up @@ -112,11 +111,8 @@ rmw_publish_raw(const rmw_publisher_t * publisher, const rmw_message_raw_t * raw
}

ConnextStaticCDRStream cdr_stream;
cdr_stream.raw_message = DDS_String_dup(raw_message->buffer);
cdr_stream.message_length = static_cast<unsigned int>(raw_message->buffer_length);
//ConnextStaticMessageHandle message_handle;
//message_handle.raw_message = raw_message->buffer;
//message_handle.raw_message_length = &raw_message->buffer_length;
cdr_stream.raw_message = raw_message->buffer;
cdr_stream.message_length = raw_message->buffer_length;
bool published = callbacks->publish(topic_writer, &cdr_stream);
if (!published) {
RMW_SET_ERROR_MSG("failed to publish message");
Expand Down
26 changes: 9 additions & 17 deletions rmw_connext_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ rmw_create_publisher(
// Past this point, a failure results in unrolling code in the goto fail block.
bool registered;
DDS_DataWriterQos datawriter_qos;
DDS_TypeSupportQosPolicy ts;
DDS_PublisherQos publisher_qos;
DDS_ReturnCode_t status;
DDSPublisher * dds_publisher = nullptr;
Expand Down Expand Up @@ -175,34 +174,27 @@ rmw_create_publisher(
DDS_String_free(topic_str);
topic_str = nullptr;

// Allocate memory for the ConnextStaticPublisherInfo object.
buf = rmw_allocate(sizeof(ConnextStaticPublisherInfo));
if (!buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
goto fail;
}
// Use a placement new to construct the ConnextStaticPublisherInfo in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(publisher_info, buf, goto fail, ConnextStaticPublisherInfo, )
buf = nullptr; // Only free the publisher_info pointer; don't need the buf pointer anymore.

if (!get_datawriter_qos(participant, *qos_profile, datawriter_qos)) {
// error string was set within the function
goto fail;
}

// Set the plugin typesupport to the connext info
// If this is true, a raw CDR Stream is enabled
ts.plugin_data = &publisher_info->raw_stream_publisher;
ts.cdr_padding_kind = DDS_AUTO_CDR_PADDING;
datawriter_qos.type_support = ts;
topic_writer = dds_publisher->create_datawriter(
topic, datawriter_qos, NULL, DDS_STATUS_MASK_NONE);
if (!topic_writer) {
RMW_SET_ERROR_MSG("failed to create datawriter");
goto fail;
}
fprintf(stderr, "created datawriter\n");

// Allocate memory for the ConnextStaticPublisherInfo object.
buf = rmw_allocate(sizeof(ConnextStaticPublisherInfo));
if (!buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
goto fail;
}
// Use a placement new to construct the ConnextStaticPublisherInfo in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(publisher_info, buf, goto fail, ConnextStaticPublisherInfo, )
buf = nullptr; // Only free the publisher_info pointer; don't need the buf pointer anymore.
publisher_info->dds_publisher_ = dds_publisher;
publisher_info->topic_writer_ = topic_writer;
publisher_info->callbacks_ = callbacks;
Expand Down
2 changes: 1 addition & 1 deletion rmw_connext_cpp/src/rmw_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ rmw_take_request(
}

*taken = callbacks->take_request(replier, request_header, ros_request);
fprintf(stderr, "Correctly taken the request\n");

return RMW_RET_OK;
}
} // extern "C"
27 changes: 8 additions & 19 deletions rmw_connext_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ rmw_create_service(
}

// Past this point, a failure results in unrolling code in the goto fail block.
DDS_TypeSupportQosPolicy ts;
DDS_DataReaderQos datareader_qos;
DDS_DataWriterQos datawriter_qos;
DDS_SubscriberQos subscriber_qos;
Expand All @@ -97,15 +96,6 @@ rmw_create_service(
char * response_partition_str = nullptr;
char * service_str = nullptr;

buf = rmw_allocate(sizeof(ConnextStaticServiceInfo));
if (!buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
goto fail;
}
// Use a placement new to construct the ConnextStaticServiceInfo in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(service_info, buf, goto fail, ConnextStaticServiceInfo, )
buf = nullptr; // Only free the service_info pointer; don't need the buf pointer anymore.

// Begin initializing elements.
service = rmw_service_allocate();
if (!service) {
Expand Down Expand Up @@ -133,13 +123,6 @@ rmw_create_service(
{
goto fail;
}
// Set the plugin typesupport to the connext info
// If this is true, a raw CDR Stream is enabled
//ts.plugin_data = &service_info->raw_stream_subscriber;
//ts.cdr_padding_kind = DDS_AUTO_CDR_PADDING;
//datareader_qos.type_support = ts;
//datawriter_qos.type_support = ts;
(void) ts;

replier = callbacks->create_replier(
participant, service_str, &datareader_qos, &datawriter_qos,
Expand All @@ -148,8 +131,6 @@ rmw_create_service(
&rmw_allocate);
DDS_String_free(service_str);
service_str = nullptr;

fprintf(stderr, "created replier\n");
if (!replier) {
RMW_SET_ERROR_MSG("failed to create replier");
goto fail;
Expand Down Expand Up @@ -210,6 +191,14 @@ rmw_create_service(
// update attached publisher
dds_publisher->set_qos(publisher_qos);

buf = rmw_allocate(sizeof(ConnextStaticServiceInfo));
if (!buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
goto fail;
}
// Use a placement new to construct the ConnextStaticServiceInfo in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(service_info, buf, goto fail, ConnextStaticServiceInfo, )
buf = nullptr; // Only free the service_info pointer; don't need the buf pointer anymore.
service_info->replier_ = replier;
service_info->callbacks_ = callbacks;
service_info->request_datareader_ = request_datareader;
Expand Down
27 changes: 9 additions & 18 deletions rmw_connext_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ rmw_create_subscription(
// Past this point, a failure results in unrolling code in the goto fail block.
bool registered;
DDS_DataReaderQos datareader_qos;
DDS_TypeSupportQosPolicy ts;
DDS_SubscriberQos subscriber_qos;
DDS_ReturnCode_t status;
DDSSubscriber * dds_subscriber = nullptr;
Expand Down Expand Up @@ -171,35 +170,18 @@ rmw_create_subscription(
DDS_String_free(topic_str);
topic_str = nullptr;

// Allocate memory for the ConnextStaticSubscriberInfo object.
buf = rmw_allocate(sizeof(ConnextStaticSubscriberInfo));
if (!buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
goto fail;
}
// Use a placement new to construct the ConnextStaticSubscriberInfo in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(subscriber_info, buf, goto fail, ConnextStaticSubscriberInfo, )
buf = nullptr; // Only free the subscriber_info pointer; don't need the buf pointer anymore.

if (!get_datareader_qos(participant, *qos_profile, datareader_qos)) {
// error string was set within the function
goto fail;
}

// Set the plugin typesupport to the connext info
// If this is true, a raw CDR Stream is enabled
ts.plugin_data = &subscriber_info->raw_stream_subscriber;
ts.cdr_padding_kind = DDS_AUTO_CDR_PADDING;
datareader_qos.type_support = ts;

topic_reader = dds_subscriber->create_datareader(
topic, datareader_qos,
NULL, DDS_STATUS_MASK_NONE);
if (!topic_reader) {
RMW_SET_ERROR_MSG("failed to create datareader");
goto fail;
}
fprintf(stderr, "data reader created\n");

read_condition = topic_reader->create_readcondition(
DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE);
Expand All @@ -208,6 +190,15 @@ rmw_create_subscription(
goto fail;
}

// Allocate memory for the ConnextStaticSubscriberInfo object.
buf = rmw_allocate(sizeof(ConnextStaticSubscriberInfo));
if (!buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
goto fail;
}
// Use a placement new to construct the ConnextStaticSubscriberInfo in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(subscriber_info, buf, goto fail, ConnextStaticSubscriberInfo, )
buf = nullptr; // Only free the subscriber_info pointer; don't need the buf pointer anymore.
subscriber_info->dds_subscriber_ = dds_subscriber;
subscriber_info->topic_reader_ = topic_reader;
subscriber_info->read_condition_ = read_condition;
Expand Down
18 changes: 9 additions & 9 deletions rmw_connext_shared_cpp/src/qos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ get_datawriter_qos(
return false;
}

// status = DDSPropertyQosPolicyHelper::add_property(
// datawriter_qos.property,
// "dds.data_writer.history.memory_manager.fast_pool.pool_buffer_max_size",
// "4096",
// DDS_BOOLEAN_FALSE);
// if (status != DDS_RETCODE_OK) {
// RMW_SET_ERROR_MSG("failed to add qos property");
// return false;
// }
status = DDSPropertyQosPolicyHelper::add_property(
datawriter_qos.property,
"dds.data_writer.history.memory_manager.fast_pool.pool_buffer_max_size",
"4096",
DDS_BOOLEAN_FALSE);
if (status != DDS_RETCODE_OK) {
RMW_SET_ERROR_MSG("failed to add qos property");
return false;
}

if (!set_entity_qos_from_profile(qos_profile, datawriter_qos)) {
return false;
Expand Down
12 changes: 11 additions & 1 deletion rosidl_typesupport_connext_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ ament_export_include_directories(include)

ament_python_install_package(${PROJECT_NAME})

add_library(${PROJECT_NAME} SHARED src/identifier.cpp)
add_library(
${PROJECT_NAME}
SHARED
src/connext_static_raw_data.cxx
src/connext_static_raw_data_plugin.cxx
src/connext_static_raw_data_support.cxx
src/identifier.cpp)
if(WIN32)
target_compile_definitions(${PROJECT_NAME}
PRIVATE "ROSIDL_TYPESUPPORT_CONNEXT_CPP_BUILDING_DLL")
Expand All @@ -50,6 +56,10 @@ target_include_directories(${PROJECT_NAME}
PUBLIC
include
)
ament_target_dependencies(
${PROJECT_NAME}
"Connext"
)
ament_export_libraries(${PROJECT_NAME})

ament_index_register_resource("rosidl_typesupport_cpp")
Expand Down
7 changes: 7 additions & 0 deletions rosidl_typesupport_connext_cpp/RawData.idl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
const long KEY_HASH_LENGTH_16 = 16;

struct RawData {
@key octet key_hash[KEY_HASH_LENGTH_16];
sequence<octet> key_data;
sequence<octet> raw_data;
};
Loading

0 comments on commit 52399cf

Please sign in to comment.