From 3e3817c9aa5dc91bcdaacd1e4256e4032b18fc15 Mon Sep 17 00:00:00 2001 From: Eran Date: Fri, 24 Mar 2023 17:49:35 +0300 Subject: [PATCH 1/8] add metadata_array, with frame_additional_data ctor for it --- src/frame.h | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/frame.h b/src/frame.h index 525386c280..55b3e1a75c 100644 --- a/src/frame.h +++ b/src/frame.h @@ -33,6 +33,8 @@ struct metadata_array_value }; #pragma pack( pop ) +typedef std::array< metadata_array_value, RS2_FRAME_METADATA_ACTUAL_COUNT > metadata_array; + static_assert( sizeof( metadata_array_value ) == sizeof( rs2_metadata_type ) + 1, "unexpected size for metadata array members" ); @@ -71,7 +73,7 @@ struct frame_additional_data : frame_header { uint32_t metadata_size = 0; bool fisheye_ae_mode = false; // TODO: remove in future release - std::array< uint8_t, RS2_FRAME_METADATA_ACTUAL_COUNT * sizeof( metadata_array_value ) > metadata_blob; + std::array< uint8_t, sizeof( metadata_array ) > metadata_blob = {}; rs2_time_t last_timestamp = 0; unsigned long long last_frame_number = 0; bool is_blocking = false; // when running from recording, this bit indicates @@ -85,6 +87,12 @@ struct frame_additional_data : frame_header frame_additional_data() {} + frame_additional_data( metadata_array const & metadata ) + { + metadata_size = (uint32_t) sizeof( metadata ); + memcpy( metadata_blob.data(), metadata.data(), metadata_size ); + } + frame_additional_data( rs2_time_t in_timestamp, unsigned long long in_frame_number, rs2_time_t in_system_time, From 87e6d0ee4c3c0e9e66a4f69980cff4bd6ebf243f Mon Sep 17 00:00:00 2001 From: Eran Date: Fri, 24 Mar 2023 17:50:20 +0300 Subject: [PATCH 2/8] metadata_array usage in software-device --- src/software-device.cpp | 16 +++------------- src/software-device.h | 2 +- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/src/software-device.cpp b/src/software-device.cpp index 335f7eaab5..d1029638b2 100644 --- a/src/software-device.cpp +++ b/src/software-device.cpp @@ -315,15 +315,12 @@ namespace librealsense if( ! _is_streaming ) return; - frame_additional_data data; + frame_additional_data data( _metadata_map ); data.timestamp = software_frame.timestamp; data.timestamp_domain = software_frame.domain; data.frame_number = software_frame.frame_number; data.depth_units = software_frame.depth_units; - data.metadata_size = (uint32_t)( _metadata_map.size() * sizeof( metadata_array_value ) ); - memcpy( data.metadata_blob.data(), _metadata_map.data(), data.metadata_size ); - auto frame = allocate_new_video_frame( vid_profile, software_frame.stride, software_frame.bpp, std::move( data ) ); if( frame ) @@ -336,14 +333,11 @@ namespace librealsense if( ! _is_streaming ) return; - frame_additional_data data; + frame_additional_data data( _metadata_map ); data.timestamp = software_frame.timestamp; data.timestamp_domain = software_frame.domain; data.frame_number = software_frame.frame_number; - data.metadata_size = (uint32_t) (_metadata_map.size() * sizeof( metadata_array_value )); - memcpy( data.metadata_blob.data(), _metadata_map.data(), data.metadata_size ); - auto frame = allocate_new_frame( RS2_EXTENSION_MOTION_FRAME, software_frame.profile->profile, std::move( data ) ); if( frame ) @@ -356,14 +350,11 @@ namespace librealsense if( ! _is_streaming ) return; - frame_additional_data data; + frame_additional_data data( _metadata_map ); data.timestamp = software_frame.timestamp; data.timestamp_domain = software_frame.domain; data.frame_number = software_frame.frame_number; - data.metadata_size = (uint32_t) (_metadata_map.size() * sizeof( metadata_array_value )); - memcpy( data.metadata_blob.data(), _metadata_map.data(), data.metadata_size ); - auto frame = allocate_new_frame( RS2_EXTENSION_POSE_FRAME, software_frame.profile->profile, std::move( data ) ); if( frame ) invoke_new_frame( frame, software_frame.data, on_release.detach() ); @@ -374,7 +365,6 @@ namespace librealsense notification n{ notif.category, notif.type, notif.severity, notif.description }; n.serialized_data = notif.serialized_data; _notifications_processor->raise_notification(n); - } void software_sensor::add_read_only_option(rs2_option option, float val) diff --git a/src/software-device.h b/src/software-device.h index de05cf8edf..864fe19464 100644 --- a/src/software-device.h +++ b/src/software-device.h @@ -125,7 +125,7 @@ namespace librealsense frame_interface * allocate_new_video_frame( video_stream_profile_interface *, int stride, int bpp, frame_additional_data && ); void invoke_new_frame( frame_interface * frame, void const * pixels, std::function< void() > on_release ); - std::array< metadata_array_value, RS2_FRAME_METADATA_ACTUAL_COUNT > _metadata_map; + metadata_array _metadata_map; processing_blocks get_recommended_processing_blocks() const override { From 4a228962857a2948f66b552dcc839cac85cce16a Mon Sep 17 00:00:00 2001 From: Eran Date: Fri, 24 Mar 2023 17:51:48 +0300 Subject: [PATCH 3/8] added move semantics in device-server, per last CR --- third-party/realdds/src/dds-device-server.cpp | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/third-party/realdds/src/dds-device-server.cpp b/third-party/realdds/src/dds-device-server.cpp index c15783e7b4..b8bf655e78 100644 --- a/third-party/realdds/src/dds-device-server.cpp +++ b/third-party/realdds/src/dds-device-server.cpp @@ -58,11 +58,11 @@ static void on_discovery_device_header( size_t const n_streams, const dds_option topics::flexible_msg device_header( json{ { "id", "device-header" }, { "n-streams", n_streams }, - { "extrinsics", extrinsics_json } + { "extrinsics", std::move( extrinsics_json ) } } ); - LOG_DEBUG( "-----> JSON = " << device_header.json_data().dump() ); - LOG_DEBUG( "-----> JSON size = " << device_header.json_data().dump().length() ); - LOG_DEBUG( "-----> CBOR size = " << json::to_cbor( device_header.json_data() ).size() ); + auto json_string = slice( device_header.custom_data< char const >(), device_header._data.size() ); + LOG_DEBUG( "-----> JSON = " << shorten_json_string( json_string, 300 ) << " size " << json_string.length() ); + //LOG_DEBUG( "-----> CBOR size = " << json::to_cbor( device_header.json_data() ).size() ); notifications.add_discovery_notification( std::move( device_header ) ); auto device_options = nlohmann::json::array(); @@ -70,11 +70,11 @@ static void on_discovery_device_header( size_t const n_streams, const dds_option device_options.push_back( std::move( opt->to_json() ) ); topics::flexible_msg device_options_message( json { { "id", "device-options" }, - { "options" , device_options } + { "options", std::move( device_options ) } } ); - LOG_DEBUG( "-----> JSON = " << device_options_message.json_data().dump() ); - LOG_DEBUG( "-----> JSON size = " << device_options_message.json_data().dump().length() ); - LOG_DEBUG( "-----> CBOR size = " << json::to_cbor( device_options_message.json_data() ).size() ); + json_string = slice( device_options_message.custom_data< char const >(), device_options_message._data.size() ); + LOG_DEBUG( "-----> JSON = " << shorten_json_string( json_string, 300 ) << " size " << json_string.length() ); + //LOG_DEBUG( "-----> CBOR size = " << json::to_cbor( device_options_message.json_data() ).size() ); notifications.add_discovery_notification( std::move( device_options_message ) ); } @@ -90,7 +90,7 @@ static void on_discovery_stream_header( std::shared_ptr< dds_stream_server > con { "type", stream->type_string() }, { "name", stream->name() }, { "sensor-name", stream->sensor_name() }, - { "profiles", profiles }, + { "profiles", std::move( profiles ) }, { "default-profile-index", stream->default_profile_index() }, { "metadata-enabled", stream->metadata_enabled() }, } ); @@ -118,7 +118,7 @@ static void on_discovery_stream_header( std::shared_ptr< dds_stream_server > con topics::flexible_msg stream_options_message( json { { "id", "stream-options" }, { "stream-name", stream->name() }, - { "options" , stream_options }, + { "options" , std::move( stream_options ) }, { "intrinsics" , intrinsics }, { "recommended-filters", std::move( stream_filters ) }, } ); From 45467acbd1a9135f652033c9a8310db2772f1856 Mon Sep 17 00:00:00 2001 From: Eran Date: Fri, 24 Mar 2023 17:53:20 +0300 Subject: [PATCH 4/8] refactor & optimize frame creation, invocation, metadata syncing --- src/context.cpp | 304 ++++++++++++++++++++---------------------------- 1 file changed, 127 insertions(+), 177 deletions(-) diff --git a/src/context.cpp b/src/context.cpp index 666558b7bb..505837f8bb 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -481,7 +481,6 @@ namespace librealsense // Frame data and metadata are sent as two seperate streams. // They are synchronized using frame_id for matching data+metadata sets. - template< class T > class frame_metadata_syncer { public: @@ -491,40 +490,58 @@ namespace librealsense // the frame late and without metadata over losing it. static constexpr size_t max_frame_queue_size = 2; - // frame_metadata_syncer is instanciated with rs2_software_video_frame and rs2_software_motion_frame. - // Both structures point to a rs2_stream_profile that was allocated especially for the life time of frame. - // We need to pass std::shared_ptr< rs2_stream_profile > as a parameter to keep it alive as long as frame is. - void enqueue_frame( T && frame, std::shared_ptr< rs2_stream_profile > prof ) + // We synchronize using some abstract "id" used to identify each frame and its metadata. We don't need to know + // the nature of the id; only that it is increasing in value over time so that, given id1 > id2, then id1 + // happened after id2. + typedef uint64_t id_type; + + private: + struct synced_frame + { + id_type _sync_id; + frame * _frame; + }; + + struct synced_metadata + { + id_type _sync_id; + nlohmann::json _md; + }; + + public: + void enqueue_frame( id_type id, frame * f ) { std::lock_guard< std::mutex > lock( _queues_lock ); - _frame_queue.push_back( std::move( frame ) ); - _prof_queue.push_back( prof ); - search_for_match(); // Call under lock + _frame_queue.push_back( synced_frame{ id, f } ); + search_for_match(); // Call under lock } - void enqueue_metadata( json && md ) + void enqueue_metadata( id_type id, nlohmann::json && md ) { std::lock_guard< std::mutex > lock( _queues_lock ); while( _metadata_queue.size() >= max_md_queue_size ) + { + LOG_DEBUG( "throwing away metadata: " << _metadata_queue.front()._md.dump() ); _metadata_queue.pop_front(); // Throw oldest + } - _metadata_queue.push_back( std::move( md ) ); - search_for_match(); // Call under lock + //LOG_DEBUG( "enqueueing metadata: " << md.dump() ); + _metadata_queue.push_back( synced_metadata{ id, std::move( md ) } ); + search_for_match(); // Call under lock } - typedef std::function< void( T && frame, nlohmann::json && metadata ) > on_frame_ready; + typedef std::function< void( frame *, nlohmann::json && metadata ) > on_frame_ready; void reset( on_frame_ready cb = nullptr ) { std::lock_guard< std::mutex > lock( _queues_lock ); _frame_queue.clear(); - _prof_queue.clear(); _metadata_queue.clear(); _on_frame_ready = cb; } private: - //Call under lock + // Call under lock void search_for_match() { // Wait for frame + metadata set, but if metadata is lost pass frame to the user anyway @@ -532,17 +549,16 @@ namespace librealsense return; // Sync using frame ID. Frame IDs are assumed to be increasing with time - int frame_id = _frame_queue.front().frame_number; - int md_id - = _metadata_queue.empty() - ? frame_id + 1 // If no metadata force call to handle_frame_no_metadata - : std::stoi( rsutils::json::get< std::string >( _metadata_queue.front()["header"], "frame-id" ) ); + auto frame_id = _frame_queue.front()._sync_id; + auto md_id = _metadata_queue.empty() + ? frame_id + 1 // If no metadata force call to handle_frame_no_metadata + : _metadata_queue.front()._sync_id; while( frame_id > md_id && _metadata_queue.size() > 1 ) { // Metadata without frame, remove it from queue and check next _metadata_queue.pop_front(); - md_id = std::stoi( rsutils::json::get< std::string >( _metadata_queue.front()["header"], "frame-id" ) ); + md_id = _metadata_queue.front()._sync_id; } if( frame_id == md_id ) @@ -551,49 +567,48 @@ namespace librealsense handle_frame_without_metadata(); } - //Call under lock + // Call under lock void handle_match() { - T & frame = _frame_queue.front(); + auto const frame = _frame_queue.front()._frame; json md; if( ! _metadata_queue.empty() ) { - md = std::move( _metadata_queue.front() ); + md = std::move( _metadata_queue.front()._md ); _metadata_queue.pop_front(); } if( _on_frame_ready ) - _on_frame_ready( std::move( frame ), std::move( md ) ); + _on_frame_ready( frame, std::move( md ) ); _frame_queue.pop_front(); - _prof_queue.pop_front(); } - //Call under lock + // Call under lock void handle_frame_without_metadata() { - T & frame = _frame_queue.front(); + auto frame = _frame_queue.front()._frame; json md; if( _on_frame_ready ) - _on_frame_ready( std::move( frame ), std::move( md ) ); + _on_frame_ready( frame, std::move( md ) ); _frame_queue.pop_front(); - _prof_queue.pop_front(); } on_frame_ready _on_frame_ready = nullptr; - std::deque< T > _frame_queue; - std::deque< std::shared_ptr< rs2_stream_profile > > _prof_queue; // Holding shared_ptr till corresponding frame is used - std::deque< nlohmann::json > _metadata_queue; + std::deque< synced_frame > _frame_queue; + std::deque< synced_metadata > _metadata_queue; std::mutex _queues_lock; }; class dds_sensor_proxy : public software_sensor { + std::map< std::string, frame_metadata_syncer > _stream_name_to_syncer; + public: dds_sensor_proxy( std::string const & sensor_name, software_device * owner, @@ -711,158 +726,103 @@ namespace librealsense software_sensor::open( profiles ); } - void custom_on_video_frame( rs2_software_video_frame const & software_frame ) + void handle_video_data( realdds::topics::device::image && dds_frame, + const std::shared_ptr< stream_profile_interface > & profile, + frame_metadata_syncer & syncer ) { - // We do exactly the same as the base on_video_frame(), but we have a custom releaser that know that the - // pixels are actually pointing to a vector: - auto p_pixels_vector = static_cast< std::vector< byte > * >( software_frame.pixels ); - rsutils::deferred on_release( [p_pixels_vector]() { delete p_pixels_vector; } ); + frame_additional_data data; // with NO metadata by default! + data.timestamp; // from metadata + data.timestamp_domain; // from metadata + data.depth_units; // from metadata + data.frame_number = ! dds_frame.frame_id.empty() ? std::stoi( dds_frame.frame_id ) : 0; - // This will all go away -- moving to storing frame_additional_data directly! - stream_profile_interface * profile = software_frame.profile->profile; - auto vid_profile = dynamic_cast(profile); + auto vid_profile = dynamic_cast< video_stream_profile_interface * >( profile.get() ); if( ! vid_profile ) - throw invalid_value_exception( "Non-video profile provided to on_video_frame" ); - - if( ! _is_streaming ) + throw invalid_value_exception( "non-video profile provided to on_video_frame" ); + + auto stride = static_cast< int >( dds_frame.height > 0 ? dds_frame.raw_data.size() / dds_frame.height + : dds_frame.raw_data.size() ); + auto bpp = dds_frame.width > 0 ? stride / dds_frame.width : stride; + auto new_frame_interface + = allocate_new_video_frame( vid_profile, stride, bpp, std::move( data ) ); + if( ! new_frame_interface ) return; - frame_additional_data data; - data.timestamp = software_frame.timestamp; - data.timestamp_domain = software_frame.domain; - data.frame_number = software_frame.frame_number; - data.depth_units = software_frame.depth_units; + auto new_frame = static_cast< frame * >( new_frame_interface ); + new_frame->data = std::move( dds_frame.raw_data ); - data.metadata_size = (uint32_t)( _metadata_map.size() * sizeof( rs2_metadata_type ) ); - memcpy( data.metadata_blob.data(), _metadata_map.data(), data.metadata_size ); - - auto frame_i - = allocate_new_video_frame( vid_profile, software_frame.stride, software_frame.bpp, std::move( data ) ); - if( frame_i ) + if( _md_enabled ) { - static_cast< frame * >( frame_i )->data = std::move( *p_pixels_vector ); - invoke_new_frame( frame_i, nullptr, nullptr ); + syncer.enqueue_frame( data.frame_number, // for now, we use this as the synced-to ID + new_frame ); } - } - - void handle_video_data( realdds::topics::device::image && dds_frame, - const std::shared_ptr< stream_profile_interface > & prof, - frame_metadata_syncer< rs2_software_video_frame > & syncer ) - { - rs2_software_video_frame rs2_frame = {}; - - // prof parameter holds the real data, rs2_software_video_frame forces us to hold a pointer to it. - // Because we use syncer, not calling on_video_frame in the lifetime of this function, we need a shared_ptr - // that the syncer will hold till using the frame. prof_holder->profile holds the actual pointer, - // prof_holder->clone holds the shared_ptr to make sure the pointer won't be released before we use it. - std::shared_ptr< rs2_stream_profile > prof_holder = std::make_shared< rs2_stream_profile >(); - prof_holder->profile = prof.get(); - prof_holder->clone = prof; - rs2_frame.profile = prof_holder.get(); - - // Info sent with image topic - rs2_frame.stride = static_cast(dds_frame.height > 0 ? dds_frame.raw_data.size() / dds_frame.height - : dds_frame.raw_data.size()); - rs2_frame.bpp = dds_frame.width > 0 ? rs2_frame.stride / dds_frame.width : rs2_frame.stride; - rs2_frame.frame_number = !dds_frame.frame_id.empty() ? std::stoi( dds_frame.frame_id ) : 0; - - // Copying from dds into LibRS space, same as copy from USB backend. - // TODO - use memory pool or some other frame allocator - rs2_frame.pixels = new std::vector< byte >( std::move( dds_frame.raw_data )); - // The way to release allocated memory - rs2_frame.deleter = []( void * pixels ) - { - delete static_cast< std::vector< byte > * >( pixels ); - }; - - if( _md_enabled ) - syncer.enqueue_frame( std::move( rs2_frame ), prof_holder ); else - custom_on_video_frame( rs2_frame ); + { + invoke_new_frame( new_frame, + nullptr, // pixels are already inside new_frame->data + nullptr ); // so no deleter is necessary + } } - void add_video_frame_metadata( rs2_software_video_frame & frame, json && dds_md ) + void add_frame_metadata( frame * const f, json && dds_md ) { json const & md_header = dds_md["header"]; json const & md = dds_md["metadata"]; // Always expected metadata - frame.timestamp = rsutils::json::get< rs2_time_t >( md_header, "timestamp" ); - frame.domain = rsutils::json::get< rs2_timestamp_domain >( md_header, "timestamp-domain" ); + f->additional_data.timestamp = rsutils::json::get< rs2_time_t >( md_header, "timestamp" ); + f->additional_data.timestamp_domain = rsutils::json::get< rs2_timestamp_domain >( md_header, "timestamp-domain" ); + // Expected metadata for all depth images if( rsutils::json::has( md_header, "depth-units" ) ) - frame.depth_units = rsutils::json::get< float >( md_header, "depth-units" ); + f->additional_data.depth_units = rsutils::json::get< float >( md_header, "depth-units" ); // Other metadata fields. Metadata fields that are present but unknown by librealsense will be ignored. + auto metadata = reinterpret_cast< metadata_array & >( f->additional_data.metadata_blob ); for( size_t i = 0; i < static_cast< size_t >( RS2_FRAME_METADATA_COUNT ); ++i ) { auto key = static_cast< rs2_frame_metadata_value >( i ); const char * keystr = rs2_frame_metadata_to_string( key ); try { - set_metadata( key, rsutils::json::get< rs2_metadata_type >( md, keystr ) ); + metadata[key] = { true, rsutils::json::get< rs2_metadata_type >( md, keystr ) }; } catch( std::runtime_error const & ) { - erase_metadata( key ); + // The metadata key doesn't exist or the value isn't the right type... we ignore it! + // (all metadata is not there when we create the frame, so no need to erase) } } } void handle_motion_data( realdds::topics::device::image && dds_frame, - const std::shared_ptr< stream_profile_interface > & prof, - frame_metadata_syncer< rs2_software_motion_frame > & syncer ) - { - rs2_software_motion_frame rs2_frame = {}; - - // prof parameter holds the real data, rs2_software_video_frame forces us to hold a pointer to it. - // Because we use syncer, not calling on_video_frame in the lifetime of this function, we need a shared_ptr - // that the syncer will hold till using the frame. prof_holder->profile holds the actual pointer, - // prof_holder->clone holds the shared_ptr to make sure the pointer won't be released before we use it. - std::shared_ptr< rs2_stream_profile > prof_holder = std::make_shared< rs2_stream_profile >(); - prof_holder->profile = prof.get(); - prof_holder->clone = prof; - rs2_frame.profile = prof_holder.get(); - - // Copying from dds into LibRS space, same as copy from USB backend. - // TODO - use memory pool or some other frame allocator - rs2_frame.data = new uint8_t[dds_frame.raw_data.size()]; - if( ! rs2_frame.data ) - throw std::runtime_error( "Could not allocate memory for new frame" ); - memcpy( rs2_frame.data, dds_frame.raw_data.data(), dds_frame.raw_data.size() ); - // The way to release allocated memory - rs2_frame.deleter = []( void * ptr ) { delete[] ptr; }; - // Info sent with image topic - rs2_frame.frame_number = std::stoi( dds_frame.frame_id ); + const std::shared_ptr< stream_profile_interface > & profile, + frame_metadata_syncer & syncer ) + { + frame_additional_data data; // with NO metadata by default! + data.timestamp; // from metadata + data.timestamp_domain; // from metadata + data.depth_units; // from metadata + data.frame_number = ! dds_frame.frame_id.empty() ? std::stoi( dds_frame.frame_id ) : 0; + + auto new_frame_interface + = allocate_new_frame( RS2_EXTENSION_MOTION_FRAME, profile.get(), std::move( data ) ); + if( ! new_frame_interface ) + return; + + auto new_frame = static_cast< frame * >( new_frame_interface ); + new_frame->data = std::move( dds_frame.raw_data ); if( _md_enabled ) - syncer.enqueue_frame( std::move( rs2_frame ), prof_holder ); + { + syncer.enqueue_frame( data.frame_number, // for now, we use this as the synced-to ID + new_frame ); + } else - on_motion_frame( rs2_frame ); - } - - void add_motion_frame_metadata( rs2_software_motion_frame & frame, json && dds_md ) - { - json const & md_header = dds_md["header"]; - json const & md = dds_md["metadata"]; - - // Always expected metadata - frame.timestamp = rsutils::json::get< rs2_time_t >( md_header, "timestamp" ); - frame.domain = rsutils::json::get< rs2_timestamp_domain >( md_header, "timestamp-domain" ); - - // Other metadata fields. Metadata fields that are present but unknown by librealsense will be ignored. - for( size_t i = 0; i < static_cast< size_t >( RS2_FRAME_METADATA_COUNT ); ++i ) { - auto key = static_cast< rs2_frame_metadata_value >( i ); - const char * keystr = rs2_frame_metadata_to_string( key ); - try - { - set_metadata( key, rsutils::json::get< rs2_metadata_type >( md, keystr ) ); - } - catch( std::runtime_error const & ) - { - erase_metadata( key ); - } + invoke_new_frame( new_frame, + nullptr, // pixels are already inside new_frame->data + nullptr ); // so no deleter is necessary } } @@ -871,20 +831,14 @@ namespace librealsense if( ! _md_enabled ) return; - auto video_iter = _stream_name_to_video_syncer.find( stream_name ); - if( video_iter != _stream_name_to_video_syncer.end() ) - video_iter->second.enqueue_metadata( std::move( dds_md ) ); + auto it = _stream_name_to_syncer.find( stream_name ); + if( it != _stream_name_to_syncer.end() ) + it->second.enqueue_metadata( + std::stoull( rsutils::json::get< std::string >( dds_md["header"], "frame-id" ) ), + std::move( dds_md ) ); else - { - auto motion_iter = _stream_name_to_motion_syncer.find( stream_name ); - if( motion_iter != _stream_name_to_motion_syncer.end() ) - { - motion_iter->second.enqueue_metadata( std::move( dds_md ) ); - } - else - throw std::runtime_error( "Stream '" + stream_name - + "' received metadata for unsupported frame type" ); - } + throw std::runtime_error( "Stream '" + stream_name + + "' received metadata for unsupported frame type" ); } void start( frame_callback_ptr callback ) override @@ -897,36 +851,34 @@ namespace librealsense // But we won't get callbacks until we "start streaming" if( Is< realdds::dds_video_stream >( dds_stream ) ) { - _stream_name_to_video_syncer[dds_stream->name()].reset( - [this]( rs2_software_video_frame && f, json && md ) + _stream_name_to_syncer[dds_stream->name()].reset( + [this]( frame * synced, json && md ) { - if( md.empty() ) - _metadata_map = {}; // clear it out - else - add_video_frame_metadata( f, std::move( md ) ); - custom_on_video_frame( f ); + if( ! md.empty() ) + add_frame_metadata( synced, std::move( md ) ); + // else the frame should already have empty metadata! + invoke_new_frame( synced, nullptr, nullptr ); } ); As< realdds::dds_video_stream >( dds_stream )->on_data_available( [profile, this, dds_stream]( realdds::topics::device::image && dds_frame ) { - handle_video_data( std::move( dds_frame ), profile, _stream_name_to_video_syncer[dds_stream->name()] ); + handle_video_data( std::move( dds_frame ), profile, _stream_name_to_syncer[dds_stream->name()] ); } ); } else if( Is< realdds::dds_motion_stream >( dds_stream ) ) { - _stream_name_to_motion_syncer[dds_stream->name()].reset( - [this]( rs2_software_motion_frame && f, json && md ) + _stream_name_to_syncer[dds_stream->name()].reset( + [this]( frame * synced, json && md ) { - if( md.empty() ) - _metadata_map = {}; // clear it out - else - add_motion_frame_metadata( f, std::move( md ) ); - on_motion_frame( f ); + if( ! md.empty() ) + add_frame_metadata( synced, std::move( md ) ); + // else the frame should already have empty metadata! + invoke_new_frame( synced, nullptr, nullptr ); } ); As< realdds::dds_motion_stream >( dds_stream )->on_data_available( [profile, this, dds_stream]( realdds::topics::device::image && dds_frame ) { - handle_motion_data( std::move( dds_frame ), profile, _stream_name_to_motion_syncer[dds_stream->name()] ); + handle_motion_data( std::move( dds_frame ), profile, _stream_name_to_syncer[dds_stream->name()] ); } ); } else @@ -1061,8 +1013,6 @@ namespace librealsense std::map< sid_index, std::shared_ptr< realdds::dds_stream > > _streams; bool _md_enabled = false; - std::map< std::string, frame_metadata_syncer< rs2_software_video_frame > > _stream_name_to_video_syncer; - std::map< std::string, frame_metadata_syncer< rs2_software_motion_frame > > _stream_name_to_motion_syncer; }; // For cases when checking if this is< color_sensor > (like realsense-viewer::subdevice_model) From c151d2dbc6da98e02d24c7848817bb25301f01ac Mon Sep 17 00:00:00 2001 From: Eran Date: Sun, 26 Mar 2023 08:49:48 +0300 Subject: [PATCH 5/8] add exception catching around on_metadata_available() --- third-party/realdds/src/dds-device-impl.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/third-party/realdds/src/dds-device-impl.cpp b/third-party/realdds/src/dds-device-impl.cpp index f0aa3c7f2b..db7f95deee 100644 --- a/third-party/realdds/src/dds-device-impl.cpp +++ b/third-party/realdds/src/dds-device-impl.cpp @@ -240,7 +240,16 @@ void dds_device::impl::create_metadata_reader() while( topics::flexible_msg::take_next( *_metadata_reader, &message ) ) { if( message.is_valid() && _on_metadata_available ) - _on_metadata_available( std::move( message.json_data() ) ); + { + try + { + _on_metadata_available( std::move( message.json_data() ) ); + } + catch( std::runtime_error const & e ) + { + LOG_DEBUG( "metadata exception: " << e.what() ); + } + } } } ); From 7265ed0f26420ba386f138c3cf5466eb6f511f1a Mon Sep 17 00:00:00 2001 From: Eran Date: Sun, 26 Mar 2023 08:50:34 +0300 Subject: [PATCH 6/8] same frame-ready handling for video and motion frames --- src/context.cpp | 28 +++++++++------------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/src/context.cpp b/src/context.cpp index 505837f8bb..b15c25d664 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -848,18 +848,17 @@ namespace librealsense auto & dds_stream = _streams[sid_index( profile->get_unique_id(), profile->get_stream_index() )]; // Opening it will start streaming on the server side automatically dds_stream->open( "rt/" + _dev->device_info().topic_root + '_' + dds_stream->name(), _dev->subscriber()); - // But we won't get callbacks until we "start streaming" + _stream_name_to_syncer[dds_stream->name()].reset( + [this]( frame * synced, json && md ) + { + if( ! md.empty() ) + add_frame_metadata( synced, std::move( md ) ); + // else the frame should already have empty metadata! + invoke_new_frame( synced, nullptr, nullptr ); + } ); + if( Is< realdds::dds_video_stream >( dds_stream ) ) { - _stream_name_to_syncer[dds_stream->name()].reset( - [this]( frame * synced, json && md ) - { - if( ! md.empty() ) - add_frame_metadata( synced, std::move( md ) ); - // else the frame should already have empty metadata! - invoke_new_frame( synced, nullptr, nullptr ); - } ); - As< realdds::dds_video_stream >( dds_stream )->on_data_available( [profile, this, dds_stream]( realdds::topics::device::image && dds_frame ) { handle_video_data( std::move( dds_frame ), profile, _stream_name_to_syncer[dds_stream->name()] ); @@ -867,15 +866,6 @@ namespace librealsense } else if( Is< realdds::dds_motion_stream >( dds_stream ) ) { - _stream_name_to_syncer[dds_stream->name()].reset( - [this]( frame * synced, json && md ) - { - if( ! md.empty() ) - add_frame_metadata( synced, std::move( md ) ); - // else the frame should already have empty metadata! - invoke_new_frame( synced, nullptr, nullptr ); - } ); - As< realdds::dds_motion_stream >( dds_stream )->on_data_available( [profile, this, dds_stream]( realdds::topics::device::image && dds_frame ) { handle_motion_data( std::move( dds_frame ), profile, _stream_name_to_syncer[dds_stream->name()] ); From db0fa7ee92d1b23666d01cc02fbfe867a8b3b76b Mon Sep 17 00:00:00 2001 From: Eran Date: Sun, 26 Mar 2023 09:29:22 +0300 Subject: [PATCH 7/8] frame_holder syncing for proper lifetime mgmt --- src/context.cpp | 18 +++++++++--------- src/software-device.cpp | 4 ++-- src/software-device.h | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/context.cpp b/src/context.cpp index b15c25d664..1861ba35a0 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -499,7 +499,7 @@ namespace librealsense struct synced_frame { id_type _sync_id; - frame * _frame; + frame_holder _frame; }; struct synced_metadata @@ -530,7 +530,7 @@ namespace librealsense search_for_match(); // Call under lock } - typedef std::function< void( frame *, nlohmann::json && metadata ) > on_frame_ready; + typedef std::function< void( frame_holder &&, nlohmann::json && metadata ) > on_frame_ready; void reset( on_frame_ready cb = nullptr ) { @@ -570,7 +570,7 @@ namespace librealsense // Call under lock void handle_match() { - auto const frame = _frame_queue.front()._frame; + auto & fh = _frame_queue.front()._frame; json md; if( ! _metadata_queue.empty() ) @@ -580,7 +580,7 @@ namespace librealsense } if( _on_frame_ready ) - _on_frame_ready( frame, std::move( md ) ); + _on_frame_ready( std::move( fh ), std::move( md ) ); _frame_queue.pop_front(); } @@ -588,11 +588,11 @@ namespace librealsense // Call under lock void handle_frame_without_metadata() { - auto frame = _frame_queue.front()._frame; + auto & fh = _frame_queue.front()._frame; json md; if( _on_frame_ready ) - _on_frame_ready( frame, std::move( md ) ); + _on_frame_ready( std::move( fh ), std::move( md ) ); _frame_queue.pop_front(); } @@ -849,12 +849,12 @@ namespace librealsense // Opening it will start streaming on the server side automatically dds_stream->open( "rt/" + _dev->device_info().topic_root + '_' + dds_stream->name(), _dev->subscriber()); _stream_name_to_syncer[dds_stream->name()].reset( - [this]( frame * synced, json && md ) + [this]( frame_holder && fh, json && md ) { if( ! md.empty() ) - add_frame_metadata( synced, std::move( md ) ); + add_frame_metadata( static_cast< frame * >( fh.frame ), std::move( md ) ); // else the frame should already have empty metadata! - invoke_new_frame( synced, nullptr, nullptr ); + invoke_new_frame( std::move( fh ), nullptr, nullptr ); } ); if( Is< realdds::dds_video_stream >( dds_stream ) ) diff --git a/src/software-device.cpp b/src/software-device.cpp index d1029638b2..9e1586ec3e 100644 --- a/src/software-device.cpp +++ b/src/software-device.cpp @@ -292,14 +292,14 @@ namespace librealsense } - void software_sensor::invoke_new_frame( frame_interface * frame, + void software_sensor::invoke_new_frame( frame_holder && frame, void const * pixels, std::function< void() > on_release ) { // The frame pixels/data are stored in the continuation object! if( pixels ) frame->attach_continuation( frame_continuation( on_release, pixels ) ); - _source.invoke_callback( frame ); + _source.invoke_callback( std::move( frame ) ); } diff --git a/src/software-device.h b/src/software-device.h index 864fe19464..409f0b2829 100644 --- a/src/software-device.h +++ b/src/software-device.h @@ -123,7 +123,7 @@ namespace librealsense protected: frame_interface * allocate_new_frame( rs2_extension, stream_profile_interface *, frame_additional_data && ); frame_interface * allocate_new_video_frame( video_stream_profile_interface *, int stride, int bpp, frame_additional_data && ); - void invoke_new_frame( frame_interface * frame, void const * pixels, std::function< void() > on_release ); + void invoke_new_frame( frame_holder &&, void const * pixels, std::function< void() > on_release ); metadata_array _metadata_map; From dbf409ce551e508f0f2a01d595bf438ed53da91f Mon Sep 17 00:00:00 2001 From: Eran Date: Sun, 26 Mar 2023 12:31:01 +0300 Subject: [PATCH 8/8] CR comments --- src/context.cpp | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/context.cpp b/src/context.cpp index 1861ba35a0..ca5234ecea 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -45,6 +45,8 @@ #include #include +#include // string_to_value + // We manage one participant and device-watcher per domain: // Two contexts with the same domain-id will share the same participant and watcher, while a third context on a // different domain will have its own. @@ -607,6 +609,11 @@ namespace librealsense class dds_sensor_proxy : public software_sensor { + std::shared_ptr< realdds::dds_device > const _dev; + std::string const _name; + bool const _md_enabled; + + std::map< sid_index, std::shared_ptr< realdds::dds_stream > > _streams; std::map< std::string, frame_metadata_syncer > _stream_name_to_syncer; public: @@ -734,7 +741,8 @@ namespace librealsense data.timestamp; // from metadata data.timestamp_domain; // from metadata data.depth_units; // from metadata - data.frame_number = ! dds_frame.frame_id.empty() ? std::stoi( dds_frame.frame_id ) : 0; + if( ! rsutils::string::string_to_value( dds_frame.frame_id, data.frame_number )) + data.frame_number = 0; auto vid_profile = dynamic_cast< video_stream_profile_interface * >( profile.get() ); if( ! vid_profile ) @@ -803,7 +811,8 @@ namespace librealsense data.timestamp; // from metadata data.timestamp_domain; // from metadata data.depth_units; // from metadata - data.frame_number = ! dds_frame.frame_id.empty() ? std::stoi( dds_frame.frame_id ) : 0; + if( ! rsutils::string::string_to_value( dds_frame.frame_id, data.frame_number ) ) + data.frame_number = 0; auto new_frame_interface = allocate_new_frame( RS2_EXTENSION_MOTION_FRAME, profile.get(), std::move( data ) ); @@ -996,13 +1005,6 @@ namespace librealsense } const std::string & get_name() const { return _name; } - - private: - std::shared_ptr< realdds::dds_device > const & _dev; - std::string _name; - std::map< sid_index, std::shared_ptr< realdds::dds_stream > > _streams; - - bool _md_enabled = false; }; // For cases when checking if this is< color_sensor > (like realsense-viewer::subdevice_model)