diff --git a/third-party/realdds/include/realdds/dds-stream.h b/third-party/realdds/include/realdds/dds-stream.h index f7b0096281..10fdd9b42b 100644 --- a/third-party/realdds/include/realdds/dds-stream.h +++ b/third-party/realdds/include/realdds/dds-stream.h @@ -14,13 +14,13 @@ namespace realdds { namespace topics { namespace device { - class image; -} // namespace device +class image; +} // namespace device class flexible_msg; -} // namespace topics +} // namespace topics class dds_subscriber; -class dds_topic_reader; +class dds_topic_reader_thread; // Represents a stream of information (images, motion data, etc..) from a single source received via the DDS system. // A stream can have several profiles, i.e different data frequency, image resolution, etc.. @@ -52,7 +52,7 @@ class dds_stream : public dds_stream_base virtual void handle_data() = 0; virtual bool can_start_streaming() const = 0; - std::shared_ptr< dds_topic_reader > _reader; + std::shared_ptr< dds_topic_reader_thread > _reader; bool _streaming = false; }; diff --git a/third-party/realdds/include/realdds/dds-topic-reader-thread.h b/third-party/realdds/include/realdds/dds-topic-reader-thread.h new file mode 100644 index 0000000000..064e9f4e13 --- /dev/null +++ b/third-party/realdds/include/realdds/dds-topic-reader-thread.h @@ -0,0 +1,43 @@ +// License: Apache 2.0. See LICENSE file in root directory. +// Copyright(c) 2023 Intel Corporation. All Rights Reserved. + +#pragma once + +#include "dds-topic-reader.h" + +#include + + +namespace realdds { + + +// A topic-reader that calls its callback from a separate thread. +// +// This is the recommended way, according to eProsima: +// "the following threads are spawned per participant: +// - 1 for reception of UDP multicast discovery traffic of participants +// - 1 for reception of UDP unicast discovery traffic of readers and writers +// - 1 for reception of UDP unicast user traffic (common for all topics on the same participant) +// - 1 for reception of SHM unicast user traffic (common for all topics on the same participant)" +// I.e., "on_data_available is called from the last two threads ... performing any lengthy process inside the callback +// is discouraged" and "calls to on_data_available are serialized". +// Experience also shows that one topic can starve out the others (e.g., metadata can get a lot more smaller messages +// than image topics), and handling the data in separate threads is the way to go. +// See also: +// https://fast-dds.docs.eprosima.com/en/latest/fastdds/dds_layer/subscriber/dataReader/readingData.html#accessing-data-with-a-waiting-thread +// +class dds_topic_reader_thread : public dds_topic_reader +{ + active_object<> _th; + + typedef dds_topic_reader super; + +public: + dds_topic_reader_thread( std::shared_ptr< dds_topic > const & topic, + std::shared_ptr< dds_subscriber > const & subscriber ); + + void run( qos const & ) override; +}; + + +} // namespace realdds diff --git a/third-party/realdds/include/realdds/dds-topic-reader.h b/third-party/realdds/include/realdds/dds-topic-reader.h index f60d7eeaa6..d2e6a61291 100644 --- a/third-party/realdds/include/realdds/dds-topic-reader.h +++ b/third-party/realdds/include/realdds/dds-topic-reader.h @@ -33,6 +33,7 @@ class dds_subscriber; // class dds_topic_reader : public eprosima::fastdds::dds::DataReaderListener { +protected: std::shared_ptr< dds_topic > const _topic; std::shared_ptr < dds_subscriber > const _subscriber; @@ -41,7 +42,7 @@ class dds_topic_reader : public eprosima::fastdds::dds::DataReaderListener public: dds_topic_reader( std::shared_ptr< dds_topic > const & topic ); dds_topic_reader( std::shared_ptr< dds_topic > const & topic, std::shared_ptr< dds_subscriber > const & subscriber ); - ~dds_topic_reader(); + virtual ~dds_topic_reader(); eprosima::fastdds::dds::DataReader * get() const { return _reader; } eprosima::fastdds::dds::DataReader * operator->() const { return get(); } @@ -72,7 +73,7 @@ class dds_topic_reader : public eprosima::fastdds::dds::DataReaderListener }; // The callbacks should be set before we actually create the underlying DDS objects, so the reader does not - void run( qos const & = qos() ); + virtual void run( qos const & ); // DataReaderListener protected: @@ -81,7 +82,7 @@ class dds_topic_reader : public eprosima::fastdds::dds::DataReaderListener void on_data_available( eprosima::fastdds::dds::DataReader * ) override; -private: +protected: on_data_available_callback _on_data_available; on_subscription_matched_callback _on_subscription_matched; }; diff --git a/third-party/realdds/src/dds-device-impl.cpp b/third-party/realdds/src/dds-device-impl.cpp index c3896d9715..06be1915e0 100644 --- a/third-party/realdds/src/dds-device-impl.cpp +++ b/third-party/realdds/src/dds-device-impl.cpp @@ -4,7 +4,7 @@ #include "dds-device-impl.h" #include -#include +#include #include #include #include @@ -110,22 +110,6 @@ void dds_device::impl::run( size_t message_timeout_ms ) } } } ); - - if( _metadata_reader ) // Might not be present - { - _metadata_reader->on_data_available( [&]() { - topics::flexible_msg notification; - eprosima::fastdds::dds::SampleInfo info; - while( topics::flexible_msg::take_next( *_metadata_reader, ¬ification, &info ) ) - { - if( ! notification.is_valid() ) - continue; - - if( _on_metadata_available ) - _on_metadata_available( std::move( notification ) ); - } - } ); - } } void dds_device::impl::open( const dds_stream_profiles & profiles ) @@ -234,9 +218,6 @@ void dds_device::impl::create_notifications_reader() //(even if reliable). Setting depth to cover known use-cases plus some spare rqos.history().depth = 24; - if( ! _notifications_reader ) - DDS_THROW( runtime_error, "failed to set notifications reader for '" + _info.topic_root + "'" ); - _notifications_reader->run( rqos ); } @@ -247,13 +228,21 @@ void dds_device::impl::create_metadata_reader() auto topic = topics::flexible_msg::create_topic( _participant, _info.topic_root + topics::METADATA_TOPIC_NAME ); - _metadata_reader = std::make_shared< dds_topic_reader >( topic, _subscriber ); + _metadata_reader = std::make_shared< dds_topic_reader_thread >( topic, _subscriber ); dds_topic_reader::qos rqos( eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS ); rqos.history().depth = 10; // Support receive metadata from multiple streams - if( ! _metadata_reader ) - DDS_THROW( runtime_error, "failed to set metadata reader for '" + _info.topic_root + "'" ); + _metadata_reader->on_data_available( + [this]() + { + topics::flexible_msg message; + while( topics::flexible_msg::take_next( *_metadata_reader, &message ) ) + { + if( message.is_valid() && _on_metadata_available ) + _on_metadata_available( std::move( message ) ); + } + } ); _metadata_reader->run( rqos ); } diff --git a/third-party/realdds/src/dds-device-impl.h b/third-party/realdds/src/dds-device-impl.h index 0c931729ee..feac31b372 100644 --- a/third-party/realdds/src/dds-device-impl.h +++ b/third-party/realdds/src/dds-device-impl.h @@ -24,7 +24,7 @@ namespace realdds { -class dds_topic_reader; +class dds_topic_reader_thread; class dds_topic_writer; class dds_subscriber; @@ -51,7 +51,7 @@ class dds_device::impl std::queue< nlohmann::json > _option_response_queue; std::shared_ptr< dds_topic_reader > _notifications_reader; - std::shared_ptr< dds_topic_reader > _metadata_reader; + std::shared_ptr< dds_topic_reader_thread > _metadata_reader; std::shared_ptr< dds_topic_writer > _control_writer; dds_options _options; diff --git a/third-party/realdds/src/dds-device-watcher.cpp b/third-party/realdds/src/dds-device-watcher.cpp index 5ece3b145b..f6135ae229 100644 --- a/third-party/realdds/src/dds-device-watcher.cpp +++ b/third-party/realdds/src/dds-device-watcher.cpp @@ -125,7 +125,7 @@ void dds_device_watcher::init() } ); if( ! _device_info_topic->is_running() ) - _device_info_topic->run(); + _device_info_topic->run( dds_topic_reader::qos() ); LOG_DEBUG( "DDS device watcher initialized successfully" ); } diff --git a/third-party/realdds/src/dds-participant.cpp b/third-party/realdds/src/dds-participant.cpp index 2c81c97a2a..e6cd51583c 100644 --- a/third-party/realdds/src/dds-participant.cpp +++ b/third-party/realdds/src/dds-participant.cpp @@ -193,11 +193,15 @@ void dds_participant::init( dds_domain_id domain_id, std::string const & partici // Indicates for how much time should a remote DomainParticipant consider the local DomainParticipant to be alive. pqos.wire_protocol().builtin.discovery_config.leaseDuration = { 10, 0 }; // [sec,nsec] - //Disable shared memory, use only UDP - //Disabling because sometimes, after improper destruction (e.g. stopping debug) the shared memory is not opened - //correctly and the application is stuck. eProsima is working on it. Manual solution delete shared memory files, - //C:\ProgramData\eprosima\fastrtps_interprocess on Windows, /dev/shm on Linux - auto udp_transport = std::make_shared(); + // Disable shared memory, use only UDP + // Disabling because sometimes, after improper destruction (e.g. stopping debug) the shared memory is not opened + // correctly and the application is stuck. eProsima is working on it. Manual solution delete shared memory files, + // C:\ProgramData\eprosima\fastrtps_interprocess on Windows, /dev/shm on Linux + auto udp_transport = std::make_shared< eprosima::fastdds::rtps::UDPv4TransportDescriptor >(); + // Also change the send/receive buffers: we deal with lots of information and, without this, we'll get dropped + // frames and unusual behavior... + udp_transport->sendBufferSize = 16 * 1024 * 1024; + udp_transport->receiveBufferSize = 16 * 1024 * 1024; pqos.transport().use_builtin_transports = false; pqos.transport().user_transports.push_back( udp_transport ); diff --git a/third-party/realdds/src/dds-stream.cpp b/third-party/realdds/src/dds-stream.cpp index 013a45c51a..6549ab5d8a 100644 --- a/third-party/realdds/src/dds-stream.cpp +++ b/third-party/realdds/src/dds-stream.cpp @@ -4,7 +4,7 @@ #include #include -#include +#include #include #include #include @@ -36,8 +36,8 @@ void dds_video_stream::open( std::string const & topic_name, std::shared_ptr< dd // To support automatic streaming (without the need to handle start/stop-streaming commands) the reader is created // here and destroyed on close() - _reader = std::make_shared< dds_topic_reader >( topic, subscriber ); - _reader->on_data_available( [&]() { handle_data(); } ); + _reader = std::make_shared< dds_topic_reader_thread >( topic, subscriber ); + _reader->on_data_available( [this]() { handle_data(); } ); _reader->run( dds_topic_reader::qos( eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS ) ); // no retries } @@ -54,15 +54,14 @@ void dds_motion_stream::open( std::string const & topic_name, std::shared_ptr< d // To support automatic streaming (without the need to handle start/stop-streaming commands) the reader is created // here and destroyed on close() - _reader = std::make_shared< dds_topic_reader >( topic, subscriber ); - _reader->on_data_available( [&]() { handle_data(); } ); + _reader = std::make_shared< dds_topic_reader_thread >( topic, subscriber ); + _reader->on_data_available( [this]() { handle_data(); } ); _reader->run( dds_topic_reader::qos( eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS ) ); // no retries } void dds_stream::close() { - _reader->on_data_available( [](){} ); _reader.reset(); } diff --git a/third-party/realdds/src/dds-topic-reader-thread.cpp b/third-party/realdds/src/dds-topic-reader-thread.cpp new file mode 100644 index 0000000000..6ea4f6b1f2 --- /dev/null +++ b/third-party/realdds/src/dds-topic-reader-thread.cpp @@ -0,0 +1,46 @@ +// License: Apache 2.0. See LICENSE file in root directory. +// Copyright(c) 2023 Intel Corporation. All Rights Reserved. + +#include +#include +#include +#include + +#include +#include +#include + + +namespace realdds { + + +dds_topic_reader_thread::dds_topic_reader_thread( std::shared_ptr< dds_topic > const & topic, + std::shared_ptr< dds_subscriber > const & subscriber ) + : super( topic, subscriber ) + , _th( + [this]( dispatcher::cancellable_timer ) + { + if( ! _reader ) + return; + eprosima::fastrtps::Duration_t const one_second = { 1, 0 }; + if( _reader->wait_for_unread_message( one_second ) ) + _on_data_available(); + } ) +{ +} + + +void dds_topic_reader_thread::run( qos const & rqos ) +{ + if( ! _on_data_available ) + DDS_THROW( runtime_error, "on-data-available must be provided" ); + + eprosima::fastdds::dds::StatusMask status_mask; + status_mask << eprosima::fastdds::dds::StatusMask::subscription_matched(); + //status_mask << eprosima::fastdds::dds::StatusMask::data_available(); + _reader = DDS_API_CALL( _subscriber->get()->create_datareader( _topic->get(), rqos, this, status_mask ) ); + _th.start(); +} + + +} // namespace realdds diff --git a/third-party/realdds/src/topics/flexible-reader.cpp b/third-party/realdds/src/topics/flexible-reader.cpp index 19251ffd48..2c313d0b84 100644 --- a/third-party/realdds/src/topics/flexible-reader.cpp +++ b/third-party/realdds/src/topics/flexible-reader.cpp @@ -24,7 +24,7 @@ flexible_reader::flexible_reader( std::shared_ptr< dds_topic > const & topic ) this->on_subscription_matched( status ); } ); _reader->on_data_available( [this]() { this->on_data_available(); } ); - _reader->run(); + _reader->run( dds_topic_reader::qos() ); }