Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

basic dds metadata tests (just the beginning) #11665

Merged
merged 35 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
7e8eab3
working using unique_ptr
maloel Mar 30, 2023
863edc6
add syncer reset on stream close
maloel Mar 30, 2023
64d1658
added pyrealdds.video_stream.on_data_available
maloel Mar 30, 2023
593ecfa
fix syncer logic
maloel Mar 30, 2023
4176c11
debug msg update when publishing metadata
maloel Mar 30, 2023
eb92015
move dds-trinsics out of dds-defines.h
maloel Apr 4, 2023
1e7a393
dds_time more compact; better pyrealdds repr
maloel Apr 4, 2023
532d6cd
add timestamp to topics::device::image
maloel Apr 4, 2023
1fede4e
revisions in syncer
maloel Apr 4, 2023
8cbd9e7
syncer exposed in pyrealdds
maloel Apr 4, 2023
3d29028
improve shorten_json_string and the way it breaks
maloel Apr 5, 2023
0c86aa5
remove annoying debug message in d400-private.cpp
maloel Apr 5, 2023
16044bd
fix bug in add_frame_metadata
maloel Apr 6, 2023
6786f75
add basic metadata tests (just the beginning)
maloel Apr 6, 2023
38d1cfc
rsutils::json:: string args (since json requires it)
maloel Apr 6, 2023
c133616
optimize librealsense::get_string for metadata
maloel Apr 6, 2023
b24670f
remove unnecessary debug message in device-watcher
maloel Apr 6, 2023
73fbc14
split participants to client/server
maloel Apr 7, 2023
ce2f25d
improve test.remote; default timeout=5
maloel Apr 9, 2023
5fc49a5
split test-metadata into metadata-server
maloel Apr 9, 2023
355aa5d
RSUSB for U20 dds testing
maloel Apr 9, 2023
2c3a7b7
wait_for_device notification-based rather than sleep
maloel Apr 9, 2023
1fc6b3b
log.f now in red, too
maloel Apr 10, 2023
fea0a6c
add test.closure; failed test cases now logged at end
maloel Apr 10, 2023
4e5c065
fix warnings
maloel Apr 10, 2023
374f616
revise test-librs-connections
maloel Apr 10, 2023
1d73c1c
fix flushing issues (linux) in rspy.log
maloel Apr 10, 2023
fe36ec5
disable test-metadata in Linux; indent remote
maloel Apr 10, 2023
a987ed0
CR comments
maloel Apr 11, 2023
397d4f7
switch test-metadata to use Timer
maloel Apr 11, 2023
c0fd66d
fix rspy.timer Stopwatch to be per instance!
maloel Apr 11, 2023
00c9aa5
CR comments in to-string.cpp
maloel Apr 11, 2023
e440de3
fix test.closure exception swallowing; on_fail constants
maloel Apr 11, 2023
64dc4a1
per CR, remove 'return *this' in syncer on_xxx()
maloel Apr 13, 2023
0a076ad
remove check in test-librs-connections, per CR
maloel Apr 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions .github/workflows/buildsCI.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ jobs:


#--------------------------------------------------------------------------------
U20_ST_Py_DDS_CI: # Ubuntu 2020, Static, Python, DDS, LibCI without executables
U20_ST_Py_DDS_RSUSB_CI: # Ubuntu 2020, Static, Python, DDS, RSUSB, LibCI without executables
runs-on: ubuntu-20.04
timeout-minutes: 60
steps:
Expand Down Expand Up @@ -340,18 +340,21 @@ jobs:
mkdir build

- name: Build
# Note: we force RSUSB because, on Linux, the context creation will fail on GHA:
# (backend-v4l2.cpp:555) Cannot access /sys/class/video4linux)
# And, well, we don't need any specific platform for DDS!
shell: bash
run: |
cd build
cmake .. -DBUILD_SHARED_LIBS=false -DBUILD_EXAMPLES=false -DBUILD_TOOLS=true -DBUILD_UNIT_TESTS=false -DCHECK_FOR_UPDATES=false -DBUILD_WITH_DDS=true -DBUILD_PYTHON_BINDINGS=true -DPYTHON_EXECUTABLE=$(which python3)
cmake .. -DBUILD_SHARED_LIBS=false -DBUILD_EXAMPLES=false -DBUILD_TOOLS=false -DBUILD_UNIT_TESTS=false -DCHECK_FOR_UPDATES=false -DBUILD_WITH_DDS=true -DBUILD_PYTHON_BINDINGS=true -DPYTHON_EXECUTABLE=$(which python3) -DFORCE_RSUSB_BACKEND=true
cmake --build . --config ${{env.LRS_RUN_CONFIG}} -- -j4

