Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Allow to respawn node with different set of arguments #190

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#![allow(dead_code, clippy::expect_fun_call)]

pub mod errors;
mod generators;
pub mod generators;
pub mod network;
mod network_helper;
mod network_spec;
Expand Down Expand Up @@ -252,7 +252,7 @@ where
} else {
node.spec.p2p_port.0
},
node.inner.args().as_ref(),
node.inner.args().await.as_ref(),
&node.spec.p2p_cert_hash,
)?,
);
Expand Down
92 changes: 74 additions & 18 deletions crates/orchestrator/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use support::fs::FileSystem;
use self::{node::NetworkNode, parachain::Parachain, relaychain::Relaychain};
use crate::{
generators::chain_spec::ChainSpec,
network_spec::{self, NetworkSpec},
network_spec::{self, node::NodeSpec, NetworkSpec},
shared::{
macros,
types::{ChainDefaultContext, RegisterParachainOptions},
Expand All @@ -31,7 +31,6 @@ pub struct Network<T: FileSystem> {
relay: Relaychain,
initial_spec: NetworkSpec,
parachains: HashMap<u32, Parachain>,
nodes_by_name: HashMap<String, NetworkNode>,
}

impl<T: FileSystem> std::fmt::Debug for Network<T> {
Expand All @@ -41,7 +40,6 @@ impl<T: FileSystem> std::fmt::Debug for Network<T> {
.field("relay", &self.relay)
.field("initial_spec", &self.initial_spec)
.field("parachains", &self.parachains)
.field("nodes_by_name", &self.nodes_by_name)
.finish()
}
}
Expand All @@ -68,7 +66,6 @@ impl<T: FileSystem> Network<T> {
relay,
initial_spec,
parachains: Default::default(),
nodes_by_name: Default::default(),
}
}

Expand Down Expand Up @@ -122,7 +119,7 @@ impl<T: FileSystem> Network<T> {
let name = name.into();
let relaychain = self.relaychain();

if self.nodes_by_name.contains_key(&name) {
if self.nodes_iter().any(|n| n.name == name) {
return Err(anyhow::anyhow!("Name: {} is already used.", name));
}

Expand Down Expand Up @@ -178,10 +175,42 @@ impl<T: FileSystem> Network<T> {
// // tx_helper::validator_actions::register(vec![&node], &running_node.ws_uri, None).await?;
// }

// Add node to the global hash
// Add node to relay
self.add_running_node(node.clone(), None);
// add node to relay
self.relay.nodes.push(node);

Ok(())
}

/// Replace an already existing but now dead node spawning a new node with a given spec
/// (which can be obtained from the old node and modified if needed).
/// The spec should contain a node name already existing in the network, otherwise the call
/// fails.
///
/// TODO: It doesn't currently check if the node in question is actually dead. Trying to
/// replace a running node is a possible UD.
pub async fn replace_node(&mut self, spec: NodeSpec) -> Result<(), anyhow::Error> {
if self.nodes_iter().all(|n| n.name != spec.name) {
return Err(anyhow::anyhow!("Name: {} is not found.", spec.name));
}

let relaychain = self.relaychain();
let base_dir = self.ns.base_dir().to_string_lossy();
let scoped_fs = ScopedFilesystem::new(&self.filesystem, &base_dir);

let ctx = SpawnNodeCtx {
chain_id: &relaychain.chain_id,
parachain_id: None,
chain: &relaychain.chain,
role: ZombieRole::Node,
ns: &self.ns,
scoped_fs: &scoped_fs,
parachain: None,
bootnodes_addr: &vec![],
wait_ready: true,
};

let node = spawner::respawn_node(&spec, &ctx).await?;
self.replace_running_node(node)?;

Ok(())
}
Expand Down Expand Up @@ -294,8 +323,6 @@ impl<T: FileSystem> Network<T> {
network_spec::node::NodeSpec::from_ad_hoc(name.into(), options.into(), &chain_context)?;

let node = spawner::spawn_node(&node_spec, global_files_to_inject, &ctx).await?;
let para = self.parachains.get_mut(&para_id).unwrap();
para.collators.push(node.clone());
self.add_running_node(node, None);

Ok(())
Expand Down Expand Up @@ -487,14 +514,14 @@ impl<T: FileSystem> Network<T> {
// remove_parachain()

pub fn get_node(&self, name: impl Into<String>) -> Result<&NetworkNode, anyhow::Error> {
let name = &name.into();
if let Some(node) = self.nodes_by_name.get(name) {
let name = name.into();
if let Some(node) = self.nodes_iter().find(|&n| n.name == name) {
return Ok(node);
}

let list = self
.nodes_by_name
.keys()
.nodes_iter()
.map(|n| &n.name)
.cloned()
.collect::<Vec<_>>()
.join(", ");
Expand All @@ -504,8 +531,18 @@ impl<T: FileSystem> Network<T> {
))
}

pub fn get_node_mut(
&mut self,
name: impl Into<String>,
) -> Result<&mut NetworkNode, anyhow::Error> {
let name = name.into();
self.nodes_iter_mut()
.find(|n| n.name == name)
.ok_or(anyhow::anyhow!("can't find node with name: {name:?}"))
}

pub fn nodes(&self) -> Vec<&NetworkNode> {
self.nodes_by_name.values().collect::<Vec<&NetworkNode>>()
self.nodes_iter().collect()
}

pub async fn detach(&self) {
Expand All @@ -524,9 +561,12 @@ impl<T: FileSystem> Network<T> {
} else {
self.relay.nodes.push(node.clone());
}
// TODO: we should hold a ref to the node in the vec in the future.
let node_name = node.name.clone();
self.nodes_by_name.insert(node_name, node);
}

pub(crate) fn replace_running_node(&mut self, node: NetworkNode) -> Result<(), anyhow::Error> {
let old_node = self.get_node_mut(&node.name)?;
*old_node = node;
Ok(())
}

pub(crate) fn add_para(&mut self, para: Parachain) {
Expand All @@ -544,4 +584,20 @@ impl<T: FileSystem> Network<T> {
pub(crate) fn parachains(&self) -> Vec<&Parachain> {
self.parachains.values().collect()
}

pub(crate) fn nodes_iter(&self) -> impl Iterator<Item = &NetworkNode> {
self.relay
.nodes
.iter()
.chain(self.parachains.iter().map(|(_, p)| &p.collators).flatten())
}

pub(crate) fn nodes_iter_mut(&mut self) -> impl Iterator<Item = &mut NetworkNode> {
self.relay.nodes.iter_mut().chain(
self.parachains
.iter_mut()
.map(|(_, p)| &mut p.collators)
.flatten(),
)
}
}
5 changes: 5 additions & 0 deletions crates/orchestrator/src/network/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ impl NetworkNode {
Ok(())
}

pub async fn kill(&self) -> Result<(), anyhow::Error> {
self.inner.kill().await?;
Ok(())
}

/// Restart the node using the same `cmd`, `args` and `env` (and same isolated dir)
pub async fn restart(&self, after: Option<Duration>) -> Result<(), anyhow::Error> {
self.inner.restart(after).await?;
Expand Down
4 changes: 2 additions & 2 deletions crates/orchestrator/src/network_spec/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ pub struct NodeSpec {
pub(crate) name: String,

/// Node key, used for compute the p2p identity.
pub(crate) key: String,
pub key: String,

// libp2p local identity
pub(crate) peer_id: String,
pub peer_id: String,

/// Accounts to be injected in the keystore.
pub(crate) accounts: NodeAccounts,
Expand Down
122 changes: 122 additions & 0 deletions crates/orchestrator/src/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,125 @@ where
running_node,
))
}

// TODO: Effectively it's a cut down copy-pasted version of `spawn_node`. Code duplication
// should be addressed.
pub async fn respawn_node<'a, T>(
node: &NodeSpec,
ctx: &SpawnNodeCtx<'a, T>,
) -> Result<NetworkNode, anyhow::Error>
where
T: FileSystem,
{
let base_dir = format!("{}/{}", ctx.ns.base_dir().to_string_lossy(), &node.name);

let (cfg_path, data_path, relay_data_path) = if !ctx.ns.capabilities().prefix_with_full_path {
(
NODE_CONFIG_DIR.into(),
NODE_DATA_DIR.into(),
NODE_RELAY_DATA_DIR.into(),
)
} else {
let cfg_path = format!("{}{NODE_CONFIG_DIR}", &base_dir);
let data_path = format!("{}{NODE_DATA_DIR}", &base_dir);
let relay_data_path = format!("{}{NODE_RELAY_DATA_DIR}", &base_dir);
(cfg_path, data_path, relay_data_path)
};

let gen_opts = generators::GenCmdOptions {
relay_chain_name: ctx.chain,
cfg_path: &cfg_path, // TODO: get from provider/ns
data_path: &data_path, // TODO: get from provider
relay_data_path: &relay_data_path, // TODO: get from provider
use_wrapper: false, // TODO: get from provider
bootnode_addr: ctx.bootnodes_addr.clone(),
// IFF the provider require an image (e.g k8s) we should use the default ports in the cmd.
use_default_ports_in_cmd: ctx.ns.capabilities().use_default_ports_in_cmd,
};

let (program, args) = match ctx.role {
// Collator should be `non-cumulus` one (e.g adder/undying)
ZombieRole::Node | ZombieRole::Collator => {
let maybe_para_id = ctx.parachain.map(|para| para.id);

generators::generate_node_command(node, gen_opts, maybe_para_id)
},
ZombieRole::CumulusCollator => {
let para = ctx.parachain.expect(&format!(
"parachain must be part of the context {THIS_IS_A_BUG}"
));
let full_p2p = generators::generate_node_port(None)?;
generators::generate_node_command_cumulus(node, gen_opts, para.id, full_p2p.0)
},
_ => unreachable!(), /* TODO: do we need those?
* ZombieRole::Bootnode => todo!(),
* ZombieRole::Companion => todo!(), */
};

info!(
"🚀 {}, respawning.... with command: {} {}",
node.name,
program,
args.join(" ")
);

// Drops the port parking listeners before spawn
node.ws_port.drop_listener();
node.p2p_port.drop_listener();
node.rpc_port.drop_listener();
node.prometheus_port.drop_listener();

let running_node = ctx
.ns
.respawn_node(&node.name, args)
.await
.with_context(|| format!("Failed to respawn node: {}", node.name))?;

let mut ip_to_use = LOCALHOST;

let (rpc_port_external, prometheus_port_external);

// Create port-forward iff we are not in CI
if !running_in_ci() {
let ports = futures::future::try_join_all(vec![
running_node.create_port_forward(node.rpc_port.0, RPC_PORT),
running_node.create_port_forward(node.prometheus_port.0, PROMETHEUS_PORT),
])
.await?;

(rpc_port_external, prometheus_port_external) = (
ports[0].unwrap_or(node.rpc_port.0),
ports[1].unwrap_or(node.prometheus_port.0),
);
} else {
// running in ci requrire to use ip and default port
(rpc_port_external, prometheus_port_external) = (RPC_PORT, PROMETHEUS_PORT);
ip_to_use = running_node.ip().await?;
}

let ws_uri = format!("ws://{}:{}", ip_to_use, rpc_port_external);
let prometheus_uri = format!("http://{}:{}/metrics", ip_to_use, prometheus_port_external);
info!("🚀 {}, should be running now", node.name);
info!(
"🚀 {}: direct link https://polkadot.js.org/apps/?rpc={ws_uri}#/explorer",
node.name
);
info!("🚀 {}: metrics link {prometheus_uri}", node.name);
// TODO: the cmd for the logs should live on the node or ns.
if ctx.ns.capabilities().requires_image {
info!(
"📓 logs cmd: kubectl -n {} logs {}",
ctx.ns.name(),
node.name
);
} else {
info!("📓 logs cmd: tail -f {}/{}.log", base_dir, node.name);
}
Ok(NetworkNode::new(
node.name.clone(),
ws_uri,
prometheus_uri,
node.clone(),
running_node,
))
}
8 changes: 8 additions & 0 deletions crates/provider/src/kubernetes/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,14 @@ where
Ok(node)
}

async fn respawn_node(
&self,
_name: &str,
_args: Vec<String>,
) -> Result<DynNode, ProviderError> {
todo!()
}

async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError> {
debug!("options {:#?}", options);

Expand Down
12 changes: 10 additions & 2 deletions crates/provider/src/kubernetes/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,8 @@ where
&self.name
}

fn args(&self) -> Vec<&str> {
self.args.iter().map(|arg| arg.as_str()).collect()
async fn args(&self) -> Vec<String> {
self.args.clone()
}

fn base_dir(&self) -> &PathBuf {
Expand Down Expand Up @@ -656,6 +656,14 @@ where
Ok(())
}

async fn kill(&self) -> Result<(), ProviderError> {
todo!()
}

async fn respawn(&self) -> Result<(), ProviderError> {
todo!()
}

async fn restart(&self, after: Option<Duration>) -> Result<(), ProviderError> {
if let Some(duration) = after {
sleep(duration).await;
Expand Down
Loading
Loading