Skip to content

Commit

Permalink
feat(deployer): add idle io-engines
Browse files Browse the repository at this point in the history
These are useful to sort of simulate upgrades.
This is required because there's no easy way of updating a container image tag.
A better alternative to add in the future would be to extend composer to add
containers to existing cluster and extending cluster to allow for this via its
components as well.

Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
tiagolobocastro committed Jun 12, 2024
1 parent 91cd3da commit 366b497
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 15 deletions.
30 changes: 22 additions & 8 deletions deployer/src/infra/io_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,47 @@ use utils::DEFAULT_GRPC_CLIENT_ADDR;
impl ComponentAction for IoEngine {
fn configure(&self, options: &StartOptions, cfg: Builder) -> Result<Builder, Error> {
let mut cfg = cfg;
for i in 0 .. options.io_engines {
for i in 0 .. options.io_engines + options.idle_io_engines {
let io_engine_socket =
format!("{}:10124", cfg.next_ip_for_name(&Self::name(i, options))?);
let name = Self::name(i, options);
let reg_name;
let reg_name = if options.idle_io_engines == 0 {
&name
} else {
reg_name = Self::name(i % (options.idle_io_engines + 1), options);
&reg_name
};
let ptpl_dir = format!("{}/{}", Self::ptpl().1, name);

let bin = utils::DATA_PLANE_BINARY;
let binary = options.io_engine_bin.clone().or_else(|| Self::binary(bin));
let binary = if i < options.io_engines {
options.io_engine_bin.clone()
} else {
options.idle_io_engine_bin.clone()
}
.or_else(|| Self::binary(bin));

let mut spec = if let Some(binary) = binary {
ContainerSpec::from_binary(&name, Binary::from_path(&binary))
.with_bind_binary_dir(true)
} else {
ContainerSpec::from_image(&name, &options.io_engine_image)
let image = if i < options.io_engines {
&options.io_engine_image
} else {
&options.idle_io_engine_image
};
ContainerSpec::from_image(&name, image)
.with_pull_policy(options.image_pull_policy.clone())
}
.with_args(vec!["-N", &name])
.with_args(vec!["-N", reg_name])
.with_args(vec!["-g", &io_engine_socket])
.with_args(vec!["-R", DEFAULT_GRPC_CLIENT_ADDR])
.with_args(vec![
"--api-versions".to_string(),
IoEngineApiVersion::vec_to_str(options.io_engine_api_versions.clone()),
])
.with_args(vec![
"-r",
format!("/host/tmp/{}.sock", Self::name(i, options)).as_str(),
])
.with_args(vec!["-r", format!("/host/tmp/{}.sock", reg_name).as_str()])
.with_args(vec!["--ptpl-dir", &ptpl_dir])
.with_env("MAYASTOR_NVMF_HOSTID", Uuid::new_v4().to_string().as_str())
.with_env("NEXUS_NVMF_RESV_ENABLE", "1")
Expand Down
41 changes: 39 additions & 2 deletions deployer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct ListOptions {
pub no_docker: bool,

/// Format the docker output
#[clap(short, long, conflicts_with = "no-docker")]
#[clap(short, long, conflicts_with = "no_docker")]
pub format: Option<String>,

/// Label for the cluster
Expand Down Expand Up @@ -140,20 +140,42 @@ pub struct StartOptions {
#[clap(long, default_value = "ifnotpresent")]
pub image_pull_policy: composer::ImagePullPolicy,

/// Use `N` io_engine instances
/// Use `N` io_engine instances.
/// Note: the io_engine containers have the host's /tmp directory mapped into the container
/// as /host/tmp. This is useful to create pool's from file images.
#[clap(short, long, default_value = "1")]
pub io_engines: u32,

/// Use `N` idle io_engine instances, to replace `N` `io_engines`, in order!
/// These can be used to replace the `io_engines` which can be useful to simulate upgrades
/// for example.
/// # Warnings:
/// These engines share a lot of configuration with the regular `io_engines` (as intended),
/// and as such please ensure these are not active at the same time as its corresponding
/// `io_engine` container.
#[clap(long, default_value = "0")]
pub idle_io_engines: u32,

/// Use the following docker image for the io_engine instances.
#[clap(long, env = "IO_ENGINE_IMAGE", default_value = utils::io_engine_image())]
pub io_engine_image: String,

/// Use the following docker image for the io_engine instances.
#[clap(long, env = "IDLE_IO_ENGINE_IMAGE", default_value = utils::io_engine_image())]
pub idle_io_engine_image: String,

/// Use the following runnable binary for the io_engine instances.
#[clap(long, env = "IO_ENGINE_BIN", conflicts_with = "io_engine_image")]
pub io_engine_bin: Option<String>,

/// Use the following runnable binary for the io_engine instances.
#[clap(
long,
env = "IDLE_IO_ENGINE_BIN",
conflicts_with = "idle_io_engine_image"
)]
pub idle_io_engine_bin: Option<String>,

/// Add host block devices to the io_engine containers as a docker bind mount
/// A raw block device: --io_engine-devices /dev/sda /dev/sdb
/// An lvm volume group: --io_engine-devices /dev/sdavg
Expand Down Expand Up @@ -448,6 +470,11 @@ impl StartOptions {
self
}
#[must_use]
pub fn with_idle_io_engines(mut self, idle_io_engines: u32) -> Self {
self.idle_io_engines = idle_io_engines;
self
}
#[must_use]
pub fn with_pull_policy(mut self, policy: composer::ImagePullPolicy) -> Self {
self.image_pull_policy = policy;
self
Expand Down Expand Up @@ -487,11 +514,21 @@ impl StartOptions {
self
}
#[must_use]
pub fn with_io_engine_tag(mut self, tag: &str) -> Self {
self.io_engine_image = utils::io_engine_image_tagged(tag.into());
self
}
#[must_use]
pub fn with_io_engine_img(mut self, image: &str) -> Self {
self.io_engine_image = image.to_string();
self
}
#[must_use]
pub fn with_idle_io_engine_bin(mut self, bin: &str) -> Self {
self.idle_io_engine_bin = Some(bin.to_string());
self
}
#[must_use]
pub fn with_io_engine_devices(mut self, devices: Vec<&str>) -> Self {
self.io_engine_devices = devices.into_iter().map(Into::into).collect();
self
Expand Down
48 changes: 47 additions & 1 deletion utils/deployer-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use stor_port::{
definitions::ObjectKey,
registry::{ControlPlaneService, StoreLeaseLockKey},
},
transport::CreatePool,
transport::{CreatePool, Filter, NodeId, NodeStatus, PoolId, PoolStatus},
},
};
use tokio::{net::UnixStream, time::sleep};
Expand Down Expand Up @@ -253,6 +253,52 @@ impl Cluster {
))
}

/// Wait till the node is in the given status.
pub async fn wait_node_status(&self, node_id: NodeId, status: NodeStatus) -> Result<(), ()> {
let timeout = Duration::from_secs(2);
let node_cli = self.grpc_client().node();
let start = std::time::Instant::now();
loop {
let node = node_cli
.get(Filter::Node(node_id.clone()), true, None)
.await
.expect("Cant get node object");
if let Some(node) = node.0.get(0) {
if node.state().map(|n| &n.status) == Some(&status) {
return Ok(());
}
}
if std::time::Instant::now() > (start + timeout) {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
Err(())
}
/// Wait till the node is in the given status.
pub async fn wait_pool_online(&self, pool_id: PoolId) -> Result<(), ()> {
let timeout = Duration::from_secs(2);
let start = std::time::Instant::now();
loop {
let filter = Filter::Pool(pool_id.clone());
if let Ok(pools) = self.grpc_client().pool().get(filter, None).await {
if pools
.into_inner()
.first()
.and_then(|p| p.state().map(|s| s.status == PoolStatus::Online))
== Some(true)
{
return Ok(());
}
}
if std::time::Instant::now() > (start + timeout) {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
Err(())
}

/// return grpc handle to the container
pub async fn grpc_handle(&self, name: &str) -> Result<RpcHandle, String> {
match self.composer.containers().iter().find(|&c| c.0 == name) {
Expand Down
9 changes: 5 additions & 4 deletions utils/utils-lib/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ pub fn fio_spdk_image() -> String {

/// Io-Engine container image used for testing.
pub fn io_engine_image() -> String {
format!(
"{TARGET_REGISTRY}/{PRODUCT_NAME}-io-engine:{}",
target_tag()
)
io_engine_image_tagged(target_tag())
}
/// Io-Engine container image used for testing, but with a custom tag.
pub fn io_engine_image_tagged(tag: String) -> String {
format!("{TARGET_REGISTRY}/{PRODUCT_NAME}-io-engine:{tag}")
}

/// Environment variable that points to an io-engine binary.
Expand Down

0 comments on commit 366b497

Please sign in to comment.