-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
basic dds metadata tests (just the beginning) #11665
Merged
Merged
Changes from all commits
Commits
Show all changes
35 commits
Select commit
Hold shift + click to select a range
7e8eab3
working using unique_ptr
maloel 863edc6
add syncer reset on stream close
maloel 64d1658
added pyrealdds.video_stream.on_data_available
maloel 593ecfa
fix syncer logic
maloel 4176c11
debug msg update when publishing metadata
maloel eb92015
move dds-trinsics out of dds-defines.h
maloel 1e7a393
dds_time more compact; better pyrealdds repr
maloel 532d6cd
add timestamp to topics::device::image
maloel 1fede4e
revisions in syncer
maloel 8cbd9e7
syncer exposed in pyrealdds
maloel 3d29028
improve shorten_json_string and the way it breaks
maloel 0c86aa5
remove annoying debug message in d400-private.cpp
maloel 16044bd
fix bug in add_frame_metadata
maloel 6786f75
add basic metadata tests (just the beginning)
maloel 38d1cfc
rsutils::json:: string args (since json requires it)
maloel c133616
optimize librealsense::get_string for metadata
maloel b24670f
remove unnecessary debug message in device-watcher
maloel 73fbc14
split participants to client/server
maloel ce2f25d
improve test.remote; default timeout=5
maloel 5fc49a5
split test-metadata into metadata-server
maloel 355aa5d
RSUSB for U20 dds testing
maloel 2c3a7b7
wait_for_device notification-based rather than sleep
maloel 1fc6b3b
log.f now in red, too
maloel fea0a6c
add test.closure; failed test cases now logged at end
maloel 4e5c065
fix warnings
maloel 374f616
revise test-librs-connections
maloel 1d73c1c
fix flushing issues (linux) in rspy.log
maloel fe36ec5
disable test-metadata in Linux; indent remote
maloel a987ed0
CR comments
maloel 397d4f7
switch test-metadata to use Timer
maloel c0fd66d
fix rspy.timer Stopwatch to be per instance!
maloel 00c9aa5
CR comments in to-string.cpp
maloel e440de3
fix test.closure exception swallowing; on_fail constants
maloel 64dc4a1
per CR, remove 'return *this' in syncer on_xxx()
maloel 0a076ad
remove check in test-librs-connections, per CR
maloel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ | |
#include <realdds/dds-device.h> | ||
#include <realdds/dds-stream.h> | ||
#include <realdds/dds-stream-profile.h> | ||
#include <realdds/dds-metadata-syncer.h> | ||
#include "software-device.h" | ||
#include <librealsense2/h/rs_internal.h> | ||
#include <realdds/topics/device-info-msg.h> | ||
|
@@ -187,19 +188,21 @@ namespace librealsense | |
assert( _device_watcher->is_stopped() ); | ||
|
||
#ifdef BUILD_WITH_DDS | ||
if( rsutils::json::get< bool >( settings, "dds-discovery", true ) ) | ||
if( rsutils::json::get< bool >( settings, std::string( "dds-discovery", 13 ), true ) ) | ||
{ | ||
realdds::dds_domain_id domain_id = rsutils::json::get< int >( settings, "dds-domain", 0 ); | ||
std::string participant_name | ||
= rsutils::json::get< std::string >( settings, "dds-participant-name", rsutils::os::executable_name() ); | ||
realdds::dds_domain_id domain_id | ||
= rsutils::json::get< int >( settings, std::string( "dds-domain", 10 ), 0 ); | ||
std::string participant_name = rsutils::json::get< std::string >( settings, | ||
std::string( "dds-participant-name", 20 ), | ||
rsutils::os::executable_name() ); | ||
|
||
auto & domain = dds_domain_context_by_id[domain_id]; | ||
_dds_participant = domain.participant.instance(); | ||
if( ! _dds_participant->is_valid() ) | ||
{ | ||
_dds_participant->init( domain_id, participant_name ); | ||
} | ||
else if( rsutils::json::has_value( settings, "dds-participant-name" ) | ||
else if( rsutils::json::has_value( settings, std::string( "dds-participant-name", 20 ) ) | ||
&& participant_name != _dds_participant->get()->get_qos().name().to_string() ) | ||
{ | ||
throw std::runtime_error( | ||
|
@@ -212,7 +215,8 @@ namespace librealsense | |
// The DDS device watcher should always be on | ||
if( _dds_watcher && _dds_watcher->is_stopped() ) | ||
{ | ||
start_dds_device_watcher( rsutils::json::get< size_t >( settings, "dds-message-timeout-ms", 5000 ) ); | ||
start_dds_device_watcher( | ||
rsutils::json::get< size_t >( settings, std::string( "dds-message-timeout-ms", 22 ), 5000 ) ); | ||
} | ||
} | ||
#endif //BUILD_WITH_DDS | ||
|
@@ -477,140 +481,17 @@ namespace librealsense | |
}; | ||
|
||
|
||
// Frame data and metadata are sent as two seperate streams. | ||
// They are synchronized using frame_id for matching data+metadata sets. | ||
class frame_metadata_syncer | ||
{ | ||
public: | ||
// We don't want the queue to get large, it means lots of drops and data that we store to (probably) throw later | ||
static constexpr size_t max_md_queue_size = 8; | ||
// If a metadata is lost we wait for it until the next frame arrives, causing a small delay but we prefer passing | ||
// the frame late and without metadata over losing it. | ||
static constexpr size_t max_frame_queue_size = 2; | ||
|
||
// We synchronize using some abstract "id" used to identify each frame and its metadata. We don't need to know | ||
// the nature of the id; only that it is increasing in value over time so that, given id1 > id2, then id1 | ||
// happened after id2. | ||
typedef uint64_t id_type; | ||
|
||
private: | ||
struct synced_frame | ||
{ | ||
id_type _sync_id; | ||
frame_holder _frame; | ||
}; | ||
|
||
struct synced_metadata | ||
{ | ||
id_type _sync_id; | ||
nlohmann::json _md; | ||
}; | ||
|
||
public: | ||
void enqueue_frame( id_type id, frame * f ) | ||
{ | ||
std::lock_guard< std::mutex > lock( _queues_lock ); | ||
_frame_queue.push_back( synced_frame{ id, f } ); | ||
search_for_match(); // Call under lock | ||
} | ||
|
||
void enqueue_metadata( id_type id, nlohmann::json && md ) | ||
{ | ||
std::lock_guard< std::mutex > lock( _queues_lock ); | ||
while( _metadata_queue.size() >= max_md_queue_size ) | ||
{ | ||
LOG_DEBUG( "throwing away metadata: " << _metadata_queue.front()._md.dump() ); | ||
_metadata_queue.pop_front(); // Throw oldest | ||
} | ||
|
||
//LOG_DEBUG( "enqueueing metadata: " << md.dump() ); | ||
_metadata_queue.push_back( synced_metadata{ id, std::move( md ) } ); | ||
search_for_match(); // Call under lock | ||
} | ||
|
||
typedef std::function< void( frame_holder &&, nlohmann::json && metadata ) > on_frame_ready; | ||
|
||
void reset( on_frame_ready cb = nullptr ) | ||
{ | ||
std::lock_guard< std::mutex > lock( _queues_lock ); | ||
_frame_queue.clear(); | ||
_metadata_queue.clear(); | ||
_on_frame_ready = cb; | ||
} | ||
|
||
private: | ||
// Call under lock | ||
void search_for_match() | ||
{ | ||
// Wait for frame + metadata set, but if metadata is lost pass frame to the user anyway | ||
if( _frame_queue.empty() || ( _metadata_queue.empty() && _frame_queue.size() < max_frame_queue_size ) ) | ||
return; | ||
|
||
// Sync using frame ID. Frame IDs are assumed to be increasing with time | ||
auto frame_id = _frame_queue.front()._sync_id; | ||
auto md_id = _metadata_queue.empty() | ||
? frame_id + 1 // If no metadata force call to handle_frame_no_metadata | ||
: _metadata_queue.front()._sync_id; | ||
|
||
while( frame_id > md_id && _metadata_queue.size() > 1 ) | ||
{ | ||
// Metadata without frame, remove it from queue and check next | ||
_metadata_queue.pop_front(); | ||
md_id = _metadata_queue.front()._sync_id; | ||
} | ||
|
||
if( frame_id == md_id ) | ||
handle_match(); | ||
else | ||
handle_frame_without_metadata(); | ||
} | ||
|
||
// Call under lock | ||
void handle_match() | ||
{ | ||
auto & fh = _frame_queue.front()._frame; | ||
|
||
json md; | ||
if( ! _metadata_queue.empty() ) | ||
{ | ||
md = std::move( _metadata_queue.front()._md ); | ||
_metadata_queue.pop_front(); | ||
} | ||
|
||
if( _on_frame_ready ) | ||
_on_frame_ready( std::move( fh ), std::move( md ) ); | ||
|
||
_frame_queue.pop_front(); | ||
} | ||
|
||
// Call under lock | ||
void handle_frame_without_metadata() | ||
{ | ||
auto & fh = _frame_queue.front()._frame; | ||
|
||
json md; | ||
if( _on_frame_ready ) | ||
_on_frame_ready( std::move( fh ), std::move( md ) ); | ||
|
||
_frame_queue.pop_front(); | ||
} | ||
|
||
on_frame_ready _on_frame_ready = nullptr; | ||
|
||
std::deque< synced_frame > _frame_queue; | ||
std::deque< synced_metadata > _metadata_queue; | ||
std::mutex _queues_lock; | ||
}; | ||
|
||
|
||
class dds_sensor_proxy : public software_sensor | ||
{ | ||
std::shared_ptr< realdds::dds_device > const _dev; | ||
std::string const _name; | ||
bool const _md_enabled; | ||
|
||
typedef realdds::dds_metadata_syncer syncer_type; | ||
static void frame_releaser( syncer_type::frame_type * f ) { static_cast< frame * >( f )->release(); } | ||
|
||
std::map< sid_index, std::shared_ptr< realdds::dds_stream > > _streams; | ||
std::map< std::string, frame_metadata_syncer > _stream_name_to_syncer; | ||
std::map< std::string, syncer_type > _stream_name_to_syncer; | ||
|
||
public: | ||
dds_sensor_proxy( std::string const & sensor_name, | ||
|
@@ -731,7 +612,7 @@ namespace librealsense | |
|
||
void handle_video_data( realdds::topics::device::image && dds_frame, | ||
const std::shared_ptr< stream_profile_interface > & profile, | ||
frame_metadata_syncer & syncer ) | ||
syncer_type & syncer ) | ||
{ | ||
frame_additional_data data; // with NO metadata by default! | ||
data.timestamp; // from metadata | ||
|
@@ -758,7 +639,7 @@ namespace librealsense | |
if( _md_enabled ) | ||
{ | ||
syncer.enqueue_frame( data.frame_number, // for now, we use this as the synced-to ID | ||
new_frame ); | ||
syncer.hold( new_frame ) ); | ||
} | ||
else | ||
{ | ||
|
@@ -770,23 +651,23 @@ namespace librealsense | |
|
||
void add_frame_metadata( frame * const f, json && dds_md ) | ||
{ | ||
json const & md_header = dds_md["header"]; | ||
json const & md = dds_md["metadata"]; | ||
json const & md_header = dds_md[std::string( "header", 6 )]; | ||
json const & md = dds_md[std::string( "metadata", 8 )]; | ||
|
||
// Always expected metadata | ||
f->additional_data.timestamp = rsutils::json::get< rs2_time_t >( md_header, "timestamp" ); | ||
f->additional_data.timestamp_domain = rsutils::json::get< rs2_timestamp_domain >( md_header, "timestamp-domain" ); | ||
f->additional_data.timestamp = rsutils::json::get< rs2_time_t >( md_header, std::string( "timestamp", 9 ) ); | ||
f->additional_data.timestamp_domain | ||
= rsutils::json::get< rs2_timestamp_domain >( md_header, std::string( "timestamp-domain", 16 ) ); | ||
|
||
// Expected metadata for all depth images | ||
if( rsutils::json::has( md_header, "depth-units" ) ) | ||
f->additional_data.depth_units = rsutils::json::get< float >( md_header, "depth-units" ); | ||
rsutils::json::get_ex( md_header, std::string( "depth-units", 11 ), &f->additional_data.depth_units ); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will throw for color frames There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
// Other metadata fields. Metadata fields that are present but unknown by librealsense will be ignored. | ||
auto metadata = reinterpret_cast< metadata_array & >( f->additional_data.metadata_blob ); | ||
auto & metadata = reinterpret_cast< metadata_array & >( f->additional_data.metadata_blob ); | ||
for( size_t i = 0; i < static_cast< size_t >( RS2_FRAME_METADATA_COUNT ); ++i ) | ||
{ | ||
auto key = static_cast< rs2_frame_metadata_value >( i ); | ||
const char * keystr = rs2_frame_metadata_to_string( key ); | ||
std::string const & keystr = librealsense::get_string( key ); | ||
try | ||
{ | ||
metadata[key] = { true, rsutils::json::get< rs2_metadata_type >( md, keystr ) }; | ||
|
@@ -801,7 +682,7 @@ namespace librealsense | |
|
||
void handle_motion_data( realdds::topics::device::image && dds_frame, | ||
const std::shared_ptr< stream_profile_interface > & profile, | ||
frame_metadata_syncer & syncer ) | ||
syncer_type & syncer ) | ||
{ | ||
frame_additional_data data; // with NO metadata by default! | ||
data.timestamp; // from metadata | ||
|
@@ -821,7 +702,7 @@ namespace librealsense | |
if( _md_enabled ) | ||
{ | ||
syncer.enqueue_frame( data.frame_number, // for now, we use this as the synced-to ID | ||
new_frame ); | ||
syncer.hold( new_frame )); | ||
} | ||
else | ||
{ | ||
|
@@ -853,13 +734,15 @@ namespace librealsense | |
auto & dds_stream = _streams[sid_index( profile->get_unique_id(), profile->get_stream_index() )]; | ||
// Opening it will start streaming on the server side automatically | ||
dds_stream->open( "rt/" + _dev->device_info().topic_root + '_' + dds_stream->name(), _dev->subscriber()); | ||
_stream_name_to_syncer[dds_stream->name()].reset( | ||
[this]( frame_holder && fh, json && md ) | ||
auto & syncer = _stream_name_to_syncer[dds_stream->name()]; | ||
syncer.on_frame_release( frame_releaser ); | ||
syncer.on_frame_ready( | ||
[this]( syncer_type::frame_holder && fh, json && md ) | ||
{ | ||
if( ! md.empty() ) | ||
add_frame_metadata( static_cast< frame * >( fh.frame ), std::move( md ) ); | ||
add_frame_metadata( static_cast< frame * >( fh.get() ), std::move( md ) ); | ||
// else the frame should already have empty metadata! | ||
invoke_new_frame( std::move( fh ), nullptr, nullptr ); | ||
invoke_new_frame( static_cast< frame * >( fh.release() ), nullptr, nullptr ); | ||
} ); | ||
|
||
if( Is< realdds::dds_video_stream >( dds_stream ) ) | ||
|
@@ -880,14 +763,15 @@ namespace librealsense | |
throw std::runtime_error( "Unsupported stream type" ); | ||
|
||
dds_stream->start_streaming(); | ||
|
||
} | ||
|
||
software_sensor::start( callback ); | ||
} | ||
|
||
void stop() | ||
{ | ||
_stream_name_to_syncer.clear(); | ||
|
||
for( auto & profile : sensor_base::get_active_streams() ) | ||
{ | ||
auto & dds_stream = _streams[sid_index( profile->get_unique_id(), profile->get_stream_index() )]; | ||
|
@@ -1240,7 +1124,7 @@ namespace librealsense | |
_dds_dev->on_metadata_available( | ||
[this]( json && dds_md ) | ||
{ | ||
std::string stream_name = rsutils::json::get< std::string >( dds_md, "stream-name" ); | ||
std::string stream_name = rsutils::json::get< std::string >( dds_md, std::string( "stream-name", 11 ) ); | ||
auto it = _stream_name_to_owning_sensor.find( stream_name ); | ||
if( it != _stream_name_to_owning_sensor.end() ) | ||
it->second->handle_new_metadata( stream_name, std::move( dds_md ) ); | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have to explicitly use the string size? It is inconvenient and error prone
Looks like it should work without the explicit size (using constructor number 5, not 4)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's optional, but I dislike using
strlen
for constantsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if you miscount?
Will the json find the right string while searching for
dds-discover
ordds-discovery
(extra char can be of any value) or would it throw?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a constant. It's like getting a constant value wrong.
This is why we have a CR and testing :)