- name: LibCI
# Note: we specifically disable BUILD_UNIT_TESTS so the executable C++ unit-tests won't run
# This is to save time as DDS already lengthens the build...
shell: bash
run: |
python3 unit-tests/run-unit-tests.py --no-color --debug --stdout --not-live --context "dds linux"
python3 unit-tests/run-unit-tests.py --no-color --debug --stdout --not-live --context "dds linux" --tag dds


#--------------------------------------------------------------------------------
Expand Down
186 changes: 35 additions & 151 deletions src/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <realdds/dds-device.h>
#include <realdds/dds-stream.h>
#include <realdds/dds-stream-profile.h>
#include <realdds/dds-metadata-syncer.h>
#include "software-device.h"
#include <librealsense2/h/rs_internal.h>
#include <realdds/topics/device-info-msg.h>
Expand Down Expand Up @@ -187,19 +188,21 @@ namespace librealsense
assert( _device_watcher->is_stopped() );

#ifdef BUILD_WITH_DDS
if( rsutils::json::get< bool >( settings, "dds-discovery", true ) )
if( rsutils::json::get< bool >( settings, std::string( "dds-discovery", 13 ), true ) )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have to explicitly use the string size? It is inconvenient and error prone

Looks like it should work without the explicit size (using constructor number 5, not 4)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's optional, but I dislike using strlen for constants

Copy link
Contributor

@OhadMeir OhadMeir Apr 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if you miscount?
Will the json find the right string while searching for dds-discover or dds-discovery (extra char can be of any value) or would it throw?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a constant. It's like getting a constant value wrong.
This is why we have a CR and testing :)

