Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Collator node workflow #280

Merged
merged 11 commits into from
Jul 6, 2018
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
error-chain = "0.12"
polkadot-cli = { path = "polkadot/cli" }
futures = "0.1"
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }

[workspace]
members = [
Expand Down
1 change: 0 additions & 1 deletion polkadot/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ ed25519 = { path = "../../substrate/ed25519" }
app_dirs = "1.2"
tokio-core = "0.1.12"
futures = "0.1.17"
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }
fdlimit = "0.1"
parking_lot = "0.4"
serde_json = "1.0"
Expand Down
91 changes: 60 additions & 31 deletions polkadot/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ extern crate regex;
extern crate time;
extern crate futures;
extern crate tokio_core;
extern crate ctrlc;
extern crate fdlimit;
extern crate ed25519;
extern crate triehash;
Expand Down Expand Up @@ -65,6 +64,11 @@ mod informant;
mod chain_spec;

pub use chain_spec::ChainSpec;
pub use client::error::Error as ClientError;
pub use client::backend::Backend as ClientBackend;
pub use state_machine::Backend as StateMachineBackend;
pub use polkadot_primitives::Block as PolkadotBlock;
pub use service::{Components as ServiceComponents, Service};

use std::io::{self, Write, Read, stdin, stdout};
use std::fs::File;
Expand All @@ -76,8 +80,7 @@ use codec::Slicable;
use client::BlockOrigin;
use runtime_primitives::generic::SignedBlock;

use futures::sync::mpsc;
use futures::{Sink, Future, Stream};
use futures::prelude::*;
use tokio_core::reactor;
use service::PruningMode;

Expand Down Expand Up @@ -117,6 +120,26 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf {
.unwrap_or_else(default_base_path)
}

/// Additional worker making use of the node, to run asynchronously before shutdown.
///
/// This will be invoked with the service and spawn a future that resolves
/// when complete.
pub trait Worker {
/// A future that resolves when the work is done.
/// This will be run on a tokio runtime.
type Work: Future<Item=(),Error=()>;

/// A future that resolves when the node should exit.
type Exit: Future<Item=(),Error=()> + Send + 'static;

/// Create exit future.
fn exit(&mut self) -> Self::Exit;

/// Do work.
fn work<C: ServiceComponents>(self, service: &Service<C>) -> Self::Work
where ClientError: From<<<<C as ServiceComponents>::Backend as ClientBackend<PolkadotBlock>>::State as StateMachineBackend>::Error>;
}

