Skip to content

Commit

Permalink
fix(system_monitor): multithreading support for boost::process (tier4…
Browse files Browse the repository at this point in the history
…#1714)

Signed-off-by: v-nakayama7440-esol <v-nakayama7440@esol.co.jp>

Signed-off-by: v-nakayama7440-esol <v-nakayama7440@esol.co.jp>
  • Loading branch information
v-nakayama7440-esol authored and boyali committed Sep 28, 2022
1 parent da9e436 commit f841586
Show file tree
Hide file tree
Showing 7 changed files with 327 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@ class NTPMonitor : public rclcpp::Node
* @brief function to execute chronyc
* @param [out] outOffset offset value of NTP time
* @param [out] out_tracking_map "chronyc tracking" output for diagnostic
* @return if error occurred, return error string
* @param [out] pipe2_err_str if pipe2 error occurred, return error string
* @return if chronyc error occurred, return error string
*/
std::string executeChronyc(
float & outOffset, std::map<std::string, std::string> & out_tracking_map);
float & outOffset, std::map<std::string, std::string> & out_tracking_map,
std::string & pipe2_err_str);

diagnostic_updater::Updater updater_; //!< @brief Updater class which advertises to /diagnostics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ class ProcessMonitor : public rclcpp::Node
memory_tasks_; //!< @brief list of diagnostics tasks for high memory procs
rclcpp::TimerBase::SharedPtr timer_; //!< @brief timer to execute top command
std::string top_output_; //!< @brief output from top command
bool is_top_error_; //!< @brief flag if an error occurs
bool is_top_error_; //!< @brief flag if an top error occurs
bool is_pipe2_error_; //!< @brief flag if an pipe2 error occurs
double elapsed_ms_; //!< @brief Execution time of top command
std::mutex mutex_; //!< @brief mutex for output from top command
rclcpp::CallbackGroup::SharedPtr timer_callback_group_; //!< @brief Callback Group
Expand Down
27 changes: 25 additions & 2 deletions system/system_monitor/src/cpu_monitor/cpu_monitor_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,31 @@ void CPUMonitorBase::checkUsage(diagnostic_updater::DiagnosticStatusWrapper & st
}

// Get CPU Usage
bp::ipstream is_out;
bp::ipstream is_err;

// boost::process create file descriptor without O_CLOEXEC required for multithreading.
// So create file descriptor with O_CLOEXEC and pass it to boost::process.
int out_fd[2];
if (pipe2(out_fd, O_CLOEXEC) != 0) {
stat.summary(DiagStatus::ERROR, "pipe2 error");
stat.add("pipe2", strerror(errno));
cpu_usage.all.status = CpuStatus::STALE;
publishCpuUsage(cpu_usage);
return;
}
bp::pipe out_pipe{out_fd[0], out_fd[1]};
bp::ipstream is_out{std::move(out_pipe)};

int err_fd[2];
if (pipe2(err_fd, O_CLOEXEC) != 0) {
stat.summary(DiagStatus::ERROR, "pipe2 error");
stat.add("pipe2", strerror(errno));
cpu_usage.all.status = CpuStatus::STALE;
publishCpuUsage(cpu_usage);
return;
}
bp::pipe err_pipe{err_fd[0], err_fd[1]};
bp::ipstream is_err{std::move(err_pipe)};

bp::child c("mpstat -P ALL 1 1 -o JSON", bp::std_out > is_out, bp::std_err > is_err);
c.wait();

Expand Down
47 changes: 43 additions & 4 deletions system/system_monitor/src/hdd_monitor/hdd_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,31 @@ void HDDMonitor::checkUsage(diagnostic_updater::DiagnosticStatusWrapper & stat)

