From f8415864607cf02929cced9041446818cd0a456c Mon Sep 17 00:00:00 2001 From: v-nakayama7440-esol <97144416+v-nakayama7440-esol@users.noreply.github.com> Date: Thu, 1 Sep 2022 06:36:49 +0900 Subject: [PATCH] fix(system_monitor): multithreading support for boost::process (#1714) Signed-off-by: v-nakayama7440-esol Signed-off-by: v-nakayama7440-esol --- .../ntp_monitor/ntp_monitor.hpp | 6 +- .../process_monitor/process_monitor.hpp | 3 +- .../src/cpu_monitor/cpu_monitor_base.cpp | 27 +- .../src/hdd_monitor/hdd_monitor.cpp | 47 +++- .../src/mem_monitor/mem_monitor.cpp | 23 +- .../src/ntp_monitor/ntp_monitor.cpp | 23 +- .../src/process_monitor/process_monitor.cpp | 236 ++++++++++++++++-- 7 files changed, 327 insertions(+), 38 deletions(-) diff --git a/system/system_monitor/include/system_monitor/ntp_monitor/ntp_monitor.hpp b/system/system_monitor/include/system_monitor/ntp_monitor/ntp_monitor.hpp index c319b8e35d5dd..f4ee2de666c22 100644 --- a/system/system_monitor/include/system_monitor/ntp_monitor/ntp_monitor.hpp +++ b/system/system_monitor/include/system_monitor/ntp_monitor/ntp_monitor.hpp @@ -57,10 +57,12 @@ class NTPMonitor : public rclcpp::Node * @brief function to execute chronyc * @param [out] outOffset offset value of NTP time * @param [out] out_tracking_map "chronyc tracking" output for diagnostic - * @return if error occurred, return error string + * @param [out] pipe2_err_str if pipe2 error occurred, return error string + * @return if chronyc error occurred, return error string */ std::string executeChronyc( - float & outOffset, std::map & out_tracking_map); + float & outOffset, std::map & out_tracking_map, + std::string & pipe2_err_str); diagnostic_updater::Updater updater_; //!< @brief Updater class which advertises to /diagnostics diff --git a/system/system_monitor/include/system_monitor/process_monitor/process_monitor.hpp b/system/system_monitor/include/system_monitor/process_monitor/process_monitor.hpp index 802d905d6bfd5..e114d58770883 100644 --- a/system/system_monitor/include/system_monitor/process_monitor/process_monitor.hpp +++ b/system/system_monitor/include/system_monitor/process_monitor/process_monitor.hpp @@ -122,7 +122,8 @@ class ProcessMonitor : public rclcpp::Node memory_tasks_; //!< @brief list of diagnostics tasks for high memory procs rclcpp::TimerBase::SharedPtr timer_; //!< @brief timer to execute top command std::string top_output_; //!< @brief output from top command - bool is_top_error_; //!< @brief flag if an error occurs + bool is_top_error_; //!< @brief flag if an top error occurs + bool is_pipe2_error_; //!< @brief flag if an pipe2 error occurs double elapsed_ms_; //!< @brief Execution time of top command std::mutex mutex_; //!< @brief mutex for output from top command rclcpp::CallbackGroup::SharedPtr timer_callback_group_; //!< @brief Callback Group diff --git a/system/system_monitor/src/cpu_monitor/cpu_monitor_base.cpp b/system/system_monitor/src/cpu_monitor/cpu_monitor_base.cpp index 90766fbcc79fc..5e8ad3ff357e9 100644 --- a/system/system_monitor/src/cpu_monitor/cpu_monitor_base.cpp +++ b/system/system_monitor/src/cpu_monitor/cpu_monitor_base.cpp @@ -134,8 +134,31 @@ void CPUMonitorBase::checkUsage(diagnostic_updater::DiagnosticStatusWrapper & st } // Get CPU Usage - bp::ipstream is_out; - bp::ipstream is_err; + + // boost::process create file descriptor without O_CLOEXEC required for multithreading. + // So create file descriptor with O_CLOEXEC and pass it to boost::process. + int out_fd[2]; + if (pipe2(out_fd, O_CLOEXEC) != 0) { + stat.summary(DiagStatus::ERROR, "pipe2 error"); + stat.add("pipe2", strerror(errno)); + cpu_usage.all.status = CpuStatus::STALE; + publishCpuUsage(cpu_usage); + return; + } + bp::pipe out_pipe{out_fd[0], out_fd[1]}; + bp::ipstream is_out{std::move(out_pipe)}; + + int err_fd[2]; + if (pipe2(err_fd, O_CLOEXEC) != 0) { + stat.summary(DiagStatus::ERROR, "pipe2 error"); + stat.add("pipe2", strerror(errno)); + cpu_usage.all.status = CpuStatus::STALE; + publishCpuUsage(cpu_usage); + return; + } + bp::pipe err_pipe{err_fd[0], err_fd[1]}; + bp::ipstream is_err{std::move(err_pipe)}; + bp::child c("mpstat -P ALL 1 1 -o JSON", bp::std_out > is_out, bp::std_err > is_err); c.wait(); diff --git a/system/system_monitor/src/hdd_monitor/hdd_monitor.cpp b/system/system_monitor/src/hdd_monitor/hdd_monitor.cpp index d1b15f1231987..3e05e76054f62 100644 --- a/system/system_monitor/src/hdd_monitor/hdd_monitor.cpp +++ b/system/system_monitor/src/hdd_monitor/hdd_monitor.cpp @@ -198,8 +198,31 @@ void HDDMonitor::checkUsage(diagnostic_updater::DiagnosticStatusWrapper & stat) for (auto itr = hdd_params_.begin(); itr != hdd_params_.end(); ++itr, ++hdd_index) { // Get summary of disk space usage of ext4 - bp::ipstream is_out; - bp::ipstream is_err; + + // boost::process create file descriptor without O_CLOEXEC required for multithreading. + // So create file descriptor with O_CLOEXEC and pass it to boost::process. + int out_fd[2]; + if (pipe2(out_fd, O_CLOEXEC) != 0) { + error_str = "pipe2 error"; + stat.add(fmt::format("HDD {}: status", hdd_index), "pipe2 error"); + stat.add(fmt::format("HDD {}: name", hdd_index), itr->first.c_str()); + stat.add(fmt::format("HDD {}: pipe2", hdd_index), strerror(errno)); + continue; + } + bp::pipe out_pipe{out_fd[0], out_fd[1]}; + bp::ipstream is_out{std::move(out_pipe)}; + + int err_fd[2]; + if (pipe2(err_fd, O_CLOEXEC) != 0) { + error_str = "pipe2 error"; + stat.add(fmt::format("HDD {}: status", hdd_index), "pipe2 error"); + stat.add(fmt::format("HDD {}: name", hdd_index), itr->first.c_str()); + stat.add(fmt::format("HDD {}: pipe2", hdd_index), strerror(errno)); + continue; + } + bp::pipe err_pipe{err_fd[0], err_fd[1]}; + bp::ipstream is_err{std::move(err_pipe)}; + // Invoke shell to use shell wildcard expansion bp::child c( "/bin/sh", "-c", fmt::format("df -Pm {}*", itr->first.c_str()), bp::std_out > is_out, @@ -324,8 +347,24 @@ void HDDMonitor::getHDDParams() std::string HDDMonitor::getDeviceFromMountPoint(const std::string & mount_point) { std::string ret; - bp::ipstream is_out; - bp::ipstream is_err; + + // boost::process create file descriptor without O_CLOEXEC required for multithreading. + // So create file descriptor with O_CLOEXEC and pass it to boost::process. + int out_fd[2]; + if (pipe2(out_fd, O_CLOEXEC) != 0) { + RCLCPP_ERROR(get_logger(), "Failed to execute pipe2. %s", strerror(errno)); + return ""; + } + bp::pipe out_pipe{out_fd[0], out_fd[1]}; + bp::ipstream is_out{std::move(out_pipe)}; + + int err_fd[2]; + if (pipe2(err_fd, O_CLOEXEC) != 0) { + RCLCPP_ERROR(get_logger(), "Failed to execute pipe2. %s", strerror(errno)); + return ""; + } + bp::pipe err_pipe{err_fd[0], err_fd[1]}; + bp::ipstream is_err{std::move(err_pipe)}; bp::child c( "/bin/sh", "-c", fmt::format("findmnt -n -o SOURCE {}", mount_point.c_str()), diff --git a/system/system_monitor/src/mem_monitor/mem_monitor.cpp b/system/system_monitor/src/mem_monitor/mem_monitor.cpp index b6f01b8396aab..489a4dc72bbe8 100644 --- a/system/system_monitor/src/mem_monitor/mem_monitor.cpp +++ b/system/system_monitor/src/mem_monitor/mem_monitor.cpp @@ -48,8 +48,27 @@ void MemMonitor::checkUsage(diagnostic_updater::DiagnosticStatusWrapper & stat) const auto t_start = SystemMonitorUtility::startMeasurement(); // Get total amount of free and used memory - bp::ipstream is_out; - bp::ipstream is_err; + + // boost::process create file descriptor without O_CLOEXEC required for multithreading. + // So create file descriptor with O_CLOEXEC and pass it to boost::process. + int out_fd[2]; + if (pipe2(out_fd, O_CLOEXEC) != 0) { + stat.summary(DiagStatus::ERROR, "pipe2 error"); + stat.add("pipe2", strerror(errno)); + return; + } + bp::pipe out_pipe{out_fd[0], out_fd[1]}; + bp::ipstream is_out{std::move(out_pipe)}; + + int err_fd[2]; + if (pipe2(err_fd, O_CLOEXEC) != 0) { + stat.summary(DiagStatus::ERROR, "pipe2 error"); + stat.add("pipe2", strerror(errno)); + return; + } + bp::pipe err_pipe{err_fd[0], err_fd[1]}; + bp::ipstream is_err{std::move(err_pipe)}; + bp::child c("free -tb", bp::std_out > is_out, bp::std_err > is_err); c.wait(); if (c.exit_code() != 0) { diff --git a/system/system_monitor/src/ntp_monitor/ntp_monitor.cpp b/system/system_monitor/src/ntp_monitor/ntp_monitor.cpp index 49f5ec57eda4c..10dba2f96e164 100644 --- a/system/system_monitor/src/ntp_monitor/ntp_monitor.cpp +++ b/system/system_monitor/src/ntp_monitor/ntp_monitor.cpp @@ -64,9 +64,15 @@ void NTPMonitor::checkOffset(diagnostic_updater::DiagnosticStatusWrapper & stat) } std::string error_str; + std::string pipe2_err_str; float offset = 0.0f; std::map tracking_map; - error_str = executeChronyc(offset, tracking_map); + error_str = executeChronyc(offset, tracking_map, pipe2_err_str); + if (!pipe2_err_str.empty()) { + stat.summary(DiagStatus::ERROR, "pipe2 error"); + stat.add("pipe2", pipe2_err_str); + return; + } if (!error_str.empty()) { stat.summary(DiagStatus::ERROR, "chronyc error"); stat.add("chronyc", error_str); @@ -92,12 +98,23 @@ void NTPMonitor::checkOffset(diagnostic_updater::DiagnosticStatusWrapper & stat) } std::string NTPMonitor::executeChronyc( - float & out_offset, std::map & out_tracking_map) + float & out_offset, std::map & out_tracking_map, + std::string & pipe2_err_str) { std::string result; // Tracking chrony status - bp::ipstream is_out; + + // boost::process create file descriptor without O_CLOEXEC required for multithreading. + // So create file descriptor with O_CLOEXEC and pass it to boost::process. + int out_fd[2]; + if (pipe2(out_fd, O_CLOEXEC) != 0) { + pipe2_err_str = std::string(strerror(errno)); + return result; + } + bp::pipe out_pipe{out_fd[0], out_fd[1]}; + bp::ipstream is_out{std::move(out_pipe)}; + bp::child c("chronyc tracking", bp::std_out > is_out); c.wait(); if (c.exit_code() != 0) { diff --git a/system/system_monitor/src/process_monitor/process_monitor.cpp b/system/system_monitor/src/process_monitor/process_monitor.cpp index 57e1aaf20f571..f78fc00c430dc 100644 --- a/system/system_monitor/src/process_monitor/process_monitor.cpp +++ b/system/system_monitor/src/process_monitor/process_monitor.cpp @@ -34,7 +34,8 @@ ProcessMonitor::ProcessMonitor(const rclcpp::NodeOptions & options) : Node("process_monitor", options), updater_(this), num_of_procs_(declare_parameter("num_of_procs", 5)), - is_top_error_(false) + is_top_error_(false), + is_pipe2_error_(false) { using namespace std::literals::chrono_literals; @@ -67,14 +68,23 @@ void ProcessMonitor::monitorProcesses(diagnostic_updater::DiagnosticStatusWrappe // thread-safe read std::string str; bool is_top_error; + bool is_pipe2_error; double elapsed_ms; { std::lock_guard lock(mutex_); str = top_output_; is_top_error = is_top_error_; + is_pipe2_error = is_pipe2_error_; elapsed_ms = elapsed_ms_; } + if (is_pipe2_error) { + stat.summary(DiagStatus::ERROR, "pipe2 error"); + stat.add("pipe2", str); + setErrorContent(&load_tasks_, "pipe2 error", "pipe2", str); + setErrorContent(&memory_tasks_, "pipe2 error", "pipe2", str); + return; + } if (is_top_error) { stat.summary(DiagStatus::ERROR, "top error"); stat.add("top", str); @@ -107,13 +117,38 @@ void ProcessMonitor::monitorProcesses(diagnostic_updater::DiagnosticStatusWrappe void ProcessMonitor::getTasksSummary( diagnostic_updater::DiagnosticStatusWrapper & stat, const std::string & output) { - bp::pipe p; + // boost::process create file descriptor without O_CLOEXEC required for multithreading. + // So create file descriptor with O_CLOEXEC and pass it to boost::process. + int p_fd[2]; + if (pipe2(p_fd, O_CLOEXEC) != 0) { + stat.summary(DiagStatus::ERROR, "pipe2 error"); + stat.add("pipe2", strerror(errno)); + return; + } + bp::pipe p{p_fd[0], p_fd[1]}; + std::string line; // Echo output for grep { - bp::ipstream is_out; - bp::ipstream is_err; + int out_fd[2]; + if (pipe2(out_fd, O_CLOEXEC) != 0) { + stat.summary(DiagStatus::ERROR, "pipe2 error"); + stat.add("pipe2", strerror(errno)); + return; + } + bp::pipe out_pipe{out_fd[0], out_fd[1]}; + bp::ipstream is_out{std::move(out_pipe)}; + + int err_fd[2]; + if (pipe2(err_fd, O_CLOEXEC) != 0) { + stat.summary(DiagStatus::ERROR, "pipe2 error"); + stat.add("pipe2", strerror(errno)); + return; + } + bp::pipe err_pipe{err_fd[0], err_fd[1]}; + bp::ipstream is_err{std::move(err_pipe)}; + bp::child c(fmt::format("echo {}", output), bp::std_out > p, bp::std_err > is_err); c.wait(); if (c.exit_code() != 0) { @@ -126,7 +161,15 @@ void ProcessMonitor::getTasksSummary( } // Find matching pattern of summary { - bp::ipstream is_out; + int out_fd[2]; + if (pipe2(out_fd, O_CLOEXEC) != 0) { + stat.summary(DiagStatus::ERROR, "pipe2 error"); + stat.add("pipe2", strerror(errno)); + return; + } + bp::pipe out_pipe{out_fd[0], out_fd[1]}; + bp::ipstream is_out{std::move(out_pipe)}; + bp::child c("grep Tasks:", bp::std_out > is_out, bp::std_in < p); c.wait(); // no matching line @@ -158,13 +201,37 @@ void ProcessMonitor::getTasksSummary( void ProcessMonitor::removeHeader( diagnostic_updater::DiagnosticStatusWrapper & stat, std::string & output) { - bp::pipe p1; - bp::pipe p2; + // boost::process create file descriptor without O_CLOEXEC required for multithreading. + // So create file descriptor with O_CLOEXEC and pass it to boost::process. + int p1_fd[2]; + if (pipe2(p1_fd, O_CLOEXEC) != 0) { + stat.summary(DiagStatus::ERROR, "pipe2 error"); + stat.add("pipe2", strerror(errno)); + return; + } + bp::pipe p1{p1_fd[0], p1_fd[1]}; + + int p2_fd[2]; + if (pipe2(p2_fd, O_CLOEXEC) != 0) { + stat.summary(DiagStatus::ERROR, "pipe2 error"); + stat.add("pipe2", strerror(errno)); + return; + } + bp::pipe p2{p2_fd[0], p2_fd[1]}; + std::ostringstream os; // Echo output for sed { - bp::ipstream is_err; + int err_fd[2]; + if (pipe2(err_fd, O_CLOEXEC) != 0) { + stat.summary(DiagStatus::ERROR, "pipe2 error"); + stat.add("pipe2", strerror(errno)); + return; + } + bp::pipe err_pipe{err_fd[0], err_fd[1]}; + bp::ipstream is_err{std::move(err_pipe)}; + bp::child c(fmt::format("echo {}", output), bp::std_out > p1, bp::std_err > is_err); c.wait(); if (c.exit_code() != 0) { @@ -176,7 +243,15 @@ void ProcessMonitor::removeHeader( } // Remove %Cpu section { - bp::ipstream is_err; + int err_fd[2]; + if (pipe2(err_fd, O_CLOEXEC) != 0) { + stat.summary(DiagStatus::ERROR, "pipe2 error"); + stat.add("pipe2", strerror(errno)); + return; + } + bp::pipe err_pipe{err_fd[0], err_fd[1]}; + bp::ipstream is_err{std::move(err_pipe)}; + bp::child c("sed \"/^%Cpu/d\"", bp::std_out > p2, bp::std_err > is_err, bp::std_in < p1); c.wait(); // no matching line @@ -188,8 +263,24 @@ void ProcessMonitor::removeHeader( } // Remove header { - bp::ipstream is_out; - bp::ipstream is_err; + int out_fd[2]; + if (pipe2(out_fd, O_CLOEXEC) != 0) { + stat.summary(DiagStatus::ERROR, "pipe2 error"); + stat.add("pipe2", strerror(errno)); + return; + } + bp::pipe out_pipe{out_fd[0], out_fd[1]}; + bp::ipstream is_out{std::move(out_pipe)}; + + int err_fd[2]; + if (pipe2(err_fd, O_CLOEXEC) != 0) { + stat.summary(DiagStatus::ERROR, "pipe2 error"); + stat.add("pipe2", strerror(errno)); + return; + } + bp::pipe err_pipe{err_fd[0], err_fd[1]}; + bp::ipstream is_err{std::move(err_pipe)}; + bp::child c("sed \"1,6d\"", bp::std_out > is_out, bp::std_err > is_err, bp::std_in < p2); c.wait(); // no matching line @@ -206,12 +297,34 @@ void ProcessMonitor::removeHeader( void ProcessMonitor::getHighLoadProcesses(const std::string & output) { - bp::pipe p; + // boost::process create file descriptor without O_CLOEXEC required for multithreading. + // So create file descriptor with O_CLOEXEC and pass it to boost::process. + int p_fd[2]; + if (pipe2(p_fd, O_CLOEXEC) != 0) { + setErrorContent(&load_tasks_, "pipe2 error", "pipe2", strerror(errno)); + return; + } + bp::pipe p{p_fd[0], p_fd[1]}; + std::ostringstream os; // Echo output for sed - bp::ipstream is_out; - bp::ipstream is_err; + int out_fd[2]; + if (pipe2(out_fd, O_CLOEXEC) != 0) { + setErrorContent(&load_tasks_, "pipe2 error", "pipe2", strerror(errno)); + return; + } + bp::pipe out_pipe{out_fd[0], out_fd[1]}; + bp::ipstream is_out{std::move(out_pipe)}; + + int err_fd[2]; + if (pipe2(err_fd, O_CLOEXEC) != 0) { + setErrorContent(&load_tasks_, "pipe2 error", "pipe2", strerror(errno)); + return; + } + bp::pipe err_pipe{err_fd[0], err_fd[1]}; + bp::ipstream is_err{std::move(err_pipe)}; + bp::child c(fmt::format("echo {}", output), bp::std_out > p, bp::std_err > is_err); c.wait(); if (c.exit_code() != 0) { @@ -226,14 +339,42 @@ void ProcessMonitor::getHighLoadProcesses(const std::string & output) void ProcessMonitor::getHighMemoryProcesses(const std::string & output) { - bp::pipe p1; - bp::pipe p2; + // boost::process create file descriptor without O_CLOEXEC required for multithreading. + // So create file descriptor with O_CLOEXEC and pass it to boost::process. + int p1_fd[2]; + if (pipe2(p1_fd, O_CLOEXEC) != 0) { + setErrorContent(&memory_tasks_, "pipe2 error", "pipe2", strerror(errno)); + return; + } + bp::pipe p1{p1_fd[0], p1_fd[1]}; + + int p2_fd[2]; + if (pipe2(p2_fd, O_CLOEXEC) != 0) { + setErrorContent(&memory_tasks_, "pipe2 error", "pipe2", strerror(errno)); + return; + } + bp::pipe p2{p2_fd[0], p2_fd[1]}; + std::ostringstream os; // Echo output for sed { - bp::ipstream is_out; - bp::ipstream is_err; + int out_fd[2]; + if (pipe2(out_fd, O_CLOEXEC) != 0) { + setErrorContent(&memory_tasks_, "pipe2 error", "pipe2", strerror(errno)); + return; + } + bp::pipe out_pipe{out_fd[0], out_fd[1]}; + bp::ipstream is_out{std::move(out_pipe)}; + + int err_fd[2]; + if (pipe2(err_fd, O_CLOEXEC) != 0) { + setErrorContent(&memory_tasks_, "pipe2 error", "pipe2", strerror(errno)); + return; + } + bp::pipe err_pipe{err_fd[0], err_fd[1]}; + bp::ipstream is_err{std::move(err_pipe)}; + bp::child c(fmt::format("echo {}", output), bp::std_out > p1, bp::std_err > is_err); c.wait(); if (c.exit_code() != 0) { @@ -244,8 +385,22 @@ void ProcessMonitor::getHighMemoryProcesses(const std::string & output) } // Sort by memory usage { - bp::ipstream is_out; - bp::ipstream is_err; + int out_fd[2]; + if (pipe2(out_fd, O_CLOEXEC) != 0) { + setErrorContent(&memory_tasks_, "pipe2 error", "pipe2", strerror(errno)); + return; + } + bp::pipe out_pipe{out_fd[0], out_fd[1]}; + bp::ipstream is_out{std::move(out_pipe)}; + + int err_fd[2]; + if (pipe2(err_fd, O_CLOEXEC) != 0) { + setErrorContent(&memory_tasks_, "pipe2 error", "pipe2", strerror(errno)); + return; + } + bp::pipe err_pipe{err_fd[0], err_fd[1]}; + bp::ipstream is_err{std::move(err_pipe)}; + bp::child c("sort -r -k 10", bp::std_out > p2, bp::std_err > is_err, bp::std_in < p1); c.wait(); if (c.exit_code() != 0) { @@ -266,8 +421,22 @@ void ProcessMonitor::getTopratedProcesses( return; } - bp::ipstream is_out; - bp::ipstream is_err; + int out_fd[2]; + if (pipe2(out_fd, O_CLOEXEC) != 0) { + setErrorContent(tasks, "pipe2 error", "pipe2", strerror(errno)); + return; + } + bp::pipe out_pipe{out_fd[0], out_fd[1]}; + bp::ipstream is_out{std::move(out_pipe)}; + + int err_fd[2]; + if (pipe2(err_fd, O_CLOEXEC) != 0) { + setErrorContent(tasks, "pipe2 error", "pipe2", strerror(errno)); + return; + } + bp::pipe err_pipe{err_fd[0], err_fd[1]}; + bp::ipstream is_err{std::move(err_pipe)}; + std::ostringstream os; bp::child c( @@ -329,8 +498,26 @@ void ProcessMonitor::onTimer() tier4_autoware_utils::StopWatch stop_watch; stop_watch.tic("execution_time"); - bp::ipstream is_err; - bp::ipstream is_out; + int out_fd[2]; + if (pipe2(out_fd, O_CLOEXEC) != 0) { + std::lock_guard lock(mutex_); + top_output_ = std::string(strerror(errno)); + is_pipe2_error_ = true; + return; + } + bp::pipe out_pipe{out_fd[0], out_fd[1]}; + bp::ipstream is_out{std::move(out_pipe)}; + + int err_fd[2]; + if (pipe2(err_fd, O_CLOEXEC) != 0) { + std::lock_guard lock(mutex_); + top_output_ = std::string(strerror(errno)); + is_pipe2_error_ = true; + return; + } + bp::pipe err_pipe{err_fd[0], err_fd[1]}; + bp::ipstream is_err{std::move(err_pipe)}; + std::ostringstream os; // Get processes @@ -351,6 +538,7 @@ void ProcessMonitor::onTimer() std::lock_guard lock(mutex_); top_output_ = os.str(); is_top_error_ = is_top_error; + is_pipe2_error_ = false; elapsed_ms_ = elapsed_ms; } }