/// Parse command line arguments and start the node.
///
/// IANA unassigned port ranges that we could use:
Expand All @@ -125,9 +148,10 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf {
/// 9556-9591 Unassigned
/// 9803-9874 Unassigned
/// 9926-9949 Unassigned
pub fn run<I, T>(args: I) -> error::Result<()> where
pub fn run<I, T, W>(args: I, mut worker: W) -> error::Result<()> where
I: IntoIterator<Item = T>,
T: Into<std::ffi::OsString> + Clone,
W: Worker,
{
let yaml = load_yaml!("./cli.yml");
let matches = match clap::App::from_yaml(yaml).version(&(crate_version!().to_owned() + "\n")[..]).get_matches_from_safe(args) {
Expand All @@ -154,11 +178,11 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
}

if let Some(matches) = matches.subcommand_matches("export-blocks") {
return export_blocks(matches);
return export_blocks(matches, worker.exit());
}

if let Some(matches) = matches.subcommand_matches("import-blocks") {
return import_blocks(matches);
return import_blocks(matches, worker.exit());
}

let spec = load_spec(&matches)?;
Expand Down Expand Up @@ -252,8 +276,8 @@ pub fn run<I, T>(args: I) -> error::Result<()> where

let core = reactor::Core::new().expect("tokio::Core could not be created");
match role == service::Role::LIGHT {
true => run_until_exit(core, service::new_light(config)?, &matches, sys_conf),
false => run_until_exit(core, service::new_full(config)?, &matches, sys_conf),
true => run_until_exit(core, service::new_light(config)?, &matches, sys_conf, worker),
false => run_until_exit(core, service::new_full(config)?, &matches, sys_conf, worker),
}
}

Expand All @@ -265,16 +289,19 @@ fn build_spec(matches: &clap::ArgMatches) -> error::Result<()> {
Ok(())
}

fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
fn export_blocks<E>(matches: &clap::ArgMatches, exit: E) -> error::Result<()>
where E: Future<Item=(),Error=()> + Send + 'static
{
let base_path = base_path(matches);
let spec = load_spec(&matches)?;
let mut config = service::Configuration::default_with_spec(spec);
config.database_path = db_path(&base_path).to_string_lossy().into();
info!("DB path: {}", config.database_path);
let client = service::new_client(config)?;
let (exit_send, exit) = std::sync::mpsc::channel();
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).expect("Error sending exit notification");
let (exit_send, exit_recv) = std::sync::mpsc::channel();
::std::thread::spawn(move || {
let _ = exit.wait();
let _ = exit_send.send(());
});
info!("Exporting blocks");
let mut block: u32 = match matches.value_of("from") {
Expand Down Expand Up @@ -303,7 +330,7 @@ fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
}

loop {
if exit.try_recv().is_ok() {
if exit_recv.try_recv().is_ok() {
break;
}
match client.block(&BlockId::number(block as u64))? {
Expand All @@ -327,15 +354,19 @@ fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
Ok(())
}

fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
fn import_blocks<E>(matches: &clap::ArgMatches, exit: E) -> error::Result<()>
where E: Future<Item=(),Error=()> + Send + 'static
{
let spec = load_spec(&matches)?;
let base_path = base_path(matches);
let mut config = service::Configuration::default_with_spec(spec);
config.database_path = db_path(&base_path).to_string_lossy().into();
let client = service::new_client(config)?;
let (exit_send, exit) = std::sync::mpsc::channel();
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).expect("Error sending exit notification");
let (exit_send, exit_recv) = std::sync::mpsc::channel();

::std::thread::spawn(move || {
let _ = exit.wait();
let _ = exit_send.send(());
});

let mut file: Box<Read> = match matches.value_of("INPUT") {
Expand All @@ -347,7 +378,7 @@ fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
let count: u32 = Slicable::decode(&mut file).ok_or("Error reading file")?;
let mut block = 0;
for _ in 0 .. count {
if exit.try_recv().is_ok() {
if exit_recv.try_recv().is_ok() {
break;
}
match SignedBlock::decode(&mut file) {
Expand All @@ -370,21 +401,18 @@ fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
Ok(())
}

fn run_until_exit<C>(mut core: reactor::Core, service: service::Service<C>, matches: &clap::ArgMatches, sys_conf: SystemConfiguration) -> error::Result<()>
fn run_until_exit<C, W>(
mut core: reactor::Core,
service: service::Service<C>,
matches: &clap::ArgMatches,
sys_conf: SystemConfiguration,
mut worker: W,
) -> error::Result<()>
where
C: service::Components,
W: Worker,
client::error::Error: From<<<<C as service::Components>::Backend as client::backend::Backend<Block>>::State as state_machine::Backend>::Error>,
{
let exit = {
// can't use signal directly here because CtrlC takes only `Fn`.
let (exit_send, exit) = mpsc::channel(1);
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).wait().expect("Error sending exit notification");
});

exit
};

informant::start(&service, core.handle());

let _rpc_servers = {
Expand All @@ -407,8 +435,9 @@ fn run_until_exit<C>(mut core: reactor::Core, service: service::Service<C>, matc
)
};

core.run(exit.into_future()).expect("Error running informant event loop");
Ok(())
let exit = worker.exit();
let until_exit = worker.work(&service).select(exit).then(|_| Ok(()));
core.run(until_exit)
}

fn start_server<T, F>(mut address: SocketAddr, start: F) -> Result<T, io::Error> where
Expand Down
7 changes: 6 additions & 1 deletion polkadot/collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@
name = "polkadot-collator"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
description = "Abstract collation logic"
description = "Collator node implementation"

[dependencies]
futures = "0.1.17"
substrate-client = { path = "../../substrate/client" }
substrate-codec = { path = "../../substrate/codec", version = "0.1" }
substrate-primitives = { path = "../../substrate/primitives", version = "0.1" }
polkadot-api = { path = "../api" }
polkadot-runtime = { path = "../runtime", version = "0.1" }
polkadot-parachain = { path = "../parachain", version = "0.1" }
polkadot-primitives = { path = "../primitives", version = "0.1" }
polkadot-cli = { path = "../cli" }
log = "0.4"
ed25519 = { path = "../../substrate/ed25519" }
Loading