Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dds stream threading & participant send/receive buffer size #11560

Merged
merged 8 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions third-party/realdds/include/realdds/dds-stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ namespace realdds {

namespace topics {
namespace device {
class image;
} // namespace device
class image;
} // namespace device
class flexible_msg;
} // namespace topics
} // namespace topics

class dds_subscriber;
class dds_topic_reader;
class dds_topic_reader_thread;

// Represents a stream of information (images, motion data, etc..) from a single source received via the DDS system.
// A stream can have several profiles, i.e different data frequency, image resolution, etc..
Expand Down Expand Up @@ -52,7 +52,7 @@ class dds_stream : public dds_stream_base
virtual void handle_data() = 0;
virtual bool can_start_streaming() const = 0;

std::shared_ptr< dds_topic_reader > _reader;
std::shared_ptr< dds_topic_reader_thread > _reader;
bool _streaming = false;
};

Expand Down
43 changes: 43 additions & 0 deletions third-party/realdds/include/realdds/dds-topic-reader-thread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// License: Apache 2.0. See LICENSE file in root directory.
// Copyright(c) 2023 Intel Corporation. All Rights Reserved.

#pragma once

#include "dds-topic-reader.h"

#include <rsutils/concurrency/concurrency.h>


namespace realdds {


// A topic-reader that calls its callback from a separate thread.
//
// This is the recommended way, according to eProsima:
// "the following threads are spawned per participant:
// - 1 for reception of UDP multicast discovery traffic of participants
// - 1 for reception of UDP unicast discovery traffic of readers and writers
// - 1 for reception of UDP unicast user traffic (common for all topics on the same participant)
// - 1 for reception of SHM unicast user traffic (common for all topics on the same participant)"
// I.e., "on_data_available is called from the last two threads ... performing any lengthy process inside the callback
// is discouraged" and "calls to on_data_available are serialized".
// Experience also shows that one topic can starve out the others (e.g., metadata can get a lot more smaller messages
// than image topics), and handling the data in separate threads is the way to go.
// See also:
// https://fast-dds.docs.eprosima.com/en/latest/fastdds/dds_layer/subscriber/dataReader/readingData.html#accessing-data-with-a-waiting-thread
//
class dds_topic_reader_thread : public dds_topic_reader
{
active_object<> _th;

typedef dds_topic_reader super;

public:
dds_topic_reader_thread( std::shared_ptr< dds_topic > const & topic,
std::shared_ptr< dds_subscriber > const & subscriber );

void run( qos const & ) override;
};


} // namespace realdds
7 changes: 4 additions & 3 deletions third-party/realdds/include/realdds/dds-topic-reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class dds_subscriber;
//
class dds_topic_reader : public eprosima::fastdds::dds::DataReaderListener
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Destructor should be virtual

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

{
protected:
std::shared_ptr< dds_topic > const _topic;
std::shared_ptr < dds_subscriber > const _subscriber;

Expand All @@ -41,7 +42,7 @@ class dds_topic_reader : public eprosima::fastdds::dds::DataReaderListener
public:
dds_topic_reader( std::shared_ptr< dds_topic > const & topic );
dds_topic_reader( std::shared_ptr< dds_topic > const & topic, std::shared_ptr< dds_subscriber > const & subscriber );
~dds_topic_reader();
virtual ~dds_topic_reader();

eprosima::fastdds::dds::DataReader * get() const { return _reader; }
eprosima::fastdds::dds::DataReader * operator->() const { return get(); }
Expand Down Expand Up @@ -72,7 +73,7 @@ class dds_topic_reader : public eprosima::fastdds::dds::DataReaderListener
};

// The callbacks should be set before we actually create the underlying DDS objects, so the reader does not
void run( qos const & = qos() );
virtual void run( qos const & );

// DataReaderListener
protected:
Expand All @@ -81,7 +82,7 @@ class dds_topic_reader : public eprosima::fastdds::dds::DataReaderListener

void on_data_available( eprosima::fastdds::dds::DataReader * ) override;

private:
protected:
on_data_available_callback _on_data_available;
on_subscription_matched_callback _on_subscription_matched;
};
Expand Down
35 changes: 12 additions & 23 deletions third-party/realdds/src/dds-device-impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include "dds-device-impl.h"

#include <realdds/dds-participant.h>
#include <realdds/dds-topic-reader.h>
#include <realdds/dds-topic-reader-thread.h>
#include <realdds/dds-topic-writer.h>
#include <realdds/dds-subscriber.h>
#include <realdds/dds-option.h>
Expand Down Expand Up @@ -110,22 +110,6 @@ void dds_device::impl::run( size_t message_timeout_ms )
}
}
} );

