Skip to content

Commit

Permalink
PR #11561 from Eran: add dds custom on-video-frame to avoid data copy
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel authored Mar 19, 2023
2 parents 094ff52 + 49e3c82 commit 610e03a
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 81 deletions.
2 changes: 1 addition & 1 deletion src/archive.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace librealsense
public:
virtual callback_invocation_holder begin_callback() = 0;

virtual frame_interface* alloc_and_track(const size_t size, const frame_additional_data& additional_data, bool requires_memory) = 0;
virtual frame_interface* alloc_and_track(const size_t size, frame_additional_data && additional_data, bool requires_memory) = 0;

virtual std::shared_ptr<metadata_parser_map> get_md_parsers() const = 0;

Expand Down
60 changes: 48 additions & 12 deletions src/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <realdds/topics/flexible/flexible-msg.h>
#include <rsutils/shared-ptr-singleton.h>
#include <rsutils/os/executable-name.h>
#include <rsutils/deferred.h>
#include <fastdds/dds/domain/DomainParticipant.hpp>

// We manage one participant and device-watcher per domain:
Expand Down Expand Up @@ -699,6 +700,40 @@ namespace librealsense
software_sensor::open( profiles );
}

void custom_on_video_frame( rs2_software_video_frame const & software_frame )
{
// We do exactly the same as the base on_video_frame(), but we have a custom releaser that know that the
// pixels are actually pointing to a vector:
auto p_pixels_vector = static_cast< std::vector< byte > * >( software_frame.pixels );
rsutils::deferred on_release( [p_pixels_vector]() { delete p_pixels_vector; } );

// This will all go away -- moving to storing frame_additional_data directly!
stream_profile_interface * profile = software_frame.profile->profile;
auto vid_profile = dynamic_cast<video_stream_profile_interface *>(profile);
if( ! vid_profile )
throw invalid_value_exception( "Non-video profile provided to on_video_frame" );

if( ! _is_streaming )
return;

frame_additional_data data;
data.timestamp = software_frame.timestamp;
data.timestamp_domain = software_frame.domain;
data.frame_number = software_frame.frame_number;
data.depth_units = software_frame.depth_units;

data.metadata_size = (uint32_t)( _metadata_map.size() * sizeof( rs2_metadata_type ) );
memcpy( data.metadata_blob.data(), _metadata_map.data(), data.metadata_size );

auto frame_i
= allocate_new_video_frame( vid_profile, software_frame.stride, software_frame.bpp, std::move( data ) );
if( frame_i )
{
static_cast< frame * >( frame_i )->data = std::move( *p_pixels_vector );
invoke_new_frame( frame_i, nullptr, nullptr );
}
}

void handle_video_data( realdds::topics::device::image && dds_frame,
const std::shared_ptr< stream_profile_interface > & prof,
frame_metadata_syncer< rs2_software_video_frame > & syncer )
Expand All @@ -714,24 +749,25 @@ namespace librealsense
prof_holder->clone = prof;
rs2_frame.profile = prof_holder.get();

// Info sent with image topic
rs2_frame.stride = static_cast<int>(dds_frame.height > 0 ? dds_frame.raw_data.size() / dds_frame.height
: dds_frame.raw_data.size());
rs2_frame.bpp = dds_frame.width > 0 ? rs2_frame.stride / dds_frame.width : rs2_frame.stride;
rs2_frame.frame_number = !dds_frame.frame_id.empty() ? std::stoi( dds_frame.frame_id ) : 0;

// Copying from dds into LibRS space, same as copy from USB backend.
// TODO - use memory pool or some other frame allocator
rs2_frame.pixels = new uint8_t[dds_frame.raw_data.size()];
if( ! rs2_frame.pixels )
throw std::runtime_error( "Could not allocate memory for new frame" );
memcpy( rs2_frame.pixels, dds_frame.raw_data.data(), dds_frame.raw_data.size() );
rs2_frame.pixels = new std::vector< byte >( std::move( dds_frame.raw_data ));
// The way to release allocated memory
rs2_frame.deleter = []( void * ptr ) { delete[] ptr; };
// Info sent with image topic
rs2_frame.stride = static_cast< int >( dds_frame.height > 0 ? dds_frame.raw_data.size() / dds_frame.height
: dds_frame.raw_data.size() );
rs2_frame.bpp = dds_frame.width > 0 ? rs2_frame.stride / dds_frame.width : rs2_frame.stride;
rs2_frame.frame_number = ! dds_frame.frame_id.empty() ? std::stoi( dds_frame.frame_id ) : 0;
rs2_frame.deleter = []( void * pixels )
{
delete static_cast< std::vector< byte > * >( pixels );
};