for (auto itr = hdd_params_.begin(); itr != hdd_params_.end(); ++itr, ++hdd_index) {
// Get summary of disk space usage of ext4
bp::ipstream is_out;
bp::ipstream is_err;

// boost::process create file descriptor without O_CLOEXEC required for multithreading.
// So create file descriptor with O_CLOEXEC and pass it to boost::process.
int out_fd[2];
if (pipe2(out_fd, O_CLOEXEC) != 0) {
error_str = "pipe2 error";
stat.add(fmt::format("HDD {}: status", hdd_index), "pipe2 error");
stat.add(fmt::format("HDD {}: name", hdd_index), itr->first.c_str());
stat.add(fmt::format("HDD {}: pipe2", hdd_index), strerror(errno));
continue;
}
bp::pipe out_pipe{out_fd[0], out_fd[1]};
bp::ipstream is_out{std::move(out_pipe)};

int err_fd[2];
if (pipe2(err_fd, O_CLOEXEC) != 0) {
error_str = "pipe2 error";
stat.add(fmt::format("HDD {}: status", hdd_index), "pipe2 error");
stat.add(fmt::format("HDD {}: name", hdd_index), itr->first.c_str());
stat.add(fmt::format("HDD {}: pipe2", hdd_index), strerror(errno));
continue;
}
bp::pipe err_pipe{err_fd[0], err_fd[1]};
bp::ipstream is_err{std::move(err_pipe)};

// Invoke shell to use shell wildcard expansion
bp::child c(
"/bin/sh", "-c", fmt::format("df -Pm {}*", itr->first.c_str()), bp::std_out > is_out,
Expand Down Expand Up @@ -324,8 +347,24 @@ void HDDMonitor::getHDDParams()
std::string HDDMonitor::getDeviceFromMountPoint(const std::string & mount_point)
{
std::string ret;
bp::ipstream is_out;
bp::ipstream is_err;

// boost::process create file descriptor without O_CLOEXEC required for multithreading.
// So create file descriptor with O_CLOEXEC and pass it to boost::process.
int out_fd[2];
if (pipe2(out_fd, O_CLOEXEC) != 0) {
RCLCPP_ERROR(get_logger(), "Failed to execute pipe2. %s", strerror(errno));
return "";
}
bp::pipe out_pipe{out_fd[0], out_fd[1]};
bp::ipstream is_out{std::move(out_pipe)};

int err_fd[2];
if (pipe2(err_fd, O_CLOEXEC) != 0) {
RCLCPP_ERROR(get_logger(), "Failed to execute pipe2. %s", strerror(errno));
return "";
}
bp::pipe err_pipe{err_fd[0], err_fd[1]};
bp::ipstream is_err{std::move(err_pipe)};

bp::child c(
"/bin/sh", "-c", fmt::format("findmnt -n -o SOURCE {}", mount_point.c_str()),
Expand Down
23 changes: 21 additions & 2 deletions system/system_monitor/src/mem_monitor/mem_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,27 @@ void MemMonitor::checkUsage(diagnostic_updater::DiagnosticStatusWrapper & stat)
const auto t_start = SystemMonitorUtility::startMeasurement();

// Get total amount of free and used memory
bp::ipstream is_out;
bp::ipstream is_err;

// boost::process create file descriptor without O_CLOEXEC required for multithreading.
// So create file descriptor with O_CLOEXEC and pass it to boost::process.
int out_fd[2];
if (pipe2(out_fd, O_CLOEXEC) != 0) {
stat.summary(DiagStatus::ERROR, "pipe2 error");
stat.add("pipe2", strerror(errno));
return;
}
bp::pipe out_pipe{out_fd[0], out_fd[1]};
bp::ipstream is_out{std::move(out_pipe)};

int err_fd[2];
if (pipe2(err_fd, O_CLOEXEC) != 0) {
stat.summary(DiagStatus::ERROR, "pipe2 error");
stat.add("pipe2", strerror(errno));
return;
}
bp::pipe err_pipe{err_fd[0], err_fd[1]};
bp::ipstream is_err{std::move(err_pipe)};

bp::child c("free -tb", bp::std_out > is_out, bp::std_err > is_err);
c.wait();
if (c.exit_code() != 0) {
Expand Down
23 changes: 20 additions & 3 deletions system/system_monitor/src/ntp_monitor/ntp_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,15 @@ void NTPMonitor::checkOffset(diagnostic_updater::DiagnosticStatusWrapper & stat)
}

std::string error_str;
std::string pipe2_err_str;
float offset = 0.0f;
std::map<std::string, std::string> tracking_map;
error_str = executeChronyc(offset, tracking_map);
error_str = executeChronyc(offset, tracking_map, pipe2_err_str);
if (!pipe2_err_str.empty()) {
stat.summary(DiagStatus::ERROR, "pipe2 error");
stat.add("pipe2", pipe2_err_str);
return;
}
if (!error_str.empty()) {
stat.summary(DiagStatus::ERROR, "chronyc error");
stat.add("chronyc", error_str);
Expand All @@ -92,12 +98,23 @@ void NTPMonitor::checkOffset(diagnostic_updater::DiagnosticStatusWrapper & stat)
}

std::string NTPMonitor::executeChronyc(
float & out_offset, std::map<std::string, std::string> & out_tracking_map)
float & out_offset, std::map<std::string, std::string> & out_tracking_map,
std::string & pipe2_err_str)
{
std::string result;

// Tracking chrony status
bp::ipstream is_out;

// boost::process create file descriptor without O_CLOEXEC required for multithreading.
// So create file descriptor with O_CLOEXEC and pass it to boost::process.
int out_fd[2];
if (pipe2(out_fd, O_CLOEXEC) != 0) {
pipe2_err_str = std::string(strerror(errno));
return result;
}
bp::pipe out_pipe{out_fd[0], out_fd[1]};
bp::ipstream is_out{std::move(out_pipe)};

bp::child c("chronyc tracking", bp::std_out > is_out);
c.wait();
if (c.exit_code() != 0) {
Expand Down
Loading

0 comments on commit f841586

Please sign in to comment.