if( _metadata_reader ) // Might not be present
{
_metadata_reader->on_data_available( [&]() {
topics::flexible_msg notification;
eprosima::fastdds::dds::SampleInfo info;
while( topics::flexible_msg::take_next( *_metadata_reader, &notification, &info ) )
{
if( ! notification.is_valid() )
continue;

if( _on_metadata_available )
_on_metadata_available( std::move( notification ) );
}
} );
}
}

void dds_device::impl::open( const dds_stream_profiles & profiles )
Expand Down Expand Up @@ -234,9 +218,6 @@ void dds_device::impl::create_notifications_reader()
//(even if reliable). Setting depth to cover known use-cases plus some spare
rqos.history().depth = 24;

if( ! _notifications_reader )
DDS_THROW( runtime_error, "failed to set notifications reader for '" + _info.topic_root + "'" );

_notifications_reader->run( rqos );
}

Expand All @@ -247,13 +228,21 @@ void dds_device::impl::create_metadata_reader()

auto topic = topics::flexible_msg::create_topic( _participant, _info.topic_root + topics::METADATA_TOPIC_NAME );

_metadata_reader = std::make_shared< dds_topic_reader >( topic, _subscriber );
_metadata_reader = std::make_shared< dds_topic_reader_thread >( topic, _subscriber );

dds_topic_reader::qos rqos( eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS );
rqos.history().depth = 10; // Support receive metadata from multiple streams

if( ! _metadata_reader )
DDS_THROW( runtime_error, "failed to set metadata reader for '" + _info.topic_root + "'" );
_metadata_reader->on_data_available(
[this]()
{
topics::flexible_msg message;
while( topics::flexible_msg::take_next( *_metadata_reader, &message ) )
{
if( message.is_valid() && _on_metadata_available )
_on_metadata_available( std::move( message ) );
}
} );

_metadata_reader->run( rqos );
}
Expand Down
4 changes: 2 additions & 2 deletions third-party/realdds/src/dds-device-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
namespace realdds {


class dds_topic_reader;
class dds_topic_reader_thread;
class dds_topic_writer;
class dds_subscriber;

Expand All @@ -51,7 +51,7 @@ class dds_device::impl
std::queue< nlohmann::json > _option_response_queue;

std::shared_ptr< dds_topic_reader > _notifications_reader;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you not handling notifications in a thread?
There are not a lot of traffic but we parse json and do some other jobs that can take time

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change, though it may have side-effects -- I'd rather do separately.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, let me try

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The notifications reader has mixed usage, split amongst init() and regular notifications, with the former actually calling wait_for_unread_message. I.e., this complicates things. For now, getting notifications on the eProsima thread while all the rest are using custom threads should be OK.

I think we should push this to another PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. We should monitor CPU usage and option setting delay with high resolution streaming and fix if needed.

std::shared_ptr< dds_topic_reader > _metadata_reader;
std::shared_ptr< dds_topic_reader_thread > _metadata_reader;
std::shared_ptr< dds_topic_writer > _control_writer;

dds_options _options;
Expand Down
2 changes: 1 addition & 1 deletion third-party/realdds/src/dds-device-watcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ void dds_device_watcher::init()
} );

if( ! _device_info_topic->is_running() )
_device_info_topic->run();
_device_info_topic->run( dds_topic_reader::qos() );

