Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 763787c

Browse files
rphmeierarkpar
authored andcommitted
handle exit and avoid threads hanging (#137)
* barrier on starting network * handle exit better * give consensus service its own internal exit signal * update comment * remove stop_notifications and fix build
1 parent 98d2777 commit 763787c

File tree

8 files changed

+128
-62
lines changed

8 files changed

+128
-62
lines changed

Cargo.lock

+12
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

polkadot/cli/src/lib.rs

+9-5
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,15 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
6868
T: Into<std::ffi::OsString> + Clone,
6969
{
7070
let mut core = reactor::Core::new().expect("tokio::Core could not be created");
71+
let exit = {
72+
// can't use signal directly here because CtrlC takes only `Fn`.
73+
let (exit_send, exit) = mpsc::channel(1);
74+
ctrlc::CtrlC::set_handler(move || {
75+
exit_send.clone().send(()).wait().expect("Error sending exit notification");
76+
});
77+
78+
exit
79+
};
7180

7281
let yaml = load_yaml!("./cli.yml");
7382
let matches = match clap::App::from_yaml(yaml).version(crate_version!()).get_matches_from_safe(args) {
@@ -140,11 +149,6 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
140149

141150
informant::start(&service, core.handle());
142151

143-
let (exit_send, exit) = mpsc::channel(1);
144-
ctrlc::CtrlC::set_handler(move || {
145-
exit_send.clone().send(()).wait().expect("Error sending exit notification");
146-
});
147-
148152
let _rpc_servers = {
149153
let http_address = parse_address("127.0.0.1:9933", "rpc-port", &matches)?;
150154
let ws_address = parse_address("127.0.0.1:9944", "ws-port", &matches)?;

polkadot/consensus/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ tokio-core = "0.1.12"
1010
ed25519 = { path = "../../substrate/ed25519" }
1111
error-chain = "0.11"
1212
log = "0.3"
13+
exit-future = "0.1"
1314
polkadot-api = { path = "../api" }
1415
polkadot-collator = { path = "../collator" }
1516
polkadot-primitives = { path = "../primitives" }

polkadot/consensus/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ extern crate substrate_primitives as primitives;
4444
extern crate substrate_runtime_support as runtime_support;
4545
extern crate substrate_network;
4646

47+
extern crate exit_future;
4748
extern crate tokio_core;
4849
extern crate substrate_keyring;
4950
extern crate substrate_client as client;

polkadot/consensus/src/service.rs

+58-33
Original file line numberDiff line numberDiff line change
@@ -218,11 +218,6 @@ impl<E> Sink for BftSink<E> {
218218
}
219219
}
220220

221-
/// Consensus service. Starts working when created.
222-
pub struct Service {
223-
thread: Option<thread::JoinHandle<()>>,
224-
}
225-
226221
struct Network(Arc<net::ConsensusService>);
227222

228223
fn start_bft<F, C>(
@@ -259,16 +254,24 @@ fn start_bft<F, C>(
259254
}
260255
}
261256

257+
/// Consensus service. Starts working when created.
258+
pub struct Service {
259+
thread: Option<thread::JoinHandle<()>>,
260+
exit_signal: Option<::exit_future::Signal>,
261+
}
262+
262263
impl Service {
263264
/// Create and start a new instance.
264265
pub fn new<C>(
265266
client: Arc<C>,
266267
network: Arc<net::ConsensusService>,
267268
transaction_pool: Arc<Mutex<TransactionPool>>,
268-
key: ed25519::Pair
269+
key: ed25519::Pair,
269270
) -> Service
270-
where C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static
271+
where
272+
C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static,
271273
{
274+
let (signal, exit) = ::exit_future::signal();
272275
let thread = thread::spawn(move || {
273276
let mut core = reactor::Core::new().expect("tokio::Core could not be created");
274277
let key = Arc::new(key);
@@ -281,52 +284,74 @@ impl Service {
281284
let messages = SharedMessageCollection::new();
282285
let bft_service = Arc::new(BftService::new(client.clone(), key, factory));
283286

284-
let handle = core.handle();
285-
let notifications = client.import_notification_stream().for_each(|notification| {
286-
if notification.is_new_best {
287-
start_bft(&notification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone());
288-
}
289-
Ok(())
290-
});
287+
let notifications = {
288+
let handle = core.handle();
289+
let network = network.clone();
290+
let client = client.clone();
291+
let bft_service = bft_service.clone();
292+
let messages = messages.clone();
291293

292-
let interval = reactor::Interval::new_at(Instant::now() + Duration::from_millis(TIMER_DELAY_MS), Duration::from_millis(TIMER_INTERVAL_MS), &handle).unwrap();
294+
client.import_notification_stream().for_each(move |notification| {
295+
if notification.is_new_best {
296+
start_bft(&notification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone());
297+
}
298+
Ok(())
299+
})
300+
};
301+
302+
let interval = reactor::Interval::new_at(
303+
Instant::now() + Duration::from_millis(TIMER_DELAY_MS),
304+
Duration::from_millis(TIMER_INTERVAL_MS),
305+
&core.handle(),
306+
).expect("it is always possible to create an interval with valid params");
293307
let mut prev_best = match client.best_block_header() {
294308
Ok(header) => header.blake2_256(),
295309
Err(e) => {
296310
warn!("Cant's start consensus service. Error reading best block header: {:?}", e);
297311
return;
298312
}
299313
};
300-
let c = client.clone();
301-
let s = bft_service.clone();
302-
let n = network.clone();
303-
let m = messages.clone();
304-
let handle = core.handle();
305-
let timed = interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
306-
if let Ok(best_block) = c.best_block_header() {
307-
let hash = best_block.blake2_256();
308-
m.collect_garbage();
309-
if hash == prev_best {
310-
debug!("Starting consensus round after a timeout");
311-
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone());
314+
315+
let timed = {
316+
let c = client.clone();
317+
let s = bft_service.clone();
318+
let n = network.clone();
319+
let m = messages.clone();
320+
let handle = core.handle();
321+
322+
interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
323+
if let Ok(best_block) = c.best_block_header() {
324+
let hash = best_block.blake2_256();
325+
m.collect_garbage();
326+
if hash == prev_best {
327+
debug!("Starting consensus round after a timeout");
328+
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone());
329+
}
330+
prev_best = hash;
312331
}
313-
prev_best = hash;
314-
}
315-
Ok(())
316-
});
332+
Ok(())
333+
})
334+
};
335+
336+
core.handle().spawn(notifications);
317337
core.handle().spawn(timed);
318-
if let Err(e) = core.run(notifications) {
338+
if let Err(e) = core.run(exit) {
319339
debug!("BFT event loop error {:?}", e);
320340
}
321341
});
322342
Service {
323-
thread: Some(thread)
343+
thread: Some(thread),
344+
exit_signal: Some(signal),
324345
}
325346
}
326347
}
327348

328349
impl Drop for Service {
329350
fn drop(&mut self) {
351+
if let Some(signal) = self.exit_signal.take() {
352+
signal.fire();
353+
}
354+
330355
if let Some(thread) = self.thread.take() {
331356
thread.join().expect("The service thread has panicked");
332357
}

polkadot/service/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ tokio-timer = "0.1.2"
1111
error-chain = "0.11"
1212
log = "0.3"
1313
tokio-core = "0.1.12"
14+
exit-future = "0.1"
1415
ed25519 = { path = "../../substrate/ed25519" }
1516
polkadot-primitives = { path = "../primitives" }
1617
polkadot-runtime = { path = "../runtime" }

polkadot/service/src/lib.rs

+46-19
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,15 @@ extern crate polkadot_api;
2828
extern crate polkadot_consensus as consensus;
2929
extern crate polkadot_transaction_pool as transaction_pool;
3030
extern crate polkadot_keystore as keystore;
31+
extern crate substrate_client as client;
3132
extern crate substrate_runtime_io as runtime_io;
3233
extern crate substrate_primitives as primitives;
3334
extern crate substrate_network as network;
3435
extern crate substrate_codec as codec;
3536
extern crate substrate_executor;
3637

38+
extern crate exit_future;
3739
extern crate tokio_core;
38-
extern crate substrate_client as client;
3940

4041
#[macro_use]
4142
extern crate error_chain;
@@ -65,6 +66,7 @@ use polkadot_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyC
6566
use client::{genesis, BlockchainEvents};
6667
use client::in_mem::Backend as InMemory;
6768
use network::ManageNetwork;
69+
use exit_future::Signal;
6870

6971
pub use self::error::{ErrorKind, Error};
7072
pub use config::{Configuration, Role, ChainSpec};
@@ -77,6 +79,7 @@ pub struct Service {
7779
client: Arc<Client>,
7880
network: Arc<network::Service>,
7981
transaction_pool: Arc<Mutex<TransactionPool>>,
82+
signal: Option<Signal>,
8083
_consensus: Option<consensus::Service>,
8184
}
8285

@@ -242,6 +245,10 @@ fn local_testnet_config() -> ChainConfig {
242245
impl Service {
243246
/// Creates and register protocol with the network service
244247
pub fn new(mut config: Configuration) -> Result<Service, error::Error> {
248+
use std::sync::Barrier;
249+
250+
let (signal, exit) = ::exit_future::signal();
251+
245252
// Create client
246253
let executor = polkadot_executor::Executor::new();
247254
let mut storage = Default::default();
@@ -284,7 +291,39 @@ impl Service {
284291
chain: client.clone(),
285292
transaction_pool: transaction_pool_adapter,
286293
};
294+
287295
let network = network::Service::new(network_params)?;
296+
let barrier = ::std::sync::Arc::new(Barrier::new(2));
297+
298+
let thread = {
299+
let client = client.clone();
300+
let network = network.clone();
301+
let txpool = transaction_pool.clone();
302+
303+
let thread_barrier = barrier.clone();
304+
thread::spawn(move || {
305+
network.start_network();
306+
307+
thread_barrier.wait();
308+
let mut core = Core::new().expect("tokio::Core could not be created");
309+
let events = client.import_notification_stream().for_each(move |notification| {
310+
network.on_block_imported(notification.hash, &notification.header);
311+
prune_imported(&*client, &*txpool, notification.hash);
312+
313+
Ok(())
314+
});
315+
316+
core.handle().spawn(events);
317+
if let Err(e) = core.run(exit) {
318+
debug!("Polkadot service event loop shutdown with {:?}", e);
319+
}
320+
debug!("Polkadot service shutdown");
321+
})
322+
};
323+
324+
// wait for the network to start up before starting the consensus
325+
// service.
326+
barrier.wait();
288327

289328
// Spin consensus service if configured
290329
let consensus_service = if config.roles & Role::VALIDATOR == Role::VALIDATOR {
@@ -296,28 +335,12 @@ impl Service {
296335
None
297336
};
298337

299-
let thread_client = client.clone();
300-
let thread_network = network.clone();
301-
let thread_txpool = transaction_pool.clone();
302-
let thread = thread::spawn(move || {
303-
thread_network.start_network();
304-
let mut core = Core::new().expect("tokio::Core could not be created");
305-
let events = thread_client.import_notification_stream().for_each(|notification| {
306-
thread_network.on_block_imported(notification.hash, &notification.header);
307-
prune_imported(&*thread_client, &*thread_txpool, notification.hash);
308-
309-
Ok(())
310-
});
311-
if let Err(e) = core.run(events) {
312-
debug!("Polkadot service event loop shutdown with {:?}", e);
313-
}
314-
debug!("Polkadot service shutdown");
315-
});
316338
Ok(Service {
317339
thread: Some(thread),
318340
client: client,
319341
network: network,
320342
transaction_pool: transaction_pool,
343+
signal: Some(signal),
321344
_consensus: consensus_service,
322345
})
323346
}
@@ -357,8 +380,12 @@ pub fn prune_imported(client: &Client, pool: &Mutex<TransactionPool>, hash: Head
357380

358381
impl Drop for Service {
359382
fn drop(&mut self) {
360-
self.client.stop_notifications();
361383
self.network.stop_network();
384+
385+
if let Some(signal) = self.signal.take() {
386+
signal.fire();
387+
}
388+
362389
if let Some(thread) = self.thread.take() {
363390
thread.join().expect("The service thread has panicked");
364391
}

substrate/client/src/client.rs

-5
Original file line numberDiff line numberDiff line change
@@ -212,11 +212,6 @@ impl<B, E> Client<B, E> where
212212
self.executor.clone()
213213
}
214214

215-
/// Close notification streams.
216-
pub fn stop_notifications(&self) {
217-
self.import_notification_sinks.lock().clear();
218-
}
219-
220215
/// Get the current set of authorities from storage.
221216
pub fn authorities_at(&self, id: &BlockId) -> error::Result<Vec<AuthorityId>> {
222217
let state = self.state_at(id)?;

0 commit comments

Comments
 (0)