Skip to content

Commit

Permalink
PR #13330 from Eran: DDS netword-adapter watcher monitoring IP changes
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel authored Sep 11, 2024
2 parents 047f6f0 + eb0673e commit e6a438a
Show file tree
Hide file tree
Showing 11 changed files with 667 additions and 56 deletions.
40 changes: 40 additions & 0 deletions third-party/realdds/include/realdds/dds-network-adapter-watcher.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// License: Apache 2.0. See LICENSE file in root directory.
// Copyright(c) 2024 Intel Corporation. All Rights Reserved.
#pragma once

#include <rsutils/os/network-adapter-watcher.h>
#include <set>
#include <string>


namespace realdds {


namespace detail {
class network_adapter_watcher_singleton;
}


// Watch for changes to network adapter IPs
//
// Unlike rsutils::os::network_adapter_watcher, we only call the callbacks when actual changes to IPs are made, which
// can sometimes be seconds after adapter-change notifications are sent
//
// All you have to do is create a watcher and keep a pointer to it to get notifications
//
class dds_network_adapter_watcher
{
std::shared_ptr< detail::network_adapter_watcher_singleton > _singleton;
rsutils::subscription _subscription;

public:
using callback = std::function< void() >;

dds_network_adapter_watcher( callback && );

using ip_set = std::set< std::string >;
static ip_set current_ips();
};


} // namespace realdds
10 changes: 10 additions & 0 deletions third-party/realdds/include/realdds/dds-participant.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class slice;
namespace realdds {


class dds_network_adapter_watcher;


// The starting point for any DDS interaction, a participant has a name and is the focal point for creating, destroying,
// and managing other DDS objects. It defines the DDS domain (ID) in which every other object lives.
//
Expand All @@ -53,6 +56,7 @@ class dds_participant
struct listener_impl;

rsutils::json _settings;
std::shared_ptr< dds_network_adapter_watcher > _adapter_watcher;

public:
dds_participant() = default;
Expand All @@ -71,6 +75,12 @@ class dds_participant
qos( std::string const & participant_name );
};

// Return the QoS (so user won't have to actually know about the DomainParticipant)
eprosima::fastdds::dds::DomainParticipantQos const & get_qos() const;

// Refresh the QoS, so it will pick up any changes in the system (e.g., if adapters have changed)
void refresh_qos();

// Creates the underlying DDS participant and sets the QoS.
// If callbacks are needed, set them before calling init. Note they may be called before init returns!
//
Expand Down
2 changes: 2 additions & 0 deletions third-party/realdds/include/realdds/dds-serialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ std::ostream & operator<<( std::ostream &, LivelinessQosPolicy const & );
std::ostream & operator<<( std::ostream &, DataSharingQosPolicy const & );
std::ostream & operator<<( std::ostream &, RTPSEndpointQos const & );
std::ostream & operator<<( std::ostream &, PublishModeQosPolicy const & );
std::ostream & operator<<( std::ostream &, ParticipantResourceLimitsQos const & );

class DomainParticipantQos;
std::ostream & operator<<( std::ostream &, DomainParticipantQos const & );
Expand Down Expand Up @@ -49,6 +50,7 @@ namespace rtps {
std::ostream & operator<<( std::ostream &, class WriterProxyData const & );
std::ostream & operator<<( std::ostream &, class ReaderProxyData const & );
std::ostream & operator<<( std::ostream &, BuiltinAttributes const & );
std::ostream & operator<<( std::ostream &, RemoteLocatorsAllocationAttributes const & );
} // namespace rtps

} // namespace fastrtps
Expand Down
44 changes: 32 additions & 12 deletions third-party/realdds/py/pyrealdds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <realdds/dds-metadata-syncer.h>
#include <realdds/dds-serialization.h>
#include <realdds/dds-sample.h>
#include <realdds/dds-network-adapter-watcher.h>

#include <rsutils/os/special-folder.h>
#include <rsutils/os/executable-name.h>
Expand Down Expand Up @@ -120,6 +121,17 @@ json load_rs_settings( json const & local_settings )
}


