-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
dds stream threading & participant send/receive buffer size #11560
Changes from all commits
41b81d2
63a840a
09608e8
7a127da
c550d34
be229ec
5e3d0a0
f05d7c9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
// License: Apache 2.0. See LICENSE file in root directory. | ||
// Copyright(c) 2023 Intel Corporation. All Rights Reserved. | ||
|
||
#pragma once | ||
|
||
#include "dds-topic-reader.h" | ||
|
||
#include <rsutils/concurrency/concurrency.h> | ||
|
||
|
||
namespace realdds { | ||
|
||
|
||
// A topic-reader that calls its callback from a separate thread. | ||
// | ||
// This is the recommended way, according to eProsima: | ||
// "the following threads are spawned per participant: | ||
// - 1 for reception of UDP multicast discovery traffic of participants | ||
// - 1 for reception of UDP unicast discovery traffic of readers and writers | ||
// - 1 for reception of UDP unicast user traffic (common for all topics on the same participant) | ||
// - 1 for reception of SHM unicast user traffic (common for all topics on the same participant)" | ||
// I.e., "on_data_available is called from the last two threads ... performing any lengthy process inside the callback | ||
// is discouraged" and "calls to on_data_available are serialized". | ||
// Experience also shows that one topic can starve out the others (e.g., metadata can get a lot more smaller messages | ||
// than image topics), and handling the data in separate threads is the way to go. | ||
// See also: | ||
// https://fast-dds.docs.eprosima.com/en/latest/fastdds/dds_layer/subscriber/dataReader/readingData.html#accessing-data-with-a-waiting-thread | ||
// | ||
class dds_topic_reader_thread : public dds_topic_reader | ||
{ | ||
active_object<> _th; | ||
|
||
typedef dds_topic_reader super; | ||
|
||
public: | ||
dds_topic_reader_thread( std::shared_ptr< dds_topic > const & topic, | ||
std::shared_ptr< dds_subscriber > const & subscriber ); | ||
|
||
void run( qos const & ) override; | ||
}; | ||
|
||
|
||
} // namespace realdds |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,7 +24,7 @@ | |
namespace realdds { | ||
|
||
|
||
class dds_topic_reader; | ||
class dds_topic_reader_thread; | ||
class dds_topic_writer; | ||
class dds_subscriber; | ||
|
||
|
@@ -51,7 +51,7 @@ class dds_device::impl | |
std::queue< nlohmann::json > _option_response_queue; | ||
|
||
std::shared_ptr< dds_topic_reader > _notifications_reader; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are you not handling notifications in a thread? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can change, though it may have side-effects -- I'd rather do separately. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, let me try There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The notifications reader has mixed usage, split amongst I think we should push this to another PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. We should monitor CPU usage and option setting delay with high resolution streaming and fix if needed. |
||
std::shared_ptr< dds_topic_reader > _metadata_reader; | ||
std::shared_ptr< dds_topic_reader_thread > _metadata_reader; | ||
std::shared_ptr< dds_topic_writer > _control_writer; | ||
|
||
dds_options _options; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
// License: Apache 2.0. See LICENSE file in root directory. | ||
// Copyright(c) 2023 Intel Corporation. All Rights Reserved. | ||
|
||
#include <realdds/dds-topic-reader-thread.h> | ||
#include <realdds/dds-topic.h> | ||
#include <realdds/dds-subscriber.h> | ||
#include <realdds/dds-utilities.h> | ||
|
||
#include <fastdds/dds/subscriber/Subscriber.hpp> | ||
#include <fastdds/dds/subscriber/DataReader.hpp> | ||
#include <fastdds/dds/topic/Topic.hpp> | ||
|
||
|
||
namespace realdds { | ||
|
||
|
||
dds_topic_reader_thread::dds_topic_reader_thread( std::shared_ptr< dds_topic > const & topic, | ||
std::shared_ptr< dds_subscriber > const & subscriber ) | ||
: super( topic, subscriber ) | ||
, _th( | ||
[this]( dispatcher::cancellable_timer ) | ||
{ | ||
if( ! _reader ) | ||
return; | ||
eprosima::fastrtps::Duration_t const one_second = { 1, 0 }; | ||
if( _reader->wait_for_unread_message( one_second ) ) | ||
_on_data_available(); | ||
} ) | ||
{ | ||
} | ||
|
||
|
||
void dds_topic_reader_thread::run( qos const & rqos ) | ||
{ | ||
if( ! _on_data_available ) | ||
DDS_THROW( runtime_error, "on-data-available must be provided" ); | ||
|
||
eprosima::fastdds::dds::StatusMask status_mask; | ||
status_mask << eprosima::fastdds::dds::StatusMask::subscription_matched(); | ||
//status_mask << eprosima::fastdds::dds::StatusMask::data_available(); | ||
_reader = DDS_API_CALL( _subscriber->get()->create_datareader( _topic->get(), rqos, this, status_mask ) ); | ||
_th.start(); | ||
} | ||
|
||
|
||
} // namespace realdds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Destructor should be virtual
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed