Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shutdown tokio runtime gracefully when panic occurs in spawned future #535

Merged
merged 12 commits into from
Dec 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ slog-envlogger = "2.1.0"
slog-term = "2.4.0"
tiny-keccak = "1.0"
tokio = "0.1.11"
tokio-executor = "0.1.5"
tokio-retry = "0.2"
tokio-timer = "0.2.7"
web3 = "0.5.0"
2 changes: 2 additions & 0 deletions graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ extern crate slog_envlogger;
extern crate slog_term;
extern crate tiny_keccak;
pub extern crate tokio;
pub extern crate tokio_executor;
extern crate tokio_retry;
pub extern crate tokio_timer;
pub extern crate web3;

/// Traits and types for all system components.
Expand Down
14 changes: 7 additions & 7 deletions graph/src/util/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ fn run_retry<I, E, F, R>(
condition: RetryIf<I, E>,
log_after: u64,
limit_opt: Option<usize>,
try_it_with_deadline: F,
try_it_with_timeout: F,
) -> impl Future<Item = I, Error = timeout::Error<E>> + Send
where
I: Debug + Send,
Expand All @@ -236,13 +236,13 @@ where

attempt_count += 1;

try_it_with_deadline().then(move |result_with_deadline| {
let is_elapsed = result_with_deadline
try_it_with_timeout().then(move |result_with_timeout| {
let is_elapsed = result_with_timeout
.as_ref()
.err()
.map(|e| e.is_elapsed())
.unwrap_or(false);
let is_timer_err = result_with_deadline
let is_timer_err = result_with_timeout
.as_ref()
.err()
.map(|e| e.is_timer())
Expand All @@ -259,16 +259,16 @@ where
}

// Wrap in Err to force retry
Err(result_with_deadline)
Err(result_with_timeout)
} else if is_timer_err {
// Should never happen
let timer_error = result_with_deadline.unwrap_err().into_timer().unwrap();
let timer_error = result_with_timeout.unwrap_err().into_timer().unwrap();
panic!("tokio timer error: {}", timer_error)
} else {
// Any error must now be an inner error.
// Unwrap the inner error so that the predicate doesn't need to think
// about timeout::Error.
let result = result_with_deadline.map_err(|e| e.into_inner().unwrap());
let result = result_with_timeout.map_err(|e| e.into_inner().unwrap());

// If needs retry
if condition.check(&result) {
Expand Down
27 changes: 24 additions & 3 deletions graph/src/util/log.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use backtrace::Backtrace;
use slog::{crit, o, Drain, FilterLevel, Logger};
use futures::sync::oneshot;
use slog::{crit, info, o, Drain, FilterLevel, Logger};
use slog_async;
use slog_envlogger;
use slog_term;
use std::{env, panic};
use std::sync::Mutex;
use std::time::Duration;
use std::{env, panic, process, thread};

pub fn logger(show_debug: bool) -> Logger {
let decorator = slog_term::TermDecorator::new().build();
Expand Down Expand Up @@ -35,7 +38,8 @@ pub fn guarded_logger() -> (Logger, slog_async::AsyncGuard) {
(Logger::root(drain.fuse(), o!()), guard)
}

pub fn register_panic_hook(panic_logger: Logger) {
pub fn register_panic_hook(panic_logger: Logger, shutdown_sender: oneshot::Sender<()>) {
let shutdown_mutex = Mutex::new(Some(shutdown_sender));
panic::set_hook(Box::new(move |panic_info| {
let panic_payload = panic_info
.payload()
Expand Down Expand Up @@ -69,5 +73,22 @@ pub fn register_panic_hook(panic_logger: Logger) {
);
}
};

// Send a shutdown signal to main which will attempt to cleanly shutdown the runtime
// After sending shutdown, the thread sleeps for 3 seconds then forces the process to
// exit because the shutdown is not always able to cleanly exit all workers
match shutdown_mutex.lock().unwrap().take() {
Some(sender) => sender
.send(())
.map(|_| ())
.map_err(|_| {
crit!(panic_logger, "Failed to send shutdown signal");
()
})
.unwrap_or(()),
None => info!(panic_logger, "Shutdown signal already sent"),
}
thread::sleep(Duration::from_millis(3000));
leoyvens marked this conversation as resolved.
Show resolved Hide resolved
process::exit(1);
}));
}
50 changes: 45 additions & 5 deletions node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@ extern crate ipfs_api;
extern crate url;

use clap::{App, Arg};
use futures::sync::oneshot;
use ipfs_api::IpfsClient;
use itertools::FoldWhile::{Continue, Done};
use itertools::Itertools;
use std::env;
use std::net::ToSocketAddrs;
use std::sync::Arc;
use std::time::Duration;

use graph::components::forward;
use graph::prelude::{JsonRpcServer as JsonRpcServerTrait, *};
use graph::tokio_executor;
use graph::tokio_timer;
use graph::tokio_timer::timer::Timer;
use graph::util::log::{guarded_logger, logger, register_panic_hook};
use graph_core::{
ElasticLoggingConfig, SubgraphInstanceManager, SubgraphProvider as IpfsSubgraphProvider,
Expand All @@ -39,14 +44,50 @@ use graph_server_websocket::SubscriptionServer as GraphQLSubscriptionServer;
use graph_store_postgres::{Store as DieselStore, StoreConfig};

fn main() {
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
// Register guarded panic logger which ensures logs flush on shutdown
let (panic_logger, _panic_guard) = guarded_logger();
register_panic_hook(panic_logger);
tokio::run(future::lazy(async_main))
register_panic_hook(panic_logger, shutdown_sender);

// Create components for tokio context: multi-threaded runtime,
// executor context on the runtime, and Timer handle.
let runtime = tokio::runtime::Runtime::new().expect("Failed to create runtime");
let mut executor = runtime.executor();
let mut enter = tokio_executor::enter()
.expect("Failed to enter runtime executor, multiple executors at once");
let timer = Timer::default();
let timer_handle = timer.handle();

// Shutdown the runtime after a panic
std::thread::spawn(|| {
let shutdown_logger = logger(false);
shutdown_receiver
.wait()
.map(|_| {
let _ = runtime
.shutdown_now()
.wait()
.expect("Failed to shutdown Tokio Runtime");
leoyvens marked this conversation as resolved.
Show resolved Hide resolved
info!(
shutdown_logger,
"Runtime cleaned up and shutdown successfully"
);
})
.expect("Runtime shutdown process did not finish");
});

// Setup runtime context with defaults and run the main application
tokio_executor::with_default(&mut executor, &mut enter, |enter| {
tokio_timer::with_default(&timer_handle, enter, |enter| {
enter
.block_on(future::lazy(|| async_main()))
.expect("Failed to run main function");
})
});
}

fn async_main() -> impl Future<Item = (), Error = ()> + Send + 'static {
env_logger::init();

// Setup CLI using Clap, provide general info and capture postgres url
let matches = App::new("graph-node")
.version("0.1.0")
Expand Down Expand Up @@ -235,7 +276,6 @@ fn async_main() -> impl Future<Item = (), Error = ()> + Send + 'static {
},
));
sentry::integrations::panic::register_panic_handler();

info!(logger, "Starting up");

// Try to create an IPFS client for one of the resolved IPFS addresses
Expand Down Expand Up @@ -467,7 +507,7 @@ fn async_main() -> impl Future<Item = (), Error = ()> + Send + 'static {
.expect("Failed to start GraphQL subscription server"),
);

future::ok(())
future::empty()
}

/// Parses an Ethereum connection string and returns the network name and Ethereum node.
Expand Down