if( _md_enabled )
syncer.enqueue_frame( std::move( rs2_frame ), prof_holder );
else
on_video_frame( rs2_frame );
custom_on_video_frame( rs2_frame );
}

void add_video_frame_metadata( rs2_software_video_frame & frame, const realdds::topics::flexible_msg & dds_md )
Expand Down Expand Up @@ -843,7 +879,7 @@ namespace librealsense
if( Is< realdds::dds_video_stream >( dds_stream ) )
{
_stream_name_to_video_syncer[dds_stream->name()].reset( [this]( rs2_software_video_frame && f ) {
on_video_frame( f );
custom_on_video_frame( f );
}, [this]( rs2_software_video_frame & f, const realdds::topics::flexible_msg & md ) {
add_video_frame_metadata( f, md );
} );
Expand Down
8 changes: 4 additions & 4 deletions src/frame-archive.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace librealsense
std::shared_ptr<sensor_interface> get_sensor() const override { return _sensor.lock(); }
void set_sensor(std::shared_ptr<sensor_interface> s) override { _sensor = s; }

T alloc_frame(const size_t size, const frame_additional_data& additional_data, bool requires_memory)
T alloc_frame(const size_t size, frame_additional_data && additional_data, bool requires_memory)
{
T backbuffer;
//const size_t size = modes[stream].get_image_size(stream);
Expand Down Expand Up @@ -59,7 +59,7 @@ namespace librealsense
{
backbuffer.data.resize(size, 0); // TODO: Allow users to provide a custom allocator for frame buffers
}
backbuffer.additional_data = additional_data;
backbuffer.additional_data = std::move( additional_data );
return backbuffer;
}

Expand Down Expand Up @@ -160,9 +160,9 @@ namespace librealsense
ref->release();
}

frame_interface* alloc_and_track(const size_t size, const frame_additional_data& additional_data, bool requires_memory) override
frame_interface* alloc_and_track(const size_t size, frame_additional_data && additional_data, bool requires_memory) override
{
auto frame = alloc_frame(size, additional_data, requires_memory);
auto frame = alloc_frame( size, std::move( additional_data ), requires_memory );
return track_frame(frame);
}

Expand Down
14 changes: 10 additions & 4 deletions src/media/ros/ros_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,11 @@ namespace librealsense
get_frame_metadata(m_file, info_topic, stream_id, image_data, additional_data);
}

frame_interface* frame = m_frame_source->alloc_frame((stream_id.stream_type == RS2_STREAM_DEPTH) ? RS2_EXTENSION_DEPTH_FRAME : RS2_EXTENSION_VIDEO_FRAME,
msg->data.size(), additional_data, true);
frame_interface * frame = m_frame_source->alloc_frame(
( stream_id.stream_type == RS2_STREAM_DEPTH ) ? RS2_EXTENSION_DEPTH_FRAME : RS2_EXTENSION_VIDEO_FRAME,
msg->data.size(),
std::move( additional_data ),
true );
if (frame == nullptr)
{
LOG_WARNING("Failed to allocate new frame");
Expand Down Expand Up @@ -491,7 +494,10 @@ namespace librealsense
get_frame_metadata(m_file, info_topic, stream_id, motion_data, additional_data);
}

