Skip to content

Commit

Permalink
add dds-topic-reader::stop()
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel committed Mar 16, 2023
1 parent 0315b56 commit 10174fd
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 1 deletion.
3 changes: 3 additions & 0 deletions third-party/realdds/include/realdds/dds-topic-reader-thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ class dds_topic_reader_thread : public dds_topic_reader
typedef dds_topic_reader super;

public:
dds_topic_reader_thread( std::shared_ptr< dds_topic > const & topic );
dds_topic_reader_thread( std::shared_ptr< dds_topic > const & topic,
std::shared_ptr< dds_subscriber > const & subscriber );
~dds_topic_reader_thread();

void run( qos const & ) override;
void stop() override;
};


Expand Down
3 changes: 3 additions & 0 deletions third-party/realdds/include/realdds/dds-topic-reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class dds_topic_reader : public eprosima::fastdds::dds::DataReaderListener
// The callbacks should be set before we actually create the underlying DDS objects, so the reader does not
virtual void run( qos const & );

// Go back to a pre-run() state, such that is_running() returns false
virtual void stop();

// DataReaderListener
protected:
void on_subscription_matched( eprosima::fastdds::dds::DataReader *,
Expand Down
26 changes: 25 additions & 1 deletion third-party/realdds/src/dds-topic-reader-thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,39 @@
namespace realdds {


dds_topic_reader_thread::dds_topic_reader_thread( std::shared_ptr< dds_topic > const & topic )
: dds_topic_reader_thread( topic, std::make_shared< dds_subscriber >( topic->get_participant() ) )
{
}


dds_topic_reader_thread::dds_topic_reader_thread( std::shared_ptr< dds_topic > const & topic,
std::shared_ptr< dds_subscriber > const & subscriber )
: super( topic, subscriber )
, _th(
[this]( dispatcher::cancellable_timer )
[this, name = topic->get()->get_name()]( dispatcher::cancellable_timer )
{
if( ! _reader )
return;
eprosima::fastrtps::Duration_t const one_second = { 1, 0 };
LOG_DEBUG( "----> '" << name << "' waiting for message" );
if( _reader->wait_for_unread_message( one_second ) )
{
LOG_DEBUG( "----> '" << name << "' callback" );
_on_data_available();
LOG_DEBUG( "<---- '" << name << "' callback" );
}
} )
{
}


dds_topic_reader_thread::~dds_topic_reader_thread()
{
LOG_DEBUG( "xxxxx '" << ( _reader ? _reader->get_topicdescription()->get_name() : "unknown" ) << "' dtor" );
}


void dds_topic_reader_thread::run( qos const & rqos )
{
if( ! _on_data_available )
Expand All @@ -43,4 +60,11 @@ void dds_topic_reader_thread::run( qos const & rqos )
}


void dds_topic_reader_thread::stop()
{
_th.stop();
super::stop();
}


} // namespace realdds
14 changes: 14 additions & 0 deletions third-party/realdds/src/dds-topic-reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,20 @@ void dds_topic_reader::run( qos const & rqos )
}


void dds_topic_reader::stop()
{
if( _subscriber )
{
if( _reader )
{
DDS_API_CALL_NO_THROW( _subscriber->get()->delete_datareader( _reader ) );
_reader = nullptr;
}
}
assert( ! is_running() );
}


void dds_topic_reader::on_subscription_matched(
eprosima::fastdds::dds::DataReader *, eprosima::fastdds::dds::SubscriptionMatchedStatus const & info )
{
Expand Down

0 comments on commit 10174fd

Please sign in to comment.