LOG_DEBUG( "DDS device watcher initialized successfully" );
}
Expand Down
14 changes: 9 additions & 5 deletions third-party/realdds/src/dds-participant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,15 @@ void dds_participant::init( dds_domain_id domain_id, std::string const & partici
// Indicates for how much time should a remote DomainParticipant consider the local DomainParticipant to be alive.
pqos.wire_protocol().builtin.discovery_config.leaseDuration = { 10, 0 }; // [sec,nsec]

//Disable shared memory, use only UDP
//Disabling because sometimes, after improper destruction (e.g. stopping debug) the shared memory is not opened
//correctly and the application is stuck. eProsima is working on it. Manual solution delete shared memory files,
//C:\ProgramData\eprosima\fastrtps_interprocess on Windows, /dev/shm on Linux
auto udp_transport = std::make_shared<eprosima::fastdds::rtps::UDPv4TransportDescriptor>();
// Disable shared memory, use only UDP
// Disabling because sometimes, after improper destruction (e.g. stopping debug) the shared memory is not opened
// correctly and the application is stuck. eProsima is working on it. Manual solution delete shared memory files,
// C:\ProgramData\eprosima\fastrtps_interprocess on Windows, /dev/shm on Linux
auto udp_transport = std::make_shared< eprosima::fastdds::rtps::UDPv4TransportDescriptor >();
// Also change the send/receive buffers: we deal with lots of information and, without this, we'll get dropped
// frames and unusual behavior...
udp_transport->sendBufferSize = 16 * 1024 * 1024;
udp_transport->receiveBufferSize = 16 * 1024 * 1024;
pqos.transport().use_builtin_transports = false;
pqos.transport().user_transports.push_back( udp_transport );

Expand Down
11 changes: 5 additions & 6 deletions third-party/realdds/src/dds-stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <realdds/dds-stream.h>

#include <realdds/dds-topic.h>
#include <realdds/dds-topic-reader.h>
#include <realdds/dds-topic-reader-thread.h>
#include <realdds/dds-subscriber.h>
#include <realdds/topics/image/image-msg.h>
#include <realdds/topics/flexible/flexible-msg.h>
Expand Down Expand Up @@ -36,8 +36,8 @@ void dds_video_stream::open( std::string const & topic_name, std::shared_ptr< dd

// To support automatic streaming (without the need to handle start/stop-streaming commands) the reader is created
// here and destroyed on close()
_reader = std::make_shared< dds_topic_reader >( topic, subscriber );
_reader->on_data_available( [&]() { handle_data(); } );
_reader = std::make_shared< dds_topic_reader_thread >( topic, subscriber );
_reader->on_data_available( [this]() { handle_data(); } );
_reader->run( dds_topic_reader::qos( eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS ) ); // no retries
}

Expand All @@ -54,15 +54,14 @@ void dds_motion_stream::open( std::string const & topic_name, std::shared_ptr< d

// To support automatic streaming (without the need to handle start/stop-streaming commands) the reader is created
// here and destroyed on close()
_reader = std::make_shared< dds_topic_reader >( topic, subscriber );
_reader->on_data_available( [&]() { handle_data(); } );
_reader = std::make_shared< dds_topic_reader_thread >( topic, subscriber );
_reader->on_data_available( [this]() { handle_data(); } );
_reader->run( dds_topic_reader::qos( eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS ) ); // no retries
}


void dds_stream::close()
{
_reader->on_data_available( [](){} );
_reader.reset();
}

Expand Down
46 changes: 46 additions & 0 deletions third-party/realdds/src/dds-topic-reader-thread.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// License: Apache 2.0. See LICENSE file in root directory.
// Copyright(c) 2023 Intel Corporation. All Rights Reserved.

#include <realdds/dds-topic-reader-thread.h>
#include <realdds/dds-topic.h>
#include <realdds/dds-subscriber.h>
#include <realdds/dds-utilities.h>

#include <fastdds/dds/subscriber/Subscriber.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/dds/topic/Topic.hpp>


namespace realdds {


dds_topic_reader_thread::dds_topic_reader_thread( std::shared_ptr< dds_topic > const & topic,
std::shared_ptr< dds_subscriber > const & subscriber )
: super( topic, subscriber )
, _th(
[this]( dispatcher::cancellable_timer )
{
if( ! _reader )
return;
eprosima::fastrtps::Duration_t const one_second = { 1, 0 };
if( _reader->wait_for_unread_message( one_second ) )
_on_data_available();
} )
{
}


void dds_topic_reader_thread::run( qos const & rqos )
{
if( ! _on_data_available )
DDS_THROW( runtime_error, "on-data-available must be provided" );

eprosima::fastdds::dds::StatusMask status_mask;
status_mask << eprosima::fastdds::dds::StatusMask::subscription_matched();
//status_mask << eprosima::fastdds::dds::StatusMask::data_available();
_reader = DDS_API_CALL( _subscriber->get()->create_datareader( _topic->get(), rqos, this, status_mask ) );
_th.start();
}


} // namespace realdds
2 changes: 1 addition & 1 deletion third-party/realdds/src/topics/flexible-reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ flexible_reader::flexible_reader( std::shared_ptr< dds_topic > const & topic )
this->on_subscription_matched( status );
} );
_reader->on_data_available( [this]() { this->on_data_available(); } );
_reader->run();
_reader->run( dds_topic_reader::qos() );
}


Expand Down