diff --git a/io-engine-tests/src/nexus.rs b/io-engine-tests/src/nexus.rs index 8b1565e1f6..2ae347dba8 100644 --- a/io-engine-tests/src/nexus.rs +++ b/io-engine-tests/src/nexus.rs @@ -17,6 +17,7 @@ use super::{ RebuildHistoryRecord, RebuildHistoryRequest, RemoveChildNexusRequest, + ResizeNexusRequest, ShutdownNexusRequest, }, snapshot::SnapshotInfo, @@ -250,6 +251,19 @@ impl NexusBuilder { .map(|r| r.into_inner().nexus.unwrap()) } + pub async fn resize(&self, req_size: u64) -> Result { + self.rpc() + .lock() + .await + .nexus + .resize_nexus(ResizeNexusRequest { + uuid: self.uuid(), + requested_size: req_size, + }) + .await + .map(|r| r.into_inner().nexus.unwrap()) + } + pub async fn add_child( &self, bdev: &str, diff --git a/io-engine-tests/src/replica.rs b/io-engine-tests/src/replica.rs index ae39ae38e6..837594ec1e 100644 --- a/io-engine-tests/src/replica.rs +++ b/io-engine-tests/src/replica.rs @@ -11,6 +11,7 @@ use io_engine_api::v1::replica::{ DestroyReplicaRequest, ListReplicaOptions, Replica, + ResizeReplicaRequest, ShareReplicaRequest, }; @@ -184,6 +185,22 @@ impl ReplicaBuilder { Ok(r) } + pub async fn resize(&mut self, req_size: u64) -> Result { + let r = self + .rpc() + .lock() + .await + .replica + .resize_replica(ResizeReplicaRequest { + uuid: self.uuid(), + requested_size: req_size, + }) + .await + .map(|r| r.into_inner())?; + self.size = Some(r.size); + Ok(r) + } + pub async fn get_replica(&self) -> Result { let uuid = self.uuid(); list_replicas(self.rpc()) diff --git a/io-engine/src/bdev/nexus/nexus_bdev.rs b/io-engine/src/bdev/nexus/nexus_bdev.rs index e2bb204866..c9b2da5ca7 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev.rs @@ -18,6 +18,7 @@ use std::{ use crossbeam::atomic::AtomicCell; use futures::channel::oneshot; +use nix::errno::Errno; use serde::Serialize; use snafu::ResultExt; use uuid::Uuid; @@ -69,6 +70,7 @@ use crate::{ use crate::core::{BlockDeviceIoStats, CoreError, IoCompletionStatus}; use events_api::event::EventAction; use spdk_rs::{ + libspdk::spdk_bdev_notify_blockcnt_change, BdevIo, BdevOps, ChannelTraverseStatus, @@ -98,6 +100,7 @@ pub enum NexusOperation { ReplicaRemove, ReplicaOnline, ReplicaFault, + NexusResize, NexusSnapshot, } @@ -681,7 +684,10 @@ impl<'n> Nexus<'n> { } /// Configure nexus's block device to match parameters of the child devices. - async fn setup_nexus_bdev(mut self: Pin<&mut Self>) -> Result<(), Error> { + async fn setup_nexus_bdev( + mut self: Pin<&mut Self>, + resizing: bool, + ) -> Result<(), Error> { let name = self.name.clone(); if self.children().is_empty() { @@ -725,7 +731,23 @@ impl<'n> Nexus<'n> { } match partition::calc_data_partition(self.req_size(), nb, bs) { - Some((start, end)) => { + Some((start, end, req_blocks)) => { + // During expansion - if the requested number of blocks + // aren't available on any child device, + // then the operation need to fail in entirety. Hence make + // sure that the `end` block number + // returned is greater than the current end block number. + // XXX: A shrink operation has to be taken care of later, if + // needed. + if resizing && (end <= (start + self.num_blocks())) { + return Err(Error::ChildTooSmall { + child: child.uri().to_owned(), + name, + num_blocks: nb, + block_size: bs, + req_blocks, + }); + } if start_blk == 0 { start_blk = start; end_blk = end; @@ -746,6 +768,7 @@ impl<'n> Nexus<'n> { name, num_blocks: nb, block_size: bs, + req_blocks: self.req_size() / bs, }) } } @@ -754,15 +777,33 @@ impl<'n> Nexus<'n> { unsafe { self.as_mut().set_data_ent_offset(start_blk); self.as_mut().set_block_len(blk_size as u32); - self.as_mut().set_num_blocks(end_blk - start_blk); + let nbdev = self.as_mut().bdev_mut().unsafe_inner_mut_ptr(); + if !resizing { + self.as_mut().set_num_blocks(end_blk - start_blk); + } else { + let rc = spdk_bdev_notify_blockcnt_change( + nbdev, + end_blk - start_blk, + ); + if rc != 0 { + error!( + "{self:?}: failed to notify block cnt change on nexus" + ); + return Err(Error::NexusResize { + source: Errno::from_i32(rc), + name, + }); + } + } } info!( - "{self:?}: nexus device initialized: \ + "{self:?}: nexus device {action}: \ requested={req_blk} blocks ({req} bytes) \ start block={start_blk}, end block={end_blk}, \ block size={blk_size}, \ smallest devices size={min_dev_size} blocks", + action = if resizing { "resized" } else { "initialized" }, req_blk = self.req_size() / blk_size, req = self.req_size(), ); @@ -781,7 +822,7 @@ impl<'n> Nexus<'n> { info!("{:?}: registering nexus bdev...", nex); - nex.as_mut().setup_nexus_bdev().await?; + nex.as_mut().setup_nexus_bdev(false).await?; // Register the bdev with SPDK and set the callbacks for io channel // creation. @@ -893,6 +934,34 @@ impl<'n> Nexus<'n> { } } + /// Resize the nexus as part of volume resize workflow. The underlying + /// replicas are already resized before nexus resize is called. + pub async fn resize( + mut self: Pin<&mut Self>, + resize_to: u64, + ) -> Result<(), Error> { + // XXX: This check is likely relevant for resize as well to + // avoid unforeseen complications. + self.check_nexus_operation(NexusOperation::NexusResize)?; + + let current_size = self.req_size(); + if current_size == resize_to { + return Ok(()); + } + info!( + "Resizing nexus {} from {current_size} to {resize_to}", + self.uuid() + ); + unsafe { self.as_mut().set_req_size(resize_to) }; + let ret = self.as_mut().setup_nexus_bdev(true).await; + if ret.is_err() { + // Reset the req_size back to original in case of failure. + unsafe { self.as_mut().set_req_size(current_size) }; + } + + ret + } + /// Returns a mutable reference to Nexus I/O. fn io_subsystem_mut(self: Pin<&mut Self>) -> &mut NexusIoSubsystem<'n> { unsafe { self.get_unchecked_mut().io_subsystem.as_mut().unwrap() } @@ -1256,6 +1325,11 @@ impl<'n> Nexus<'n> { pub(crate) unsafe fn set_data_ent_offset(self: Pin<&mut Self>, val: u64) { self.get_unchecked_mut().data_ent_offset = val; } + + /// Sets the requested nexus size. + pub(crate) unsafe fn set_req_size(self: Pin<&mut Self>, val: u64) { + self.get_unchecked_mut().req_size = val; + } } impl Drop for Nexus<'_> { diff --git a/io-engine/src/bdev/nexus/nexus_bdev_error.rs b/io-engine/src/bdev/nexus/nexus_bdev_error.rs index 630246cfc3..8c9ea155a4 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev_error.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev_error.rs @@ -67,10 +67,12 @@ pub enum Error { ))] NexusIncomplete { name: String, reason: String }, #[snafu(display( - "Child {} of nexus {} is too small: size = {} x {}", + "Child {} of nexus {} is too small: size = {} x {}, required = {} x {}", child, name, num_blocks, + block_size, + req_blocks, block_size ))] ChildTooSmall { @@ -78,6 +80,7 @@ pub enum Error { name: String, num_blocks: u64, block_size: u64, + req_blocks: u64, }, #[snafu(display("Children of nexus {} have mixed block sizes", name))] MixedBlockSizes { name: String }, @@ -183,6 +186,8 @@ pub enum Error { NexusCreate { name: String, reason: String }, #[snafu(display("Failed to destroy nexus {}", name))] NexusDestroy { name: String }, + #[snafu(display("Failed to resize nexus {}", name))] + NexusResize { source: Errno, name: String }, #[snafu(display( "Child {} of nexus {} is not degraded but {}", child, @@ -281,6 +286,12 @@ impl From for tonic::Status { Error::RebuildJobNotFound { .. } => Status::not_found(e.to_string()), + Error::NexusIncomplete { + .. + } => Status::failed_precondition(e.to_string()), + Error::NexusResize { + .. + } => Status::failed_precondition(e.to_string()), Error::NexusNotFound { .. } => Status::not_found(e.to_string()), diff --git a/io-engine/src/bin/io-engine-client/v1/nexus_cli.rs b/io-engine/src/bin/io-engine-client/v1/nexus_cli.rs index 3468db07bb..3d723c7566 100644 --- a/io-engine/src/bin/io-engine-client/v1/nexus_cli.rs +++ b/io-engine/src/bin/io-engine-client/v1/nexus_cli.rs @@ -196,6 +196,16 @@ pub fn subcommands() -> Command { .help("uuid of nexus"), ); + let resize = Command::new("resize") + .about("Resize nexus") + .arg(Arg::new("uuid").required(true).index(1).help("Nexus uuid")) + .arg( + Arg::new("size") + .required(true) + .index(2) + .help("Requested new size of the nexus"), + ); + Command::new("nexus") .subcommand_required(true) .arg_required_else_help(true) @@ -210,6 +220,7 @@ pub fn subcommands() -> Command { .subcommand(ana_state) .subcommand(list) .subcommand(children) + .subcommand(resize) .subcommand(nexus_child_cli::subcommands()) } @@ -220,6 +231,7 @@ pub async fn handler(ctx: Context, matches: &ArgMatches) -> crate::Result<()> { ("shutdown", args) => nexus_shutdown(ctx, args).await, ("list", args) => nexus_list(ctx, args).await, ("children", args) => nexus_children_2(ctx, args).await, + ("resize", args) => nexus_resize(ctx, args).await, ("publish", args) => nexus_publish(ctx, args).await, ("unpublish", args) => nexus_unpublish(ctx, args).await, ("ana_state", args) => nexus_nvme_ana_state(ctx, args).await, @@ -564,6 +576,54 @@ async fn nexus_children_2( Ok(()) } +async fn nexus_resize( + mut ctx: Context, + matches: &ArgMatches, +) -> crate::Result<()> { + let uuid = matches + .get_one::("uuid") + .ok_or_else(|| ClientError::MissingValue { + field: "uuid".to_string(), + })? + .to_owned(); + + let requested_size = + parse_size(matches.get_one::("size").ok_or_else(|| { + ClientError::MissingValue { + field: "size".to_string(), + } + })?) + .map_err(|s| Status::invalid_argument(format!("Bad size '{s}'"))) + .context(GrpcStatus)?; + + let response = ctx + .v1 + .nexus + .resize_nexus(v1::nexus::ResizeNexusRequest { + uuid: uuid.clone(), + requested_size: requested_size.get_bytes() as u64, + }) + .await + .context(GrpcStatus)?; + + match ctx.output { + OutputFormat::Json => { + println!( + "{}", + serde_json::to_string_pretty(&response.get_ref()) + .unwrap() + .to_colored_json_auto() + .unwrap() + ); + } + OutputFormat::Default => { + println!("{}", &uuid,); + } + }; + + Ok(()) +} + async fn nexus_publish( mut ctx: Context, matches: &ArgMatches, diff --git a/io-engine/src/core/partition.rs b/io-engine/src/core/partition.rs index b5ac46ee6e..98337ca654 100644 --- a/io-engine/src/core/partition.rs +++ b/io-engine/src/core/partition.rs @@ -50,7 +50,7 @@ pub fn calc_data_partition( req_size: u64, num_blocks: u64, block_size: u64, -) -> Option<(u64, u64)> { +) -> Option<(u64, u64, u64)> { // Number of blocks occupied by GPT tables. let gpt_blocks = bytes_to_alinged_blocks(GPT_TABLE_SIZE, block_size); @@ -78,7 +78,7 @@ pub fn calc_data_partition( // Last data block. let data_end = min(data_start + req_blocks - 1, lba_end); - Some((data_start, data_end)) + Some((data_start, data_end, req_blocks)) } /// Converts an offset in bytes into offset in number of aligned blocks for the diff --git a/io-engine/src/grpc/v1/nexus.rs b/io-engine/src/grpc/v1/nexus.rs index fae55a3436..4f0b694d7f 100644 --- a/io-engine/src/grpc/v1/nexus.rs +++ b/io-engine/src/grpc/v1/nexus.rs @@ -484,6 +484,34 @@ impl NexusRpc for NexusService { .await } + #[named] + async fn resize_nexus( + &self, + request: Request, + ) -> GrpcResult { + let ctx = GrpcClientContext::new(&request, function_name!()); + let args = request.into_inner(); + + self.serialized(ctx, args.uuid.clone(), true, async move { + let rx = rpc_submit::<_, _, nexus::Error>(async move { + info!("{args:?}"); + nexus_lookup(&args.uuid)? + .resize(args.requested_size) + .await?; + info!("Nexus {} resized to {}", args.uuid, args.requested_size); + Ok(ResizeNexusResponse { + nexus: Some(nexus_lookup(&args.uuid)?.into_grpc().await), + }) + })?; + + rx.await + .map_err(|_| Status::cancelled("cancelled"))? + .map_err(Status::from) + .map(Response::new) + }) + .await + } + #[named] async fn shutdown_nexus( &self, diff --git a/io-engine/tests/nexus_replica_resize.rs b/io-engine/tests/nexus_replica_resize.rs new file mode 100644 index 0000000000..4bb9350b02 --- /dev/null +++ b/io-engine/tests/nexus_replica_resize.rs @@ -0,0 +1,321 @@ +pub mod common; + +use common::{ + compose::{ + rpc::v1::{ + nexus::{ChildState, NexusState}, + GrpcConnect, + SharedRpcHandle, + }, + Binary, + Builder, + }, + fio::{Fio, FioJob}, + nexus::NexusBuilder, + pool::PoolBuilder, + replica::ReplicaBuilder, +}; +use std::time::Duration; + +use async_trait::async_trait; + +const POOL_SIZE: u64 = 500; // 500MiB +const REPL_SIZE: u64 = 400; // 400MiB +const EXPANDED_SIZE: u64 = 471859200; //450 MiB +const DEFAULT_REPLICA_CNT: usize = 3; + +async fn compose_ms_nodes() -> io_engine_tests::compose::ComposeTest { + common::composer_init(); + + Builder::new() + .name("cargo-test") + .network("10.1.0.0/16") + .unwrap() + .add_container_bin( + "ms_nex_0", + Binary::from_dbg("io-engine").with_args(vec!["-l", "1,2"]), + ) + .add_container_bin( + "ms_rep_1", + Binary::from_dbg("io-engine").with_args(vec!["-l", "3,4"]), + ) + .add_container_bin( + "ms_rep_2", + Binary::from_dbg("io-engine").with_args(vec!["-l", "5,6"]), + ) + .with_clean(true) + .build() + .await + .unwrap() +} + +struct StorConfig { + ms_nex_0: SharedRpcHandle, + ms_rep_1: SharedRpcHandle, + ms_rep_2: SharedRpcHandle, +} + +// Define an enum to represent the functions +enum ResizeTest { + WithoutReplicaResize, + AfterReplicaResize, + WithRebuildingReplica, +} + +// Define a trait for the test functions +#[async_trait(?Send)] +trait ResizeTestTrait { + async fn call( + &self, + nexus: &NexusBuilder, + replicas: Vec<&mut ReplicaBuilder>, + ); +} + +// Implement the trait for the functions +#[async_trait(?Send)] +impl ResizeTestTrait for ResizeTest { + async fn call( + &self, + nexus: &NexusBuilder, + replicas: Vec<&mut ReplicaBuilder>, + ) { + match self { + ResizeTest::WithoutReplicaResize => { + do_resize_without_replica_resize(nexus, replicas).await + } + ResizeTest::AfterReplicaResize => { + do_resize_after_replica_resize(nexus, replicas).await + } + ResizeTest::WithRebuildingReplica => { + do_resize_with_rebuilding_replica(nexus, replicas).await + } + } + } +} + +async fn do_resize_without_replica_resize( + nexus: &NexusBuilder, + replicas: Vec<&mut ReplicaBuilder>, +) { + let _ = nexus + .resize(EXPANDED_SIZE) + .await + .expect_err("Resize of nexus without resizing replicas must fail"); + + // And even if a replica is resized and others are not - then also operation + // must fail. + assert!(replicas.len() == DEFAULT_REPLICA_CNT); + let mut resize_repl = replicas[0].clone(); + let ret = &mut resize_repl.resize(EXPANDED_SIZE).await.unwrap(); + assert!(ret.size >= EXPANDED_SIZE); + let _ = nexus + .resize(EXPANDED_SIZE) + .await + .expect_err("Resize of nexus without resizing ALL replicas must fail"); +} + +async fn do_resize_after_replica_resize( + nexus: &NexusBuilder, + replicas: Vec<&mut ReplicaBuilder>, +) { + for replica in replicas { + let ret = replica.resize(EXPANDED_SIZE).await.unwrap(); + assert!(ret.size >= EXPANDED_SIZE); + } + + let nexus_obj = nexus + .resize(EXPANDED_SIZE) + .await + .expect("Resize of nexus after resizing replicas failed"); + assert!(nexus_obj.size == EXPANDED_SIZE); +} + +async fn do_resize_with_rebuilding_replica( + nexus: &NexusBuilder, + replicas: Vec<&mut ReplicaBuilder>, +) { + assert!(replicas.len() == DEFAULT_REPLICA_CNT); + // Last one is the chosen one! + let rebuild_replica = &replicas[replicas.len() - 1]; + + // Scale down and then scale up to initiate a rebuild. + nexus.remove_child_replica(rebuild_replica).await.unwrap(); + let _ = nexus + .wait_replica_state( + rebuild_replica, + ChildState::Degraded, + None, + Duration::from_secs(1), + ) + .await; + nexus.add_replica(rebuild_replica, false).await.unwrap(); + + // Make sure nexus is Degraded i.e. a rebuild is ongoing before we attempt + // volume resize. + assert_eq!( + nexus.get_nexus().await.unwrap().state, + NexusState::NexusDegraded as i32 + ); + do_resize_after_replica_resize(nexus, replicas).await +} + +/// Creates a nexus of 3 replicas and resize the replicas and nexus bdev while +/// fio is running. +async fn setup_cluster_and_run(cfg: StorConfig, test: ResizeTest) { + let StorConfig { + ms_nex_0, + ms_rep_1, + ms_rep_2, + } = cfg; + + // + let mut pool_0 = PoolBuilder::new(ms_nex_0.clone()) + .with_name("pool0") + .with_new_uuid() + .with_malloc("mem0", POOL_SIZE); + + let mut repl_0 = ReplicaBuilder::new(ms_nex_0.clone()) + .with_pool(&pool_0) + .with_name("r0") + .with_new_uuid() + .with_size_mb(REPL_SIZE); + + pool_0.create().await.unwrap(); + repl_0.create().await.unwrap(); + repl_0.share().await.unwrap(); + + // + let mut pool_1 = PoolBuilder::new(ms_rep_1.clone()) + .with_name("pool1") + .with_new_uuid() + .with_malloc("mem1", POOL_SIZE); + + let mut repl_1 = ReplicaBuilder::new(ms_rep_1.clone()) + .with_pool(&pool_1) + .with_name("r1") + .with_new_uuid() + .with_size_mb(REPL_SIZE); + + pool_1.create().await.unwrap(); + repl_1.create().await.unwrap(); + repl_1.share().await.unwrap(); + + // + let mut pool_2 = PoolBuilder::new(ms_rep_2.clone()) + .with_name("pool2") + .with_new_uuid() + .with_malloc("mem2", POOL_SIZE); + + let mut repl_2 = ReplicaBuilder::new(ms_rep_2.clone()) + .with_pool(&pool_2) + .with_name("r2") + .with_new_uuid() + .with_size_mb(REPL_SIZE); + + pool_2.create().await.unwrap(); + repl_2.create().await.unwrap(); + repl_2.share().await.unwrap(); + + // + let mut nex_0 = NexusBuilder::new(ms_nex_0.clone()) + .with_name("nexus_rsz0") + .with_new_uuid() + .with_size_mb(REPL_SIZE) + .with_replica(&repl_0) + .with_replica(&repl_1) + .with_replica(&repl_2); + + nex_0.create().await.unwrap(); + nex_0.publish().await.unwrap(); + + // Run I/O on the nexus in a thread, and resize the underlying replicas + // and the nexus's size. + let _ = { + let (_cg, path) = nex_0.nvmf_location().open().unwrap(); + + let fio = Fio::new().with_job( + FioJob::new() + .with_name("fio_vol_resize") + .with_filename_path(path) + .with_ioengine("libaio") + .with_iodepth(8) + .with_numjobs(4) + .with_direct(true) + .with_rw("write") + .with_runtime(20), + ); + + tokio::spawn(async { fio.run() }).await.unwrap() + }; + + // Wait a few secs for fio to have started. + // XXX: See if can check process status deterministically. + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + test.call(&nex_0, vec![&mut repl_0, &mut repl_1, &mut repl_2]) + .await; +} + +#[tokio::test] +async fn resize_without_replica_resize() { + let test = compose_ms_nodes().await; + + let conn = GrpcConnect::new(&test); + + let ms_nex_0 = conn.grpc_handle_shared("ms_nex_0").await.unwrap(); + let ms_rep_1 = conn.grpc_handle_shared("ms_rep_1").await.unwrap(); + let ms_rep_2 = conn.grpc_handle_shared("ms_rep_2").await.unwrap(); + + setup_cluster_and_run( + StorConfig { + ms_nex_0, + ms_rep_1, + ms_rep_2, + }, + ResizeTest::WithoutReplicaResize, + ) + .await +} + +#[tokio::test] +async fn resize_after_replica_resize() { + let test = compose_ms_nodes().await; + + let conn = GrpcConnect::new(&test); + + let ms_nex_0 = conn.grpc_handle_shared("ms_nex_0").await.unwrap(); + let ms_rep_1 = conn.grpc_handle_shared("ms_rep_1").await.unwrap(); + let ms_rep_2 = conn.grpc_handle_shared("ms_rep_2").await.unwrap(); + + setup_cluster_and_run( + StorConfig { + ms_nex_0, + ms_rep_1, + ms_rep_2, + }, + ResizeTest::AfterReplicaResize, + ) + .await +} + +#[tokio::test] +async fn resize_with_rebuilding_replica() { + let test = compose_ms_nodes().await; + + let conn = GrpcConnect::new(&test); + + let ms_nex_0 = conn.grpc_handle_shared("ms_nex_0").await.unwrap(); + let ms_rep_1 = conn.grpc_handle_shared("ms_rep_1").await.unwrap(); + let ms_rep_2 = conn.grpc_handle_shared("ms_rep_2").await.unwrap(); + + setup_cluster_and_run( + StorConfig { + ms_nex_0, + ms_rep_1, + ms_rep_2, + }, + ResizeTest::WithRebuildingReplica, + ) + .await +}