Skip to content

Commit

Permalink
PR #11063 from Eran: Unify notification and control messages using "f…
Browse files Browse the repository at this point in the history
…lexible-message"
  • Loading branch information
maloel authored Nov 6, 2022
2 parents 792e4df + e1b4ad9 commit 51179a9
Show file tree
Hide file tree
Showing 40 changed files with 740 additions and 1,986 deletions.
7 changes: 7 additions & 0 deletions include/librealsense2/hpp/rs_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "rs_types.hpp"
#include "rs_sensor.hpp"
#include <array>
#include <cstring>

namespace rs2
{
Expand Down Expand Up @@ -117,6 +118,12 @@ namespace rs2
{
return _dev;
}
bool operator<( device const & other ) const
{
return (
std::strcmp( get_info( RS2_CAMERA_INFO_SERIAL_NUMBER ), other.get_info( RS2_CAMERA_INFO_SERIAL_NUMBER ) )
< 0 );
}

template<class T>
bool is() const
Expand Down
40 changes: 39 additions & 1 deletion include/librealsense2/utilities/json.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,45 @@ T get( nlohmann::json const & j, char const * key )
}
catch( nlohmann::json::exception & e )
{
throw std::runtime_error( "[while getting '" + std::string( key ) + "']" + e.what() );
throw std::runtime_error( "[getting '" + std::string( key ) + "']" + e.what() );
}
}


// If there, returns the value at the given index (in an array); otherwise throws!
// Turns json exceptions into runtime errors with additional info.
template < class T >
T get( nlohmann::json const & j, int index )
{
try
{
// This will throw for type mismatches, etc.
// Does not check for existence: will throw, too!
return j.at( index ).get< T >();
}
catch( nlohmann::json::exception & e )
{
throw std::runtime_error( "[getting index " + std::to_string( index ) + "]" + e.what() );
}
}


// If there, returns the value at the given iterator; otherwise throws!
// Turns json exceptions into runtime errors with additional info.
template < class T >
T get( nlohmann::json const & j, nlohmann::json::const_iterator const & it )
{
if( it == j.end() )
throw std::runtime_error( "unexpected end of json" );
try
{
// This will throw for type mismatches, etc.
// Does not check for existence: will throw, too!
return it->get< T >();
}
catch( nlohmann::json::exception & e )
{
throw std::runtime_error( std::string( "[getting iterator]" ) + e.what() );
}
}

