From 10174fd5306fa62ef442a5941e32078102af3786 Mon Sep 17 00:00:00 2001 From: Eran Date: Thu, 16 Mar 2023 09:09:43 +0200 Subject: [PATCH] add dds-topic-reader::stop() --- .../include/realdds/dds-topic-reader-thread.h | 3 +++ .../include/realdds/dds-topic-reader.h | 3 +++ .../realdds/src/dds-topic-reader-thread.cpp | 26 ++++++++++++++++++- third-party/realdds/src/dds-topic-reader.cpp | 14 ++++++++++ 4 files changed, 45 insertions(+), 1 deletion(-) diff --git a/third-party/realdds/include/realdds/dds-topic-reader-thread.h b/third-party/realdds/include/realdds/dds-topic-reader-thread.h index 064e9f4e13..b5efc89b10 100644 --- a/third-party/realdds/include/realdds/dds-topic-reader-thread.h +++ b/third-party/realdds/include/realdds/dds-topic-reader-thread.h @@ -33,10 +33,13 @@ class dds_topic_reader_thread : public dds_topic_reader typedef dds_topic_reader super; public: + dds_topic_reader_thread( std::shared_ptr< dds_topic > const & topic ); dds_topic_reader_thread( std::shared_ptr< dds_topic > const & topic, std::shared_ptr< dds_subscriber > const & subscriber ); + ~dds_topic_reader_thread(); void run( qos const & ) override; + void stop() override; }; diff --git a/third-party/realdds/include/realdds/dds-topic-reader.h b/third-party/realdds/include/realdds/dds-topic-reader.h index d2e6a61291..d46838e963 100644 --- a/third-party/realdds/include/realdds/dds-topic-reader.h +++ b/third-party/realdds/include/realdds/dds-topic-reader.h @@ -75,6 +75,9 @@ class dds_topic_reader : public eprosima::fastdds::dds::DataReaderListener // The callbacks should be set before we actually create the underlying DDS objects, so the reader does not virtual void run( qos const & ); + // Go back to a pre-run() state, such that is_running() returns false + virtual void stop(); + // DataReaderListener protected: void on_subscription_matched( eprosima::fastdds::dds::DataReader *, diff --git a/third-party/realdds/src/dds-topic-reader-thread.cpp b/third-party/realdds/src/dds-topic-reader-thread.cpp index 6ea4f6b1f2..b9bcc727b8 100644 --- a/third-party/realdds/src/dds-topic-reader-thread.cpp +++ b/third-party/realdds/src/dds-topic-reader-thread.cpp @@ -14,22 +14,39 @@ namespace realdds { +dds_topic_reader_thread::dds_topic_reader_thread( std::shared_ptr< dds_topic > const & topic ) + : dds_topic_reader_thread( topic, std::make_shared< dds_subscriber >( topic->get_participant() ) ) +{ +} + + dds_topic_reader_thread::dds_topic_reader_thread( std::shared_ptr< dds_topic > const & topic, std::shared_ptr< dds_subscriber > const & subscriber ) : super( topic, subscriber ) , _th( - [this]( dispatcher::cancellable_timer ) + [this, name = topic->get()->get_name()]( dispatcher::cancellable_timer ) { if( ! _reader ) return; eprosima::fastrtps::Duration_t const one_second = { 1, 0 }; + LOG_DEBUG( "----> '" << name << "' waiting for message" ); if( _reader->wait_for_unread_message( one_second ) ) + { + LOG_DEBUG( "----> '" << name << "' callback" ); _on_data_available(); + LOG_DEBUG( "<---- '" << name << "' callback" ); + } } ) { } +dds_topic_reader_thread::~dds_topic_reader_thread() +{ + LOG_DEBUG( "xxxxx '" << ( _reader ? _reader->get_topicdescription()->get_name() : "unknown" ) << "' dtor" ); +} + + void dds_topic_reader_thread::run( qos const & rqos ) { if( ! _on_data_available ) @@ -43,4 +60,11 @@ void dds_topic_reader_thread::run( qos const & rqos ) } +void dds_topic_reader_thread::stop() +{ + _th.stop(); + super::stop(); +} + + } // namespace realdds diff --git a/third-party/realdds/src/dds-topic-reader.cpp b/third-party/realdds/src/dds-topic-reader.cpp index 1733ff1bb5..f011c40e8f 100644 --- a/third-party/realdds/src/dds-topic-reader.cpp +++ b/third-party/realdds/src/dds-topic-reader.cpp @@ -94,6 +94,20 @@ void dds_topic_reader::run( qos const & rqos ) } +void dds_topic_reader::stop() +{ + if( _subscriber ) + { + if( _reader ) + { + DDS_API_CALL_NO_THROW( _subscriber->get()->delete_datareader( _reader ) ); + _reader = nullptr; + } + } + assert( ! is_running() ); +} + + void dds_topic_reader::on_subscription_matched( eprosima::fastdds::dds::DataReader *, eprosima::fastdds::dds::SubscriptionMatchedStatus const & info ) {