frame_interface* frame = m_frame_source->alloc_frame(RS2_EXTENSION_MOTION_FRAME, 3 * sizeof(float), additional_data, true);
frame_interface * frame = m_frame_source->alloc_frame( RS2_EXTENSION_MOTION_FRAME,
3 * sizeof( float ),
std::move( additional_data ),
true );
if (frame == nullptr)
{
LOG_WARNING("Failed to allocate new frame");
Expand Down Expand Up @@ -637,7 +643,7 @@ namespace librealsense

additional_data.timestamp = timestamp_ms.count();

frame_interface* new_frame = m_frame_source->alloc_frame(frame_type, frame_size, additional_data, true);
frame_interface* new_frame = m_frame_source->alloc_frame(frame_type, frame_size, std::move( additional_data ), true);
if (new_frame == nullptr)
{
LOG_WARNING("Failed to allocate new frame");
Expand Down
15 changes: 11 additions & 4 deletions src/proc/synthetic-stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,11 @@ namespace librealsense
data.system_time = _actual_source.get_time();
data.is_blocking = original->is_blocking();

auto res = _actual_source.alloc_frame(frame_type, vid_stream->get_width() * vid_stream->get_height() * sizeof(float) * 5, data, true);
auto res
= _actual_source.alloc_frame( frame_type,
vid_stream->get_width() * vid_stream->get_height() * sizeof( float ) * 5,
std::move( data ),
true );
if (!res) throw wrong_api_call_sequence_exception("Out of frame resources!");
res->set_sensor(original->get_sensor());
res->set_stream(stream);
Expand Down Expand Up @@ -402,7 +406,7 @@ namespace librealsense

auto of = dynamic_cast<frame*>(original);
frame_additional_data data = of->additional_data;
auto res = _actual_source.alloc_frame(frame_type, stride * height, data, true);
auto res = _actual_source.alloc_frame( frame_type, stride * height, std::move( data ), true );
if (!res) throw wrong_api_call_sequence_exception("Out of frame resources!");
vf = dynamic_cast<video_frame*>(res);
vf->metadata_parsers = of->metadata_parsers;
Expand All @@ -425,7 +429,7 @@ namespace librealsense
{
auto of = dynamic_cast<frame*>(original);
frame_additional_data data = of->additional_data;
auto res = _actual_source.alloc_frame(frame_type, of->get_frame_data_size(), data, true);
auto res = _actual_source.alloc_frame( frame_type, of->get_frame_data_size(), std::move( data ), true );
if (!res) throw wrong_api_call_sequence_exception("Out of frame resources!");
auto mf = dynamic_cast<motion_frame*>(res);
mf->metadata_parsers = of->metadata_parsers;
Expand Down Expand Up @@ -471,7 +475,10 @@ namespace librealsense
for (auto&& f : holders)
req_size += get_embeded_frames_size(f.frame);

auto res = _actual_source.alloc_frame(RS2_EXTENSION_COMPOSITE_FRAME, req_size * sizeof(rs2_frame*), d, true);
auto res = _actual_source.alloc_frame( RS2_EXTENSION_COMPOSITE_FRAME,
req_size * sizeof( rs2_frame * ),
std::move( d ),
true );
if (!res) return nullptr;

auto cf = static_cast<composite_frame*>(res);
Expand Down
5 changes: 3 additions & 2 deletions src/sensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ void log_callback_end( uint32_t fps,
frame_holder fh = _source.alloc_frame(
stream_to_frame_types( req_profile_base->get_stream_type() ),
expected_size,
fr->additional_data,
std::move( fr->additional_data ),
true );
auto diff = environment::get_instance().get_time_service()->get_time() - system_time;
if( diff > 10 )
Expand Down Expand Up @@ -1013,7 +1013,8 @@ void log_callback_end( uint32_t fps,

last_frame_number = frame_counter;
last_timestamp = timestamp;
frame_holder frame = _source.alloc_frame(RS2_EXTENSION_MOTION_FRAME, data_size, fr->additional_data, true);
frame_holder frame
= _source.alloc_frame( RS2_EXTENSION_MOTION_FRAME, data_size, std::move( fr->additional_data ), true );
memcpy( (void *)frame->get_frame_data(),
sensor_data.fo.pixels,
sizeof( byte ) * sensor_data.fo.frame_size );
Expand Down
Loading

0 comments on commit 610e03a

Please sign in to comment.