Expand Down
257 changes: 174 additions & 83 deletions src/context.cpp

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions third-party/realdds/include/realdds/dds-device-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace realdds {

// Forward declaration
namespace topics {
class notification;
class flexible_msg;
namespace raw {
class device_info;
} // namespace raw
Expand Down Expand Up @@ -60,7 +60,7 @@ class dds_device_server
void start_streaming( const std::string & stream_name, const image_header & header );

void publish_image( const std::string & stream_name, const uint8_t * data, size_t size );
void publish_notification( topics::notification && );
void publish_notification( topics::flexible_msg && );

private:
std::shared_ptr< dds_publisher > _publisher;
Expand Down
2 changes: 1 addition & 1 deletion third-party/realdds/include/realdds/dds-device.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class dds_device
size_t foreach_stream( std::function< void( std::shared_ptr< dds_stream > stream ) > fn ) const;

void open( const dds_stream_profiles & profiles );
void close( const std::vector< std::pair< int16_t, int8_t > > & stream_uids );
void close( const dds_streams & streams );

private:
class impl;
Expand Down
10 changes: 5 additions & 5 deletions third-party/realdds/include/realdds/dds-notification-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#pragma once


#include <realdds/topics/notification/notification-msg.h>
#include <realdds/topics/flexible/flexible-msg.h>
#include <librealsense2/utilities/concurrency/concurrency.h>

#include <memory>
Expand Down Expand Up @@ -35,19 +35,19 @@ class dds_notification_server
bool is_running() const { return _active; }

// On-demand notification: these happen sequentially and from another thread
void send_notification( topics::notification && notification );
void send_notification( topics::flexible_msg && notification );

// On-discovery notification: when a new client is detected
void add_discovery_notification( topics::notification && notification );
void add_discovery_notification( topics::flexible_msg && notification );

private:
void send_discovery_notifications();

std::shared_ptr< dds_publisher > _publisher;
std::shared_ptr< dds_topic_writer > _writer;
active_object<> _notifications_loop;
single_consumer_queue< topics::raw::notification > _instant_notifications;
std::vector< topics::raw::notification > _discovery_notifications;
single_consumer_queue< topics::raw::flexible > _instant_notifications;
std::vector< topics::raw::flexible > _discovery_notifications;
std::mutex _notification_send_mutex;
std::condition_variable _send_notification_cv;
std::atomic_bool _send_init_msgs = { false };
Expand Down
78 changes: 29 additions & 49 deletions third-party/realdds/include/realdds/dds-stream-profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,6 @@
namespace realdds {


union dds_stream_uid
{
uint32_t whole = 0;
struct
{
int16_t sid; // Stream ID; assigned by the server, but may not be unique because of index
int8_t index; // Used to distinguish similar streams like IR L / R, 0 otherwise
};

dds_stream_uid() = default;
dds_stream_uid( dds_stream_uid const & ) = default;
dds_stream_uid( dds_stream_uid && ) = default;

dds_stream_uid( uint32_t whole_ )
: whole( whole_ )
{
}

dds_stream_uid( int sid_, int index_ )
{
whole = 0; // it covers an extra byte, which needs to be 0
sid = static_cast<int16_t>( sid_ );
index = static_cast<int8_t>( index_ );
}

std::string to_string() const;
};


inline bool operator<( dds_stream_uid const & l, dds_stream_uid const & r )
{
return l.whole < r.whole;
}


// Similar to fourcc, this describes how a stream data is organized. The characters are zero-terminated so it can be
// shorter than 'size' and can be easily converted from/to string.
//
Expand Down Expand Up @@ -83,39 +48,52 @@ class dds_stream_base;

class dds_stream_profile
{
dds_stream_uid _uid;
dds_stream_format _format;
int16_t _frequency; // "Frames" per second
dds_stream_format _format;

std::weak_ptr< dds_stream_base > _stream;

public:
virtual ~dds_stream_profile() {}

protected:
dds_stream_profile( dds_stream_uid uid, dds_stream_format format, int16_t frequency )
: _uid( uid )
, _format( format )
dds_stream_profile( dds_stream_format format, int16_t frequency )
: _format( format )
, _frequency( frequency )
{
}
dds_stream_profile( dds_stream_profile && ) = default;
dds_stream_profile( nlohmann::json const &, int & index );

public:
std::shared_ptr< dds_stream_base > stream() const { return _stream.lock(); }
// This is for initialization and is called from dds_stream_base only!
void init_stream( std::weak_ptr< dds_stream_base > const & stream );

dds_stream_uid uid() const { return _uid; }
dds_stream_format format() const { return _format; }
int16_t frequency() const { return _frequency; }

// These are for debugging - not functional
virtual std::string to_string() const;
virtual std::string details_to_string() const;

// Serialization to a JSON representation
// Serialization to a JSON array representation
virtual nlohmann::json to_json() const;

// Build a profile from a json array object, e.g.:
// auto profile = dds_stream_profile::from_json< dds_video_stream_profile >( j );
// This is the reverse of to_json() which returns a json array
template< class final_stream_profile >
static std::shared_ptr< final_stream_profile > from_json( nlohmann::json const & j )
{
int it = 0;
auto profile = std::make_shared< final_stream_profile >( j, it );
verify_end_of_json( j, it ); // just so it's not in the header
return profile;
}

private:
static void verify_end_of_json( nlohmann::json const &, int index );
};


Expand All @@ -128,21 +106,19 @@ class dds_video_stream_profile : public dds_stream_profile

uint16_t _width; // Resolution width [pixels]
uint16_t _height; // Resolution height [pixels]
uint8_t _bytes_per_pixel;

public:
dds_video_stream_profile( dds_stream_uid uid, dds_stream_format format, int16_t frequency, uint16_t width, uint16_t height, uint8_t bytes_per_pixel )
: dds_stream_profile( uid, format, frequency )
dds_video_stream_profile( dds_stream_format format, int16_t frequency, uint16_t width, uint16_t height )
: super( format, frequency )
, _width( width )
, _height( height )
, _bytes_per_pixel( bytes_per_pixel )
{
}
dds_video_stream_profile( nlohmann::json const &, int & index );
dds_video_stream_profile( dds_video_stream_profile && ) = default;

uint16_t width() const { return _width; }
uint16_t height() const { return _height; }
uint8_t bytes_per_pixel() const { return _bytes_per_pixel; }

std::string details_to_string() const override;

Expand All @@ -155,8 +131,12 @@ class dds_motion_stream_profile : public dds_stream_profile
typedef dds_stream_profile super;

public:
dds_motion_stream_profile( dds_stream_uid uid, dds_stream_format format, int16_t frequency )
: dds_stream_profile( uid, format, frequency )
dds_motion_stream_profile( dds_stream_format format, int16_t frequency )
: super( format, frequency )
{
}
dds_motion_stream_profile( nlohmann::json const & j, int & index )
: super( j, index )
{
}
dds_motion_stream_profile( dds_motion_stream_profile && ) = default;
Expand Down
5 changes: 4 additions & 1 deletion third-party/realdds/include/realdds/dds-stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
#include "dds-stream-base.h"
#include "dds-stream-profile.h"

#include <functional>
#include <memory>
#include <string>
#include <vector>

namespace realdds {

Expand Down Expand Up @@ -62,4 +62,7 @@ class dds_motion_stream : public dds_stream
};


typedef std::vector< std::shared_ptr< dds_stream > > dds_streams;


} // namespace realdds
Loading

0 comments on commit 51179a9

Please sign in to comment.