From a4d6c580e8ae9dd12040b208a4ff5e85e55dd6af Mon Sep 17 00:00:00 2001 From: Guillaume Potier Date: Thu, 23 Feb 2023 08:53:54 +0100 Subject: [PATCH] feat: Shutdown RPC endpoint (#2538) --- CHANGELOG.md | 2 ++ Cargo.lock | 11 ------- documentation/src/js_console.md | 4 +++ forest/cli/src/cli/attach_cmd.rs | 50 +++++++++++++++++++++++++++--- forest/cli/src/cli/mod.rs | 11 +++++-- forest/cli/src/cli/shutdown_cmd.rs | 28 +++++++++++++++++ forest/cli/src/subcommand.rs | 1 + forest/daemon/Cargo.toml | 3 +- forest/daemon/src/cli/mod.rs | 45 +++++++-------------------- forest/daemon/src/daemon.rs | 33 ++++++++++++++------ node/rpc-api/src/data_types.rs | 4 +-- node/rpc-api/src/lib.rs | 5 +++ node/rpc-client/src/common_ops.rs | 21 +++++++++++++ node/rpc-client/src/lib.rs | 4 ++- node/rpc/src/common_api.rs | 9 ++++++ node/rpc/src/lib.rs | 10 ++++-- 16 files changed, 173 insertions(+), 68 deletions(-) create mode 100644 forest/cli/src/cli/shutdown_cmd.rs create mode 100644 node/rpc-client/src/common_ops.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index bba1fe88c0d4..c40fa0b83f39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,11 +4,13 @@ Notable updates: ### Added * [database] added ParityDb statistics to the stats endpoint. [#2444](https://github.com/ChainSafe/forest/pull/2444) +* [api|cli] Add RPC `Filecoin.Shutdown` endpoint and `forest-cli shutdown` subcommand. [#2538](https://github.com/ChainSafe/forest/pull/2538) * [cli] A JavaScript console to interact with Filecoin API. [#2492](https://github.com/ChainSafe/forest/pull/2492) * [docker] Multi-platform Docker image support. [#2552](https://github.com/ChainSafe/forest/pull/2552) * [forest daemon] Added `--exit-after-init` and `--save-token` flags. [#2577](https://github.com/ChainSafe/forest/pull/2577) ### Changed +* [cli] Remove Forest ctrl-c hard shutdown behavior on subsequent ctrl-c signals. [#2538](https://github.com/ChainSafe/forest/pull/2538) * [libp2p] Use in house bitswap implementation. [#2445](https://github.com/ChainSafe/forest/pull/2445) * [libp2p] Ban peers with duration. Banned peers are automatically unbanned after a period of 1h. [#2396](https://github.com/ChainSafe/forest/pull/2396) * [libp2p] Support multiple listen addr. [#2570](https://github.com/ChainSafe/forest/pull/2570) diff --git a/Cargo.lock b/Cargo.lock index c7620c8dc3c4..d08b6ecdb9df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1842,16 +1842,6 @@ dependencies = [ "cipher 0.3.0", ] -[[package]] -name = "ctrlc" -version = "3.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbcf33c2a618cbe41ee43ae6e9f2e48368cd9f9db2896f10167d8d762679f639" -dependencies = [ - "nix 0.26.2", - "windows-sys 0.45.0", -] - [[package]] name = "curve25519-dalek" version = "3.2.0" @@ -3270,7 +3260,6 @@ dependencies = [ "assert_cmd", "atty", "clap", - "ctrlc", "daemonize-me", "dialoguer", "flume", diff --git a/documentation/src/js_console.md b/documentation/src/js_console.md index 87b8d307bb32..48c8a11e1642 100644 --- a/documentation/src/js_console.md +++ b/documentation/src/js_console.md @@ -76,6 +76,10 @@ Forest console comes with a number of helper functions that make interacting wit - `showSyncStatus()` - `sendFIL(to, attoAmount)` +### Timers + +In addition, to support part of the JavaScript language, the console also provides implementation for `sleep(seconds)` timer and a tipset based timer, `sleepTipsets(epochs)`, which sleeps till the number of new tipsets added is equal to or greater than `epochs`. + ### Modules CommonJS modules is the way to package JavaScript code for Forest console. You can import modules using the `require` function: diff --git a/forest/cli/src/cli/attach_cmd.rs b/forest/cli/src/cli/attach_cmd.rs index 0fc72b15f673..6fc027abbdc8 100644 --- a/forest/cli/src/cli/attach_cmd.rs +++ b/forest/cli/src/cli/attach_cmd.rs @@ -16,15 +16,17 @@ use boa_engine::{ }; use convert_case::{Case, Casing}; use directories::BaseDirs; +use forest_chain_sync::SyncStage; use forest_json::message::json::MessageJson; use forest_rpc_api::mpool_api::MpoolPushMessageResult; use forest_rpc_client::*; use forest_shim::{address::Address, econ::TokenAmount, message::Message_v3}; -use fvm_shared::METHOD_SEND; +use fvm_shared::{clock::ChainEpoch, METHOD_SEND}; use num::BigInt; use rustyline::{config::Config as RustyLineConfig, EditMode, Editor}; use serde::Serialize; use serde_json::Value as JsonValue; +use tokio::time; use super::Config; @@ -113,7 +115,6 @@ fn require( let result = if path.exists() { read_to_string(path.clone()) } else if path == PathBuf::from("prelude.js") { - //println!("load builtin prelude"); Ok(PRELUDE_PATH.into()) } else { return context.throw_error("expecting valid module path"); @@ -235,6 +236,42 @@ async fn send_message( mpool_push_message((json_message, None), auth_token).await } +type SleepParams = (u64,); +type SleepResult = (); + +async fn sleep( + params: SleepParams, + _auth_token: &Option, +) -> Result { + let secs = params.0; + time::sleep(time::Duration::from_secs(secs)).await; + Ok(()) +} + +type SleepTipsetsParams = (ChainEpoch,); +type SleepTipsetsResult = (); + +async fn sleep_tipsets( + params: SleepTipsetsParams, + auth_token: &Option, +) -> Result { + let mut epoch = None; + loop { + let state = sync_status((), auth_token).await?; + if state.active_syncs[0].stage() == SyncStage::Complete { + if let Some(prev) = epoch { + let curr = state.active_syncs[0].epoch(); + if (curr - prev) >= params.0 { + return Ok(()); + } + } else { + epoch = Some(state.active_syncs[0].epoch()); + } + } + time::sleep(time::Duration::from_secs(1)).await; + } +} + impl AttachCommand { fn setup_context(&self, context: &mut Context, token: &Option) { // Disable tracing @@ -283,8 +320,14 @@ impl AttachCommand { // Message Pool API bind_func!(context, token, mpool_push_message); - // Bind send_message + // Common API + bind_func!(context, token, version); + bind_func!(context, token, shutdown); + + // Bind send_message, sleep, sleep_tipsets bind_func!(context, token, send_message); + bind_func!(context, token, sleep); + bind_func!(context, token, sleep_tipsets); } fn import_prelude(&self, context: &mut Context) -> anyhow::Result<()> { @@ -368,7 +411,6 @@ impl AttachCommand { } match context.parse(buffer.trim_end()) { Ok(_v) => { - // println!("Parse tree:\n{:#?}", v); editor.add_history_entry(&buffer); eval(buffer.trim_end(), &mut context); break; diff --git a/forest/cli/src/cli/mod.rs b/forest/cli/src/cli/mod.rs index 8c01afd1da2d..e73d871850d9 100644 --- a/forest/cli/src/cli/mod.rs +++ b/forest/cli/src/cli/mod.rs @@ -15,6 +15,7 @@ mod fetch_params_cmd; mod mpool_cmd; mod net_cmd; mod send_cmd; +mod shutdown_cmd; mod snapshot_cmd; mod state_cmd; mod sync_cmd; @@ -35,8 +36,8 @@ pub(super) use self::{ attach_cmd::AttachCommand, auth_cmd::AuthCommands, chain_cmd::ChainCommands, config_cmd::ConfigCommands, db_cmd::DBCommands, fetch_params_cmd::FetchCommands, mpool_cmd::MpoolCommands, net_cmd::NetCommands, send_cmd::SendCommand, - snapshot_cmd::SnapshotCommands, state_cmd::StateCommands, sync_cmd::SyncCommands, - wallet_cmd::WalletCommands, + shutdown_cmd::ShutdownCommand, snapshot_cmd::SnapshotCommands, state_cmd::StateCommands, + sync_cmd::SyncCommands, wallet_cmd::WalletCommands, }; /// CLI structure generated when interacting with Forest binary @@ -102,6 +103,9 @@ pub enum Subcommand { /// Attach to daemon via a JavaScript console Attach(AttachCommand), + + /// Shutdown Forest + Shutdown(ShutdownCommand), } /// Pretty-print a JSON-RPC error and exit @@ -183,7 +187,8 @@ pub(super) fn print_stdout(out: String) { } fn prompt_confirm() -> bool { - println!("Do you want to continue? [y/n]"); + print!("Do you want to continue? [y/n] "); + std::io::stdout().flush().unwrap(); let mut line = String::new(); std::io::stdin().read_line(&mut line).unwrap(); let line = line.trim().to_lowercase(); diff --git a/forest/cli/src/cli/shutdown_cmd.rs b/forest/cli/src/cli/shutdown_cmd.rs new file mode 100644 index 000000000000..28da7434670d --- /dev/null +++ b/forest/cli/src/cli/shutdown_cmd.rs @@ -0,0 +1,28 @@ +// Copyright 2019-2023 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use forest_rpc_client::shutdown; + +use super::{handle_rpc_err, Config}; +use crate::cli::prompt_confirm; + +#[derive(Debug, clap::Args)] +pub struct ShutdownCommand { + /// Assume "yes" as answer to shutdown prompt + #[arg(long)] + force: bool, +} + +impl ShutdownCommand { + pub async fn run(&self, config: Config) -> anyhow::Result<()> { + println!("Shutting down Forest node"); + if !self.force && !prompt_confirm() { + println!("Aborted."); + return Ok(()); + } + shutdown((), &config.client.rpc_token) + .await + .map_err(handle_rpc_err)?; + Ok(()) + } +} diff --git a/forest/cli/src/subcommand.rs b/forest/cli/src/subcommand.rs index 025542026fef..2f88cbf5d720 100644 --- a/forest/cli/src/subcommand.rs +++ b/forest/cli/src/subcommand.rs @@ -20,5 +20,6 @@ pub(super) async fn process(command: Subcommand, config: Config) -> anyhow::Resu Subcommand::DB(cmd) => cmd.run(&config), Subcommand::Snapshot(cmd) => cmd.run(config).await, Subcommand::Attach(cmd) => cmd.run(config), + Subcommand::Shutdown(cmd) => cmd.run(config).await, } } diff --git a/forest/daemon/Cargo.toml b/forest/daemon/Cargo.toml index ee210ea8cdad..e406adf6d8b9 100644 --- a/forest/daemon/Cargo.toml +++ b/forest/daemon/Cargo.toml @@ -16,7 +16,6 @@ anes = "0.1.6" anyhow.workspace = true atty.workspace = true clap.workspace = true -ctrlc = { version = "3.2", features = ["termination"] } daemonize-me = "2.0" dialoguer.workspace = true flume.workspace = true @@ -51,7 +50,7 @@ serde_json.workspace = true shared_memory = "0.12" tempfile.workspace = true time.workspace = true -tokio = { workspace = true, features = ["sync", "macros", "rt"] } +tokio = { workspace = true, features = ["sync", "macros", "rt", "signal"] } [dev-dependencies] assert_cmd.workspace = true diff --git a/forest/daemon/src/cli/mod.rs b/forest/daemon/src/cli/mod.rs index c5939e00bf74..d207a79915ff 100644 --- a/forest/daemon/src/cli/mod.rs +++ b/forest/daemon/src/cli/mod.rs @@ -1,21 +1,12 @@ // Copyright 2019-2023 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use std::{ - cell::RefCell, - io::Write, - process, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, -}; +use std::io::Write; use anes::execute; use clap::Parser; use forest_cli_shared::cli::{CliOpts, FOREST_VERSION_STRING, HELP_MESSAGE}; -use futures::channel::oneshot::Receiver; -use log::{info, warn}; +use tokio::{signal, task}; /// CLI structure generated when interacting with Forest binary #[derive(Parser)] @@ -27,28 +18,14 @@ pub struct Cli { pub cmd: Option, } -pub fn set_sigint_handler() -> Receiver<()> { - let (ctrlc_send, ctrlc_oneshot) = futures::channel::oneshot::channel(); - let ctrlc_send_c = RefCell::new(Some(ctrlc_send)); +pub fn set_sigint_handler() { + task::spawn(async { + let _ = signal::ctrl_c().await; - let running = Arc::new(AtomicUsize::new(0)); - ctrlc::set_handler(move || { - let prev = running.fetch_add(1, Ordering::SeqCst); - if prev == 0 { - warn!("Got interrupt, shutting down..."); - let mut stdout = std::io::stdout(); - #[allow(clippy::question_mark)] - execute!(&mut stdout, anes::ShowCursor).unwrap(); - // Send sig int in channel to blocking task - if let Some(ctrlc_send) = ctrlc_send_c.try_borrow_mut().unwrap().take() { - ctrlc_send.send(()).expect("Error sending ctrl-c message"); - } - } else { - info!("Exiting process"); - process::exit(0); - } - }) - .expect("Error setting Ctrl-C handler"); - - ctrlc_oneshot + // the cursor can go missing if we hit ctrl-c during a prompt, so we always + // restore it + let mut stdout = std::io::stdout(); + #[allow(clippy::question_mark)] + execute!(&mut stdout, anes::ShowCursor).unwrap(); + }); } diff --git a/forest/daemon/src/daemon.rs b/forest/daemon/src/daemon.rs index a8abe6c6c585..9c64e5ef8709 100644 --- a/forest/daemon/src/daemon.rs +++ b/forest/daemon/src/daemon.rs @@ -38,7 +38,11 @@ use fvm_ipld_blockstore::Blockstore; use log::{debug, error, info, warn}; use raw_sync::events::{Event, EventInit, EventState}; use rpassword::read_password; -use tokio::{sync::RwLock, task::JoinSet}; +use tokio::{ + signal::unix::{signal, SignalKind}, + sync::RwLock, + task::JoinSet, +}; use super::cli::set_sigint_handler; @@ -65,7 +69,9 @@ fn unblock_parent_process() -> anyhow::Result<()> { /// Starts daemon process pub(super) async fn start(opts: CliOpts, config: Config) -> anyhow::Result { - let mut ctrlc_oneshot = set_sigint_handler(); + set_sigint_handler(); + let (shutdown_send, mut shutdown_recv) = tokio::sync::mpsc::channel(1); + let mut terminate = signal(SignalKind::terminate())?; info!( "Starting Forest daemon, version {}", @@ -291,6 +297,7 @@ pub(super) async fn start(opts: CliOpts, config: Config) -> anyhow::Result { }), rpc_listen, FOREST_VERSION_STRING.as_str(), + shutdown_send, ) .await .map_err(|err| anyhow::anyhow!("{:?}", serde_json::to_string(&err))) @@ -314,10 +321,17 @@ pub(super) async fn start(opts: CliOpts, config: Config) -> anyhow::Result { let config = maybe_fetch_snapshot(should_fetch_snapshot, config).await?; - select! { + tokio::select! { () = sync_from_snapshot(&config, &state_manager).fuse() => {}, - _ = ctrlc_oneshot => { - // Cancel all async services + _ = tokio::signal::ctrl_c() => { + services.shutdown().await; + return Ok(db); + }, + _ = terminate.recv() => { + services.shutdown().await; + return Ok(db); + }, + _ = shutdown_recv.recv() => { services.shutdown().await; return Ok(db); }, @@ -334,12 +348,13 @@ pub(super) async fn start(opts: CliOpts, config: Config) -> anyhow::Result { // blocking until any of the services returns an error, // or CTRL-C is pressed - select! { + tokio::select! { err = propagate_error(&mut services).fuse() => error!("services failure: {}", err), - _ = ctrlc_oneshot => {} + _ = tokio::signal::ctrl_c() => {}, + _ = terminate.recv() => {}, + _ = shutdown_recv.recv() => {}, } - // Cancel all async services services.shutdown().await; Ok(db) @@ -428,7 +443,7 @@ async fn prompt_snapshot_or_die( if should_download { Ok(true) } else { - anyhow::bail!("Forest cannot sync without a snapshot. Download a snapshot from a trusted source and import with --import-snapshot=[file] or --download-snapshot to download one automatically"); + anyhow::bail!("Forest cannot sync without a snapshot. Download a snapshot from a trusted source and import with --import-snapshot=[file] or --auto-download-snapshot to download one automatically"); } } diff --git a/node/rpc-api/src/data_types.rs b/node/rpc-api/src/data_types.rs index 483bffcc032a..98bbb126e7ac 100644 --- a/node/rpc-api/src/data_types.rs +++ b/node/rpc-api/src/data_types.rs @@ -105,7 +105,7 @@ pub struct PeerID { } /// Represents the current version of the API. -#[derive(Serialize)] +#[derive(Serialize, Deserialize)] #[serde(rename_all = "PascalCase")] pub struct APIVersion { pub version: String, @@ -115,7 +115,7 @@ pub struct APIVersion { /// Integer based value on version information. Highest order bits for Major, /// Mid order for Minor and lowest for Patch. -#[derive(Serialize)] +#[derive(Serialize, Deserialize)] pub struct Version(u32); impl Version { diff --git a/node/rpc-api/src/lib.rs b/node/rpc-api/src/lib.rs index d7f43b89dafd..11b6b2af8ef8 100644 --- a/node/rpc-api/src/lib.rs +++ b/node/rpc-api/src/lib.rs @@ -81,6 +81,7 @@ pub static ACCESS_MAP: Lazy> = Lazy::new(|| { // Common API access.insert(common_api::VERSION, Access::Read); + access.insert(common_api::SHUTDOWN, Access::Admin); // Net API access.insert(net_api::NET_ADDRS_LISTEN, Access::Read); @@ -373,6 +374,10 @@ pub mod common_api { pub const VERSION: &str = "Filecoin.Version"; pub type VersionParams = (); pub type VersionResult = APIVersion; + + pub const SHUTDOWN: &str = "Filecoin.Shutdown"; + pub type ShutdownParams = (); + pub type ShutdownResult = (); } /// Net API diff --git a/node/rpc-client/src/common_ops.rs b/node/rpc-client/src/common_ops.rs new file mode 100644 index 000000000000..030ce643e055 --- /dev/null +++ b/node/rpc-client/src/common_ops.rs @@ -0,0 +1,21 @@ +// Copyright 2019-2023 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use forest_rpc_api::common_api::*; +use jsonrpc_v2::Error; + +use crate::call; + +pub async fn version( + params: VersionParams, + auth_token: &Option, +) -> Result { + call(VERSION, params, auth_token).await +} + +pub async fn shutdown( + params: ShutdownParams, + auth_token: &Option, +) -> Result { + call(SHUTDOWN, params, auth_token).await +} diff --git a/node/rpc-client/src/lib.rs b/node/rpc-client/src/lib.rs index 08c88293fe65..5addee1f3a3e 100644 --- a/node/rpc-client/src/lib.rs +++ b/node/rpc-client/src/lib.rs @@ -4,6 +4,7 @@ /// Filecoin RPC client interface methods pub mod auth_ops; pub mod chain_ops; +pub mod common_ops; pub mod mpool_ops; pub mod net_ops; pub mod state_ops; @@ -29,7 +30,8 @@ pub const DEFAULT_URL: &str = "http://127.0.0.1:1234/rpc/v0"; pub const RPC_ENDPOINT: &str = "rpc/v0"; pub use self::{ - auth_ops::*, chain_ops::*, mpool_ops::*, net_ops::*, state_ops::*, sync_ops::*, wallet_ops::*, + auth_ops::*, chain_ops::*, common_ops::*, mpool_ops::*, net_ops::*, state_ops::*, sync_ops::*, + wallet_ops::*, }; pub struct ApiInfo { diff --git a/node/rpc/src/common_api.rs b/node/rpc/src/common_api.rs index 000de7fa1fab..78050e1f20c9 100644 --- a/node/rpc/src/common_api.rs +++ b/node/rpc/src/common_api.rs @@ -8,6 +8,7 @@ use forest_rpc_api::{ }; use jsonrpc_v2::Error as JsonRpcError; use semver::Version as SemVer; +use tokio::sync::mpsc::Sender; pub(crate) async fn version( block_delay: u64, @@ -20,3 +21,11 @@ pub(crate) async fn version( block_delay, }) } + +pub(crate) async fn shutdown(shutdown_send: Sender<()>) -> Result { + // Trigger graceful shutdown + if let Err(err) = shutdown_send.send(()).await { + return Err(JsonRpcError::from(err)); + } + Ok(()) +} diff --git a/node/rpc/src/lib.rs b/node/rpc/src/lib.rs index 8bc9b68f74e0..c092c50aa191 100644 --- a/node/rpc/src/lib.rs +++ b/node/rpc/src/lib.rs @@ -28,16 +28,21 @@ use forest_rpc_api::{ use fvm_ipld_blockstore::Blockstore; use jsonrpc_v2::{Data, Error as JSONRPCError, Server}; use log::info; +use tokio::sync::mpsc::Sender; use crate::{ - beacon_api::beacon_get_entry, common_api::version, rpc_http_handler::rpc_http_handler, - rpc_ws_handler::rpc_ws_handler, state_api::*, + beacon_api::beacon_get_entry, + common_api::{shutdown, version}, + rpc_http_handler::rpc_http_handler, + rpc_ws_handler::rpc_ws_handler, + state_api::*, }; pub async fn start_rpc( state: Arc>, rpc_endpoint: TcpListener, forest_version: &'static str, + shutdown_send: Sender<()>, ) -> Result<(), JSONRPCError> where DB: Blockstore + Store + Clone + Send + Sync + 'static, @@ -116,6 +121,7 @@ where .with_method(GAS_ESTIMATE_MESSAGE_GAS, gas_estimate_message_gas::) // Common API .with_method(VERSION, move || version(block_delay, forest_version)) + .with_method(SHUTDOWN, move || shutdown(shutdown_send.clone())) // Net API .with_method(NET_ADDRS_LISTEN, net_api::net_addrs_listen::) .with_method(NET_PEERS, net_api::net_peers::)