diff --git a/system/topic_state_monitor/CMakeLists.txt b/system/topic_state_monitor/CMakeLists.txt new file mode 100644 index 0000000000000..cefc2f00e9d07 --- /dev/null +++ b/system/topic_state_monitor/CMakeLists.txt @@ -0,0 +1,35 @@ +cmake_minimum_required(VERSION 3.5) +project(topic_state_monitor) + +if(NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 14) + set(CMAKE_CXX_STANDARD_REQUIRED ON) + set(CMAKE_CXX_EXTENSIONS OFF) +endif() +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic -Werror) +endif() + +find_package(ament_cmake_auto REQUIRED) +ament_auto_find_build_dependencies() + +ament_auto_add_library(topic_state_monitor SHARED + src/topic_state_monitor/topic_state_monitor.cpp + src/topic_state_monitor_core.cpp +) + +rclcpp_components_register_node(topic_state_monitor + PLUGIN "topic_state_monitor::TopicStateMonitorNode" + EXECUTABLE topic_state_monitor_node +) + +if(BUILD_TESTING) + find_package(ament_lint_auto REQUIRED) + ament_lint_auto_find_test_dependencies() +endif() + +# Install +## directories +ament_auto_package(INSTALL_TO_SHARE + launch +) diff --git a/system/topic_state_monitor/Readme.md b/system/topic_state_monitor/Readme.md new file mode 100644 index 0000000000000..434e63c1ca783 --- /dev/null +++ b/system/topic_state_monitor/Readme.md @@ -0,0 +1,58 @@ +# topic_state_monitor + +## Purpose + +This node monitors input topic for abnormalities such as timeout and low frequency. +The result of topic status is published as diagnostics. + +## Inner-workings / Algorithms + +The types of topic status and corresponding diagnostic status are following. + +| Topic status | Diagnostic status | Description | +| ------------- | ----------------- | ---------------------------------------------------- | +| `OK` | OK | The topic has no abnormalities | +| `NotReceived` | ERROR | The topic has not been received yet | +| `WarnRate` | WARN | The frequency of the topic is dropped | +| `ErrorRate` | ERROR | The frequency of the topic is significantly dropped | +| `Timeout` | ERROR | The topic subscription is stopped for a certain time | + +## Inputs / Outputs + +### Input + +| Name | Type | Description | +| -------- | -------- | --------------------------------- | +| any name | any type | Subscribe target topic to monitor | + +### Output + +| Name | Type | Description | +| -------------- | --------------------------------- | ------------------- | +| `/diagnostics` | `diagnostic_msgs/DiagnosticArray` | Diagnostics outputs | + +## Parameters + +### Node Parameters + +| Name | Type | Default Value | Description | +| ------------- | ------ | ------------- | ----------------------------------------------------- | +| `update_rate` | double | 10.0 | Timer callback period [Hz] | +| `window_size` | int | 10 | Window size of target topic for calculating frequency | + +### Core Parameters + +| Name | Type | Default Value | Description | +| ----------------- | ------ | ------------- | ---------------------------------------------------------------------------------------------------- | +| `topic` | string | - | Name of target topic | +| `topic_type` | string | - | Type of target topic | +| `transient_local` | bool | false | QoS policy of topic subscription (Transient Local/Volatile) | +| `best_effort` | bool | false | QoS policy of topic subscription (Best Effort/Reliable) | +| `diag_name` | string | - | Name used for the diagnostics to publish | +| `warn_rate` | double | 0.5 | If the topic rate is lower than this value, the topic status becomes `WarnRate` | +| `error_rate` | double | 0.1 | If the topic rate is lower than this value, the topic status becomes `ErrorRate` | +| `timeout` | double | 1.0 | If the topic subscription is stopped for more than this time [s], the topic status becomes `Timeout` | + +## Assumptions / Known limits + +TBD. diff --git a/system/topic_state_monitor/include/topic_state_monitor/topic_state_monitor.hpp b/system/topic_state_monitor/include/topic_state_monitor/topic_state_monitor.hpp new file mode 100644 index 0000000000000..3ef80851b59a4 --- /dev/null +++ b/system/topic_state_monitor/include/topic_state_monitor/topic_state_monitor.hpp @@ -0,0 +1,78 @@ +// Copyright 2020 Tier IV, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef TOPIC_STATE_MONITOR__TOPIC_STATE_MONITOR_HPP_ +#define TOPIC_STATE_MONITOR__TOPIC_STATE_MONITOR_HPP_ + +#include + +#include +#include + +namespace topic_state_monitor +{ +struct Param +{ + std::string topic; + std::string topic_type; + bool transient_local; + bool best_effort; + std::string diag_name; + double warn_rate; + double error_rate; + double timeout; + int window_size; +}; + +enum class TopicStatus : int8_t { + Ok, + NotReceived, + WarnRate, + ErrorRate, + Timeout, +}; + +class TopicStateMonitor +{ +public: + explicit TopicStateMonitor(rclcpp::Node & node); + + void setParam(const Param & param) { param_ = param; } + + rclcpp::Time getLastMessageTime() const { return last_message_time_; } + double getTopicRate() const { return topic_rate_; } + + void update(); + TopicStatus getTopicStatus() const; + +private: + Param param_; + + static constexpr double max_rate = 100000.0; + + std::deque time_buffer_; + rclcpp::Time last_message_time_ = rclcpp::Time(0); + double topic_rate_ = TopicStateMonitor::max_rate; + + rclcpp::Clock::SharedPtr clock_; + + double calcTopicRate() const; + bool isNotReceived() const; + bool isWarnRate() const; + bool isErrorRate() const; + bool isTimeout() const; +}; +} // namespace topic_state_monitor + +#endif // TOPIC_STATE_MONITOR__TOPIC_STATE_MONITOR_HPP_ diff --git a/system/topic_state_monitor/include/topic_state_monitor/topic_state_monitor_core.hpp b/system/topic_state_monitor/include/topic_state_monitor/topic_state_monitor_core.hpp new file mode 100644 index 0000000000000..f05ecc51206a5 --- /dev/null +++ b/system/topic_state_monitor/include/topic_state_monitor/topic_state_monitor_core.hpp @@ -0,0 +1,68 @@ +// Copyright 2020 Tier IV, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef TOPIC_STATE_MONITOR__TOPIC_STATE_MONITOR_CORE_HPP_ +#define TOPIC_STATE_MONITOR__TOPIC_STATE_MONITOR_CORE_HPP_ + +#include "topic_state_monitor/topic_state_monitor.hpp" + +#include +#include + +#include +#include +#include +#include +#include + +namespace topic_state_monitor +{ +struct NodeParam +{ + double update_rate; +}; + +class TopicStateMonitorNode : public rclcpp::Node +{ +public: + explicit TopicStateMonitorNode(const rclcpp::NodeOptions & node_options); + +private: + // Parameter + NodeParam node_param_; + Param param_; + + // Parameter Reconfigure + OnSetParametersCallbackHandle::SharedPtr set_param_res_; + rcl_interfaces::msg::SetParametersResult onParameter( + const std::vector & parameters); + + // Core + std::unique_ptr topic_state_monitor_; + + // Subscriber + rclcpp::GenericSubscription::SharedPtr sub_topic_; + + // Timer + void onTimer(); + rclcpp::TimerBase::SharedPtr timer_; + + // Diagnostic Updater + diagnostic_updater::Updater updater_; + + void checkTopicStatus(diagnostic_updater::DiagnosticStatusWrapper & stat); +}; +} // namespace topic_state_monitor + +#endif // TOPIC_STATE_MONITOR__TOPIC_STATE_MONITOR_CORE_HPP_ diff --git a/system/topic_state_monitor/launch/topic_state_monitor.launch.xml b/system/topic_state_monitor/launch/topic_state_monitor.launch.xml new file mode 100644 index 0000000000000..e5d416f61f19a --- /dev/null +++ b/system/topic_state_monitor/launch/topic_state_monitor.launch.xml @@ -0,0 +1,24 @@ + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/system/topic_state_monitor/package.xml b/system/topic_state_monitor/package.xml new file mode 100644 index 0000000000000..0435d04a20f37 --- /dev/null +++ b/system/topic_state_monitor/package.xml @@ -0,0 +1,22 @@ + + + topic_state_monitor + 0.1.0 + The topic_state_monitor package + Kenji Miyake + Apache License 2.0 + + ament_cmake_auto + + ament_index_cpp + diagnostic_updater + rclcpp + rclcpp_components + + ament_lint_auto + autoware_lint_common + + + ament_cmake + + diff --git a/system/topic_state_monitor/src/topic_state_monitor/topic_state_monitor.cpp b/system/topic_state_monitor/src/topic_state_monitor/topic_state_monitor.cpp new file mode 100644 index 0000000000000..b6a480c0966fb --- /dev/null +++ b/system/topic_state_monitor/src/topic_state_monitor/topic_state_monitor.cpp @@ -0,0 +1,97 @@ +// Copyright 2020 Tier IV, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "topic_state_monitor/topic_state_monitor.hpp" + +namespace topic_state_monitor +{ +TopicStateMonitor::TopicStateMonitor(rclcpp::Node & node) : clock_(node.get_clock()) {} + +void TopicStateMonitor::update() +{ + // Add data + last_message_time_ = clock_->now(); + time_buffer_.push_back(last_message_time_); + + // Remove old data + while (static_cast(time_buffer_.size()) > param_.window_size) { + time_buffer_.pop_front(); + } + + // Calc topic rate + topic_rate_ = calcTopicRate(); +} + +TopicStatus TopicStateMonitor::getTopicStatus() const +{ + if (isNotReceived()) { + return TopicStatus::NotReceived; + } + if (isTimeout()) { + return TopicStatus::Timeout; + } + if (isErrorRate()) { + return TopicStatus::ErrorRate; + } + if (isWarnRate()) { + return TopicStatus::WarnRate; + } + return TopicStatus::Ok; +} + +double TopicStateMonitor::calcTopicRate() const +{ + // Output max_rate when topic rate can't be calculated. + // In this case, it's assumed timeout is used instead. + if (time_buffer_.size() < 2) { + return TopicStateMonitor::max_rate; + } + + const auto time_diff = (time_buffer_.back() - time_buffer_.front()).seconds(); + const auto num_intervals = time_buffer_.size() - 1; + + return static_cast(num_intervals) / time_diff; +} + +bool TopicStateMonitor::isNotReceived() const { return time_buffer_.empty(); } + +bool TopicStateMonitor::isWarnRate() const +{ + if (param_.warn_rate == 0.0) { + return false; + } + + return getTopicRate() < param_.warn_rate; +} + +bool TopicStateMonitor::isErrorRate() const +{ + if (param_.error_rate == 0.0) { + return false; + } + + return getTopicRate() < param_.error_rate; +} + +bool TopicStateMonitor::isTimeout() const +{ + if (param_.timeout == 0.0) { + return false; + } + + const auto time_diff = (clock_->now() - time_buffer_.back()).seconds(); + + return time_diff > param_.timeout; +} +} // namespace topic_state_monitor diff --git a/system/topic_state_monitor/src/topic_state_monitor_core.cpp b/system/topic_state_monitor/src/topic_state_monitor_core.cpp new file mode 100644 index 0000000000000..ccc7d7bd6b79f --- /dev/null +++ b/system/topic_state_monitor/src/topic_state_monitor_core.cpp @@ -0,0 +1,174 @@ +// Copyright 2020 Tier IV, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "topic_state_monitor/topic_state_monitor_core.hpp" + +#include +#include +#include +#include + +namespace +{ +template +void update_param( + const std::vector & parameters, const std::string & name, T & value) +{ + auto it = std::find_if( + parameters.cbegin(), parameters.cend(), + [&name](const rclcpp::Parameter & parameter) { return parameter.get_name() == name; }); + if (it != parameters.cend()) { + value = it->template get_value(); + } +} +} // namespace + +namespace topic_state_monitor +{ +TopicStateMonitorNode::TopicStateMonitorNode(const rclcpp::NodeOptions & node_options) +: Node("topic_state_monitor", node_options), updater_(this) +{ + using std::placeholders::_1; + // Parameter + node_param_.update_rate = declare_parameter("update_rate", 10.0); + param_.topic = declare_parameter("topic"); + param_.topic_type = declare_parameter("topic_type"); + param_.transient_local = declare_parameter("transient_local", false); + param_.best_effort = declare_parameter("best_effort", false); + param_.diag_name = declare_parameter("diag_name"); + param_.warn_rate = declare_parameter("warn_rate", 0.5); + param_.error_rate = declare_parameter("error_rate", 0.1); + param_.timeout = declare_parameter("timeout", 1.0); + param_.window_size = declare_parameter("window_size", 10); + + // Parameter Reconfigure + set_param_res_ = + this->add_on_set_parameters_callback(std::bind(&TopicStateMonitorNode::onParameter, this, _1)); + + // Core + topic_state_monitor_ = std::make_unique(*this); + topic_state_monitor_->setParam(param_); + + // Subscriber + rclcpp::QoS qos = rclcpp::QoS{1}; + if (param_.transient_local) { + qos.transient_local(); + } + if (param_.best_effort) { + qos.best_effort(); + } + sub_topic_ = this->create_generic_subscription( + param_.topic, param_.topic_type, qos, + [this]([[maybe_unused]] std::shared_ptr msg) { + topic_state_monitor_->update(); + }); + + // Diagnostic Updater + updater_.setHardwareID("topic_state_monitor"); + updater_.add(param_.diag_name, this, &TopicStateMonitorNode::checkTopicStatus); + + // Timer + auto timer_callback = std::bind(&TopicStateMonitorNode::onTimer, this); + auto period = std::chrono::duration_cast( + std::chrono::duration(1.0 / node_param_.update_rate)); + + timer_ = std::make_shared>( + this->get_clock(), period, std::move(timer_callback), + this->get_node_base_interface()->get_context()); + this->get_node_timers_interface()->add_timer(timer_, nullptr); +} + +rcl_interfaces::msg::SetParametersResult TopicStateMonitorNode::onParameter( + const std::vector & parameters) +{ + rcl_interfaces::msg::SetParametersResult result; + result.successful = true; + result.reason = "success"; + + try { + update_param(parameters, "warn_rate", param_.warn_rate); + update_param(parameters, "error_rate", param_.error_rate); + update_param(parameters, "timeout", param_.timeout); + update_param(parameters, "window_size", param_.window_size); + } catch (const rclcpp::exceptions::InvalidParameterTypeException & e) { + result.successful = false; + result.reason = e.what(); + } + + return result; +} + +void TopicStateMonitorNode::onTimer() +{ + // Publish diagnostics + updater_.force_update(); +} + +void TopicStateMonitorNode::checkTopicStatus(diagnostic_updater::DiagnosticStatusWrapper & stat) +{ + using diagnostic_msgs::msg::DiagnosticStatus; + + // Get information + const auto topic_status = topic_state_monitor_->getTopicStatus(); + const auto last_message_time = topic_state_monitor_->getLastMessageTime(); + const auto topic_rate = topic_state_monitor_->getTopicRate(); + + // Add topic name + stat.addf("topic", "%s", param_.topic.c_str()); + + // Judge level + int8_t level = DiagnosticStatus::OK; + if (topic_status == TopicStatus::Ok) { + level = DiagnosticStatus::OK; + stat.add("status", "OK"); + } else if (topic_status == TopicStatus::NotReceived) { + level = DiagnosticStatus::ERROR; + stat.add("status", "NotReceived"); + } else if (topic_status == TopicStatus::WarnRate) { + level = DiagnosticStatus::WARN; + stat.add("status", "WarnRate"); + } else if (topic_status == TopicStatus::ErrorRate) { + level = DiagnosticStatus::ERROR; + stat.add("status", "ErrorRate"); + } else if (topic_status == TopicStatus::Timeout) { + level = DiagnosticStatus::ERROR; + stat.add("status", "Timeout"); + } + + // Add key-value + stat.addf("warn_rate", "%.2f [Hz]", param_.warn_rate); + stat.addf("error_rate", "%.2f [Hz]", param_.error_rate); + stat.addf("timeout", "%.2f [s]", param_.timeout); + stat.addf("measured_rate", "%.2f [Hz]", topic_rate); + stat.addf("now", "%.2f [s]", this->now().seconds()); + stat.addf("last_message_time", "%.2f [s]", last_message_time.seconds()); + + // Create message + std::string msg; + if (level == DiagnosticStatus::OK) { + msg = "OK"; + } else if (level == DiagnosticStatus::WARN) { + msg = "Warn"; + } else if (level == DiagnosticStatus::ERROR) { + msg = "Error"; + } + + // Add summary + stat.summary(level, msg); +} + +} // namespace topic_state_monitor + +#include +RCLCPP_COMPONENTS_REGISTER_NODE(topic_state_monitor::TopicStateMonitorNode)