py::list network_adapter_list()
{
auto const ips = realdds::dds_network_adapter_watcher::current_ips();
py::list obj( ips.size() );
int i = 0;
for( auto & ip : ips )
obj[i++] = ip;
return std::move( obj );
}


} // namespace


Expand Down Expand Up @@ -225,6 +237,9 @@ PYBIND11_MODULE(NAME, m) {
m.def( "timestr", []( dds_time t ) { return timestr( t ).to_string(); } );


m.def( "network_adapter_list", &network_adapter_list );


py::class_< dds_participant::listener,
std::shared_ptr< dds_participant::listener > // handled with a shared_ptr
>
Expand Down Expand Up @@ -408,15 +423,15 @@ PYBIND11_MODULE(NAME, m) {
using realdds::dds_topic_reader;
py::class_< dds_topic_reader, std::shared_ptr< dds_topic_reader > >( m, "topic_reader" )
.def( py::init< std::shared_ptr< dds_topic > const & >() )
.def( FN_FWD( dds_topic_reader, on_data_available, (dds_topic_reader &), (), callback( self ); ) )
.def( FN_FWD( dds_topic_reader, on_data_available, (dds_topic_reader *), (), callback( self ); ) )
.def( FN_FWD( dds_topic_reader,
on_subscription_matched,
(dds_topic_reader &, int),
(dds_topic_reader *, int),
( eprosima::fastdds::dds::SubscriptionMatchedStatus const & status ),
callback( self, status.current_count_change ); ) )
.def( FN_FWD( dds_topic_reader,
on_sample_lost,
(dds_topic_reader &, int, int),
(dds_topic_reader *, int, int),
(eprosima::fastdds::dds::SampleLostStatus const & status),
callback( self, status.total_count, status.total_count_change ); ) )
.def( "topic", &dds_topic_reader::topic )
Expand Down Expand Up @@ -455,7 +470,7 @@ PYBIND11_MODULE(NAME, m) {
.def( "guid", &dds_topic_writer::guid )
.def( FN_FWD( dds_topic_writer,
on_publication_matched,
(dds_topic_writer &, int),
(dds_topic_writer *, int),
( eprosima::fastdds::dds::PublicationMatchedStatus const & status ),
callback( self, status.current_count_change ); ) )
.def( "topic", &dds_topic_writer::topic )
Expand Down Expand Up @@ -803,7 +818,7 @@ PYBIND11_MODULE(NAME, m) {
.def( "stream", &dds_stream_profile::stream )
.def( "to_string", &dds_stream_profile::to_string )
.def( "details_to_string", &dds_stream_profile::details_to_string )
.def( "to_json", []( dds_stream_profile const & self ) { return self.to_json().dump(); } )
.def( "to_json", []( dds_stream_profile const & self ) { return self.to_json(); } )
.def( "__repr__", []( dds_stream_profile const & self ) {
std::ostringstream os;
std::string self_as_string = self.to_string(); // <video 0xUID ...>
Expand Down Expand Up @@ -937,12 +952,12 @@ PYBIND11_MODULE(NAME, m) {
.def( "broadcast", &dds_device_server::broadcast )
.def( "broadcast_disconnect", &dds_device_server::broadcast_disconnect, py::arg( "ack-timeout" ) = dds_time() )
.def( FN_FWD( dds_device_server, on_set_option,
(dds_device_server &, std::shared_ptr< realdds::dds_option > const &, json_ref &&),
(dds_device_server *, std::shared_ptr< realdds::dds_option > const &, json_ref &&),
( std::shared_ptr< realdds::dds_option > const & option, json & value ),
callback( self, option, json_ref{ value } ); ) )
.def( FN_FWD_R( dds_device_server, on_control,
false,
(dds_device_server &, std::string const &, py::object &&, json_ref &&),
(dds_device_server *, std::string const &, py::object &&, json_ref &&),
( std::string const & id, json const & control, json & reply ),
return callback( self, id, json_to_py( control ), json_ref{ reply } ); ) );

Expand Down Expand Up @@ -970,7 +985,7 @@ PYBIND11_MODULE(NAME, m) {
.def( "get_intrinsics", &dds_video_stream::get_intrinsics )
.def( FN_FWD( dds_video_stream,
on_data_available,
( dds_video_stream &, image_msg &&, dds_sample && ),
( dds_video_stream *, image_msg &&, dds_sample && ),
( image_msg && i, dds_sample && sample ),
callback( self, std::move( i ), std::move( sample ) ); ) );

Expand All @@ -995,7 +1010,12 @@ PYBIND11_MODULE(NAME, m) {
motion_stream_client_base( m, "motion_stream", stream_client_base );
motion_stream_client_base
.def( "set_gyro_intrinsics", &dds_motion_stream::set_gyro_intrinsics )
.def( "set_accel_intrinsics", &dds_motion_stream::set_accel_intrinsics );
.def( "set_accel_intrinsics", &dds_motion_stream::set_accel_intrinsics )
.def( FN_FWD( dds_motion_stream,
on_data_available,
( dds_motion_stream *, imu_msg &&, dds_sample && ),
( imu_msg && i, dds_sample && sample ),
callback( self, std::move( i ), std::move( sample ) ); ) );


using subscription = rsutils::subscription;
Expand Down Expand Up @@ -1107,12 +1127,12 @@ PYBIND11_MODULE(NAME, m) {
.def( "is_stopped", &dds_device_watcher::is_stopped )
.def( FN_FWD( dds_device_watcher,
on_device_added,
( dds_device_watcher const &, std::shared_ptr< dds_device > const & ),
( dds_device_watcher const *, std::shared_ptr< dds_device > const & ),
( std::shared_ptr< dds_device > const & dev ),
callback( self, dev ); ) )
.def( FN_FWD( dds_device_watcher,
on_device_removed,
( dds_device_watcher const &, std::shared_ptr< dds_device > const & ),
( dds_device_watcher const *, std::shared_ptr< dds_device > const & ),
( std::shared_ptr< dds_device > const & dev ),
callback( self, dev ); ) )
.def( "devices",
Expand Down Expand Up @@ -1200,7 +1220,7 @@ PYBIND11_MODULE(NAME, m) {
on_frame_ready,
( dds_metadata_syncer::frame_type, json const & ),
( dds_metadata_syncer::frame_holder && fh, std::shared_ptr< const json > const & metadata ),
callback( self.get_frame( fh ), metadata ? *metadata : json() ); ) )
callback( self->get_frame( fh ), metadata ? *metadata : json() ); ) )
.def( FN_FWD( dds_metadata_syncer,
on_metadata_dropped,
( dds_metadata_syncer::key_type, json const & ),
Expand Down
110 changes: 74 additions & 36 deletions third-party/realdds/scripts/fps.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ def domain_arg(x):
return t
args.add_argument( '--domain', metavar='<0-232>', type=domain_arg, default=-1, help='DDS domain to use (default=0)' )
args.add_argument( '--with-metadata', action='store_true', help='stream with metadata, if available (default off)' )
args.add_argument( '--depth', action='store_true', help='stream Depth' )
args.add_argument( '--color', action='store_true', help='stream Color' )
args.add_argument( '--ir1', action='store_true', help='stream IR1' )
args.add_argument( '--ir2', action='store_true', help='stream IR2' )
args.add_argument( '--fps', metavar='5,15,30,60,90', type=int, default=30, help='Frames per second' )
def res_arg(x):
import re
m = re.fullmatch( r'(\d+)x(\d+)', x )
if not m:
raise ValueError( f'--res should be WIDTHxHEIGHT' )
return [int(m.group(1)), int(m.group(2))]
args.add_argument( '--res', metavar='WxH', type=res_arg, default=[1280,720], help='Resolution as WIDTHxHEIGHT' )
args = args.parse_args()


Expand Down Expand Up @@ -61,66 +73,92 @@ def e( *a, **kw ):
e( 'Cannot find device' )
sys.exit(1)

n_depth = 0
def on_depth_image( stream, image, sample ):
#d( f'----> depth {image}')
global n_depth
n_depth += 1

n_color = 0
def on_color_image( stream, image, sample ):
#d( f'----> color {image}')
global n_color
n_color += 1
streams = []
if args.depth:
streams.append( 'Depth' )
if args.color:
streams.append( 'Color' )
if args.ir1:
streams.append( 'Infrared_1' )
if args.ir2:
streams.append( 'Infrared_2' )
if not streams:
streams = ['Depth', 'Color']

n_stream_frames = {}
for s in streams:
n_stream_frames[s] = 0
capturing = True
def on_image( stream, image, sample ):
#d( f'----> {image}')
global n_stream_frames, capturing
if capturing:
n_stream_frames[stream.name()] += 1

fps = args.fps
width = args.res[0]
height = args.res[1]
type_format = { 'depth': str(dds.video_encoding.z16), 'color': str(dds.video_encoding.yuyv), 'ir': str(dds.video_encoding.y8) }

i( device.n_streams(), 'streams available' )
depth_stream = None
color_stream = None
stream_profiles = {}
for stream in device.streams():
profiles = stream.profiles()
i( ' ', stream.sensor_name(), '/', stream.default_profile().to_string()[1:-1] )
if stream.type_string() == 'depth':
depth_stream = stream
stream.on_data_available( on_depth_image )
elif stream.type_string() == 'color':
color_stream = stream
stream.on_data_available( on_color_image )
else:
#profiles = stream.profiles()
if stream.name() not in streams:
continue

#stream.default_profile().to_string()[1:-1]
profile = [fps,type_format[stream.type_string()],width,height]
i( f' {stream.sensor_name()} / {stream.name()} {profile}' )
stream_profiles[stream.name()] = profile

device.send_control( { 'id': 'open-streams',
'stream-profiles': stream_profiles }, True )

for stream in device.streams():
if stream.name() not in streams:
continue

stream.on_data_available( on_image )
stream_topic = 'rt/' + info.topic_root + '_' + stream.name()
stream.open( stream_topic, dds.subscriber( participant ) )
stream.start_streaming()

# Wait until we have at least one frame from each
tries = 5
while tries > 0:
if n_depth > 0 and n_color > 0:
for n in n_stream_frames.values():
if n <= 0:
break
else:
break
time.sleep( 1 )
tries -= 1
else:
raise RuntimeError( 'timed out waiting for frames to arrive' )
n_depth = n_color = 0
raise RuntimeError( f'timed out waiting for all frames to arrive {n_stream_frames}' )
time_started = time.time()
i( 'Starting :', time_started )
i( f'Starting @ {time_started}' )
n_frames_at_start = dict( n_stream_frames )
capturing = True

# Measure number of frames in a period of time
time.sleep( args.time )

result_depth = n_depth
result_color = n_color
capturing = False
time_stopped = time.time()
i( 'Stopping :', time_stopped )
depth_stream.stop_streaming()
color_stream.stop_streaming()
depth_stream.close()
color_stream.close()
i( f'Stopping @ {time_stopped}' )
for stream in device.streams():
if stream.name() in streams:
stream.stop_streaming()
for stream in device.streams():
if stream.name() in streams:
stream.close()

time_delta = time_stopped - time_started
i( 'Elapsed :', time_delta )
i( 'Number of Depth frames received:', result_depth, '=', result_depth / time_delta, 'fps' )
i( 'Number of Color frames received:', result_color, '=', result_color / time_delta, 'fps' )
i( 'Elapsed :', time_delta )
for s in streams:
n_frames = n_stream_frames[s] - n_frames_at_start[s]
i( f' {s:16}: {n_frames:4} frames @ FPS {n_frames / time_delta}' )

if args.quiet:
print( min( result_color, result_depth ) / time_delta )
Expand Down
Loading

0 comments on commit e6a438a

Please sign in to comment.