{
realdds::dds_domain_id domain_id = rsutils::json::get< int >( settings, "dds-domain", 0 );
std::string participant_name
= rsutils::json::get< std::string >( settings, "dds-participant-name", rsutils::os::executable_name() );
realdds::dds_domain_id domain_id
= rsutils::json::get< int >( settings, std::string( "dds-domain", 10 ), 0 );
std::string participant_name = rsutils::json::get< std::string >( settings,
std::string( "dds-participant-name", 20 ),
rsutils::os::executable_name() );

auto & domain = dds_domain_context_by_id[domain_id];
_dds_participant = domain.participant.instance();
if( ! _dds_participant->is_valid() )
{
_dds_participant->init( domain_id, participant_name );
}
else if( rsutils::json::has_value( settings, "dds-participant-name" )
else if( rsutils::json::has_value( settings, std::string( "dds-participant-name", 20 ) )
&& participant_name != _dds_participant->get()->get_qos().name().to_string() )
{
throw std::runtime_error(
Expand All @@ -212,7 +215,8 @@ namespace librealsense
// The DDS device watcher should always be on
if( _dds_watcher && _dds_watcher->is_stopped() )
{
start_dds_device_watcher( rsutils::json::get< size_t >( settings, "dds-message-timeout-ms", 5000 ) );
start_dds_device_watcher(
rsutils::json::get< size_t >( settings, std::string( "dds-message-timeout-ms", 22 ), 5000 ) );
}
}
#endif //BUILD_WITH_DDS
Expand Down Expand Up @@ -477,140 +481,17 @@ namespace librealsense
};


// Frame data and metadata are sent as two seperate streams.
// They are synchronized using frame_id for matching data+metadata sets.
class frame_metadata_syncer
{
public:
// We don't want the queue to get large, it means lots of drops and data that we store to (probably) throw later
static constexpr size_t max_md_queue_size = 8;
// If a metadata is lost we wait for it until the next frame arrives, causing a small delay but we prefer passing
// the frame late and without metadata over losing it.
static constexpr size_t max_frame_queue_size = 2;

// We synchronize using some abstract "id" used to identify each frame and its metadata. We don't need to know
// the nature of the id; only that it is increasing in value over time so that, given id1 > id2, then id1
// happened after id2.
typedef uint64_t id_type;

private:
struct synced_frame
{
id_type _sync_id;
frame_holder _frame;
};

struct synced_metadata
{
id_type _sync_id;
nlohmann::json _md;
};

public:
void enqueue_frame( id_type id, frame * f )
{
std::lock_guard< std::mutex > lock( _queues_lock );
_frame_queue.push_back( synced_frame{ id, f } );
search_for_match(); // Call under lock
}

void enqueue_metadata( id_type id, nlohmann::json && md )
{
std::lock_guard< std::mutex > lock( _queues_lock );
while( _metadata_queue.size() >= max_md_queue_size )
{
LOG_DEBUG( "throwing away metadata: " << _metadata_queue.front()._md.dump() );
_metadata_queue.pop_front(); // Throw oldest
}

//LOG_DEBUG( "enqueueing metadata: " << md.dump() );
_metadata_queue.push_back( synced_metadata{ id, std::move( md ) } );
search_for_match(); // Call under lock
}

typedef std::function< void( frame_holder &&, nlohmann::json && metadata ) > on_frame_ready;

void reset( on_frame_ready cb = nullptr )
{
std::lock_guard< std::mutex > lock( _queues_lock );
_frame_queue.clear();
_metadata_queue.clear();
_on_frame_ready = cb;
}

private:
// Call under lock
void search_for_match()
{
// Wait for frame + metadata set, but if metadata is lost pass frame to the user anyway
if( _frame_queue.empty() || ( _metadata_queue.empty() && _frame_queue.size() < max_frame_queue_size ) )
return;

// Sync using frame ID. Frame IDs are assumed to be increasing with time
auto frame_id = _frame_queue.front()._sync_id;
auto md_id = _metadata_queue.empty()
? frame_id + 1 // If no metadata force call to handle_frame_no_metadata
: _metadata_queue.front()._sync_id;

while( frame_id > md_id && _metadata_queue.size() > 1 )
{
// Metadata without frame, remove it from queue and check next
_metadata_queue.pop_front();
md_id = _metadata_queue.front()._sync_id;
}

if( frame_id == md_id )
handle_match();
else
handle_frame_without_metadata();
}

// Call under lock
void handle_match()
{
auto & fh = _frame_queue.front()._frame;

json md;
if( ! _metadata_queue.empty() )
{
md = std::move( _metadata_queue.front()._md );
_metadata_queue.pop_front();
}

if( _on_frame_ready )
_on_frame_ready( std::move( fh ), std::move( md ) );

_frame_queue.pop_front();
}

// Call under lock
void handle_frame_without_metadata()
{
auto & fh = _frame_queue.front()._frame;

json md;
if( _on_frame_ready )
_on_frame_ready( std::move( fh ), std::move( md ) );

_frame_queue.pop_front();
}

on_frame_ready _on_frame_ready = nullptr;

std::deque< synced_frame > _frame_queue;
std::deque< synced_metadata > _metadata_queue;
std::mutex _queues_lock;
};


class dds_sensor_proxy : public software_sensor
{
std::shared_ptr< realdds::dds_device > const _dev;
std::string const _name;
bool const _md_enabled;

typedef realdds::dds_metadata_syncer syncer_type;
static void frame_releaser( syncer_type::frame_type * f ) { static_cast< frame * >( f )->release(); }

std::map< sid_index, std::shared_ptr< realdds::dds_stream > > _streams;
std::map< std::string, frame_metadata_syncer > _stream_name_to_syncer;
std::map< std::string, syncer_type > _stream_name_to_syncer;

public:
dds_sensor_proxy( std::string const & sensor_name,
Expand Down Expand Up @@ -731,7 +612,7 @@ namespace librealsense

void handle_video_data( realdds::topics::device::image && dds_frame,
const std::shared_ptr< stream_profile_interface > & profile,
frame_metadata_syncer & syncer )
syncer_type & syncer )
{
frame_additional_data data; // with NO metadata by default!
data.timestamp; // from metadata
Expand All @@ -758,7 +639,7 @@ namespace librealsense
if( _md_enabled )
{
syncer.enqueue_frame( data.frame_number, // for now, we use this as the synced-to ID
new_frame );
syncer.hold( new_frame ) );
}
else
{
Expand All @@ -770,23 +651,23 @@ namespace librealsense

void add_frame_metadata( frame * const f, json && dds_md )
{
json const & md_header = dds_md["header"];
json const & md = dds_md["metadata"];
json const & md_header = dds_md[std::string( "header", 6 )];
json const & md = dds_md[std::string( "metadata", 8 )];

// Always expected metadata
f->additional_data.timestamp = rsutils::json::get< rs2_time_t >( md_header, "timestamp" );
f->additional_data.timestamp_domain = rsutils::json::get< rs2_timestamp_domain >( md_header, "timestamp-domain" );
f->additional_data.timestamp = rsutils::json::get< rs2_time_t >( md_header, std::string( "timestamp", 9 ) );
f->additional_data.timestamp_domain
= rsutils::json::get< rs2_timestamp_domain >( md_header, std::string( "timestamp-domain", 16 ) );

// Expected metadata for all depth images
if( rsutils::json::has( md_header, "depth-units" ) )
f->additional_data.depth_units = rsutils::json::get< float >( md_header, "depth-units" );
rsutils::json::get_ex( md_header, std::string( "depth-units", 11 ), &f->additional_data.depth_units );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will throw for color frames

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_ex returns a bool on failure, and checks if it's there first. It can still throw, but not if it's not there.


// Other metadata fields. Metadata fields that are present but unknown by librealsense will be ignored.
auto metadata = reinterpret_cast< metadata_array & >( f->additional_data.metadata_blob );
auto & metadata = reinterpret_cast< metadata_array & >( f->additional_data.metadata_blob );
for( size_t i = 0; i < static_cast< size_t >( RS2_FRAME_METADATA_COUNT ); ++i )
{
auto key = static_cast< rs2_frame_metadata_value >( i );
const char * keystr = rs2_frame_metadata_to_string( key );
std::string const & keystr = librealsense::get_string( key );
try
{
metadata[key] = { true, rsutils::json::get< rs2_metadata_type >( md, keystr ) };
Expand All @@ -801,7 +682,7 @@ namespace librealsense

void handle_motion_data( realdds::topics::device::image && dds_frame,
const std::shared_ptr< stream_profile_interface > & profile,
frame_metadata_syncer & syncer )
syncer_type & syncer )
{
frame_additional_data data; // with NO metadata by default!
data.timestamp; // from metadata
Expand All @@ -821,7 +702,7 @@ namespace librealsense
if( _md_enabled )
{
syncer.enqueue_frame( data.frame_number, // for now, we use this as the synced-to ID
new_frame );
syncer.hold( new_frame ));
}
else
{
Expand Down Expand Up @@ -853,13 +734,15 @@ namespace librealsense
auto & dds_stream = _streams[sid_index( profile->get_unique_id(), profile->get_stream_index() )];
// Opening it will start streaming on the server side automatically
dds_stream->open( "rt/" + _dev->device_info().topic_root + '_' + dds_stream->name(), _dev->subscriber());
_stream_name_to_syncer[dds_stream->name()].reset(
[this]( frame_holder && fh, json && md )
auto & syncer = _stream_name_to_syncer[dds_stream->name()];
syncer.on_frame_release( frame_releaser );
syncer.on_frame_ready(
[this]( syncer_type::frame_holder && fh, json && md )
{
if( ! md.empty() )
add_frame_metadata( static_cast< frame * >( fh.frame ), std::move( md ) );
add_frame_metadata( static_cast< frame * >( fh.get() ), std::move( md ) );
// else the frame should already have empty metadata!
invoke_new_frame( std::move( fh ), nullptr, nullptr );
invoke_new_frame( static_cast< frame * >( fh.release() ), nullptr, nullptr );
} );

if( Is< realdds::dds_video_stream >( dds_stream ) )
Expand All @@ -880,14 +763,15 @@ namespace librealsense
throw std::runtime_error( "Unsupported stream type" );

dds_stream->start_streaming();

}

software_sensor::start( callback );
}

void stop()
{
_stream_name_to_syncer.clear();

for( auto & profile : sensor_base::get_active_streams() )
{
auto & dds_stream = _streams[sid_index( profile->get_unique_id(), profile->get_stream_index() )];
Expand Down Expand Up @@ -1240,7 +1124,7 @@ namespace librealsense
_dds_dev->on_metadata_available(
[this]( json && dds_md )
{
std::string stream_name = rsutils::json::get< std::string >( dds_md, "stream-name" );
std::string stream_name = rsutils::json::get< std::string >( dds_md, std::string( "stream-name", 11 ) );
auto it = _stream_name_to_owning_sensor.find( stream_name );
if( it != _stream_name_to_owning_sensor.end() )
it->second->handle_new_metadata( stream_name, std::move( dds_md ) );
Expand Down
2 changes: 1 addition & 1 deletion src/ds/d400/d400-private.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ namespace librealsense
RS2_DISTORTION_INVERSE_BROWN_CONRADY // The coefficients shall be use for undistort
};
librealsense::copy(calc_intrinsic.coeffs, table->distortion, sizeof(table->distortion));
LOG_DEBUG(endl << array2str((float_4&)(calc_intrinsic.fx, calc_intrinsic.fy, calc_intrinsic.ppx, calc_intrinsic.ppy)) << endl);
//LOG_DEBUG(endl << array2str((float_4&)(calc_intrinsic.fx, calc_intrinsic.fy, calc_intrinsic.ppx, calc_intrinsic.ppy)) << endl);

static rs2_intrinsics ref{};
if (memcmp(&calc_intrinsic, &ref, sizeof(rs2_intrinsics)))
Expand Down
Loading