Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Check spawned worker version vs node version before PVF preparation (#…
Browse files Browse the repository at this point in the history
…6861)

* Check spawned worker version vs node version before PVF preparation

* Address discussions

* Propagate errors and shutdown preparation and execution pipelines properly

* Add logs; Fix execution worker checks

* Revert "Propagate errors and shutdown preparation and execution pipelines properly"

This reverts commit b96cc31.

* Don't try to shut down; report the condition and exit worker

* Get rid of `VersionMismatch` preparation error

* Merge master

* Add docs; Fix tests

* Update Cargo.lock

* Kill again, but only the main node process

* Move unsafe code to a common safe function

* Fix libc dependency error on MacOS

* pvf spawning: Add some logging, add a small integration test

* Minor fixes

* Restart CI

---------

Co-authored-by: Marcin S <marcin@realemail.net>
  • Loading branch information
s0me0ne-unkn0wn and mrcnski authored Mar 29, 2023
1 parent 21298c8 commit 1fe1702
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ pub enum Subcommand {
#[derive(Debug, Parser)]
pub struct ValidationWorkerCommand {
/// The path to the validation host's socket.
#[arg(long)]
pub socket_path: String,
/// Calling node implementation version
#[arg(long)]
pub node_impl_version: String,
}

#[allow(missing_docs)]
Expand Down
10 changes: 8 additions & 2 deletions cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,10 @@ pub fn run() -> Result<()> {

#[cfg(not(target_os = "android"))]
{
polkadot_node_core_pvf::prepare_worker_entrypoint(&cmd.socket_path);
polkadot_node_core_pvf::prepare_worker_entrypoint(
&cmd.socket_path,
Some(&cmd.node_impl_version),
);
Ok(())
}
},
Expand All @@ -513,7 +516,10 @@ pub fn run() -> Result<()> {

#[cfg(not(target_os = "android"))]
{
polkadot_node_core_pvf::execute_worker_entrypoint(&cmd.socket_path);
polkadot_node_core_pvf::execute_worker_entrypoint(
&cmd.socket_path,
Some(&cmd.node_impl_version),
);
Ok(())
}
},
Expand Down
5 changes: 4 additions & 1 deletion node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ cpu-time = "1.0.0"
futures = "0.3.21"
futures-timer = "3.0.2"
gum = { package = "tracing-gum", path = "../../gum" }
libc = "0.2.139"
pin-project = "1.0.9"
rand = "0.8.5"
rayon = "1.5.1"
Expand All @@ -41,8 +42,10 @@ sp-wasm-interface = { git = "https://github.com/paritytech/substrate", branch =
sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }

[build-dependencies]
substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" }

[target.'cfg(target_os = "linux")'.dependencies]
libc = "0.2.139"
tikv-jemalloc-ctl = "0.5.0"

[dev-dependencies]
Expand Down
19 changes: 19 additions & 0 deletions node/core/pvf/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2017-2023 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

fn main() {
substrate_build_script_utils::generate_cargo_keys();
}
34 changes: 26 additions & 8 deletions node/core/pvf/src/execute/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ pub async fn spawn(
executor_params: ExecutorParams,
spawn_timeout: Duration,
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
let (mut idle_worker, worker_handle) =
spawn_with_program_path("execute", program_path, &["execute-worker"], spawn_timeout)
.await?;
let (mut idle_worker, worker_handle) = spawn_with_program_path(
"execute",
program_path,
&["execute-worker", "--node-impl-version", env!("SUBSTRATE_CLI_IMPL_VERSION")],
spawn_timeout,
)
.await?;
send_handshake(&mut idle_worker.stream, Handshake { executor_params })
.await
.map_err(|error| {
Expand Down Expand Up @@ -260,11 +264,25 @@ impl Response {
}

/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host.
pub fn worker_entrypoint(socket_path: &str) {
/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
/// is checked against the worker version. A mismatch results in immediate worker termination.
/// `None` is used for tests and in other situations when version check is not necessary.
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
worker_event_loop("execute", socket_path, |rt_handle, mut stream| async move {
let handshake = recv_handshake(&mut stream).await?;
let worker_pid = std::process::id();
if let Some(version) = node_version {
if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
gum::error!(
target: LOG_TARGET,
%worker_pid,
"Node and worker version mismatch, node needs restarting, forcing shutdown",
);
crate::kill_parent_node_in_emergency();
return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"))
}
}

let handshake = recv_handshake(&mut stream).await?;
let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
})?);
Expand All @@ -273,7 +291,7 @@ pub fn worker_entrypoint(socket_path: &str) {
let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
gum::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
%worker_pid,
"worker: validating artifact {}",
artifact_path.display(),
);
Expand Down Expand Up @@ -307,7 +325,7 @@ pub fn worker_entrypoint(socket_path: &str) {
// Log if we exceed the timeout and the other thread hasn't finished.
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
%worker_pid,
"execute job took {}ms cpu time, exceeded execute timeout {}ms",
cpu_time_elapsed.as_millis(),
execution_timeout.as_millis(),
Expand Down
1 change: 1 addition & 0 deletions node/core/pvf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ pub use pvf::PvfPrepData;

pub use host::{start, Config, ValidationHost};
pub use metrics::Metrics;
pub(crate) use worker_common::kill_parent_node_in_emergency;
pub use worker_common::JOB_TIMEOUT_WALL_CLOCK_FACTOR;

pub use execute::worker_entrypoint as execute_worker_entrypoint;
Expand Down
28 changes: 24 additions & 4 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ pub async fn spawn(
program_path: &Path,
spawn_timeout: Duration,
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
spawn_with_program_path("prepare", program_path, &["prepare-worker"], spawn_timeout).await
spawn_with_program_path(
"prepare",
program_path,
&["prepare-worker", "--node-impl-version", env!("SUBSTRATE_CLI_IMPL_VERSION")],
spawn_timeout,
)
.await
}

pub enum Outcome {
Expand Down Expand Up @@ -321,7 +327,9 @@ async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareR
}

/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host.
/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
/// is checked against the worker version. A mismatch results in immediate worker termination.
/// `None` is used for tests and in other situations when version check is not necessary.
///
/// # Flow
///
Expand All @@ -342,10 +350,22 @@ async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result<PrepareR
///
/// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we
/// send that in the `PrepareResult`.
pub fn worker_entrypoint(socket_path: &str) {
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move {
let worker_pid = std::process::id();
if let Some(version) = node_version {
if version != env!("SUBSTRATE_CLI_IMPL_VERSION") {
gum::error!(
target: LOG_TARGET,
%worker_pid,
"Node and worker version mismatch, node needs restarting, forcing shutdown",
);
crate::kill_parent_node_in_emergency();
return Err(io::Error::new(io::ErrorKind::Unsupported, "Version mismatch"))
}
}

loop {
let worker_pid = std::process::id();
let (pvf, dest) = recv_request(&mut stream).await?;
gum::debug!(
target: LOG_TARGET,
Expand Down
22 changes: 17 additions & 5 deletions node/core/pvf/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,34 @@ macro_rules! decl_puppet_worker_main {
$crate::sp_tracing::try_init_simple();

let args = std::env::args().collect::<Vec<_>>();
if args.len() < 2 {
if args.len() < 3 {
panic!("wrong number of arguments");
}

let mut version = None;
let mut socket_path: &str = "";

for i in 2..args.len() {
match args[i].as_ref() {
"--socket-path" => socket_path = args[i + 1].as_str(),
"--node-version" => version = Some(args[i + 1].as_str()),
_ => (),
}
}

let subcommand = &args[1];
match subcommand.as_ref() {
"exit" => {
std::process::exit(1);
},
"sleep" => {
std::thread::sleep(std::time::Duration::from_secs(5));
},
"prepare-worker" => {
let socket_path = &args[2];
$crate::prepare_worker_entrypoint(socket_path);
$crate::prepare_worker_entrypoint(&socket_path, version);
},
"execute-worker" => {
let socket_path = &args[2];
$crate::execute_worker_entrypoint(socket_path);
$crate::execute_worker_entrypoint(&socket_path, version);
},
other => panic!("unknown subcommand: {}", other),
}
Expand Down
43 changes: 41 additions & 2 deletions node/core/pvf/src/worker_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,21 @@ pub async fn spawn_with_program_path(
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
"cannot bind unix socket: {:?}",
err,
);
SpawnErr::Bind
})?;

let handle =
WorkerHandle::spawn(program_path, extra_args, socket_path).map_err(|err| {
WorkerHandle::spawn(&program_path, extra_args, socket_path).map_err(|err| {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
"cannot spawn a worker: {:?}",
err,
);
Expand All @@ -84,6 +88,8 @@ pub async fn spawn_with_program_path(
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
"cannot accept a worker: {:?}",
err,
);
Expand All @@ -92,6 +98,14 @@ pub async fn spawn_with_program_path(
Ok((IdleWorker { stream, pid: handle.id() }, handle))
}
_ = Delay::new(spawn_timeout).fuse() => {
gum::warn!(
target: LOG_TARGET,
%debug_id,
?program_path,
?extra_args,
?spawn_timeout,
"spawning and connecting to socket timed out",
);
Err(SpawnErr::AcceptTimeout)
}
}
Expand Down Expand Up @@ -162,6 +176,13 @@ where
F: FnMut(Handle, UnixStream) -> Fut,
Fut: futures::Future<Output = io::Result<Never>>,
{
gum::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"starting pvf worker ({})",
debug_id,
);

let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
let handle = rt.handle();
let err = rt
Expand All @@ -179,7 +200,7 @@ where
gum::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"pvf worker ({}): {:?}",
"quitting pvf worker ({}): {:?}",
debug_id,
err,
);
Expand Down Expand Up @@ -280,6 +301,7 @@ impl WorkerHandle {
) -> io::Result<Self> {
let mut child = process::Command::new(program.as_ref())
.args(extra_args)
.arg("--socket-path")
.arg(socket_path.as_ref().as_os_str())
.stdout(std::process::Stdio::piped())
.kill_on_drop(true)
Expand Down Expand Up @@ -393,3 +415,20 @@ pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result<Vec<u8>
r.read_exact(&mut buf).await?;
Ok(buf)
}

/// In case of node and worker version mismatch (as a result of in-place upgrade), send `SIGKILL`
/// to the node to tear it down and prevent it from raising disputes on valid candidates. Node
/// restart should be handled by the node owner. As node exits, unix sockets opened to workers
/// get closed by the OS and other workers receive error on socket read and also exit. Preparation
/// jobs are written to the temporary files that are renamed to real artifacts on the node side, so
/// no leftover artifacts are possible.
pub(crate) fn kill_parent_node_in_emergency() {
unsafe {
// SAFETY: `getpid()` never fails but may return "no-parent" (0) or "parent-init" (1) in
// some corner cases, which is checked. `kill()` never fails.
let ppid = libc::getppid();
if ppid > 1 {
libc::kill(ppid, libc::SIGKILL);
}
}
}
9 changes: 9 additions & 0 deletions node/core/pvf/tests/it/worker_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ use crate::PUPPET_EXE;
use polkadot_node_core_pvf::testing::worker_common::{spawn_with_program_path, SpawnErr};
use std::time::Duration;

// Test spawning a program that immediately exits with a failure code.
#[tokio::test]
async fn spawn_immediate_exit() {
let result =
spawn_with_program_path("integration-test", PUPPET_EXE, &["exit"], Duration::from_secs(2))
.await;
assert!(matches!(result, Err(SpawnErr::AcceptTimeout)));
}

#[tokio::test]
async fn spawn_timeout() {
let result =
Expand Down
4 changes: 2 additions & 2 deletions node/malus/src/malus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl MalusCli {

#[cfg(not(target_os = "android"))]
{
polkadot_node_core_pvf::prepare_worker_entrypoint(&cmd.socket_path);
polkadot_node_core_pvf::prepare_worker_entrypoint(&cmd.socket_path, None);
}
},
NemesisVariant::PvfExecuteWorker(cmd) => {
Expand All @@ -108,7 +108,7 @@ impl MalusCli {

#[cfg(not(target_os = "android"))]
{
polkadot_node_core_pvf::execute_worker_entrypoint(&cmd.socket_path);
polkadot_node_core_pvf::execute_worker_entrypoint(&cmd.socket_path, None);
}
},
}
Expand Down

1 comment on commit 1fe1702

@Polkadot-Forum
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has been mentioned on Polkadot Forum. There might be relevant details there:

https://forum.polkadot.network/t/polkadot-release-analysis-v0-9-41-v0-9-42/2828/1

Please sign in to comment.