From 6378f35f17fc69aeb5d16e741bc627fee13cce9d Mon Sep 17 00:00:00 2001 From: Marcin S Date: Sun, 15 Jan 2023 17:00:41 -0500 Subject: [PATCH 01/13] Add getrusage and memory tracker for precheck preparation --- Cargo.lock | 6 +- node/core/pvf/Cargo.toml | 8 +- node/core/pvf/src/host.rs | 8 +- node/core/pvf/src/prepare/memory_stats.rs | 54 +++++ node/core/pvf/src/prepare/mod.rs | 12 +- node/core/pvf/src/prepare/pool.rs | 27 ++- node/core/pvf/src/prepare/queue.rs | 28 ++- node/core/pvf/src/prepare/worker.rs | 273 +++++++++++++++++++--- 8 files changed, 364 insertions(+), 52 deletions(-) create mode 100644 node/core/pvf/src/prepare/memory_stats.rs diff --git a/Cargo.lock b/Cargo.lock index 90701f6a7566..bdf4c2ed4b17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3584,9 +3584,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.126" +version = "0.2.139" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836" +checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" [[package]] name = "libgit2-sys" @@ -6789,6 +6789,7 @@ dependencies = [ "futures", "futures-timer", "hex-literal", + "libc", "parity-scale-codec", "pin-project", "polkadot-core-primitives", @@ -6809,6 +6810,7 @@ dependencies = [ "tempfile", "test-parachain-adder", "test-parachain-halt", + "tikv-jemalloc-ctl", "tokio", "tracing-gum", ] diff --git a/node/core/pvf/Cargo.toml b/node/core/pvf/Cargo.toml index e00092826428..9c55aa000b42 100644 --- a/node/core/pvf/Cargo.toml +++ b/node/core/pvf/Cargo.toml @@ -14,13 +14,14 @@ assert_matches = "1.4.0" cpu-time = "1.0.0" futures = "0.3.21" futures-timer = "3.0.2" -slotmap = "1.0" gum = { package = "tracing-gum", path = "../../gum" } pin-project = "1.0.9" rand = "0.8.5" +rayon = "1.5.1" +slotmap = "1.0" tempfile = "3.3.0" +tikv-jemalloc-ctl = "0.5.0" tokio = { version = "1.22.0", features = ["fs", "process"] } -rayon = "1.5.1" parity-scale-codec = { version = "3.1.5", default-features = false, features = ["derive"] } @@ -38,6 +39,9 @@ sp-wasm-interface = { git = "https://github.com/paritytech/substrate", branch = sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" } +[target.'cfg(target_os = "linux")'.dependencies] +libc = "0.2.139" + [dev-dependencies] adder = { package = "test-parachain-adder", path = "../../../parachain/test-parachains/adder" } halt = { package = "test-parachain-halt", path = "../../../parachain/test-parachains/halt" } diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 3514aff1d896..c85d54eb46e0 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -25,7 +25,8 @@ use crate::{ error::PrepareError, execute, metrics::Metrics, - prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET, + prepare::{self, PreparationKind}, + PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET, }; use always_assert::never; use futures::{ @@ -476,6 +477,7 @@ async fn handle_precheck_pvf( priority: Priority::Normal, pvf, preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, + preparation_kind: PreparationKind::PreCheck, }, ) .await?; @@ -547,6 +549,7 @@ async fn handle_execute_pvf( priority, pvf, preparation_timeout: LENIENT_PREPARATION_TIMEOUT, + preparation_kind: PreparationKind::FromExecutionRequest, }, ) .await?; @@ -569,6 +572,7 @@ async fn handle_execute_pvf( priority, pvf, preparation_timeout: LENIENT_PREPARATION_TIMEOUT, + preparation_kind: PreparationKind::FromExecutionRequest, }, ) .await?; @@ -621,6 +625,7 @@ async fn handle_heads_up( priority: Priority::Normal, pvf: active_pvf, preparation_timeout: LENIENT_PREPARATION_TIMEOUT, + preparation_kind: PreparationKind::FromHeadsUpRequest, }, ) .await?; @@ -637,6 +642,7 @@ async fn handle_heads_up( priority: Priority::Normal, pvf: active_pvf, preparation_timeout: LENIENT_PREPARATION_TIMEOUT, + preparation_kind: PreparationKind::FromHeadsUpRequest, }, ) .await?; diff --git a/node/core/pvf/src/prepare/memory_stats.rs b/node/core/pvf/src/prepare/memory_stats.rs new file mode 100644 index 000000000000..fb06a1007d00 --- /dev/null +++ b/node/core/pvf/src/prepare/memory_stats.rs @@ -0,0 +1,54 @@ +// Copyright 2023 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use parity_scale_codec::{Decode, Encode}; +use tikv_jemalloc_ctl::{epoch, stats, Error}; + +#[derive(Clone)] +pub struct MemoryAllocationTracker { + epoch: tikv_jemalloc_ctl::epoch_mib, + allocated: stats::allocated_mib, + resident: stats::resident_mib, +} + +impl MemoryAllocationTracker { + pub fn new() -> Result { + Ok(Self { + epoch: epoch::mib()?, + allocated: stats::allocated::mib()?, + resident: stats::resident::mib()?, + }) + } + + pub fn snapshot(&self) -> Result { + // update stats by advancing the allocation epoch + self.epoch.advance()?; + + let allocated: u64 = self.allocated.read()? as _; + let resident: u64 = self.resident.read()? as _; + Ok(MemoryAllocationStats { allocated, resident }) + } +} + +/// Statistics of collected memory metrics. +#[non_exhaustive] +#[derive(Clone, Debug, Default, Encode, Decode)] +pub struct MemoryAllocationStats { + /// Total resident memory, in bytes. + pub resident: u64, + /// Total allocated memory, in bytes. + pub allocated: u64, +} diff --git a/node/core/pvf/src/prepare/mod.rs b/node/core/pvf/src/prepare/mod.rs index ac03cefc6fdb..2245ec54d953 100644 --- a/node/core/pvf/src/prepare/mod.rs +++ b/node/core/pvf/src/prepare/mod.rs @@ -1,4 +1,4 @@ -// Copyright 2021 Parity Technologies (UK) Ltd. +// Copyright 2021-2023 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify @@ -22,6 +22,7 @@ //! The pool will spawn workers in new processes and those should execute pass control to //! [`worker_entrypoint`]. +mod memory_stats; mod pool; mod queue; mod worker; @@ -29,3 +30,12 @@ mod worker; pub use pool::start as start_pool; pub use queue::{start as start_queue, FromQueue, ToQueue}; pub use worker::worker_entrypoint; + +use parity_scale_codec::{Decode, Encode}; + +#[derive(Clone, Copy, Debug, Eq, PartialEq, Encode, Decode)] +pub enum PreparationKind { + PreCheck, + FromExecutionRequest, + FromHeadsUpRequest, +} diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 0d39623c99db..ce37afc98b2b 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -14,7 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use super::worker::{self, Outcome}; +use super::{ + worker::{self, Outcome}, + PreparationKind, +}; use crate::{ error::{PrepareError, PrepareResult}, metrics::Metrics, @@ -70,6 +73,7 @@ pub enum ToPool { code: Arc>, artifact_path: PathBuf, preparation_timeout: Duration, + preparation_kind: PreparationKind, }, } @@ -214,7 +218,13 @@ fn handle_to_pool( metrics.prepare_worker().on_begin_spawn(); mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed()); }, - ToPool::StartWork { worker, code, artifact_path, preparation_timeout } => { + ToPool::StartWork { + worker, + code, + artifact_path, + preparation_timeout, + preparation_kind, + } => { if let Some(data) = spawned.get_mut(worker) { if let Some(idle) = data.idle.take() { let preparation_timer = metrics.time_preparation(); @@ -226,6 +236,7 @@ fn handle_to_pool( cache_path.to_owned(), artifact_path, preparation_timeout, + preparation_kind, preparation_timer, ) .boxed(), @@ -274,10 +285,18 @@ async fn start_work_task( cache_path: PathBuf, artifact_path: PathBuf, preparation_timeout: Duration, + preparation_kind: PreparationKind, _preparation_timer: Option, ) -> PoolEvent { - let outcome = - worker::start_work(idle, code, &cache_path, artifact_path, preparation_timeout).await; + let outcome = worker::start_work( + idle, + code, + &cache_path, + artifact_path, + preparation_timeout, + preparation_kind, + ) + .await; PoolEvent::StartWork(worker, outcome) } diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index c44301c7427b..91e6bfa3c7a1 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -16,7 +16,10 @@ //! A queue that handles requests for PVF preparation. -use super::pool::{self, Worker}; +use super::{ + pool::{self, Worker}, + PreparationKind, +}; use crate::{artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, Pvf, LOG_TARGET}; use always_assert::{always, never}; use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt}; @@ -33,7 +36,12 @@ pub enum ToQueue { /// /// Note that it is incorrect to enqueue the same PVF again without first receiving the /// [`FromQueue`] response. - Enqueue { priority: Priority, pvf: Pvf, preparation_timeout: Duration }, + Enqueue { + priority: Priority, + pvf: Pvf, + preparation_timeout: Duration, + preparation_kind: PreparationKind, + }, } /// A response from queue. @@ -81,6 +89,8 @@ struct JobData { pvf: Pvf, /// The timeout for the preparation job. preparation_timeout: Duration, + /// Are we preparing because of a pre-check, execution, or heads-up request? + preparation_kind: PreparationKind, worker: Option, } @@ -208,8 +218,8 @@ impl Queue { async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fatal> { match to_queue { - ToQueue::Enqueue { priority, pvf, preparation_timeout } => { - handle_enqueue(queue, priority, pvf, preparation_timeout).await?; + ToQueue::Enqueue { priority, pvf, preparation_timeout, preparation_kind } => { + handle_enqueue(queue, priority, pvf, preparation_timeout, preparation_kind).await?; }, } Ok(()) @@ -220,6 +230,7 @@ async fn handle_enqueue( priority: Priority, pvf: Pvf, preparation_timeout: Duration, + preparation_kind: PreparationKind, ) -> Result<(), Fatal> { gum::debug!( target: LOG_TARGET, @@ -247,7 +258,13 @@ async fn handle_enqueue( return Ok(()) } - let job = queue.jobs.insert(JobData { priority, pvf, preparation_timeout, worker: None }); + let job = queue.jobs.insert(JobData { + priority, + pvf, + preparation_timeout, + preparation_kind, + worker: None, + }); queue.artifact_id_to_job.insert(artifact_id, job); if let Some(available) = find_idle_worker(queue) { @@ -438,6 +455,7 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal code: job_data.pvf.code.clone(), artifact_path, preparation_timeout: job_data.preparation_timeout, + preparation_kind: job_data.preparation_kind, }, ) .await?; diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index d3550fe3afe6..e1f1875628fa 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -14,9 +14,11 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use super::memory_stats::{MemoryAllocationStats, MemoryAllocationTracker}; use crate::{ artifacts::CompiledArtifact, error::{PrepareError, PrepareResult}, + prepare::PreparationKind, worker_common::{ bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, @@ -31,10 +33,16 @@ use sp_core::hexdisplay::HexDisplay; use std::{ panic, path::{Path, PathBuf}, - sync::{mpsc::channel, Arc}, + sync::{ + mpsc::{channel, Receiver, RecvTimeoutError, Sender}, + Arc, + }, time::Duration, }; -use tokio::{io, net::UnixStream}; +use tokio::{io, net::UnixStream, task::JoinHandle}; + +#[cfg(target_os = "linux")] +use libc::{getrusage, rusage, timeval, RUSAGE_THREAD}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// @@ -78,6 +86,7 @@ pub async fn start_work( cache_path: &Path, artifact_path: PathBuf, preparation_timeout: Duration, + preparation_kind: PreparationKind, ) -> Outcome { let IdleWorker { stream, pid } = worker; @@ -89,7 +98,9 @@ pub async fn start_work( ); with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move { - if let Err(err) = send_request(&mut stream, code, &tmp_file, preparation_timeout).await { + if let Err(err) = + send_request(&mut stream, code, &tmp_file, preparation_timeout, preparation_kind).await + { gum::warn!( target: LOG_TARGET, worker_pid = %pid, @@ -109,14 +120,15 @@ pub async fn start_work( // load, but the CPU resources of the child can only be measured from the parent after the // child process terminates. let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR; - let result = tokio::time::timeout(timeout, framed_recv(&mut stream)).await; + let result = tokio::time::timeout(timeout, recv_response(&mut stream, pid)).await; match result { // Received bytes from worker within the time limit. - Ok(Ok(response_bytes)) => - handle_response_bytes( + Ok(Ok((prepare_result, memory_stats))) => + handle_response( IdleWorker { stream, pid }, - response_bytes, + prepare_result, + memory_stats, pid, tmp_file, artifact_path, @@ -151,29 +163,15 @@ pub async fn start_work( /// /// NOTE: Here we know the artifact exists, but is still located in a temporary file which will be /// cleared by `with_tmp_file`. -async fn handle_response_bytes( +async fn handle_response( worker: IdleWorker, - response_bytes: Vec, + result: PrepareResult, + memory_stats: MemoryAllocationStats, pid: u32, tmp_file: PathBuf, artifact_path: PathBuf, preparation_timeout: Duration, ) -> Outcome { - // By convention we expect encoded `PrepareResult`. - let result = match PrepareResult::decode(&mut response_bytes.as_slice()) { - Ok(result) => result, - Err(err) => { - // We received invalid bytes from the worker. - let bound_bytes = &response_bytes[..response_bytes.len().min(4)]; - gum::warn!( - target: LOG_TARGET, - worker_pid = %pid, - "received unexpected response from the prepare worker: {}", - HexDisplay::from(&bound_bytes), - ); - return Outcome::IoErr(err.to_string()) - }, - }; let cpu_time_elapsed = match result { Ok(result) => result, // Timed out on the child. This should already be logged by the child. @@ -271,14 +269,18 @@ async fn send_request( code: Arc>, tmp_file: &Path, preparation_timeout: Duration, + preparation_kind: PreparationKind, ) -> io::Result<()> { framed_send(stream, &code).await?; framed_send(stream, path_to_bytes(tmp_file)).await?; framed_send(stream, &preparation_timeout.encode()).await?; + framed_send(stream, &preparation_kind.encode()).await?; Ok(()) } -async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, PathBuf, Duration)> { +async fn recv_request( + stream: &mut UnixStream, +) -> io::Result<(Vec, PathBuf, Duration, PreparationKind)> { let code = framed_recv(stream).await?; let tmp_file = framed_recv(stream).await?; let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| { @@ -288,46 +290,122 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, PathBuf, ) })?; let preparation_timeout = framed_recv(stream).await?; - let preparation_timeout = Duration::decode(&mut &preparation_timeout[..]).map_err(|_| { + let preparation_timeout = Duration::decode(&mut &preparation_timeout[..]).map_err(|e| { io::Error::new( io::ErrorKind::Other, - "prepare pvf recv_request: failed to decode duration".to_string(), + format!("prepare pvf recv_request: failed to decode duration: {:?}", e), ) })?; - Ok((code, tmp_file, preparation_timeout)) + let preparation_kind = framed_recv(stream).await?; + let preparation_kind = PreparationKind::decode(&mut &preparation_kind[..]).map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("prepare pvf recv_request: failed to decode preparation kind: {:?}", e), + ) + })?; + Ok((code, tmp_file, preparation_timeout, preparation_kind)) +} + +async fn send_response( + stream: &mut UnixStream, + result: PrepareResult, + memory_stats: Option, +) -> io::Result<()> { + framed_send(stream, &result.encode()).await?; + framed_send(stream, &memory_stats.encode()).await +} + +async fn recv_response( + stream: &mut UnixStream, + pid: u32, +) -> io::Result<(PrepareResult, MemoryAllocationStats)> { + let result = framed_recv(stream).await?; + let result = PrepareResult::decode(&mut &result[..]).map_err(|e| { + // We received invalid bytes from the worker. + let bound_bytes = &result[..result.len().min(4)]; + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "received unexpected response from the prepare worker: {}", + HexDisplay::from(&bound_bytes), + ); + io::Error::new( + io::ErrorKind::Other, + format!("prepare pvf recv_response: failed to decode result: {:?}", e), + ) + })?; + let memory_stats = framed_recv(stream).await?; + let memory_stats = MemoryAllocationStats::decode(&mut &memory_stats[..]).map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("prepare pvf recv_response: failed to decode memory stats: {:?}", e), + ) + })?; + Ok((result, memory_stats)) } /// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies /// the path to the socket used to communicate with the host. +/// +/// # Flow +/// +/// This runs the following in a loop: +/// +/// 1. Get the code and parameters for preparation from the host. +/// +/// 2. If we are pre-checking, start a memory tracker in a separate thread. +/// +/// 3. Start the CPU time monitor loop and the actual preparation in two separate threads. +/// +/// 4. Select on the two threads created in step 3. If the CPU timeout was hit, the CPU time monitor +/// thread will trigger first. +/// +/// 5. If compilation succeeded, write the compiled artifact into a temporary file. +/// +/// 6. If we are pre-checking, stop the memory tracker and get the stats. +/// +/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we +/// send that in the `PrepareResult`. pub fn worker_entrypoint(socket_path: &str) { worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move { loop { - let (code, dest, preparation_timeout) = recv_request(&mut stream).await?; + let (code, dest, preparation_timeout, preparation_kind) = + recv_request(&mut stream).await?; gum::debug!( target: LOG_TARGET, worker_pid = %std::process::id(), "worker: preparing artifact", ); - // Used to signal to the cpu time monitor thread that it can finish. - let (finished_tx, finished_rx) = channel::<()>(); let cpu_time_start = ProcessTime::now(); + // If we are pre-checking, run the memory tracker. + let memory_tracker_info = if let PreparationKind::PreCheck = preparation_kind { + let (memory_tracker_tx, memory_tracker_rx) = channel::<()>(); + let memory_tracker_fut = + rt_handle.spawn_blocking(move || memory_tracker_loop(memory_tracker_rx)); + Some((memory_tracker_fut, memory_tracker_tx)) + } else { + None + }; + // Spawn a new thread that runs the CPU time monitor. - let thread_fut = rt_handle + let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>(); + let cpu_time_monitor_fut = rt_handle .spawn_blocking(move || { - cpu_time_monitor_loop(cpu_time_start, preparation_timeout, finished_rx) + cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx) }) .fuse(); + // Spawn another thread for preparation. let prepare_fut = rt_handle.spawn_blocking(move || prepare_artifact(&code)).fuse(); - pin_mut!(thread_fut); + pin_mut!(cpu_time_monitor_fut); pin_mut!(prepare_fut); let result = select_biased! { // If this future is not selected, the join handle is dropped and the thread will // finish in the background. - join_res = thread_fut => { + join_res = cpu_time_monitor_fut => { match join_res { Ok(Some(cpu_time_elapsed)) => { // Log if we exceed the timeout and the other thread hasn't finished. @@ -346,7 +424,7 @@ pub fn worker_entrypoint(socket_path: &str) { }, compilation_res = prepare_fut => { let cpu_time_elapsed = cpu_time_start.elapsed(); - let _ = finished_tx.send(()); + let _ = cpu_time_monitor_tx.send(()); match compilation_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) { Err(err) => { @@ -375,7 +453,15 @@ pub fn worker_entrypoint(socket_path: &str) { }, }; - framed_send(&mut stream, result.encode().as_slice()).await?; + // Stop the memory stats worker and get its observed memory stats. + let memory_stats = + if let Some((memory_tracker_fut, memory_tracker_tx)) = memory_tracker_info { + get_memory_tracker_stats(memory_tracker_fut, memory_tracker_tx).await + } else { + None + }; + + send_response(&mut stream, result, memory_stats).await?; } }); } @@ -397,3 +483,116 @@ fn prepare_artifact(code: &[u8]) -> Result { }) .and_then(|inner_result| inner_result) } + +/// Get the rusage stats for the current thread. +#[cfg(target_os = "linux")] +fn getrusage_thread() -> io::Result { + let mut result = rusage { + ru_utime: timeval { tv_sec: 0, tv_usec: 0 }, + ru_stime: timeval { tv_sec: 0, tv_usec: 0 }, + ru_maxrss: 0, + ru_ixrss: 0, + ru_idrss: 0, + ru_isrss: 0, + ru_minflt: 0, + ru_majflt: 0, + ru_nswap: 0, + ru_inblock: 0, + ru_oublock: 0, + ru_msgsnd: 0, + ru_msgrcv: 0, + ru_nsignals: 0, + ru_nvcsw: 0, + ru_nivcsw: 0, + }; + if unsafe { getrusage(RUSAGE_THREAD, &mut result) } == -1 { + return Err(io::Error::last_os_error()) + } + Ok(result) +} + +/// Runs a thread in the background that observes memory statistics. The goal is to try to get an +/// accurate stats during pre-checking. +/// +/// # Algorithm +/// +/// 1. Create the memory tracker. +/// +/// 2. Sleep for some short interval. Whenever we wake up, take a snapshot by updating the +/// allocation epoch. +/// +/// 3. When we receive a signal that preparation has completed, take one last snapshot and return +/// the maximum observed values. +fn memory_tracker_loop(finished_rx: Receiver<()>) -> Result { + const POLL_INTERVAL: Duration = Duration::from_millis(10); + + let tracker = MemoryAllocationTracker::new().map_err(|err| err.to_string())?; + let mut max_stats = MemoryAllocationStats::default(); + + let mut update_stats = || -> Result<(), String> { + let current_stats = tracker.snapshot().map_err(|err| err.to_string())?; + if current_stats.resident > max_stats.resident { + max_stats.resident = current_stats.resident; + } + if current_stats.allocated > max_stats.allocated { + max_stats.allocated = current_stats.allocated; + } + Ok(()) + }; + + loop { + // Take a snapshot and update the max stats. + update_stats()?; + + // Sleep. + match finished_rx.recv_timeout(POLL_INTERVAL) { + // Received finish signal. + Ok(()) => { + update_stats()?; + return Ok(max_stats) + }, + // Timed out, restart loop. + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => + return Err("memory_tracker_loop: finished_rx disconnected".into()), + } + } +} + +/// Helper function to get the stats from the memory tracker thread. Helps isolate all this error +/// handling. +async fn get_memory_tracker_stats( + fut: JoinHandle>, + tx: Sender<()>, +) -> Option { + // Signal to the memory tracker thread to terminate. + if let Err(err) = tx.send(()) { + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "worker: error sending signal to memory tracker_thread: {}", err + ); + None + } else { + // Join on the thread handle. + match fut.await { + Ok(Ok(stats)) => Some(stats), + Ok(Err(err)) => { + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "worker: error occurred in the memory tracker thread: {}", err + ); + None + }, + Err(err) => { + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "worker: error joining on memory tracker thread: {}", err + ); + None + }, + } + } +} From 488bb06a30eec6fee6439a4aea3221a30545a35b Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 16 Jan 2023 16:46:37 -0500 Subject: [PATCH 02/13] Log memory stats metrics after prechecking --- node/core/pvf/src/metrics.rs | 51 ++++++ node/core/pvf/src/prepare/memory_stats.rs | 200 +++++++++++++++++++-- node/core/pvf/src/prepare/pool.rs | 3 + node/core/pvf/src/prepare/worker.rs | 207 +++++++--------------- node/overseer/src/memory_stats.rs | 8 +- node/overseer/src/metrics.rs | 4 +- 6 files changed, 309 insertions(+), 164 deletions(-) diff --git a/node/core/pvf/src/metrics.rs b/node/core/pvf/src/metrics.rs index 8db105d895ea..3d2154904cb0 100644 --- a/node/core/pvf/src/metrics.rs +++ b/node/core/pvf/src/metrics.rs @@ -72,6 +72,27 @@ impl Metrics { pub(crate) fn time_execution(&self) -> Option { self.0.as_ref().map(|metrics| metrics.execution_time.start_timer()) } + + /// Observe max_rss for precheck preparation. + pub(crate) fn observe_precheck_max_rss(&self, max_rss: f64) { + if let Some(metrics) = &self.0 { + metrics.precheck_max_rss.observe(max_rss); + } + } + + /// Observe max resident memory for precheck preparation. + pub(crate) fn observe_precheck_max_resident(&self, max_resident_kb: f64) { + if let Some(metrics) = &self.0 { + metrics.precheck_max_resident.observe(max_resident_kb); + } + } + + /// Observe max allocated memory for precheck preparation. + pub(crate) fn observe_precheck_max_allocated(&self, max_allocated_kb: f64) { + if let Some(metrics) = &self.0 { + metrics.precheck_max_allocated.observe(max_allocated_kb); + } + } } #[derive(Clone)] @@ -85,6 +106,9 @@ struct MetricsInner { execute_finished: prometheus::Counter, preparation_time: prometheus::Histogram, execution_time: prometheus::Histogram, + precheck_max_rss: prometheus::Histogram, + precheck_max_allocated: prometheus::Histogram, + precheck_max_resident: prometheus::Histogram, } impl metrics::Metrics for Metrics { @@ -202,6 +226,33 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + precheck_max_rss: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "polkadot_pvf_precheck_max_rss", + "max_rss (maximum resident set size) observed for precheck preparation (in kilobytes)", + ) + )?, + registry, + )?, + precheck_max_resident: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "polkadot_pvf_precheck_max_resident", + "max resident memory observed for precheck preparation (in kilobytes)", + ) + )?, + registry, + )?, + precheck_max_allocated: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "polkadot_pvf_precheck_max_allocated", + "max allocated memory observed for precheck preparation (in kilobytes)", + ) + )?, + registry, + )?, }; Ok(Metrics(Some(inner))) } diff --git a/node/core/pvf/src/prepare/memory_stats.rs b/node/core/pvf/src/prepare/memory_stats.rs index fb06a1007d00..35e057e5f0cc 100644 --- a/node/core/pvf/src/prepare/memory_stats.rs +++ b/node/core/pvf/src/prepare/memory_stats.rs @@ -14,11 +14,41 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use crate::{metrics::Metrics, LOG_TARGET}; use parity_scale_codec::{Decode, Encode}; +use std::{ + io, + sync::mpsc::{Receiver, RecvTimeoutError, Sender}, + time::Duration, +}; use tikv_jemalloc_ctl::{epoch, stats, Error}; +use tokio::task::JoinHandle; + +#[cfg(target_os = "linux")] +use libc::{getrusage, rusage, timeval, RUSAGE_THREAD}; + +/// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if +/// supported by the OS, `max_rss`. +#[derive(Encode, Decode)] +pub struct MemoryStats { + /// Memory stats from `tikv_jemalloc_ctl`. + pub memory_tracker_stats: Option, + /// `max_rss` from `getrusage`. A string error since `io::Error` is not `Encode`able. + pub max_rss: Option>, +} + +/// Statistics of collected memory metrics. +#[non_exhaustive] +#[derive(Clone, Debug, Default, Encode, Decode)] +pub struct MemoryAllocationStats { + /// Total resident memory, in bytes. + pub resident: u64, + /// Total allocated memory, in bytes. + pub allocated: u64, +} #[derive(Clone)] -pub struct MemoryAllocationTracker { +struct MemoryAllocationTracker { epoch: tikv_jemalloc_ctl::epoch_mib, allocated: stats::allocated_mib, resident: stats::resident_mib, @@ -37,18 +67,166 @@ impl MemoryAllocationTracker { // update stats by advancing the allocation epoch self.epoch.advance()?; - let allocated: u64 = self.allocated.read()? as _; - let resident: u64 = self.resident.read()? as _; + // Convert to `u64`, as `usize` is not `Encode`able. + let allocated = self.allocated.read()? as u64; + let resident = self.resident.read()? as u64; Ok(MemoryAllocationStats { allocated, resident }) } } -/// Statistics of collected memory metrics. -#[non_exhaustive] -#[derive(Clone, Debug, Default, Encode, Decode)] -pub struct MemoryAllocationStats { - /// Total resident memory, in bytes. - pub resident: u64, - /// Total allocated memory, in bytes. - pub allocated: u64, +/// Get the rusage stats for the current thread. +#[cfg(target_os = "linux")] +fn getrusage_thread() -> io::Result { + let mut result = rusage { + ru_utime: timeval { tv_sec: 0, tv_usec: 0 }, + ru_stime: timeval { tv_sec: 0, tv_usec: 0 }, + ru_maxrss: 0, + ru_ixrss: 0, + ru_idrss: 0, + ru_isrss: 0, + ru_minflt: 0, + ru_majflt: 0, + ru_nswap: 0, + ru_inblock: 0, + ru_oublock: 0, + ru_msgsnd: 0, + ru_msgrcv: 0, + ru_nsignals: 0, + ru_nvcsw: 0, + ru_nivcsw: 0, + }; + if unsafe { getrusage(RUSAGE_THREAD, &mut result) } == -1 { + return Err(io::Error::last_os_error().to_string()) + } + Ok(result) +} + +/// Gets the `max_rss` for the current thread if the OS supports `getrusage`. Otherwise, just +/// returns `None`. +pub fn get_max_rss_thread() -> Option> { + #[cfg(target_os = "linux")] + let max_rss = Some(getrusage_thread().map(|rusage| max_rss)); + #[cfg(not(target_os = "linux"))] + let max_rss = None; + max_rss +} + +/// Runs a thread in the background that observes memory statistics. The goal is to try to get an +/// accurate stats during pre-checking. +/// +/// # Algorithm +/// +/// 1. Create the memory tracker. +/// +/// 2. Sleep for some short interval. Whenever we wake up, take a snapshot by updating the +/// allocation epoch. +/// +/// 3. When we receive a signal that preparation has completed, take one last snapshot and return +/// the maximum observed values. +/// +/// # Errors +/// +/// For simplicity, any errors are returned as a string. As this is not a critical component, errors +/// are used for informational purposes (logging) only. +pub fn memory_tracker_loop(finished_rx: Receiver<()>) -> Result { + const POLL_INTERVAL: Duration = Duration::from_millis(10); + + let tracker = MemoryAllocationTracker::new().map_err(|err| err.to_string())?; + let mut max_stats = MemoryAllocationStats::default(); + + let mut update_stats = || -> Result<(), String> { + let current_stats = tracker.snapshot().map_err(|err| err.to_string())?; + if current_stats.resident > max_stats.resident { + max_stats.resident = current_stats.resident; + } + if current_stats.allocated > max_stats.allocated { + max_stats.allocated = current_stats.allocated; + } + Ok(()) + }; + + loop { + // Take a snapshot and update the max stats. + update_stats()?; + + // Sleep. + match finished_rx.recv_timeout(POLL_INTERVAL) { + // Received finish signal. + Ok(()) => { + update_stats()?; + return Ok(max_stats) + }, + // Timed out, restart loop. + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => + return Err("memory_tracker_loop: finished_rx disconnected".into()), + } + } +} + +/// Helper function to terminate the memory tracker thread and get the stats. Helps isolate all this +/// error handling. +pub async fn get_memory_tracker_loop_stats( + fut: JoinHandle>, + tx: Sender<()>, +) -> Option { + // Signal to the memory tracker thread to terminate. + if let Err(err) = tx.send(()) { + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "worker: error sending signal to memory tracker_thread: {}", err + ); + None + } else { + // Join on the thread handle. + match fut.await { + Ok(Ok(stats)) => Some(stats), + Ok(Err(err)) => { + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "worker: error occurred in the memory tracker thread: {}", err + ); + None + }, + Err(err) => { + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "worker: error joining on memory tracker thread: {}", err + ); + None + }, + } + } +} + +/// Helper function to send the memory metrics, if available, to prometheus. +pub fn observe_memory_metrics(metrics: &Metrics, memory_stats: MemoryStats, pid: u32) { + if let Some(max_rss) = memory_stats.max_rss { + match max_rss { + Ok(max_rss) => metrics.observe_precheck_max_rss(f64::from(max_rss)), + Err(err) => gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "error getting `max_rss` in preparation thread: {}", + err + ), + } + } + + if let Some(tracker_stats) = memory_stats.memory_tracker_stats { + // We convert these stats from B to KB for two reasons: + // + // 1. To match the unit of `max_rss` from `getrusage`. + // + // 2. To have less potential loss of precision when converting to `f64`. (These values are + // originally `usize`, which is 64 bits on 64-bit platforms). + let resident_kb = (tracker_stats.resident / 1000) as f64; + let allocated_kb = (tracker_stats.allocated / 1000) as f64; + + metrics.observe_precheck_max_resident(resident_kb); + metrics.observe_precheck_max_allocated(allocated_kb); + } } diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index ce37afc98b2b..fe8a286b96c1 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -230,6 +230,7 @@ fn handle_to_pool( let preparation_timer = metrics.time_preparation(); mux.push( start_work_task( + metrics.clone(), worker, idle, code, @@ -279,6 +280,7 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Po } async fn start_work_task( + metrics: Metrics, worker: Worker, idle: IdleWorker, code: Arc>, @@ -289,6 +291,7 @@ async fn start_work_task( _preparation_timer: Option, ) -> PoolEvent { let outcome = worker::start_work( + &metrics, idle, code, &cache_path, diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index e1f1875628fa..b75c51a6c5a7 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -14,10 +14,14 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use super::memory_stats::{MemoryAllocationStats, MemoryAllocationTracker}; +use super::memory_stats::{ + get_max_rss_thread, get_memory_tracker_loop_stats, memory_tracker_loop, observe_memory_metrics, + MemoryStats, +}; use crate::{ artifacts::CompiledArtifact, error::{PrepareError, PrepareResult}, + metrics::Metrics, prepare::PreparationKind, worker_common::{ bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, @@ -33,16 +37,10 @@ use sp_core::hexdisplay::HexDisplay; use std::{ panic, path::{Path, PathBuf}, - sync::{ - mpsc::{channel, Receiver, RecvTimeoutError, Sender}, - Arc, - }, + sync::{mpsc::channel, Arc}, time::Duration, }; -use tokio::{io, net::UnixStream, task::JoinHandle}; - -#[cfg(target_os = "linux")] -use libc::{getrusage, rusage, timeval, RUSAGE_THREAD}; +use tokio::{io, net::UnixStream}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// @@ -81,6 +79,7 @@ pub enum Outcome { /// NOTE: Returning the `TimedOut`, `IoErr` or `Unreachable` outcomes will trigger the child process /// being killed. pub async fn start_work( + metrics: &Metrics, worker: IdleWorker, code: Arc>, cache_path: &Path, @@ -126,6 +125,7 @@ pub async fn start_work( // Received bytes from worker within the time limit. Ok(Ok((prepare_result, memory_stats))) => handle_response( + metrics, IdleWorker { stream, pid }, prepare_result, memory_stats, @@ -164,9 +164,10 @@ pub async fn start_work( /// NOTE: Here we know the artifact exists, but is still located in a temporary file which will be /// cleared by `with_tmp_file`. async fn handle_response( + metrics: &Metrics, worker: IdleWorker, result: PrepareResult, - memory_stats: MemoryAllocationStats, + memory_stats: MemoryStats, pid: u32, tmp_file: PathBuf, artifact_path: PathBuf, @@ -200,7 +201,7 @@ async fn handle_response( artifact_path.display(), ); - match tokio::fs::rename(&tmp_file, &artifact_path).await { + let outcome = match tokio::fs::rename(&tmp_file, &artifact_path).await { Ok(()) => Outcome::Concluded { worker, result }, Err(err) => { gum::warn!( @@ -213,7 +214,13 @@ async fn handle_response( ); Outcome::RenameTmpFileErr { worker, result, err: format!("{:?}", err) } }, - } + }; + + // If there were no errors up until now, log the memory stats for a successful preparation, if + // available. + observe_memory_metrics(metrics, memory_stats, pid); + + outcome } /// Create a temporary file for an artifact at the given cache path and execute the given @@ -309,7 +316,7 @@ async fn recv_request( async fn send_response( stream: &mut UnixStream, result: PrepareResult, - memory_stats: Option, + memory_stats: Option, ) -> io::Result<()> { framed_send(stream, &result.encode()).await?; framed_send(stream, &memory_stats.encode()).await @@ -318,7 +325,7 @@ async fn send_response( async fn recv_response( stream: &mut UnixStream, pid: u32, -) -> io::Result<(PrepareResult, MemoryAllocationStats)> { +) -> io::Result<(PrepareResult, MemoryStats)> { let result = framed_recv(stream).await?; let result = PrepareResult::decode(&mut &result[..]).map_err(|e| { // We received invalid bytes from the worker. @@ -335,7 +342,7 @@ async fn recv_response( ) })?; let memory_stats = framed_recv(stream).await?; - let memory_stats = MemoryAllocationStats::decode(&mut &memory_stats[..]).map_err(|e| { + let memory_stats = MemoryStats::decode(&mut &memory_stats[..]).map_err(|e| { io::Error::new( io::ErrorKind::Other, format!("prepare pvf recv_response: failed to decode memory stats: {:?}", e), @@ -360,9 +367,9 @@ async fn recv_response( /// 4. Select on the two threads created in step 3. If the CPU timeout was hit, the CPU time monitor /// thread will trigger first. /// -/// 5. If compilation succeeded, write the compiled artifact into a temporary file. +/// 5. If we are pre-checking, stop the memory tracker and get the stats. /// -/// 6. If we are pre-checking, stop the memory tracker and get the stats. +/// 6. If compilation succeeded, write the compiled artifact into a temporary file. /// /// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we /// send that in the `PrepareResult`. @@ -397,16 +404,30 @@ pub fn worker_entrypoint(socket_path: &str) { }) .fuse(); // Spawn another thread for preparation. - let prepare_fut = rt_handle.spawn_blocking(move || prepare_artifact(&code)).fuse(); + let prepare_fut = rt_handle + .spawn_blocking(move || { + let prepare_result = prepare_artifact(&code); + + // Get the `max_rss` stat if we are pre-checking. + let max_rss = if let PreparationKind::PreCheck = preparation_kind { + // If supported, call getrusage for the thread. + get_max_rss_thread() + } else { + None + }; + + (prepare_result, max_rss) + }) + .fuse(); pin_mut!(cpu_time_monitor_fut); pin_mut!(prepare_fut); - let result = select_biased! { + let (result, memory_stats) = select_biased! { // If this future is not selected, the join handle is dropped and the thread will // finish in the background. join_res = cpu_time_monitor_fut => { - match join_res { + let result = match join_res { Ok(Some(cpu_time_elapsed)) => { // Log if we exceed the timeout and the other thread hasn't finished. gum::warn!( @@ -420,18 +441,31 @@ pub fn worker_entrypoint(socket_path: &str) { }, Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())), Err(err) => Err(PrepareError::IoErr(err.to_string())), - } + }; + (result, None) }, compilation_res = prepare_fut => { let cpu_time_elapsed = cpu_time_start.elapsed(); let _ = cpu_time_monitor_tx.send(()); - match compilation_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) { - Err(err) => { + match compilation_res.unwrap_or_else(|err| (Err(PrepareError::IoErr(err.to_string())), None)) { + (Err(err), _) => { // Serialized error will be written into the socket. - Err(err) + (Err(err), None) }, - Ok(compiled_artifact) => { + (Ok(compiled_artifact), max_rss) => { + // Stop the memory stats worker and get its observed memory stats. + let memory_tracker_stats = + if let Some((memory_tracker_fut, memory_tracker_tx)) = memory_tracker_info { + get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx).await + } else { + None + }; + let memory_stats = MemoryStats { + memory_tracker_stats, + max_rss: max_rss.map(|inner| inner.map_err(|e| e.to_string())), + }; + // Write the serialized artifact into a temp file. // // PVF host only keeps artifacts statuses in its memory, successfully @@ -447,20 +481,12 @@ pub fn worker_entrypoint(socket_path: &str) { ); tokio::fs::write(&dest, &compiled_artifact).await?; - Ok(cpu_time_elapsed) + (Ok(cpu_time_elapsed), Some(memory_stats)) }, } }, }; - // Stop the memory stats worker and get its observed memory stats. - let memory_stats = - if let Some((memory_tracker_fut, memory_tracker_tx)) = memory_tracker_info { - get_memory_tracker_stats(memory_tracker_fut, memory_tracker_tx).await - } else { - None - }; - send_response(&mut stream, result, memory_stats).await?; } }); @@ -483,116 +509,3 @@ fn prepare_artifact(code: &[u8]) -> Result { }) .and_then(|inner_result| inner_result) } - -/// Get the rusage stats for the current thread. -#[cfg(target_os = "linux")] -fn getrusage_thread() -> io::Result { - let mut result = rusage { - ru_utime: timeval { tv_sec: 0, tv_usec: 0 }, - ru_stime: timeval { tv_sec: 0, tv_usec: 0 }, - ru_maxrss: 0, - ru_ixrss: 0, - ru_idrss: 0, - ru_isrss: 0, - ru_minflt: 0, - ru_majflt: 0, - ru_nswap: 0, - ru_inblock: 0, - ru_oublock: 0, - ru_msgsnd: 0, - ru_msgrcv: 0, - ru_nsignals: 0, - ru_nvcsw: 0, - ru_nivcsw: 0, - }; - if unsafe { getrusage(RUSAGE_THREAD, &mut result) } == -1 { - return Err(io::Error::last_os_error()) - } - Ok(result) -} - -/// Runs a thread in the background that observes memory statistics. The goal is to try to get an -/// accurate stats during pre-checking. -/// -/// # Algorithm -/// -/// 1. Create the memory tracker. -/// -/// 2. Sleep for some short interval. Whenever we wake up, take a snapshot by updating the -/// allocation epoch. -/// -/// 3. When we receive a signal that preparation has completed, take one last snapshot and return -/// the maximum observed values. -fn memory_tracker_loop(finished_rx: Receiver<()>) -> Result { - const POLL_INTERVAL: Duration = Duration::from_millis(10); - - let tracker = MemoryAllocationTracker::new().map_err(|err| err.to_string())?; - let mut max_stats = MemoryAllocationStats::default(); - - let mut update_stats = || -> Result<(), String> { - let current_stats = tracker.snapshot().map_err(|err| err.to_string())?; - if current_stats.resident > max_stats.resident { - max_stats.resident = current_stats.resident; - } - if current_stats.allocated > max_stats.allocated { - max_stats.allocated = current_stats.allocated; - } - Ok(()) - }; - - loop { - // Take a snapshot and update the max stats. - update_stats()?; - - // Sleep. - match finished_rx.recv_timeout(POLL_INTERVAL) { - // Received finish signal. - Ok(()) => { - update_stats()?; - return Ok(max_stats) - }, - // Timed out, restart loop. - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => - return Err("memory_tracker_loop: finished_rx disconnected".into()), - } - } -} - -/// Helper function to get the stats from the memory tracker thread. Helps isolate all this error -/// handling. -async fn get_memory_tracker_stats( - fut: JoinHandle>, - tx: Sender<()>, -) -> Option { - // Signal to the memory tracker thread to terminate. - if let Err(err) = tx.send(()) { - gum::warn!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "worker: error sending signal to memory tracker_thread: {}", err - ); - None - } else { - // Join on the thread handle. - match fut.await { - Ok(Ok(stats)) => Some(stats), - Ok(Err(err)) => { - gum::warn!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "worker: error occurred in the memory tracker thread: {}", err - ); - None - }, - Err(err) => { - gum::warn!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "worker: error joining on memory tracker thread: {}", err - ); - None - }, - } - } -} diff --git a/node/overseer/src/memory_stats.rs b/node/overseer/src/memory_stats.rs index 670762a4935c..908e20cc213a 100644 --- a/node/overseer/src/memory_stats.rs +++ b/node/overseer/src/memory_stats.rs @@ -36,8 +36,8 @@ impl MemoryAllocationTracker { // update stats by advancing the allocation epoch self.epoch.advance()?; - let allocated: u64 = self.allocated.read()? as _; - let resident: u64 = self.resident.read()? as _; + let allocated = self.allocated.read()?; + let resident = self.resident.read()?; Ok(MemoryAllocationSnapshot { allocated, resident }) } } @@ -47,7 +47,7 @@ impl MemoryAllocationTracker { #[derive(Debug, Clone)] pub struct MemoryAllocationSnapshot { /// Total resident memory, in bytes. - pub resident: u64, + pub resident: usize, /// Total allocated memory, in bytes. - pub allocated: u64, + pub allocated: usize, } diff --git a/node/overseer/src/metrics.rs b/node/overseer/src/metrics.rs index bb7d98a68f2e..b7a4ff443fae 100644 --- a/node/overseer/src/metrics.rs +++ b/node/overseer/src/metrics.rs @@ -69,8 +69,8 @@ impl Metrics { pub(crate) fn memory_stats_snapshot(&self, memory_stats: MemoryAllocationSnapshot) { if let Some(metrics) = &self.0 { - metrics.memory_stats_allocated.set(memory_stats.allocated); - metrics.memory_stats_resident.set(memory_stats.resident); + metrics.memory_stats_allocated.set(memory_stats.allocated as u64); + metrics.memory_stats_resident.set(memory_stats.resident as u64); } } From 61ebe728299c02c95470b68f341860d7df0966ef Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 16 Jan 2023 17:01:54 -0500 Subject: [PATCH 03/13] Fix tests --- node/core/pvf/src/host.rs | 2 +- node/core/pvf/src/prepare/memory_stats.rs | 2 +- node/core/pvf/src/prepare/queue.rs | 63 +++++++++++++++++++---- node/core/pvf/src/prepare/worker.rs | 10 ++-- 4 files changed, 62 insertions(+), 15 deletions(-) diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index c85d54eb46e0..d892be32efc6 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -851,7 +851,7 @@ mod tests { let pulse = pulse_every(Duration::from_millis(100)); futures::pin_mut!(pulse); - for _ in 0usize..5usize { + for _ in 0..5 { let start = std::time::Instant::now(); let _ = pulse.next().await.unwrap(); diff --git a/node/core/pvf/src/prepare/memory_stats.rs b/node/core/pvf/src/prepare/memory_stats.rs index 35e057e5f0cc..460872779dbc 100644 --- a/node/core/pvf/src/prepare/memory_stats.rs +++ b/node/core/pvf/src/prepare/memory_stats.rs @@ -105,7 +105,7 @@ fn getrusage_thread() -> io::Result { /// returns `None`. pub fn get_max_rss_thread() -> Option> { #[cfg(target_os = "linux")] - let max_rss = Some(getrusage_thread().map(|rusage| max_rss)); + let max_rss = Some(getrusage_thread().map(|rusage| rusage.max_rss)); #[cfg(not(target_os = "linux"))] let max_rss = None; max_rss diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index 91e6bfa3c7a1..b3352b43b0d7 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -510,7 +510,10 @@ pub fn start( #[cfg(test)] mod tests { use super::*; - use crate::{error::PrepareError, host::PRECHECK_PREPARATION_TIMEOUT}; + use crate::{ + error::PrepareError, + host::{LENIENT_PREPARATION_TIMEOUT, PRECHECK_PREPARATION_TIMEOUT}, + }; use assert_matches::assert_matches; use futures::{future::BoxFuture, FutureExt}; use slotmap::SlotMap; @@ -629,6 +632,7 @@ mod tests { priority: Priority::Normal, pvf: pvf(1), preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, + preparation_kind: PreparationKind::PreCheck, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -646,12 +650,29 @@ mod tests { #[tokio::test] async fn dont_spawn_over_soft_limit_unless_critical() { let mut test = Test::new(2, 3); - let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; let priority = Priority::Normal; - test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout }); - test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout }); - test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), preparation_timeout }); + let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; + let preparation_kind = PreparationKind::PreCheck; + test.send_queue(ToQueue::Enqueue { + priority, + pvf: pvf(1), + preparation_timeout, + preparation_kind, + }); + test.send_queue(ToQueue::Enqueue { + priority, + pvf: pvf(2), + preparation_timeout, + preparation_kind, + }); + // Start a non-precheck preparation for this one. + test.send_queue(ToQueue::Enqueue { + priority, + pvf: pvf(3), + preparation_timeout: LENIENT_PREPARATION_TIMEOUT, + preparation_kind: PreparationKind::FromExecutionRequest, + }); // Receive only two spawns. assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -680,6 +701,7 @@ mod tests { priority: Priority::Critical, pvf: pvf(4), preparation_timeout, + preparation_kind, }); // 2 out of 2 are working, but there is a critical job incoming. That means that spawning @@ -691,11 +713,13 @@ mod tests { async fn cull_unwanted() { let mut test = Test::new(1, 2); let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; + let preparation_kind = PreparationKind::PreCheck; test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1), preparation_timeout, + preparation_kind, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); let w1 = test.workers.insert(()); @@ -707,6 +731,7 @@ mod tests { priority: Priority::Critical, pvf: pvf(2), preparation_timeout, + preparation_kind, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -729,10 +754,28 @@ mod tests { async fn worker_mass_die_out_doesnt_stall_queue() { let mut test = Test::new(2, 2); - let (priority, preparation_timeout) = (Priority::Normal, PRECHECK_PREPARATION_TIMEOUT); - test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout }); - test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout }); - test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), preparation_timeout }); + let priority = Priority::Normal; + let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; + let preparation_kind = PreparationKind::PreCheck; + test.send_queue(ToQueue::Enqueue { + priority, + pvf: pvf(1), + preparation_timeout, + preparation_kind, + }); + test.send_queue(ToQueue::Enqueue { + priority, + pvf: pvf(2), + preparation_timeout, + preparation_kind, + }); + // Start a non-precheck preparation for this one. + test.send_queue(ToQueue::Enqueue { + priority, + pvf: pvf(3), + preparation_timeout: LENIENT_PREPARATION_TIMEOUT, + preparation_kind: PreparationKind::FromExecutionRequest, + }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -767,6 +810,7 @@ mod tests { priority: Priority::Normal, pvf: pvf(1), preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, + preparation_kind: PreparationKind::PreCheck, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -792,6 +836,7 @@ mod tests { priority: Priority::Normal, pvf: pvf(1), preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, + preparation_kind: PreparationKind::PreCheck, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index b75c51a6c5a7..5b41978dc8a1 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -167,7 +167,7 @@ async fn handle_response( metrics: &Metrics, worker: IdleWorker, result: PrepareResult, - memory_stats: MemoryStats, + memory_stats: Option, pid: u32, tmp_file: PathBuf, artifact_path: PathBuf, @@ -218,7 +218,9 @@ async fn handle_response( // If there were no errors up until now, log the memory stats for a successful preparation, if // available. - observe_memory_metrics(metrics, memory_stats, pid); + if let Some(memory_stats) = memory_stats { + observe_memory_metrics(metrics, memory_stats, pid); + } outcome } @@ -325,7 +327,7 @@ async fn send_response( async fn recv_response( stream: &mut UnixStream, pid: u32, -) -> io::Result<(PrepareResult, MemoryStats)> { +) -> io::Result<(PrepareResult, Option)> { let result = framed_recv(stream).await?; let result = PrepareResult::decode(&mut &result[..]).map_err(|e| { // We received invalid bytes from the worker. @@ -342,7 +344,7 @@ async fn recv_response( ) })?; let memory_stats = framed_recv(stream).await?; - let memory_stats = MemoryStats::decode(&mut &memory_stats[..]).map_err(|e| { + let memory_stats = Option::::decode(&mut &memory_stats[..]).map_err(|e| { io::Error::new( io::ErrorKind::Other, format!("prepare pvf recv_response: failed to decode memory stats: {:?}", e), From 8211891a21262599b862c86e94f5ff4b1d5ad457 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 16 Jan 2023 18:40:46 -0500 Subject: [PATCH 04/13] Try to fix errors (linux-only so I'm relying on CI here) --- node/core/pvf/src/prepare/memory_stats.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/core/pvf/src/prepare/memory_stats.rs b/node/core/pvf/src/prepare/memory_stats.rs index 460872779dbc..c7d5031c074b 100644 --- a/node/core/pvf/src/prepare/memory_stats.rs +++ b/node/core/pvf/src/prepare/memory_stats.rs @@ -76,7 +76,7 @@ impl MemoryAllocationTracker { /// Get the rusage stats for the current thread. #[cfg(target_os = "linux")] -fn getrusage_thread() -> io::Result { +fn getrusage_thread() -> io::Result { let mut result = rusage { ru_utime: timeval { tv_sec: 0, tv_usec: 0 }, ru_stime: timeval { tv_sec: 0, tv_usec: 0 }, @@ -96,7 +96,7 @@ fn getrusage_thread() -> io::Result { ru_nivcsw: 0, }; if unsafe { getrusage(RUSAGE_THREAD, &mut result) } == -1 { - return Err(io::Error::last_os_error().to_string()) + return Err(io::Error::last_os_error()) } Ok(result) } From f6259885c61a0e5671f12a6cb088db108909fd04 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 17 Jan 2023 09:00:37 -0500 Subject: [PATCH 05/13] Try to fix CI --- node/core/pvf/src/metrics.rs | 2 +- node/core/pvf/src/prepare/memory_stats.rs | 12 ++++++------ node/core/pvf/src/prepare/worker.rs | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/node/core/pvf/src/metrics.rs b/node/core/pvf/src/metrics.rs index 3d2154904cb0..6a19faf3e40a 100644 --- a/node/core/pvf/src/metrics.rs +++ b/node/core/pvf/src/metrics.rs @@ -230,7 +230,7 @@ impl metrics::Metrics for Metrics { prometheus::Histogram::with_opts( prometheus::HistogramOpts::new( "polkadot_pvf_precheck_max_rss", - "max_rss (maximum resident set size) observed for precheck preparation (in kilobytes)", + "ru_maxrss (maximum resident set size) observed for precheck preparation (in kilobytes)", ) )?, registry, diff --git a/node/core/pvf/src/prepare/memory_stats.rs b/node/core/pvf/src/prepare/memory_stats.rs index c7d5031c074b..37c26e546fdc 100644 --- a/node/core/pvf/src/prepare/memory_stats.rs +++ b/node/core/pvf/src/prepare/memory_stats.rs @@ -28,12 +28,12 @@ use tokio::task::JoinHandle; use libc::{getrusage, rusage, timeval, RUSAGE_THREAD}; /// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if -/// supported by the OS, `max_rss`. +/// supported by the OS, `ru_maxrss`. #[derive(Encode, Decode)] pub struct MemoryStats { /// Memory stats from `tikv_jemalloc_ctl`. pub memory_tracker_stats: Option, - /// `max_rss` from `getrusage`. A string error since `io::Error` is not `Encode`able. + /// `ru_maxrss` from `getrusage`. A string error since `io::Error` is not `Encode`able. pub max_rss: Option>, } @@ -101,11 +101,11 @@ fn getrusage_thread() -> io::Result { Ok(result) } -/// Gets the `max_rss` for the current thread if the OS supports `getrusage`. Otherwise, just +/// Gets the `ru_maxrss` for the current thread if the OS supports `getrusage`. Otherwise, just /// returns `None`. pub fn get_max_rss_thread() -> Option> { #[cfg(target_os = "linux")] - let max_rss = Some(getrusage_thread().map(|rusage| rusage.max_rss)); + let max_rss = Some(getrusage_thread().map(|rusage| rusage.ru_maxrss)); #[cfg(not(target_os = "linux"))] let max_rss = None; max_rss @@ -210,7 +210,7 @@ pub fn observe_memory_metrics(metrics: &Metrics, memory_stats: MemoryStats, pid: Err(err) => gum::warn!( target: LOG_TARGET, worker_pid = %pid, - "error getting `max_rss` in preparation thread: {}", + "error getting `ru_maxrss` in preparation thread: {}", err ), } @@ -219,7 +219,7 @@ pub fn observe_memory_metrics(metrics: &Metrics, memory_stats: MemoryStats, pid: if let Some(tracker_stats) = memory_stats.memory_tracker_stats { // We convert these stats from B to KB for two reasons: // - // 1. To match the unit of `max_rss` from `getrusage`. + // 1. To match the unit of `ru_maxrss` from `getrusage`. // // 2. To have less potential loss of precision when converting to `f64`. (These values are // originally `usize`, which is 64 bits on 64-bit platforms). diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 5b41978dc8a1..b6b540ebeb45 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -410,7 +410,7 @@ pub fn worker_entrypoint(socket_path: &str) { .spawn_blocking(move || { let prepare_result = prepare_artifact(&code); - // Get the `max_rss` stat if we are pre-checking. + // Get the `ru_maxrss` stat if we are pre-checking. let max_rss = if let PreparationKind::PreCheck = preparation_kind { // If supported, call getrusage for the thread. get_max_rss_thread() From 85f7a859c9894515a35f9a277e5ff30063dccb43 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 17 Jan 2023 09:55:39 -0500 Subject: [PATCH 06/13] Add module docs for `prepare/memory_stats.rs`; fix CI error --- node/core/pvf/src/prepare/memory_stats.rs | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/node/core/pvf/src/prepare/memory_stats.rs b/node/core/pvf/src/prepare/memory_stats.rs index 37c26e546fdc..2dd16e9ca02c 100644 --- a/node/core/pvf/src/prepare/memory_stats.rs +++ b/node/core/pvf/src/prepare/memory_stats.rs @@ -14,6 +14,19 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +//! Memory stats for preparation. +//! +//! Right now we gather three measurements: +//! +//! - `ru_maxrss` (resident set size) from `getrusage`. +//! - `resident` memory stat provided by `tikv-malloc-ctl`. +//! - `allocated` memory stat also from `tikv-malloc-ctl`. +//! +//! Currently we are only logging these, and only on each successful pre-check. In the future, we +//! may use these stats to reject PVFs during pre-checking. See +//! for more +//! background. + use crate::{metrics::Metrics, LOG_TARGET}; use parity_scale_codec::{Decode, Encode}; use std::{ @@ -34,7 +47,7 @@ pub struct MemoryStats { /// Memory stats from `tikv_jemalloc_ctl`. pub memory_tracker_stats: Option, /// `ru_maxrss` from `getrusage`. A string error since `io::Error` is not `Encode`able. - pub max_rss: Option>, + pub max_rss: Option>, } /// Statistics of collected memory metrics. @@ -103,9 +116,10 @@ fn getrusage_thread() -> io::Result { /// Gets the `ru_maxrss` for the current thread if the OS supports `getrusage`. Otherwise, just /// returns `None`. -pub fn get_max_rss_thread() -> Option> { +pub fn get_max_rss_thread() -> Option> { + // `c_long` is either `i32` or `i64` depending on architecture. `i64::from` always works. #[cfg(target_os = "linux")] - let max_rss = Some(getrusage_thread().map(|rusage| rusage.ru_maxrss)); + let max_rss = Some(getrusage_thread().map(|rusage| i64::from(rusage.ru_maxrss))); #[cfg(not(target_os = "linux"))] let max_rss = None; max_rss @@ -206,7 +220,7 @@ pub async fn get_memory_tracker_loop_stats( pub fn observe_memory_metrics(metrics: &Metrics, memory_stats: MemoryStats, pid: u32) { if let Some(max_rss) = memory_stats.max_rss { match max_rss { - Ok(max_rss) => metrics.observe_precheck_max_rss(f64::from(max_rss)), + Ok(max_rss) => metrics.observe_precheck_max_rss(max_rss as f64), Err(err) => gum::warn!( target: LOG_TARGET, worker_pid = %pid, From ddd15c7b6326e550f724e02c955882248b51448d Mon Sep 17 00:00:00 2001 From: Marcin S Date: Thu, 26 Jan 2023 14:11:36 +0100 Subject: [PATCH 07/13] Report memory stats for all preparation jobs --- node/core/pvf/src/host.rs | 8 +-- node/core/pvf/src/metrics.rs | 42 +++++++------- node/core/pvf/src/prepare/memory_stats.rs | 27 ++++----- node/core/pvf/src/prepare/mod.rs | 9 --- node/core/pvf/src/prepare/pool.rs | 29 ++-------- node/core/pvf/src/prepare/queue.rs | 67 +++-------------------- node/core/pvf/src/prepare/worker.rs | 56 +++++-------------- 7 files changed, 60 insertions(+), 178 deletions(-) diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index d892be32efc6..30e4ada02b10 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -25,8 +25,7 @@ use crate::{ error::PrepareError, execute, metrics::Metrics, - prepare::{self, PreparationKind}, - PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET, + prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET, }; use always_assert::never; use futures::{ @@ -477,7 +476,6 @@ async fn handle_precheck_pvf( priority: Priority::Normal, pvf, preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, - preparation_kind: PreparationKind::PreCheck, }, ) .await?; @@ -549,7 +547,6 @@ async fn handle_execute_pvf( priority, pvf, preparation_timeout: LENIENT_PREPARATION_TIMEOUT, - preparation_kind: PreparationKind::FromExecutionRequest, }, ) .await?; @@ -572,7 +569,6 @@ async fn handle_execute_pvf( priority, pvf, preparation_timeout: LENIENT_PREPARATION_TIMEOUT, - preparation_kind: PreparationKind::FromExecutionRequest, }, ) .await?; @@ -625,7 +621,6 @@ async fn handle_heads_up( priority: Priority::Normal, pvf: active_pvf, preparation_timeout: LENIENT_PREPARATION_TIMEOUT, - preparation_kind: PreparationKind::FromHeadsUpRequest, }, ) .await?; @@ -642,7 +637,6 @@ async fn handle_heads_up( priority: Priority::Normal, pvf: active_pvf, preparation_timeout: LENIENT_PREPARATION_TIMEOUT, - preparation_kind: PreparationKind::FromHeadsUpRequest, }, ) .await?; diff --git a/node/core/pvf/src/metrics.rs b/node/core/pvf/src/metrics.rs index 6a19faf3e40a..e3379d2027cf 100644 --- a/node/core/pvf/src/metrics.rs +++ b/node/core/pvf/src/metrics.rs @@ -73,24 +73,24 @@ impl Metrics { self.0.as_ref().map(|metrics| metrics.execution_time.start_timer()) } - /// Observe max_rss for precheck preparation. - pub(crate) fn observe_precheck_max_rss(&self, max_rss: f64) { + /// Observe max_rss for preparation. + pub(crate) fn observe_preparation_max_rss(&self, max_rss: f64) { if let Some(metrics) = &self.0 { - metrics.precheck_max_rss.observe(max_rss); + metrics.preparation_max_rss.observe(max_rss); } } - /// Observe max resident memory for precheck preparation. - pub(crate) fn observe_precheck_max_resident(&self, max_resident_kb: f64) { + /// Observe max resident memory for preparation. + pub(crate) fn observe_preparation_max_resident(&self, max_resident_kb: f64) { if let Some(metrics) = &self.0 { - metrics.precheck_max_resident.observe(max_resident_kb); + metrics.preparation_max_resident.observe(max_resident_kb); } } - /// Observe max allocated memory for precheck preparation. - pub(crate) fn observe_precheck_max_allocated(&self, max_allocated_kb: f64) { + /// Observe max allocated memory for preparation. + pub(crate) fn observe_preparation_max_allocated(&self, max_allocated_kb: f64) { if let Some(metrics) = &self.0 { - metrics.precheck_max_allocated.observe(max_allocated_kb); + metrics.preparation_max_allocated.observe(max_allocated_kb); } } } @@ -106,9 +106,9 @@ struct MetricsInner { execute_finished: prometheus::Counter, preparation_time: prometheus::Histogram, execution_time: prometheus::Histogram, - precheck_max_rss: prometheus::Histogram, - precheck_max_allocated: prometheus::Histogram, - precheck_max_resident: prometheus::Histogram, + preparation_max_rss: prometheus::Histogram, + preparation_max_allocated: prometheus::Histogram, + preparation_max_resident: prometheus::Histogram, } impl metrics::Metrics for Metrics { @@ -226,29 +226,29 @@ impl metrics::Metrics for Metrics { )?, registry, )?, - precheck_max_rss: prometheus::register( + preparation_max_rss: prometheus::register( prometheus::Histogram::with_opts( prometheus::HistogramOpts::new( - "polkadot_pvf_precheck_max_rss", - "ru_maxrss (maximum resident set size) observed for precheck preparation (in kilobytes)", + "polkadot_pvf_preparation_max_rss", + "ru_maxrss (maximum resident set size) observed for preparation (in kilobytes)", ) )?, registry, )?, - precheck_max_resident: prometheus::register( + preparation_max_resident: prometheus::register( prometheus::Histogram::with_opts( prometheus::HistogramOpts::new( - "polkadot_pvf_precheck_max_resident", - "max resident memory observed for precheck preparation (in kilobytes)", + "polkadot_pvf_preparation_max_resident", + "max resident memory observed for preparation (in kilobytes)", ) )?, registry, )?, - precheck_max_allocated: prometheus::register( + preparation_max_allocated: prometheus::register( prometheus::Histogram::with_opts( prometheus::HistogramOpts::new( - "polkadot_pvf_precheck_max_allocated", - "max allocated memory observed for precheck preparation (in kilobytes)", + "polkadot_pvf_preparation_max_allocated", + "max allocated memory observed for preparation (in kilobytes)", ) )?, registry, diff --git a/node/core/pvf/src/prepare/memory_stats.rs b/node/core/pvf/src/prepare/memory_stats.rs index 2dd16e9ca02c..65f28e00a0cd 100644 --- a/node/core/pvf/src/prepare/memory_stats.rs +++ b/node/core/pvf/src/prepare/memory_stats.rs @@ -22,8 +22,8 @@ //! - `resident` memory stat provided by `tikv-malloc-ctl`. //! - `allocated` memory stat also from `tikv-malloc-ctl`. //! -//! Currently we are only logging these, and only on each successful pre-check. In the future, we -//! may use these stats to reject PVFs during pre-checking. See +//! Currently we are only logging these for the purposes of gathering data. In the future, we may +//! use these stats to reject PVFs during pre-checking. See //! for more //! background. @@ -125,8 +125,8 @@ pub fn get_max_rss_thread() -> Option> { max_rss } -/// Runs a thread in the background that observes memory statistics. The goal is to try to get an -/// accurate stats during pre-checking. +/// Runs a thread in the background that observes memory statistics. The goal is to try to get +/// accurate stats during preparation. /// /// # Algorithm /// @@ -220,7 +220,7 @@ pub async fn get_memory_tracker_loop_stats( pub fn observe_memory_metrics(metrics: &Metrics, memory_stats: MemoryStats, pid: u32) { if let Some(max_rss) = memory_stats.max_rss { match max_rss { - Ok(max_rss) => metrics.observe_precheck_max_rss(max_rss as f64), + Ok(max_rss) => metrics.observe_preparation_max_rss(max_rss as f64), Err(err) => gum::warn!( target: LOG_TARGET, worker_pid = %pid, @@ -231,16 +231,11 @@ pub fn observe_memory_metrics(metrics: &Metrics, memory_stats: MemoryStats, pid: } if let Some(tracker_stats) = memory_stats.memory_tracker_stats { - // We convert these stats from B to KB for two reasons: - // - // 1. To match the unit of `ru_maxrss` from `getrusage`. - // - // 2. To have less potential loss of precision when converting to `f64`. (These values are - // originally `usize`, which is 64 bits on 64-bit platforms). - let resident_kb = (tracker_stats.resident / 1000) as f64; - let allocated_kb = (tracker_stats.allocated / 1000) as f64; - - metrics.observe_precheck_max_resident(resident_kb); - metrics.observe_precheck_max_allocated(allocated_kb); + // We convert these stats from B to KB to match the unit of `ru_maxrss` from `getrusage`. + let resident_kb = (tracker_stats.resident / 1024) as f64; + let allocated_kb = (tracker_stats.allocated / 1024) as f64; + + metrics.observe_preparation_max_resident(resident_kb); + metrics.observe_preparation_max_allocated(allocated_kb); } } diff --git a/node/core/pvf/src/prepare/mod.rs b/node/core/pvf/src/prepare/mod.rs index 2245ec54d953..4cbd63eff7d2 100644 --- a/node/core/pvf/src/prepare/mod.rs +++ b/node/core/pvf/src/prepare/mod.rs @@ -30,12 +30,3 @@ mod worker; pub use pool::start as start_pool; pub use queue::{start as start_queue, FromQueue, ToQueue}; pub use worker::worker_entrypoint; - -use parity_scale_codec::{Decode, Encode}; - -#[derive(Clone, Copy, Debug, Eq, PartialEq, Encode, Decode)] -pub enum PreparationKind { - PreCheck, - FromExecutionRequest, - FromHeadsUpRequest, -} diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index fe8a286b96c1..49670e4c1ac2 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -14,10 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use super::{ - worker::{self, Outcome}, - PreparationKind, -}; +use super::worker::{self, Outcome}; use crate::{ error::{PrepareError, PrepareResult}, metrics::Metrics, @@ -73,7 +70,6 @@ pub enum ToPool { code: Arc>, artifact_path: PathBuf, preparation_timeout: Duration, - preparation_kind: PreparationKind, }, } @@ -218,13 +214,7 @@ fn handle_to_pool( metrics.prepare_worker().on_begin_spawn(); mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed()); }, - ToPool::StartWork { - worker, - code, - artifact_path, - preparation_timeout, - preparation_kind, - } => { + ToPool::StartWork { worker, code, artifact_path, preparation_timeout } => { if let Some(data) = spawned.get_mut(worker) { if let Some(idle) = data.idle.take() { let preparation_timer = metrics.time_preparation(); @@ -237,7 +227,6 @@ fn handle_to_pool( cache_path.to_owned(), artifact_path, preparation_timeout, - preparation_kind, preparation_timer, ) .boxed(), @@ -287,19 +276,11 @@ async fn start_work_task( cache_path: PathBuf, artifact_path: PathBuf, preparation_timeout: Duration, - preparation_kind: PreparationKind, _preparation_timer: Option, ) -> PoolEvent { - let outcome = worker::start_work( - &metrics, - idle, - code, - &cache_path, - artifact_path, - preparation_timeout, - preparation_kind, - ) - .await; + let outcome = + worker::start_work(&metrics, idle, code, &cache_path, artifact_path, preparation_timeout) + .await; PoolEvent::StartWork(worker, outcome) } diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index b3352b43b0d7..32e9bfa70748 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -16,10 +16,7 @@ //! A queue that handles requests for PVF preparation. -use super::{ - pool::{self, Worker}, - PreparationKind, -}; +use super::pool::{self, Worker}; use crate::{artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, Pvf, LOG_TARGET}; use always_assert::{always, never}; use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt}; @@ -36,12 +33,7 @@ pub enum ToQueue { /// /// Note that it is incorrect to enqueue the same PVF again without first receiving the /// [`FromQueue`] response. - Enqueue { - priority: Priority, - pvf: Pvf, - preparation_timeout: Duration, - preparation_kind: PreparationKind, - }, + Enqueue { priority: Priority, pvf: Pvf, preparation_timeout: Duration }, } /// A response from queue. @@ -89,8 +81,6 @@ struct JobData { pvf: Pvf, /// The timeout for the preparation job. preparation_timeout: Duration, - /// Are we preparing because of a pre-check, execution, or heads-up request? - preparation_kind: PreparationKind, worker: Option, } @@ -218,8 +208,8 @@ impl Queue { async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fatal> { match to_queue { - ToQueue::Enqueue { priority, pvf, preparation_timeout, preparation_kind } => { - handle_enqueue(queue, priority, pvf, preparation_timeout, preparation_kind).await?; + ToQueue::Enqueue { priority, pvf, preparation_timeout } => { + handle_enqueue(queue, priority, pvf, preparation_timeout).await?; }, } Ok(()) @@ -230,7 +220,6 @@ async fn handle_enqueue( priority: Priority, pvf: Pvf, preparation_timeout: Duration, - preparation_kind: PreparationKind, ) -> Result<(), Fatal> { gum::debug!( target: LOG_TARGET, @@ -258,13 +247,7 @@ async fn handle_enqueue( return Ok(()) } - let job = queue.jobs.insert(JobData { - priority, - pvf, - preparation_timeout, - preparation_kind, - worker: None, - }); + let job = queue.jobs.insert(JobData { priority, pvf, preparation_timeout, worker: None }); queue.artifact_id_to_job.insert(artifact_id, job); if let Some(available) = find_idle_worker(queue) { @@ -455,7 +438,6 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal code: job_data.pvf.code.clone(), artifact_path, preparation_timeout: job_data.preparation_timeout, - preparation_kind: job_data.preparation_kind, }, ) .await?; @@ -632,7 +614,6 @@ mod tests { priority: Priority::Normal, pvf: pvf(1), preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, - preparation_kind: PreparationKind::PreCheck, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -653,25 +634,13 @@ mod tests { let priority = Priority::Normal; let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; - let preparation_kind = PreparationKind::PreCheck; - test.send_queue(ToQueue::Enqueue { - priority, - pvf: pvf(1), - preparation_timeout, - preparation_kind, - }); - test.send_queue(ToQueue::Enqueue { - priority, - pvf: pvf(2), - preparation_timeout, - preparation_kind, - }); + test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout }); + test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout }); // Start a non-precheck preparation for this one. test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), preparation_timeout: LENIENT_PREPARATION_TIMEOUT, - preparation_kind: PreparationKind::FromExecutionRequest, }); // Receive only two spawns. @@ -701,7 +670,6 @@ mod tests { priority: Priority::Critical, pvf: pvf(4), preparation_timeout, - preparation_kind, }); // 2 out of 2 are working, but there is a critical job incoming. That means that spawning @@ -713,13 +681,11 @@ mod tests { async fn cull_unwanted() { let mut test = Test::new(1, 2); let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; - let preparation_kind = PreparationKind::PreCheck; test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1), preparation_timeout, - preparation_kind, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); let w1 = test.workers.insert(()); @@ -731,7 +697,6 @@ mod tests { priority: Priority::Critical, pvf: pvf(2), preparation_timeout, - preparation_kind, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -756,25 +721,13 @@ mod tests { let priority = Priority::Normal; let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT; - let preparation_kind = PreparationKind::PreCheck; - test.send_queue(ToQueue::Enqueue { - priority, - pvf: pvf(1), - preparation_timeout, - preparation_kind, - }); - test.send_queue(ToQueue::Enqueue { - priority, - pvf: pvf(2), - preparation_timeout, - preparation_kind, - }); + test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout }); + test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout }); // Start a non-precheck preparation for this one. test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), preparation_timeout: LENIENT_PREPARATION_TIMEOUT, - preparation_kind: PreparationKind::FromExecutionRequest, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -810,7 +763,6 @@ mod tests { priority: Priority::Normal, pvf: pvf(1), preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, - preparation_kind: PreparationKind::PreCheck, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -836,7 +788,6 @@ mod tests { priority: Priority::Normal, pvf: pvf(1), preparation_timeout: PRECHECK_PREPARATION_TIMEOUT, - preparation_kind: PreparationKind::PreCheck, }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index b6b540ebeb45..bb6e120a6691 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -22,7 +22,6 @@ use crate::{ artifacts::CompiledArtifact, error::{PrepareError, PrepareResult}, metrics::Metrics, - prepare::PreparationKind, worker_common::{ bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, @@ -85,7 +84,6 @@ pub async fn start_work( cache_path: &Path, artifact_path: PathBuf, preparation_timeout: Duration, - preparation_kind: PreparationKind, ) -> Outcome { let IdleWorker { stream, pid } = worker; @@ -97,9 +95,7 @@ pub async fn start_work( ); with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move { - if let Err(err) = - send_request(&mut stream, code, &tmp_file, preparation_timeout, preparation_kind).await - { + if let Err(err) = send_request(&mut stream, code, &tmp_file, preparation_timeout).await { gum::warn!( target: LOG_TARGET, worker_pid = %pid, @@ -278,18 +274,14 @@ async fn send_request( code: Arc>, tmp_file: &Path, preparation_timeout: Duration, - preparation_kind: PreparationKind, ) -> io::Result<()> { framed_send(stream, &code).await?; framed_send(stream, path_to_bytes(tmp_file)).await?; framed_send(stream, &preparation_timeout.encode()).await?; - framed_send(stream, &preparation_kind.encode()).await?; Ok(()) } -async fn recv_request( - stream: &mut UnixStream, -) -> io::Result<(Vec, PathBuf, Duration, PreparationKind)> { +async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, PathBuf, Duration)> { let code = framed_recv(stream).await?; let tmp_file = framed_recv(stream).await?; let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| { @@ -305,14 +297,7 @@ async fn recv_request( format!("prepare pvf recv_request: failed to decode duration: {:?}", e), ) })?; - let preparation_kind = framed_recv(stream).await?; - let preparation_kind = PreparationKind::decode(&mut &preparation_kind[..]).map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!("prepare pvf recv_request: failed to decode preparation kind: {:?}", e), - ) - })?; - Ok((code, tmp_file, preparation_timeout, preparation_kind)) + Ok((code, tmp_file, preparation_timeout)) } async fn send_response( @@ -362,14 +347,14 @@ async fn recv_response( /// /// 1. Get the code and parameters for preparation from the host. /// -/// 2. If we are pre-checking, start a memory tracker in a separate thread. +/// 2. Start a memory tracker in a separate thread. /// /// 3. Start the CPU time monitor loop and the actual preparation in two separate threads. /// /// 4. Select on the two threads created in step 3. If the CPU timeout was hit, the CPU time monitor /// thread will trigger first. /// -/// 5. If we are pre-checking, stop the memory tracker and get the stats. +/// 5. Stop the memory tracker and get the stats. /// /// 6. If compilation succeeded, write the compiled artifact into a temporary file. /// @@ -378,8 +363,7 @@ async fn recv_response( pub fn worker_entrypoint(socket_path: &str) { worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move { loop { - let (code, dest, preparation_timeout, preparation_kind) = - recv_request(&mut stream).await?; + let (code, dest, preparation_timeout) = recv_request(&mut stream).await?; gum::debug!( target: LOG_TARGET, worker_pid = %std::process::id(), @@ -388,15 +372,10 @@ pub fn worker_entrypoint(socket_path: &str) { let cpu_time_start = ProcessTime::now(); - // If we are pre-checking, run the memory tracker. - let memory_tracker_info = if let PreparationKind::PreCheck = preparation_kind { - let (memory_tracker_tx, memory_tracker_rx) = channel::<()>(); - let memory_tracker_fut = - rt_handle.spawn_blocking(move || memory_tracker_loop(memory_tracker_rx)); - Some((memory_tracker_fut, memory_tracker_tx)) - } else { - None - }; + // Run the memory tracker. + let (memory_tracker_tx, memory_tracker_rx) = channel::<()>(); + let memory_tracker_fut = + rt_handle.spawn_blocking(move || memory_tracker_loop(memory_tracker_rx)); // Spawn a new thread that runs the CPU time monitor. let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>(); @@ -410,13 +389,8 @@ pub fn worker_entrypoint(socket_path: &str) { .spawn_blocking(move || { let prepare_result = prepare_artifact(&code); - // Get the `ru_maxrss` stat if we are pre-checking. - let max_rss = if let PreparationKind::PreCheck = preparation_kind { - // If supported, call getrusage for the thread. - get_max_rss_thread() - } else { - None - }; + // Get the `ru_maxrss` stat. If supported, call getrusage for the thread. + let max_rss = get_max_rss_thread(); (prepare_result, max_rss) }) @@ -458,11 +432,7 @@ pub fn worker_entrypoint(socket_path: &str) { (Ok(compiled_artifact), max_rss) => { // Stop the memory stats worker and get its observed memory stats. let memory_tracker_stats = - if let Some((memory_tracker_fut, memory_tracker_tx)) = memory_tracker_info { - get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx).await - } else { - None - }; + get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx).await; let memory_stats = MemoryStats { memory_tracker_stats, max_rss: max_rss.map(|inner| inner.map_err(|e| e.to_string())), From 4b25b8fc78349d543b86362b972b1ddd3514e5fa Mon Sep 17 00:00:00 2001 From: Marcin S Date: Wed, 1 Feb 2023 17:43:37 +0100 Subject: [PATCH 08/13] Use `RUSAGE_SELF` instead of `RUSAGE_THREAD` Not sure why I did that -- was a brainfart on my end. --- node/core/pvf/src/prepare/memory_stats.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/core/pvf/src/prepare/memory_stats.rs b/node/core/pvf/src/prepare/memory_stats.rs index 65f28e00a0cd..8e88f9af7c44 100644 --- a/node/core/pvf/src/prepare/memory_stats.rs +++ b/node/core/pvf/src/prepare/memory_stats.rs @@ -38,7 +38,7 @@ use tikv_jemalloc_ctl::{epoch, stats, Error}; use tokio::task::JoinHandle; #[cfg(target_os = "linux")] -use libc::{getrusage, rusage, timeval, RUSAGE_THREAD}; +use libc::{getrusage, rusage, timeval, RUSAGE_SELF}; /// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if /// supported by the OS, `ru_maxrss`. @@ -108,7 +108,7 @@ fn getrusage_thread() -> io::Result { ru_nvcsw: 0, ru_nivcsw: 0, }; - if unsafe { getrusage(RUSAGE_THREAD, &mut result) } == -1 { + if unsafe { getrusage(RUSAGE_SELF, &mut result) } == -1 { return Err(io::Error::last_os_error()) } Ok(result) From d995d507b2174c5c3f7e8bab04e9067d04def4b0 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Thu, 2 Feb 2023 10:22:28 +0100 Subject: [PATCH 09/13] Revert last commit (RUSAGE_THREAD is correct) --- node/core/pvf/src/prepare/memory_stats.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/core/pvf/src/prepare/memory_stats.rs b/node/core/pvf/src/prepare/memory_stats.rs index 8e88f9af7c44..65f28e00a0cd 100644 --- a/node/core/pvf/src/prepare/memory_stats.rs +++ b/node/core/pvf/src/prepare/memory_stats.rs @@ -38,7 +38,7 @@ use tikv_jemalloc_ctl::{epoch, stats, Error}; use tokio::task::JoinHandle; #[cfg(target_os = "linux")] -use libc::{getrusage, rusage, timeval, RUSAGE_SELF}; +use libc::{getrusage, rusage, timeval, RUSAGE_THREAD}; /// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if /// supported by the OS, `ru_maxrss`. @@ -108,7 +108,7 @@ fn getrusage_thread() -> io::Result { ru_nvcsw: 0, ru_nivcsw: 0, }; - if unsafe { getrusage(RUSAGE_SELF, &mut result) } == -1 { + if unsafe { getrusage(RUSAGE_THREAD, &mut result) } == -1 { return Err(io::Error::last_os_error()) } Ok(result) From 3865da1a9f418ac48a027b10649038068c5bb405 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Thu, 2 Feb 2023 13:55:25 +0100 Subject: [PATCH 10/13] Use exponential buckets --- node/core/pvf/src/metrics.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/node/core/pvf/src/metrics.rs b/node/core/pvf/src/metrics.rs index e3379d2027cf..07a2bf46f530 100644 --- a/node/core/pvf/src/metrics.rs +++ b/node/core/pvf/src/metrics.rs @@ -231,7 +231,10 @@ impl metrics::Metrics for Metrics { prometheus::HistogramOpts::new( "polkadot_pvf_preparation_max_rss", "ru_maxrss (maximum resident set size) observed for preparation (in kilobytes)", - ) + ).buckets( + prometheus::exponential_buckets(8192.0, 2.0, 10) + .expect("arguments are always valid; qed"), + ), )?, registry, )?, @@ -240,7 +243,10 @@ impl metrics::Metrics for Metrics { prometheus::HistogramOpts::new( "polkadot_pvf_preparation_max_resident", "max resident memory observed for preparation (in kilobytes)", - ) + ).buckets( + prometheus::exponential_buckets(8192.0, 2.0, 10) + .expect("arguments are always valid; qed"), + ), )?, registry, )?, @@ -249,7 +255,10 @@ impl metrics::Metrics for Metrics { prometheus::HistogramOpts::new( "polkadot_pvf_preparation_max_allocated", "max allocated memory observed for preparation (in kilobytes)", - ) + ).buckets( + prometheus::exponential_buckets(8192.0, 2.0, 10) + .expect("arguments are always valid; qed"), + ), )?, registry, )?, From becf7a815409ab530fc61370abffcd1b97b9a777 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Thu, 2 Feb 2023 19:42:35 +0100 Subject: [PATCH 11/13] Use `RUSAGE_SELF` for `getrusage`; enable `max_rss` metric for MacOS --- node/core/pvf/Cargo.toml | 2 +- node/core/pvf/src/prepare/memory_stats.rs | 25 ++++++++++++++--------- node/core/pvf/src/prepare/worker.rs | 8 ++++---- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/node/core/pvf/Cargo.toml b/node/core/pvf/Cargo.toml index 9c55aa000b42..739495cdc4c9 100644 --- a/node/core/pvf/Cargo.toml +++ b/node/core/pvf/Cargo.toml @@ -39,7 +39,7 @@ sp-wasm-interface = { git = "https://github.com/paritytech/substrate", branch = sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" } -[target.'cfg(target_os = "linux")'.dependencies] +[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dependencies] libc = "0.2.139" [dev-dependencies] diff --git a/node/core/pvf/src/prepare/memory_stats.rs b/node/core/pvf/src/prepare/memory_stats.rs index 65f28e00a0cd..1a11d6cee2c2 100644 --- a/node/core/pvf/src/prepare/memory_stats.rs +++ b/node/core/pvf/src/prepare/memory_stats.rs @@ -37,8 +37,8 @@ use std::{ use tikv_jemalloc_ctl::{epoch, stats, Error}; use tokio::task::JoinHandle; -#[cfg(target_os = "linux")] -use libc::{getrusage, rusage, timeval, RUSAGE_THREAD}; +#[cfg(any(target_os = "linux", target_os = "macos"))] +use libc::{getrusage, rusage, timeval, RUSAGE_SELF}; /// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if /// supported by the OS, `ru_maxrss`. @@ -87,9 +87,9 @@ impl MemoryAllocationTracker { } } -/// Get the rusage stats for the current thread. -#[cfg(target_os = "linux")] -fn getrusage_thread() -> io::Result { +/// Get the rusage stats for all threads in the current process. +#[cfg(any(target_os = "linux", target_os = "macos"))] +fn getrusage_process() -> io::Result { let mut result = rusage { ru_utime: timeval { tv_sec: 0, tv_usec: 0 }, ru_stime: timeval { tv_sec: 0, tv_usec: 0 }, @@ -108,19 +108,24 @@ fn getrusage_thread() -> io::Result { ru_nvcsw: 0, ru_nivcsw: 0, }; - if unsafe { getrusage(RUSAGE_THREAD, &mut result) } == -1 { + if unsafe { getrusage(RUSAGE_SELF, &mut result) } == -1 { return Err(io::Error::last_os_error()) } Ok(result) } -/// Gets the `ru_maxrss` for the current thread if the OS supports `getrusage`. Otherwise, just +/// Gets the `ru_maxrss` for the current process if the OS supports `getrusage`. Otherwise, just /// returns `None`. -pub fn get_max_rss_thread() -> Option> { +/// +/// The returned value is always in kilobytes. +pub fn get_max_rss_process() -> Option> { // `c_long` is either `i32` or `i64` depending on architecture. `i64::from` always works. #[cfg(target_os = "linux")] - let max_rss = Some(getrusage_thread().map(|rusage| i64::from(rusage.ru_maxrss))); - #[cfg(not(target_os = "linux"))] + let max_rss = Some(getrusage_process().map(|rusage| i64::from(rusage.ru_maxrss))); + // Macos returns this in bytes, so convert to kilobytes. + #[cfg(target_os = "macos")] + let max_rss = Some(getrusage_process().map(|rusage| i64::from(rusage.ru_maxrss) / 1024)); + #[cfg(not(any(target_os = "linux", target_os = "macos")))] let max_rss = None; max_rss } diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index bb6e120a6691..c6d6050143b1 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -15,8 +15,8 @@ // along with Polkadot. If not, see . use super::memory_stats::{ - get_max_rss_thread, get_memory_tracker_loop_stats, memory_tracker_loop, observe_memory_metrics, - MemoryStats, + get_max_rss_process, get_memory_tracker_loop_stats, memory_tracker_loop, + observe_memory_metrics, MemoryStats, }; use crate::{ artifacts::CompiledArtifact, @@ -389,8 +389,8 @@ pub fn worker_entrypoint(socket_path: &str) { .spawn_blocking(move || { let prepare_result = prepare_artifact(&code); - // Get the `ru_maxrss` stat. If supported, call getrusage for the thread. - let max_rss = get_max_rss_thread(); + // Get the `ru_maxrss` stat. If supported, call getrusage for the process. + let max_rss = get_max_rss_process(); (prepare_result, max_rss) }) From 73ed92ebc4b0ea0fbcdcf93c3159623c1cef9923 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Fri, 3 Feb 2023 09:45:01 +0100 Subject: [PATCH 12/13] Increase poll interval --- node/core/pvf/src/prepare/memory_stats.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/node/core/pvf/src/prepare/memory_stats.rs b/node/core/pvf/src/prepare/memory_stats.rs index 1a11d6cee2c2..f973edb0d11f 100644 --- a/node/core/pvf/src/prepare/memory_stats.rs +++ b/node/core/pvf/src/prepare/memory_stats.rs @@ -148,7 +148,9 @@ pub fn get_max_rss_process() -> Option> { /// For simplicity, any errors are returned as a string. As this is not a critical component, errors /// are used for informational purposes (logging) only. pub fn memory_tracker_loop(finished_rx: Receiver<()>) -> Result { - const POLL_INTERVAL: Duration = Duration::from_millis(10); + // This doesn't need to be too fine-grained since preparation currently takes 3-10s or more. + // Apart from that, there is not really a science to this number. + const POLL_INTERVAL: Duration = Duration::from_millis(100); let tracker = MemoryAllocationTracker::new().map_err(|err| err.to_string())?; let mut max_stats = MemoryAllocationStats::default(); From 4be8b21bc52748184311a9f57fa7d8c096a4e527 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 6 Feb 2023 11:20:12 +0100 Subject: [PATCH 13/13] Revert "Use `RUSAGE_SELF` for `getrusage`; enable `max_rss` metric for MacOS" This reverts commit becf7a815409ab530fc61370abffcd1b97b9a777. --- node/core/pvf/Cargo.toml | 2 +- node/core/pvf/src/prepare/memory_stats.rs | 25 +++++++++-------------- node/core/pvf/src/prepare/worker.rs | 8 ++++---- 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/node/core/pvf/Cargo.toml b/node/core/pvf/Cargo.toml index 8eb99c0f97ed..e918c5f90fb2 100644 --- a/node/core/pvf/Cargo.toml +++ b/node/core/pvf/Cargo.toml @@ -39,7 +39,7 @@ sp-wasm-interface = { git = "https://github.com/paritytech/substrate", branch = sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" } -[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dependencies] +[target.'cfg(target_os = "linux")'.dependencies] libc = "0.2.139" [dev-dependencies] diff --git a/node/core/pvf/src/prepare/memory_stats.rs b/node/core/pvf/src/prepare/memory_stats.rs index f973edb0d11f..4765a196d54e 100644 --- a/node/core/pvf/src/prepare/memory_stats.rs +++ b/node/core/pvf/src/prepare/memory_stats.rs @@ -37,8 +37,8 @@ use std::{ use tikv_jemalloc_ctl::{epoch, stats, Error}; use tokio::task::JoinHandle; -#[cfg(any(target_os = "linux", target_os = "macos"))] -use libc::{getrusage, rusage, timeval, RUSAGE_SELF}; +#[cfg(target_os = "linux")] +use libc::{getrusage, rusage, timeval, RUSAGE_THREAD}; /// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if /// supported by the OS, `ru_maxrss`. @@ -87,9 +87,9 @@ impl MemoryAllocationTracker { } } -/// Get the rusage stats for all threads in the current process. -#[cfg(any(target_os = "linux", target_os = "macos"))] -fn getrusage_process() -> io::Result { +/// Get the rusage stats for the current thread. +#[cfg(target_os = "linux")] +fn getrusage_thread() -> io::Result { let mut result = rusage { ru_utime: timeval { tv_sec: 0, tv_usec: 0 }, ru_stime: timeval { tv_sec: 0, tv_usec: 0 }, @@ -108,24 +108,19 @@ fn getrusage_process() -> io::Result { ru_nvcsw: 0, ru_nivcsw: 0, }; - if unsafe { getrusage(RUSAGE_SELF, &mut result) } == -1 { + if unsafe { getrusage(RUSAGE_THREAD, &mut result) } == -1 { return Err(io::Error::last_os_error()) } Ok(result) } -/// Gets the `ru_maxrss` for the current process if the OS supports `getrusage`. Otherwise, just +/// Gets the `ru_maxrss` for the current thread if the OS supports `getrusage`. Otherwise, just /// returns `None`. -/// -/// The returned value is always in kilobytes. -pub fn get_max_rss_process() -> Option> { +pub fn get_max_rss_thread() -> Option> { // `c_long` is either `i32` or `i64` depending on architecture. `i64::from` always works. #[cfg(target_os = "linux")] - let max_rss = Some(getrusage_process().map(|rusage| i64::from(rusage.ru_maxrss))); - // Macos returns this in bytes, so convert to kilobytes. - #[cfg(target_os = "macos")] - let max_rss = Some(getrusage_process().map(|rusage| i64::from(rusage.ru_maxrss) / 1024)); - #[cfg(not(any(target_os = "linux", target_os = "macos")))] + let max_rss = Some(getrusage_thread().map(|rusage| i64::from(rusage.ru_maxrss))); + #[cfg(not(target_os = "linux"))] let max_rss = None; max_rss } diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index c6d6050143b1..bb6e120a6691 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -15,8 +15,8 @@ // along with Polkadot. If not, see . use super::memory_stats::{ - get_max_rss_process, get_memory_tracker_loop_stats, memory_tracker_loop, - observe_memory_metrics, MemoryStats, + get_max_rss_thread, get_memory_tracker_loop_stats, memory_tracker_loop, observe_memory_metrics, + MemoryStats, }; use crate::{ artifacts::CompiledArtifact, @@ -389,8 +389,8 @@ pub fn worker_entrypoint(socket_path: &str) { .spawn_blocking(move || { let prepare_result = prepare_artifact(&code); - // Get the `ru_maxrss` stat. If supported, call getrusage for the process. - let max_rss = get_max_rss_process(); + // Get the `ru_maxrss` stat. If supported, call getrusage for the thread. + let max_rss = get_max_rss_thread(); (prepare_result, max_rss) })