diff --git a/Cargo.lock b/Cargo.lock index f0cc4f985a..7f8b2b588d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1628,6 +1628,7 @@ dependencies = [ "crc", "crossbeam", "crossbeam-sync", + "derive_builder", "dns-lookup", "env_logger", "etcd-client", diff --git a/io-engine-tests/src/fio.rs b/io-engine-tests/src/fio.rs index 41951b8c06..4a0bda78fe 100644 --- a/io-engine-tests/src/fio.rs +++ b/io-engine-tests/src/fio.rs @@ -1,36 +1,51 @@ use super::file_io::DataSize; -use std::sync::atomic::{AtomicU32, Ordering}; +use nix::errno::Errno; +use std::{ + path::Path, + sync::atomic::{AtomicU32, Ordering}, + time::{Duration, Instant}, +}; + +/// TODO +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum FioJobResult { + NotRun, + Ok, + Error(Errno), +} /// TODO #[derive(Debug, Clone)] #[allow(dead_code)] pub struct FioJob { /// Job counter. - counter: u32, + pub counter: u32, /// Job name. - name: String, + pub name: String, /// I/O engine to use. Default: spdk. - ioengine: String, + pub ioengine: String, /// Filename. - filename: String, + pub filename: String, /// Type of I/O pattern. - rw: String, + pub rw: String, /// If true, use non-buffered I/O (usually O_DIRECT). Default: true. - direct: bool, + pub direct: bool, /// Block size for I/O units. Default: 4k. - blocksize: Option, + pub blocksize: Option, /// Offset in the file to start I/O. Data before the offset will not be /// touched. - offset: Option, + pub offset: Option, /// Number of I/O units to keep in flight against the file. - iodepth: Option, + pub iodepth: Option, /// Number of clones (processes/threads performing the same workload) of /// this job. Default: 1. - numjobs: u32, + pub numjobs: u32, /// Terminate processing after the specified number of seconds. - runtime: Option, + pub runtime: Option, /// Total size of I/O for this job. - size: Option, + pub size: Option, + /// Run result. + pub result: FioJobResult, } impl Default for FioJob { @@ -58,6 +73,7 @@ impl FioJob { numjobs: 1, runtime: None, size: None, + result: FioJobResult::NotRun, } } @@ -100,6 +116,12 @@ impl FioJob { r } + /// Sets job name. + pub fn with_name(mut self, v: &str) -> Self { + self.name = v.to_string(); + self + } + /// I/O engine to use. Default: spdk. pub fn with_ioengine(mut self, v: &str) -> Self { self.ioengine = v.to_string(); @@ -112,6 +134,12 @@ impl FioJob { self } + /// Filename. + pub fn with_filename_path(mut self, v: impl AsRef) -> Self { + self.filename = v.as_ref().to_str().unwrap().to_string(); + self + } + /// Read-write FIO mode. pub fn with_rw(mut self, rw: &str) -> Self { self.rw = rw.to_string(); @@ -170,6 +198,10 @@ pub struct Fio { pub jobs: Vec, pub verbose: bool, pub verbose_err: bool, + pub script: String, + pub total_time: Duration, + pub exit: i32, + pub err_messages: Vec, } impl Fio { @@ -197,7 +229,7 @@ impl Fio { self } - pub fn run(&self) -> std::io::Result<()> { + pub fn run(mut self) -> Self { let cmd = "sudo -E LD_PRELOAD=$FIO_SPDK fio"; let args = self @@ -207,49 +239,110 @@ impl Fio { .collect::>() .join(" "); - let script = format!("{cmd} {args}"); + self.script = format!("{cmd} --output-format=json {args}"); - if self.verbose { - println!("{script}"); + if self.verbose || self.verbose_err { + println!("{}", self.script); } + let start_time = Instant::now(); let (exit, stdout, stderr) = run_script::run( - &script, + &self.script, &Vec::new(), &run_script::ScriptOptions::new(), ) .unwrap(); - if exit == 0 { - if self.verbose { - println!("FIO:"); - println!("{script}"); - println!("Output:"); - println!("{stdout}"); - } - - Ok(()) - } else { - if self.verbose_err { - println!("Error running FIO:"); - println!("{script}"); - println!("Exit code: {exit}"); - println!("Output:"); - println!("{stdout}"); - println!("Error output:"); - println!("{stderr}"); - } - - Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!("SPDK FIO error: {stderr}"), - )) + self.total_time = start_time.elapsed(); + self.push_err(&stderr); + self.exit = exit; + + if let Err(e) = self.update_result(&stdout) { + self.push_err(&e); + } + + if self.verbose_err { + println!( + "Error(s) running FIO: {s}", + s = self.err_messages.join("\n") + ); + } + + self + } + + /// TODO + fn push_err(&mut self, msg: &str) { + let s = msg.trim_end_matches('\n'); + if !s.is_empty() { + self.err_messages.push(s.to_string()); } } + + /// TODO + fn update_result(&mut self, out: &str) -> Result<(), String> { + // Filter out lines error messages, those starting with "fio: ". + let out = out + .split('\n') + .filter(|s| !s.starts_with("fio: ")) + .collect::>() + .join("\n"); + + serde_json::from_str::(&out) + .map_err(|e| e.to_string())? + .get("jobs") + .ok_or_else(|| format!("No 'jobs' item in output"))? + .as_array() + .ok_or_else(|| format!("'jobs' item in output is not an array"))? + .iter() + .for_each(|j| { + let name = + j.get("jobname").unwrap().as_str().unwrap().to_string(); + let err = j.get("error").unwrap().as_i64().unwrap() as i32; + + if let Some(j) = self.find_job_mut(&name) { + if err == 0 { + j.result = FioJobResult::Ok; + } else { + j.result = FioJobResult::Error(Errno::from_i32(err)); + } + } + }); + + Ok(()) + } + + /// TODO + pub fn find_job(&self, name: &str) -> Option<&FioJob> { + self.jobs.iter().find(|j| j.name == name) + } + + /// TODO + pub fn find_job_mut(&mut self, name: &str) -> Option<&mut FioJob> { + self.jobs.iter_mut().find(|j| j.name == name) + } } -/// TODO -pub async fn run_fio_jobs(fio: &Fio) -> std::io::Result<()> { - let fio = fio.clone(); - tokio::spawn(async move { fio.run() }).await.unwrap() +/// Spawns a tokio task and runs the given FIO on it. Any FIO error is converted +/// into an `std::io::Result`. +pub async fn spawn_fio_task(fio: &Fio) -> std::io::Result<()> { + let fio = tokio::spawn({ + let fio = fio.clone(); + async move { fio.run() } + }) + .await + .unwrap(); + + if fio.exit == 0 { + Ok(()) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!( + "SPDK FIO error: {exit} {err_msg}", + exit = fio.exit, + err_msg = fio.err_messages.join("\n") + ), + )) + } } diff --git a/io-engine-tests/src/nexus.rs b/io-engine-tests/src/nexus.rs index 5f06d0ea8f..c2bcb2edaf 100644 --- a/io-engine-tests/src/nexus.rs +++ b/io-engine-tests/src/nexus.rs @@ -11,6 +11,8 @@ use super::{ DestroyNexusRequest, ListNexusOptions, Nexus, + NexusNvmePreemption, + NvmeReservation, PublishNexusRequest, RebuildHistoryRecord, RebuildHistoryRequest, @@ -46,6 +48,7 @@ pub struct NexusBuilder { resv_key: u64, preempt_key: u64, resv_type: Option, + preempt_policy: i32, children: Option>, nexus_info_key: Option, serial: Option, @@ -63,6 +66,7 @@ impl NexusBuilder { resv_key: 1, preempt_key: 0, resv_type: None, + preempt_policy: 0, children: None, nexus_info_key: None, serial: None, @@ -120,6 +124,21 @@ impl NexusBuilder { self.with_bdev(&r.bdev()) } + pub fn with_resv_key(mut self, r: u64) -> Self { + self.resv_key = r; + self + } + + pub fn with_resv_type(mut self, r: NvmeReservation) -> Self { + self.resv_type = Some(r as i32); + self + } + + pub fn with_preempt_policy(mut self, r: NexusNvmePreemption) -> Self { + self.preempt_policy = r as i32; + self + } + fn replica_uri(&self, r: &ReplicaBuilder) -> String { if r.rpc() == self.rpc() { r.bdev() @@ -174,7 +193,7 @@ impl NexusBuilder { children: self.children.as_ref().unwrap().clone(), nexus_info_key: self.nexus_info_key.as_ref().unwrap().clone(), resv_type: self.resv_type, - preempt_policy: 0, + preempt_policy: self.preempt_policy, }) .await .map(|r| r.into_inner().nexus.unwrap()) diff --git a/io-engine-tests/src/nvmf.rs b/io-engine-tests/src/nvmf.rs index 074a537207..6e1f90ac6b 100644 --- a/io-engine-tests/src/nvmf.rs +++ b/io-engine-tests/src/nvmf.rs @@ -2,7 +2,7 @@ use std::{net::SocketAddr, path::PathBuf}; use super::{ file_io::{compare_files, test_write_to_file, DataSize}, - fio::{run_fio_jobs, Fio}, + fio::{spawn_fio_task, Fio}, nexus::{make_nexus_nqn, make_nexus_serial}, nvme::{find_mayastor_nvme_device_path, NmveConnectGuard}, }; @@ -89,7 +89,7 @@ pub async fn test_fio_to_nvmf( }) .collect(); - run_fio_jobs(&fio).await + spawn_fio_task(&fio).await } /// TODO @@ -111,5 +111,5 @@ pub async fn test_fio_to_nvmf_aio( }) .collect(); - run_fio_jobs(&fio).await + spawn_fio_task(&fio).await } diff --git a/io-engine-tests/src/pool.rs b/io-engine-tests/src/pool.rs index 3d4e1a4f0d..4d0a0b9e05 100644 --- a/io-engine-tests/src/pool.rs +++ b/io-engine-tests/src/pool.rs @@ -52,6 +52,18 @@ impl PoolBuilder { self.with_bdev(&bdev) } + pub fn with_malloc_blk_size( + self, + bdev_name: &str, + size_mb: u64, + blk_size: u64, + ) -> Self { + let bdev = format!( + "malloc:///{bdev_name}?size_mb={size_mb}&blk_size={blk_size}" + ); + self.with_bdev(&bdev) + } + pub fn rpc(&self) -> SharedRpcHandle { self.rpc.clone() } diff --git a/io-engine/Cargo.toml b/io-engine/Cargo.toml index 9aa2a44cdb..0083f871b7 100644 --- a/io-engine/Cargo.toml +++ b/io-engine/Cargo.toml @@ -56,6 +56,7 @@ colored_json = "3.3.0" crc = "3.0.1" crossbeam = "0.8.2" crossbeam-sync = "0.0.0" +derive_builder = "0.12.0" dns-lookup = "2.0.3" env_logger = "0.10.0" etcd-client = "0.12.1" diff --git a/io-engine/src/bdev/device.rs b/io-engine/src/bdev/device.rs index d217d040ca..45ab56343f 100644 --- a/io-engine/src/bdev/device.rs +++ b/io-engine/src/bdev/device.rs @@ -297,6 +297,7 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle { cb_arg, #[cfg(feature = "fault-injection")] inj_op: InjectIoCtx::with_iovs( + FaultDomain::BlockDevice, self.get_device(), IoType::Read, offset_blocks, @@ -309,9 +310,7 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle { )?; #[cfg(feature = "fault-injection")] - inject_submission_error(FaultDomain::BlockDevice, unsafe { - &(*ctx).inj_op - })?; + inject_submission_error(unsafe { &(*ctx).inj_op })?; let (desc, chan) = self.handle.io_tuple(); let rc = unsafe { @@ -355,6 +354,7 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle { cb_arg, #[cfg(feature = "fault-injection")] inj_op: InjectIoCtx::with_iovs( + FaultDomain::BlockDevice, self.get_device(), IoType::Write, offset_blocks, @@ -367,9 +367,7 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle { )?; #[cfg(feature = "fault-injection")] - inject_submission_error(FaultDomain::BlockDevice, unsafe { - &(*ctx).inj_op - })?; + inject_submission_error(unsafe { &(*ctx).inj_op })?; let (desc, chan) = self.handle.io_tuple(); let rc = unsafe { @@ -411,7 +409,7 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle { cb, cb_arg, #[cfg(feature = "fault-injection")] - inj_op: Default::default(), + inj_op: InjectIoCtx::new(FaultDomain::BlockDevice), }, offset_blocks, num_blocks, @@ -454,7 +452,7 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle { cb, cb_arg, #[cfg(feature = "fault-injection")] - inj_op: InjectIoCtx::default(), + inj_op: InjectIoCtx::new(FaultDomain::BlockDevice), }, 0, 0, @@ -493,7 +491,7 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle { cb, cb_arg, #[cfg(feature = "fault-injection")] - inj_op: InjectIoCtx::default(), + inj_op: InjectIoCtx::new(FaultDomain::BlockDevice), }, offset_blocks, num_blocks, @@ -536,7 +534,7 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle { cb, cb_arg, #[cfg(feature = "fault-injection")] - inj_op: InjectIoCtx::default(), + inj_op: InjectIoCtx::new(FaultDomain::BlockDevice), }, offset_blocks, num_blocks, @@ -634,7 +632,7 @@ impl BlockDeviceHandle for SpdkBlockDeviceHandle { cb, cb_arg, #[cfg(feature = "fault-injection")] - inj_op: InjectIoCtx::default(), + inj_op: InjectIoCtx::new(FaultDomain::BlockDevice), }, 0, 0, @@ -757,8 +755,7 @@ extern "C" fn bdev_io_completion( }; #[cfg(feature = "fault-injection")] - let status = - inject_completion_error(FaultDomain::BlockDevice, &bio.inj_op, status); + let status = inject_completion_error(&bio.inj_op, status); (bio.cb)(&bio.device, status, bio.cb_arg); diff --git a/io-engine/src/bdev/nexus/nexus_bdev.rs b/io-engine/src/bdev/nexus/nexus_bdev.rs index 7f92139543..e6a060f70f 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev.rs @@ -61,6 +61,7 @@ use crate::{ subsys::NvmfSubsystem, }; +use crate::core::IoCompletionStatus; use events_api::event::EventAction; use spdk_rs::{ BdevIo, @@ -261,6 +262,8 @@ pub struct Nexus<'n> { pub(super) nexus_target: Option, /// Indicates if the Nexus has an I/O device. pub(super) has_io_device: bool, + /// Initiators. + initiators: parking_lot::Mutex>, /// Information associated with the persisted NexusInfo structure. pub(super) nexus_info: futures::lock::Mutex, /// Nexus I/O subsystem. @@ -271,10 +274,10 @@ pub struct Nexus<'n> { pub(super) rebuild_history: parking_lot::Mutex>, /// Flag to control shutdown from I/O path. pub(crate) shutdown_requested: AtomicCell, + /// Last child I/O error. + pub(super) last_error: IoCompletionStatus, /// Prevent auto-Unpin. _pin: PhantomPinned, - /// Initiators. - initiators: parking_lot::Mutex>, } impl<'n> Debug for Nexus<'n> { @@ -379,6 +382,7 @@ impl<'n> Nexus<'n> { event_sink: None, rebuild_history: parking_lot::Mutex::new(Vec::new()), shutdown_requested: AtomicCell::new(false), + last_error: IoCompletionStatus::Success, _pin: Default::default(), }; diff --git a/io-engine/src/bdev/nexus/nexus_io.rs b/io-engine/src/bdev/nexus/nexus_io.rs index 4ade833f03..8caaa9c9a7 100644 --- a/io-engine/src/bdev/nexus/nexus_io.rs +++ b/io-engine/src/bdev/nexus/nexus_io.rs @@ -1,6 +1,7 @@ use std::{ fmt::{Debug, Formatter}, ops::{Deref, DerefMut}, + pin::Pin, }; use libc::c_void; @@ -9,8 +10,10 @@ use nix::errno::Errno; use spdk_rs::{ libspdk::{ spdk_bdev_io, + spdk_bdev_io_complete_nvme_status, spdk_io_channel, SPDK_NVME_SC_ABORTED_SQ_DELETION, + SPDK_NVME_SC_CAPACITY_EXCEEDED, SPDK_NVME_SC_INVALID_OPCODE, SPDK_NVME_SC_RESERVATION_CONFLICT, }, @@ -207,11 +210,19 @@ impl<'n> NexusBio<'n> { } } - /// Obtains the Nexus struct embedded within the bdev. + /// Obtains a reference to the Nexus struct embedded within the bdev. + #[inline(always)] pub(crate) fn nexus(&self) -> &Nexus<'n> { self.bdev_checked(NEXUS_PRODUCT_ID).data() } + /// Obtains a mutable reference to the Nexus struct embedded within the + /// bdev. + #[inline(always)] + fn nexus_mut(&mut self) -> Pin<&mut Nexus<'n>> { + self.bdev_checked(NEXUS_PRODUCT_ID).data_mut() + } + /// Invoked when a nexus IO completes. fn child_completion( device: &dyn BlockDevice, @@ -234,7 +245,7 @@ impl<'n> NexusBio<'n> { self.driver_ctx_mut::() } - /// completion handler for the nexus when a child IO completes + /// Completion handler for the nexus when a child I/O completes. fn complete( &mut self, child: &dyn BlockDevice, @@ -270,10 +281,37 @@ impl<'n> NexusBio<'n> { self.resubmit(); } else { error!("{self:?}: failing nexus I/O: all child I/Os failed"); + + unsafe { + self.nexus_mut().get_unchecked_mut().last_error = status; + } + self.fail(); } } + /// Fails the current I/O with a generic internal error. If the nexus + /// already had a last child error, it fails with it. + fn fail(&self) { + match self.nexus().last_error { + IoCompletionStatus::NvmeError(s) => self.fail_nvme_status(s), + IoCompletionStatus::LvolError(LvolFailure::NoSpace) => self + .fail_nvme_status(NvmeStatus::Generic( + SPDK_NVME_SC_CAPACITY_EXCEEDED, + )), + _ => self.0.fail(), + } + } + + /// Completes the I/O with the given `NvmeStatus`. + #[inline(always)] + fn fail_nvme_status(&self, status: NvmeStatus) { + let (sct, sc) = status.as_sct_sc_codes(); + unsafe { + spdk_bdev_io_complete_nvme_status(self.as_ptr(), 0, sct, sc); + } + } + /// Resubmits the I/O. fn resubmit(&mut self) { warn!("{self:?}: resubmitting nexus I/O due to a child I/O failure"); @@ -751,20 +789,18 @@ impl<'n> NexusBio<'n> { ) -> Result<(), CoreError> { use crate::core::fault_injection::{ inject_submission_error, - FaultDomain::Nexus, + FaultDomain::NexusChild, InjectIoCtx, }; - inject_submission_error( - Nexus, - &InjectIoCtx::with_iovs( - hdl.get_device(), - self.io_type(), - self.offset(), - self.num_blocks(), - self.iovs(), - ), - ) + inject_submission_error(&InjectIoCtx::with_iovs( + NexusChild, + hdl.get_device(), + self.io_type(), + self.offset(), + self.num_blocks(), + self.iovs(), + )) } /// Checks if an error is to be injected upon completion. @@ -777,13 +813,13 @@ impl<'n> NexusBio<'n> { ) -> IoCompletionStatus { use crate::core::fault_injection::{ inject_completion_error, - FaultDomain::Nexus, + FaultDomain::NexusChild, InjectIoCtx, }; inject_completion_error( - Nexus, &InjectIoCtx::with_iovs( + NexusChild, child, self.io_type(), self.offset(), diff --git a/io-engine/src/bdev/nvmx/handle.rs b/io-engine/src/bdev/nvmx/handle.rs index 72ed477ae7..073196a7ee 100644 --- a/io-engine/src/bdev/nvmx/handle.rs +++ b/io-engine/src/bdev/nvmx/handle.rs @@ -357,11 +357,7 @@ fn complete_nvme_command(ctx: *mut NvmeIoCtx, cpl: *const spdk_nvme_cpl) { }; #[cfg(feature = "fault-injection")] - let status = inject_completion_error( - FaultDomain::BlockDevice, - &io_ctx.inj_op, - status, - ); + let status = inject_completion_error(&io_ctx.inj_op, status); (io_ctx.cb)(&*inner.device, status, io_ctx.cb_arg); @@ -771,6 +767,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { num_blocks, #[cfg(feature = "fault-injection")] inj_op: InjectIoCtx::with_iovs( + FaultDomain::BlockDevice, self.get_device(), IoType::Read, offset_blocks, @@ -783,9 +780,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { )?; #[cfg(feature = "fault-injection")] - inject_submission_error(FaultDomain::BlockDevice, unsafe { - &(*bio).inj_op - })?; + inject_submission_error(unsafe { &(*bio).inj_op })?; let rc = if iovs.len() == 1 { unsafe { @@ -858,6 +853,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { num_blocks, #[cfg(feature = "fault-injection")] inj_op: InjectIoCtx::with_iovs( + FaultDomain::BlockDevice, self.get_device(), IoType::Write, offset_blocks, @@ -870,9 +866,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { )?; #[cfg(feature = "fault-injection")] - inject_submission_error(FaultDomain::BlockDevice, unsafe { - &(*bio).inj_op - })?; + inject_submission_error(unsafe { &(*bio).inj_op })?; let rc = if iovs.len() == 1 { unsafe { @@ -949,7 +943,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { op: IoType::Compare, num_blocks, #[cfg(feature = "fault-injection")] - inj_op: Default::default(), + inj_op: InjectIoCtx::new(FaultDomain::BlockDevice), }, offset_blocks, num_blocks, @@ -1047,7 +1041,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { op: IoType::Flush, num_blocks, #[cfg(feature = "fault-injection")] - inj_op: Default::default(), + inj_op: InjectIoCtx::new(FaultDomain::BlockDevice), }, 0, num_blocks, // Flush all device blocks. @@ -1110,7 +1104,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { op: IoType::Unmap, num_blocks, #[cfg(feature = "fault-injection")] - inj_op: Default::default(), + inj_op: InjectIoCtx::new(FaultDomain::BlockDevice), }, offset_blocks, num_blocks, @@ -1209,7 +1203,7 @@ impl BlockDeviceHandle for NvmeDeviceHandle { op: IoType::WriteZeros, num_blocks, #[cfg(feature = "fault-injection")] - inj_op: Default::default(), + inj_op: InjectIoCtx::new(FaultDomain::BlockDevice), }, offset_blocks, num_blocks, diff --git a/io-engine/src/core/env.rs b/io-engine/src/core/env.rs index 7f13467b28..2f4cbd528c 100644 --- a/io-engine/src/core/env.rs +++ b/io-engine/src/core/env.rs @@ -4,6 +4,7 @@ use std::{ net::Ipv4Addr, os::raw::{c_char, c_void}, pin::Pin, + str::FromStr, sync::{ atomic::{AtomicBool, Ordering::SeqCst}, Arc, @@ -87,6 +88,38 @@ fn parse_ps_timeout(src: &str) -> Result { .map(|d| d.clamp(Duration::from_secs(1), Duration::from_secs(60))) } +/// Parses Command Retry Delay(s): either a single integer or a comma-separated +/// list of three integers. +fn parse_crdt(src: &str) -> Result<[u16; 3], String> { + fn parse_val(s: &str) -> Result { + let u = u16::from_str(s).map_err(|e| e.to_string())?; + if u > 100 { + Err("Command Retry Delay value is too big".to_string()) + } else { + Ok(u) + } + } + + let items = src.split(',').collect::>(); + if items.len() == 1 { + let u = parse_val(src)?; + Ok([u, 0, 0]) + } else if items.len() == 3 { + let mut items = items.into_iter(); + + let mut u = [0; 3]; + for i in &mut u { + *i = parse_val(items.next().unwrap())?; + } + + Ok(u) + } else { + Err("Command Retry Delay argument must be a interger or \ + a comma-separated list of three intergers" + .to_string()) + } +} + #[derive(Debug, Clone, StructOpt)] #[structopt( name = package_description!(), @@ -164,9 +197,15 @@ pub struct MayastorCliArgs { #[structopt(short = "T", long = "tgt-iface", env = "NVMF_TGT_IFACE")] /// NVMF target interface (ip, mac, name or subnet). pub nvmf_tgt_interface: Option, - /// NVMF target Command Retry Delay. - #[structopt(long = "tgt-crdt", env = "NVMF_TGT_CRDT", default_value = "0")] - pub nvmf_tgt_crdt: u16, + /// NVMF target Command Retry Delay in x100 ms (single integer or three + /// comma-separated integers). + #[structopt( + long = "tgt-crdt", + env = "NVMF_TGT_CRDT", + default_value = "0", + parse(try_from_str = parse_crdt), + )] + pub nvmf_tgt_crdt: [u16; 3], /// The gRPC api version. #[structopt( long, @@ -246,7 +285,7 @@ impl Default for MayastorCliArgs { nvme_ctl_io_ctx_pool_size: 65535, registration_endpoint: None, nvmf_tgt_interface: None, - nvmf_tgt_crdt: 0, + nvmf_tgt_crdt: [0; 3], api_versions: vec![ApiVersion::V0, ApiVersion::V1], diagnose_stack: None, reactor_freeze_detection: false, @@ -348,7 +387,7 @@ pub struct MayastorEnvironment { bdev_io_ctx_pool_size: u64, nvme_ctl_io_ctx_pool_size: u64, nvmf_tgt_interface: Option, - pub nvmf_tgt_crdt: u16, + pub nvmf_tgt_crdt: [u16; 3], api_versions: Vec, skip_sig_handler: bool, enable_io_all_thrd_nexus_channels: bool, @@ -395,7 +434,7 @@ impl Default for MayastorEnvironment { bdev_io_ctx_pool_size: 65535, nvme_ctl_io_ctx_pool_size: 65535, nvmf_tgt_interface: None, - nvmf_tgt_crdt: 0, + nvmf_tgt_crdt: [0; 3], api_versions: vec![ApiVersion::V0, ApiVersion::V1], skip_sig_handler: false, enable_io_all_thrd_nexus_channels: false, diff --git a/io-engine/src/core/fault_injection/bdev_io_injection.rs b/io-engine/src/core/fault_injection/bdev_io_injection.rs new file mode 100644 index 0000000000..ab83e86070 --- /dev/null +++ b/io-engine/src/core/fault_injection/bdev_io_injection.rs @@ -0,0 +1,152 @@ +use once_cell::sync::OnceCell; +use parking_lot::{Mutex, MutexGuard}; +use std::collections::HashMap; + +use spdk_rs::{ + libspdk::{ + spdk_bdev_fn_table, + spdk_bdev_io, + spdk_bdev_io_complete_nvme_status, + spdk_io_channel, + }, + BdevIo, + UntypedBdev, +}; + +use crate::core::IoCompletionStatus; + +use super::{ + FaultDomain, + FaultInjectionError, + FaultIoStage, + FaultMethod, + InjectIoCtx, + Injection, +}; + +/// TODO +struct BdevInfo { + fn_table: *const spdk_bdev_fn_table, + fn_table_orig: *const spdk_bdev_fn_table, + inj: Injection, +} + +unsafe impl Send for BdevInfo {} + +/// TODO +type Bdevs = HashMap; + +/// TODO +fn get_bdevs<'a>() -> MutexGuard<'a, Bdevs> { + static INJECTIONS: OnceCell> = OnceCell::new(); + + INJECTIONS.get_or_init(|| Mutex::new(HashMap::new())).lock() +} + +/// TODO +unsafe extern "C" fn inject_submit_request( + chan: *mut spdk_io_channel, + io_ptr: *mut spdk_bdev_io, +) { + let mut g = get_bdevs(); + + let io = BdevIo::<()>::legacy_from_ptr(io_ptr); + let bdev = io.bdev(); + + let hash = (*bdev.unsafe_inner_ptr()).fn_table as usize; + let t = g.get_mut(&hash).expect("Bdev for injection not found"); + assert_eq!(t.fn_table, (*bdev.unsafe_inner_ptr()).fn_table); + + let inj = &mut t.inj; + + let ctx = InjectIoCtx::with_iovs( + FaultDomain::BdevIo, + inj.device_name.as_str(), + io.io_type(), + io.offset(), + io.num_blocks(), + io.iovs(), + ); + + match inj.inject(FaultIoStage::Submission, &ctx) { + Some(s) => { + error!("Injection {inj:?}: failing I/O: {io:?}"); + + match s { + IoCompletionStatus::NvmeError(err) => { + let (sct, sc) = err.as_sct_sc_codes(); + unsafe { + spdk_bdev_io_complete_nvme_status(io_ptr, 0, sct, sc); + } + } + _ => panic!("Non-NVME error is not supported"), + } + } + None => { + ((*t.fn_table_orig).submit_request.unwrap())(chan, io_ptr); + } + } +} + +/// TODO +pub(super) fn add_bdev_io_injection( + inj: &Injection, +) -> Result<(), FaultInjectionError> { + if inj.io_stage != FaultIoStage::Submission { + return Err(FaultInjectionError::InvalidInjection { + name: inj.device_name.clone(), + msg: format!("bdev I/O supports only submission injections"), + }); + } + + if !matches!( + inj.method, + FaultMethod::Status(IoCompletionStatus::NvmeError(_)) + ) { + return Err(FaultInjectionError::InvalidInjection { + name: inj.device_name.clone(), + msg: format!("bdev I/O supports only NVME error injection"), + }); + } + + let Some(mut bdev) = UntypedBdev::lookup_by_name(&inj.device_name) else { + return Err(FaultInjectionError::DeviceNotFound { + name: inj.device_name.clone(), + }); + }; + + let mut g = get_bdevs(); + + // Check for double insertion. + if g.iter().any(|(_, v)| v.inj.device_name == inj.device_name) { + return Err(FaultInjectionError::InvalidInjection { + name: inj.device_name.clone(), + msg: format!( + "bdev I/O does not support multiple injections per bdev" + ), + }); + } + + unsafe { + let fn_table_orig = (*bdev.unsafe_inner_ptr()).fn_table; + + let fn_table = Box::into_raw(Box::new(spdk_bdev_fn_table { + submit_request: Some(inject_submit_request), + ..*fn_table_orig + })); + + let bdev_inj = BdevInfo { + fn_table, + fn_table_orig, + inj: inj.clone(), + }; + + (*bdev.unsafe_inner_mut_ptr()).fn_table = fn_table; + + g.insert(fn_table as usize, bdev_inj); + + info!("Added bdev I/O injection to bdev {bdev:?}: {inj:?}"); + } + + Ok(()) +} diff --git a/io-engine/src/core/fault_injection/fault_method.rs b/io-engine/src/core/fault_injection/fault_method.rs new file mode 100644 index 0000000000..a53719edc5 --- /dev/null +++ b/io-engine/src/core/fault_injection/fault_method.rs @@ -0,0 +1,135 @@ +use rand::RngCore; +use regex::Regex; +use std::fmt::{Debug, Display, Formatter}; + +use spdk_rs::NvmeStatus; + +use crate::core::{IoCompletionStatus, IoSubmissionFailure, LvolFailure}; + +use super::{InjectIoCtx, InjectionState}; + +/// Injection method. +#[derive(Clone, Copy, PartialEq)] +pub enum FaultMethod { + /// Faults I/O by returning the given status for an affected operation. + Status(IoCompletionStatus), + /// Introduces data buffer corruption. + Data, +} + +impl Debug for FaultMethod { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Status(s) => { + write!(f, "Status[{s:?}]") + } + Self::Data => f.write_str("Data"), + } + } +} + +impl Display for FaultMethod { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + use IoCompletionStatus::*; + + match self { + Self::Status(NvmeError(NvmeStatus::DATA_TRANSFER_ERROR)) => { + f.write_str("status") + } + Self::Status(NvmeError(s)) => { + let (sct, sc) = s.as_sct_sc_codes(); + write!(f, "status-nvme-{sct:x}-{sc:x}",) + } + Self::Status(LvolError(s)) => { + write!( + f, + "status-lvol-{s}", + s = format!("{s:?}").to_ascii_lowercase() + ) + } + Self::Status(IoSubmissionError(s)) => { + write!( + f, + "status-submit-{s}", + s = format!("{s:?}").to_ascii_lowercase() + ) + } + Self::Status(AdminCommandError) => { + write!(f, "status-admin") + } + Self::Data => f.write_str("data"), + _ => f.write_str("invalid"), + } + } +} + +impl FaultMethod { + /// A shorthand for a generic data transfer error. + pub const DATA_TRANSFER_ERROR: Self = Self::Status( + IoCompletionStatus::NvmeError(NvmeStatus::DATA_TRANSFER_ERROR), + ); + + /// TODO + pub(super) fn inject( + &self, + state: &mut InjectionState, + ctx: &InjectIoCtx, + ) -> Option { + match self { + FaultMethod::Status(status) => Some(*status), + FaultMethod::Data => { + self.inject_data_errors(state, ctx); + Some(IoCompletionStatus::Success) + } + } + } + + /// TODO + fn inject_data_errors(&self, s: &mut InjectionState, ctx: &InjectIoCtx) { + let Some(iovs) = ctx.iovs_mut() else { + return; + }; + + for iov in iovs { + for i in 0 .. iov.len() { + iov[i] = s.rng.next_u32() as u8; + } + } + } + + /// TODO + pub fn parse(s: &str) -> Option { + lazy_static::lazy_static! { + static ref NVME_RE: Regex = + Regex::new(r"^status-nvme-([0-9a-f.]+)-([0-9a-f.]+)$").unwrap(); + } + + if let Some(cap) = NVME_RE.captures(s) { + let sct = i32::from_str_radix(cap.get(1).unwrap().as_str(), 16); + let sc = i32::from_str_radix(cap.get(2).unwrap().as_str(), 16); + + if let Ok(sct) = sct { + if let Ok(sc) = sc { + return Some(Self::Status(IoCompletionStatus::NvmeError( + NvmeStatus::from((sct, sc)), + ))); + } + } + } + + let r = match s { + "status-lvol-nospace" => { + IoCompletionStatus::LvolError(LvolFailure::NoSpace) + } + "status-submit-read" => { + IoCompletionStatus::IoSubmissionError(IoSubmissionFailure::Read) + } + "status-submit-write" => IoCompletionStatus::IoSubmissionError( + IoSubmissionFailure::Write, + ), + "status-admin" => IoCompletionStatus::AdminCommandError, + _ => return None, + }; + Some(Self::Status(r)) + } +} diff --git a/io-engine/src/core/fault_injection/inject_io_ctx.rs b/io-engine/src/core/fault_injection/inject_io_ctx.rs new file mode 100644 index 0000000000..d1f161b0f9 --- /dev/null +++ b/io-engine/src/core/fault_injection/inject_io_ctx.rs @@ -0,0 +1,131 @@ +use std::{ops::Range, slice::from_raw_parts_mut}; + +use spdk_rs::{IoType, IoVec}; + +use crate::core::BlockDevice; + +use super::{FaultDomain, FaultIoOperation}; + +/// Reference to the injection target device. +#[derive(Debug, Clone)] +pub enum InjectIoDevice { + None, + BlockDevice(*mut dyn BlockDevice), + DeviceName(*const str), +} + +impl From<&dyn BlockDevice> for InjectIoDevice { + fn from(dev: &dyn BlockDevice) -> Self { + Self::BlockDevice(dev as *const _ as *mut dyn BlockDevice) + } +} + +impl From<&str> for InjectIoDevice { + fn from(name: &str) -> Self { + Self::DeviceName(name as *const _) + } +} + +/// Injection I/O context. +#[derive(Debug, Clone)] +pub struct InjectIoCtx { + pub(super) domain: FaultDomain, + pub(super) dev: InjectIoDevice, + pub(super) range: Range, + pub(super) io_type: IoType, + pub(super) iovs: *mut IoVec, + pub(super) iovs_len: usize, +} + +impl InjectIoCtx { + /// TODO + pub fn new(domain: FaultDomain) -> Self { + Self { + domain, + dev: InjectIoDevice::None, + range: 0 .. 0, + io_type: IoType::Invalid, + iovs: std::ptr::null_mut(), + iovs_len: 0, + } + } + + /// TODO + #[inline(always)] + pub fn with_iovs>( + domain: FaultDomain, + dev: D, + io_type: IoType, + offset: u64, + num_blocks: u64, + iovs: &[IoVec], + ) -> Self { + Self { + domain, + dev: dev.into(), + range: offset .. offset + num_blocks, + io_type, + iovs: iovs.as_ptr() as *mut _, + iovs_len: iovs.len(), + } + } + + /// TODO + #[inline(always)] + pub fn is_valid(&self) -> bool { + !matches!(self.dev, InjectIoDevice::None) + } + + /// TODO + #[inline(always)] + pub fn domain_ok(&self, domain: FaultDomain) -> bool { + self.domain == domain + } + + /// Tests if the given device name matches the context's device. + #[inline(always)] + pub fn device_name_ok(&self, name: &str) -> bool { + unsafe { + match self.dev { + InjectIoDevice::None => false, + InjectIoDevice::BlockDevice(dev) => { + (*dev).device_name() == name + } + InjectIoDevice::DeviceName(pname) => &*pname == name, + } + } + } + + /// Tests if the given fault operation matches the context's I/O. + pub fn io_type_ok(&self, op: FaultIoOperation) -> bool { + match op { + FaultIoOperation::Read => self.io_type == IoType::Read, + FaultIoOperation::Write => self.io_type == IoType::Write, + FaultIoOperation::ReadWrite => { + self.io_type == IoType::Read || self.io_type == IoType::Write + } + } + } + + /// Tests if the range overlap with the range of the context. + #[inline(always)] + pub fn block_range_ok(&self, r: &Range) -> bool { + self.range.end > r.start && r.end > self.range.start + } + + /// TODO + #[inline(always)] + pub fn iovs_mut(&self) -> Option<&mut [IoVec]> { + unsafe { + if self.iovs.is_null() + || !(*self.iovs).is_initialized() + || (*self.iovs).is_empty() + || self.iovs_len == 0 + { + None + } else { + Some(from_raw_parts_mut(self.iovs, self.iovs_len)) + } + } + } +} diff --git a/io-engine/src/core/fault_injection/injection.rs b/io-engine/src/core/fault_injection/injection.rs index b964e2b717..f45f71ce70 100644 --- a/io-engine/src/core/fault_injection/injection.rs +++ b/io-engine/src/core/fault_injection/injection.rs @@ -1,12 +1,12 @@ #![cfg(feature = "fault-injection")] -use rand::{rngs::StdRng, RngCore, SeedableRng}; +use spdk_rs::NvmeStatus; use std::{ - fmt::{Debug, Display, Formatter}, + cell::RefCell, + fmt::{Debug, Formatter}, ops::Range, - time::{Duration, Instant}, + time::Duration, }; - use url::Url; use crate::core::IoCompletionStatus; @@ -14,33 +14,76 @@ use crate::core::IoCompletionStatus; use super::{ FaultDomain, FaultInjectionError, + FaultIoOperation, FaultIoStage, - FaultIoType, - FaultType, + FaultMethod, InjectIoCtx, + InjectionState, }; /// Fault injection. -#[derive(Debug, Clone)] -pub struct FaultInjection { - pub uri: String, +#[derive(Clone, Builder)] +#[builder(setter(prefix = "with"))] +#[builder(default)] +#[builder(build_fn(validate = "Self::validate"))] +pub struct Injection { + /// URI this injection was created from. + #[builder(setter(skip))] + uri: Option, + /// Fault domain. pub domain: FaultDomain, + /// Target device name. pub device_name: String, - pub fault_io_type: FaultIoType, - pub fault_io_stage: FaultIoStage, - pub fault_type: FaultType, - pub started: Option, - pub begin: Duration, - pub end: Duration, - pub range: Range, - rng: StdRng, + /// I/O operation to which the fault applies. + pub io_operation: FaultIoOperation, + /// I/O stage. + pub io_stage: FaultIoStage, + /// Injection method. + pub method: FaultMethod, + /// Time time. + pub time_range: Range, + /// Block range. + pub block_range: Range, + /// Number of retries. + pub retries: u64, + /// Injection state. + #[builder(setter(skip))] + state: RefCell, } -impl Display for FaultInjection { +impl InjectionBuilder { + /// TODO + pub fn with_offset(&mut self, offset: u64, num_blocks: u64) -> &mut Self { + self.block_range = Some(offset .. offset + num_blocks); + self + } + + /// TODO + pub fn with_method_nvme_error(&mut self, err: NvmeStatus) -> &mut Self { + self.method = + Some(FaultMethod::Status(IoCompletionStatus::NvmeError(err))); + self + } + + /// TODO + pub fn build_uri(&mut self) -> Result { + self.build().map(|inj| inj.as_uri()) + } + + /// TODO + fn validate(&self) -> Result<(), String> { + match &self.device_name { + Some(s) if !s.is_empty() => Ok(()), + _ => Err("Device not configured".to_string()), + } + } +} + +impl Debug for Injection { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { fn fmt_duration(u: &Duration) -> String { if *u == Duration::MAX { - "INF".to_string() + "-".to_string() } else { format!("{u:?}") } @@ -48,80 +91,103 @@ impl Display for FaultInjection { fn fmt_u64(u: u64) -> String { if u == u64::MAX { - "INF".to_string() + "MAX".to_string() } else { format!("{u:?}") } } - write!( - f, - "{io}::{stage}::{ft} injection <{d}::{n}> [{b:?} -> \ - {e} ({t:?})] @ {rs}..{re}", - io = self.fault_io_type, - stage = self.fault_io_stage, - ft = self.fault_type, - d = self.domain, - n = self.device_name, - b = self.begin, - e = fmt_duration(&self.end), - t = self.now(), - rs = self.range.start, - re = fmt_u64(self.range.end), - ) - } -} + if f.alternate() { + f.debug_struct("Injection") + .field("uri", &self.uri()) + .field("domain", &self.domain) + .field("device_name", &self.device_name) + .field("io_operation", &self.io_operation) + .field("stage", &self.io_stage) + .field("method", &self.method) + .field("begin_at", &fmt_duration(&self.time_range.start)) + .field("end_at", &fmt_duration(&self.time_range.end)) + .field("block_range_start", &fmt_u64(self.block_range.start)) + .field("block_range_end", &fmt_u64(self.block_range.end)) + .field( + "num_blocks", + &fmt_u64(self.block_range.end - self.block_range.start), + ) + .field("retries", &fmt_u64(self.retries)) + .field("hits", &self.state.borrow().hits) + .field("started", &fmt_duration(&self.state.borrow().now())) + .finish() + } else { + let info = format!( + "{d}/{io}/{stage}/{ft}", + d = self.domain, + io = self.io_operation, + stage = self.io_stage, + ft = self.method, + ); + + let timed = if !self.time_range.start.is_zero() + || self.time_range.end != Duration::MAX + { + format!( + " for period {b} -> {e} ({t})", + b = fmt_duration(&self.time_range.start), + e = fmt_duration(&self.time_range.end), + t = fmt_duration(&self.state.borrow().now()), + ) + } else { + String::default() + }; -fn new_rng() -> StdRng { - let seed = [ - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, - ]; - StdRng::from_seed(seed) -} + let range = if self.block_range.start != 0 + || self.block_range.end != u64::MAX + { + format!( + " at blocks {rs}..{re}", + rs = self.block_range.start, + re = fmt_u64(self.block_range.end), + ) + } else { + String::default() + }; -impl FaultInjection { - /// Creates a new injection. - #[allow(dead_code)] - pub fn new( - domain: FaultDomain, - name: &str, - fault_io_type: FaultIoType, - fault_io_stage: FaultIoStage, - fault_type: FaultType, - begin: Duration, - end: Duration, - range: Range, - ) -> Self { - let opts = vec![ - format!("domain={domain}"), - format!("op={fault_io_type}"), - format!("stage={fault_io_stage}"), - format!("type={fault_type}"), - format!("begin={begin:?}"), - format!("end={end:?}"), - format!("offset={}", range.start), - format!("num_blk={}", range.end), - ] - .join("&"); - - let uri = format!("inject://{name}?{opts}"); + let retries = if self.retries != u64::MAX { + format!( + " | {h}/{n} retries", + h = self.state.borrow().hits, + n = self.retries + ) + } else { + "".to_string() + }; + write!( + f, + "{info} on '{n}'{timed}{range}{retries}", + n = self.device_name, + ) + } + } +} + +impl Default for Injection { + fn default() -> Self { Self { - uri, - domain, - device_name: name.to_owned(), - fault_io_type, - fault_io_stage, - fault_type, - started: None, - begin, - end, - range, - rng: new_rng(), + uri: None, + domain: FaultDomain::BlockDevice, + device_name: Default::default(), + io_operation: FaultIoOperation::ReadWrite, + io_stage: FaultIoStage::Submission, + method: FaultMethod::DATA_TRANSFER_ERROR, + time_range: Duration::ZERO .. Duration::MAX, + block_range: 0 .. u64::MAX, + retries: u64::MAX, + state: Default::default(), } } +} +impl Injection { /// Parses an injection URI and creates injection object. pub fn from_uri(uri: &str) -> Result { if !uri.starts_with("inject://") { @@ -137,8 +203,7 @@ impl FaultInjection { })?; let mut r = Self { - uri: uri.to_owned(), - domain: FaultDomain::None, + uri: Some(uri.to_owned()), device_name: format!( "{host}{port}{path}", host = p.host_str().unwrap_or_default(), @@ -149,26 +214,22 @@ impl FaultInjection { }, path = p.path() ), - fault_io_type: FaultIoType::Read, - fault_io_stage: FaultIoStage::Completion, - fault_type: FaultType::status_data_transfer_error(), - started: None, - begin: Duration::ZERO, - end: Duration::MAX, - range: 0 .. u64::MAX, - rng: new_rng(), + ..Default::default() }; for (k, v) in p.query_pairs() { match k.as_ref() { "domain" => r.domain = parse_domain(&k, &v)?, - "op" => r.fault_io_type = parse_fault_io_type(&k, &v)?, - "stage" => r.fault_io_stage = parse_fault_io_stage(&k, &v)?, - "type" => r.fault_type = parse_fault_type(&k, &v)?, - "begin" => r.begin = parse_timer(&k, &v)?, - "end" => r.end = parse_timer(&k, &v)?, - "offset" => r.range.start = parse_num(&k, &v)?, - "num_blk" => r.range.end = parse_num(&k, &v)?, + "op" => r.io_operation = parse_fault_io_type(&k, &v)?, + "stage" => r.io_stage = parse_fault_io_stage(&k, &v)?, + "method" => r.method = parse_method(&k, &v)?, + "begin_at" => r.time_range.start = parse_timer(&k, &v)?, + "end_at" => r.time_range.end = parse_timer(&k, &v)?, + "offset" => r.block_range.start = parse_num(&k, &v)?, + "num_blk" | "num_blocks" => { + r.block_range.end = parse_num(&k, &v)? + } + "retries" => r.retries = parse_num(&k, &v)?, _ => { return Err(FaultInjectionError::UnknownParameter { name: k.to_string(), @@ -178,30 +239,91 @@ impl FaultInjection { }; } - r.range.end = r.range.start.saturating_add(r.range.end); + r.block_range.end = + r.block_range.start.saturating_add(r.block_range.end); - if r.begin > r.end { + if r.time_range.start > r.time_range.end { return Err(FaultInjectionError::BadDurations { name: r.device_name, - begin: r.begin, - end: r.end, + begin: r.time_range.start, + end: r.time_range.end, }); } Ok(r) } - /// Returns current time relative to injection start. - fn now(&self) -> Duration { - self.started.map_or(Duration::MAX, |s| { - Instant::now().saturating_duration_since(s) - }) + /// Returns injection's URI. + pub fn uri(&self) -> String { + match &self.uri { + Some(s) => s.to_owned(), + None => self.as_uri(), + } + } + + /// Builds URI for the injection. + pub fn as_uri(&self) -> String { + let d = Self::default(); + + let mut opts = vec![ + format!("domain={}", self.domain), + format!("op={}", self.io_operation), + format!("stage={}", self.io_stage), + ]; + + if self.method != d.method { + opts.push(format!("method={}", self.method)); + } + + if self.time_range.start != d.time_range.start { + opts.push(format!( + "begin_at={:?}", + self.time_range.start.as_millis() + )); + } + + if self.time_range.end != d.time_range.end { + opts.push(format!("end_at={}", self.time_range.end.as_millis())); + } + + if self.block_range.start != d.block_range.start { + opts.push(format!("offset={}", self.block_range.start)); + } + + if self.block_range.end != d.block_range.end { + opts.push(format!( + "num_blk={}", + self.block_range.end - self.block_range.start + )); + } + + if self.retries != d.retries { + opts.push(format!("retries={}", self.retries)); + } + + format!( + "inject://{name}?{opts}", + name = self.device_name, + opts = opts.join("&") + ) + } + + /// Returns device name. + pub fn device_name(&self) -> &str { + &self.device_name } /// True if the injection is currently active. + #[inline(always)] pub fn is_active(&self) -> bool { - let d = self.now(); - d >= self.begin && d < self.end + let s = self.state.borrow(); + + if s.hits >= self.retries { + return false; + } + + let d = s.now(); + d >= self.time_range.start && d < self.time_range.end } /// Injects an error for the given I/O context. @@ -210,57 +332,37 @@ impl FaultInjection { /// routine. #[inline] pub fn inject( - &mut self, - domain: FaultDomain, - fault_io_type: FaultIoType, - fault_io_stage: FaultIoStage, + &self, + stage: FaultIoStage, ctx: &InjectIoCtx, ) -> Option { - if domain != self.domain - || fault_io_type != self.fault_io_type - || fault_io_stage != self.fault_io_stage - || ctx.device_name() != self.device_name + if !ctx.is_valid() + || !ctx.domain_ok(self.domain) + || stage != self.io_stage + || !ctx.io_type_ok(self.io_operation) + || !ctx.device_name_ok(&self.device_name) + || !ctx.block_range_ok(&self.block_range) { return None; } - - if self.started.is_none() { + if self.state.borrow_mut().tick() { debug!("{self:?}: starting"); - self.started = Some(Instant::now()); } - if !self.is_active() || !is_overlapping(&self.range, &ctx.range) { + if !self.is_active() { return None; } - match self.fault_type { - FaultType::Status(status) => Some(status), - FaultType::Data => { - self.inject_data_errors(ctx); - Some(IoCompletionStatus::Success) - } - } - } - - fn inject_data_errors(&mut self, ctx: &InjectIoCtx) { - let Some(iovs) = ctx.iovs_mut() else { - return; - }; - - for iov in iovs { - for i in 0 .. iov.len() { - iov[i] = self.rng.next_u32() as u8; - } - } + self.method.inject(&mut self.state.borrow_mut(), ctx) } } /// TODO fn parse_domain(k: &str, v: &str) -> Result { let r = match v { - "none" => FaultDomain::None, - "nexus" => FaultDomain::Nexus, - "block" | "block_device" => FaultDomain::BlockDevice, + "child" | "nexus_child" | "NexusChild" => FaultDomain::NexusChild, + "block" | "block_device" | "BlockDevice" => FaultDomain::BlockDevice, + "bdev_io" | "BdevIo" => FaultDomain::BdevIo, _ => { return Err(FaultInjectionError::UnknownParameter { name: k.to_string(), @@ -275,10 +377,11 @@ fn parse_domain(k: &str, v: &str) -> Result { fn parse_fault_io_type( k: &str, v: &str, -) -> Result { +) -> Result { let res = match v { - "read" | "r" => FaultIoType::Read, - "write" | "w" => FaultIoType::Write, + "read" | "r" | "Read" => FaultIoOperation::Read, + "write" | "w" | "Write" => FaultIoOperation::Write, + "read_write" | "rw" | "ReadWrite" => FaultIoOperation::ReadWrite, _ => { return Err(FaultInjectionError::UnknownParameter { name: k.to_string(), @@ -295,8 +398,10 @@ fn parse_fault_io_stage( v: &str, ) -> Result { let res = match v { - "submit" | "s" | "submission" => FaultIoStage::Submission, - "compl" | "c" | "completion" => FaultIoStage::Submission, + "submit" | "s" | "submission" | "Submission" => { + FaultIoStage::Submission + } + "compl" | "c" | "completion" | "Completion" => FaultIoStage::Completion, _ => { return Err(FaultInjectionError::UnknownParameter { name: k.to_string(), @@ -308,23 +413,18 @@ fn parse_fault_io_stage( } /// TODO -fn parse_fault_type( - k: &str, - v: &str, -) -> Result { - let res = match v { - // TODO: add more statuses. - "status" => FaultType::status_data_transfer_error(), +fn parse_method(k: &str, v: &str) -> Result { + match v { + "status" | "Status" => Ok(FaultMethod::DATA_TRANSFER_ERROR), // TODO: add data corruption methods. - "data" => FaultType::Data, - _ => { - return Err(FaultInjectionError::UnknownParameter { + "data" | "Data" => Ok(FaultMethod::Data), + _ => FaultMethod::parse(v).ok_or_else(|| { + FaultInjectionError::UnknownParameter { name: k.to_string(), value: v.to_string(), - }) - } - }; - Ok(res) + } + }), + } } /// TODO @@ -347,8 +447,3 @@ fn parse_num(k: &str, v: &str) -> Result { value: v.to_string(), }) } - -/// Tests if teo ranges overlap. -fn is_overlapping(a: &Range, b: &Range) -> bool { - a.end > b.start && b.end > a.start -} diff --git a/io-engine/src/core/fault_injection/injections.rs b/io-engine/src/core/fault_injection/injection_api.rs similarity index 67% rename from io-engine/src/core/fault_injection/injections.rs rename to io-engine/src/core/fault_injection/injection_api.rs index db6925562c..2a6786b5b0 100644 --- a/io-engine/src/core/fault_injection/injections.rs +++ b/io-engine/src/core/fault_injection/injection_api.rs @@ -2,24 +2,22 @@ use nix::errno::Errno; use once_cell::sync::OnceCell; -use std::{ - convert::TryInto, - sync::atomic::{AtomicBool, Ordering}, -}; +use std::sync::atomic::{AtomicBool, Ordering}; use crate::core::{CoreError, IoCompletionStatus}; use super::{ + add_bdev_io_injection, FaultDomain, - FaultInjection, + FaultInjectionError, FaultIoStage, - FaultIoType, InjectIoCtx, + Injection, }; /// A list of fault injections. struct Injections { - items: Vec, + items: Vec, } static INJECTIONS: OnceCell> = OnceCell::new(); @@ -39,34 +37,37 @@ impl Injections { } /// Adds an injection. - pub fn add(&mut self, inj: FaultInjection) { - info!("Adding injected fault: '{uri}'", uri = inj.uri); + pub fn add(&mut self, inj: Injection) -> Result<(), FaultInjectionError> { + if inj.domain == FaultDomain::BdevIo { + add_bdev_io_injection(&inj)?; + } + + info!("Adding injected fault: '{inj:?}'"); self.items.push(inj); + + Ok(()) } /// Removes all injections matching the URI. - pub fn remove(&mut self, uri: &str) { + pub fn remove(&mut self, uri: &str) -> Result<(), FaultInjectionError> { info!("Removing injected fault: '{uri}'"); - self.items.retain(|inj| inj.uri != uri); + self.items.retain(|inj| inj.uri() != uri); + Ok(()) } /// Returns a copy of the injection list. - pub fn list(&self) -> Vec { + pub fn list(&self) -> Vec { self.items.clone() } /// TODO #[inline(always)] fn inject( - &mut self, - domain: FaultDomain, - fault_io_type: FaultIoType, - fault_io_stage: FaultIoStage, + &self, + stage: FaultIoStage, op: &InjectIoCtx, ) -> Option { - self.items.iter_mut().find_map(|inj| { - inj.inject(domain, fault_io_type, fault_io_stage, op) - }) + self.items.iter().find_map(|inj| inj.inject(stage, op)) } } @@ -92,18 +93,18 @@ fn enable_fault_injections() { } } /// Adds an fault injection. -pub fn add_fault_injection(inj: FaultInjection) { +pub fn add_fault_injection(inj: Injection) -> Result<(), FaultInjectionError> { enable_fault_injections(); - Injections::get().add(inj); + Injections::get().add(inj) } /// Removes all injections matching the URI. -pub fn remove_fault_injection(uri: &str) { - Injections::get().remove(uri); +pub fn remove_fault_injection(uri: &str) -> Result<(), FaultInjectionError> { + Injections::get().remove(uri) } /// Lists fault injections. A clone of current state of injection is returned. -pub fn list_fault_injections() -> Vec { +pub fn list_fault_injections() -> Vec { Injections::get().list() } @@ -111,24 +112,12 @@ pub fn list_fault_injections() -> Vec { /// stage. In the case a fault is injected, returns the corresponding /// `CoreError`. #[inline] -pub fn inject_submission_error( - domain: FaultDomain, - ctx: &InjectIoCtx, -) -> Result<(), CoreError> { +pub fn inject_submission_error(ctx: &InjectIoCtx) -> Result<(), CoreError> { if !injections_enabled() || !ctx.is_valid() { return Ok(()); } - let Ok(fault_io_type) = ctx.io_type.try_into() else { - return Ok(()); - }; - - match Injections::get().inject( - domain, - fault_io_type, - FaultIoStage::Submission, - ctx, - ) { + match Injections::get().inject(FaultIoStage::Submission, ctx) { None => Ok(()), Some(IoCompletionStatus::Success) => Ok(()), Some(_) => Err(crate::bdev::device::io_type_to_err( @@ -146,7 +135,6 @@ pub fn inject_submission_error( /// `IoCompletionStatus`. #[inline] pub fn inject_completion_error( - domain: FaultDomain, ctx: &InjectIoCtx, status: IoCompletionStatus, ) -> IoCompletionStatus { @@ -157,16 +145,7 @@ pub fn inject_completion_error( return status; } - let Ok(fault_io_type) = ctx.io_type.try_into() else { - return status; - }; - - match Injections::get().inject( - domain, - fault_io_type, - FaultIoStage::Completion, - ctx, - ) { + match Injections::get().inject(FaultIoStage::Completion, ctx) { Some(inj) => inj, None => IoCompletionStatus::Success, } diff --git a/io-engine/src/core/fault_injection/injection_state.rs b/io-engine/src/core/fault_injection/injection_state.rs new file mode 100644 index 0000000000..83ff902209 --- /dev/null +++ b/io-engine/src/core/fault_injection/injection_state.rs @@ -0,0 +1,50 @@ +use rand::{rngs::StdRng, SeedableRng}; +use std::time::{Duration, Instant}; + +/// Injection random number generator. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct InjectionState { + /// Instance at which the injection started to be active. + pub(super) started: Option, + /// Number of injection hits. + pub(super) hits: u64, + /// Random number generator for data corruption injection. + pub(super) rng: StdRng, +} + +impl Default for InjectionState { + fn default() -> Self { + let seed = [ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ]; + Self { + started: None, + hits: 0, + rng: StdRng::from_seed(seed), + } + } +} + +impl InjectionState { + /// TODO + #[inline(always)] + pub(super) fn tick(&mut self) -> bool { + if self.started.is_none() { + self.started = Some(Instant::now()); + self.hits = 1; + true + } else { + self.hits += 1; + false + } + } + + /// Returns the current time relative to the injection start. + #[inline(always)] + pub(super) fn now(&self) -> Duration { + self.started.map_or(Duration::MAX, |s| { + Instant::now().saturating_duration_since(s) + }) + } +} diff --git a/io-engine/src/core/fault_injection/mod.rs b/io-engine/src/core/fault_injection/mod.rs index 253c9f161c..d3d3698b70 100644 --- a/io-engine/src/core/fault_injection/mod.rs +++ b/io-engine/src/core/fault_injection/mod.rs @@ -2,84 +2,66 @@ use snafu::Snafu; use std::{ - convert::TryFrom, - fmt::{Display, Formatter}, - ops::Range, - slice::from_raw_parts_mut, + fmt::{Debug, Display, Formatter}, time::Duration, }; use url::ParseError; +mod bdev_io_injection; +mod fault_method; +mod inject_io_ctx; mod injection; -mod injections; - -use crate::core::{BlockDevice, IoCompletionStatus}; -pub use injection::FaultInjection; -pub use injections::{ +mod injection_api; +mod injection_state; + +use bdev_io_injection::add_bdev_io_injection; +pub use fault_method::FaultMethod; +pub use inject_io_ctx::{InjectIoCtx, InjectIoDevice}; +pub use injection::{Injection, InjectionBuilder, InjectionBuilderError}; +pub use injection_api::{ add_fault_injection, inject_completion_error, inject_submission_error, list_fault_injections, remove_fault_injection, }; -use spdk_rs::{IoType, IoVec}; +pub use injection_state::InjectionState; /// Fault domain. #[derive(Debug, Clone, Copy, PartialEq)] pub enum FaultDomain { - None, - Nexus, + /// Fault injection on nexus child I/O level. + NexusChild, + /// Fault injection on block device abstraction level. BlockDevice, + /// + BdevIo, } impl Display for FaultDomain { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str(match self { - Self::None => "none", - Self::Nexus => "nexus", - Self::BlockDevice => "block_device", - }) - } -} - -/// Data fault mode. -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum DataFaultMode { - Rand, -} - -impl Display for DataFaultMode { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str(match self { - Self::Rand => "rand", - }) + match self { + FaultDomain::NexusChild => f.write_str("child"), + FaultDomain::BlockDevice => f.write_str("block"), + FaultDomain::BdevIo => f.write_str("bdev_io"), + } } } -/// Fault I/O type. +/// I/O operation to which the fault applies. #[derive(Debug, Clone, Copy, PartialEq)] -pub enum FaultIoType { +pub enum FaultIoOperation { Read, Write, + ReadWrite, } -impl Display for FaultIoType { +impl Display for FaultIoOperation { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - Self::Read => f.write_str("read"), - Self::Write => f.write_str("write"), - } - } -} - -impl TryFrom for FaultIoType { - type Error = (); - - fn try_from(value: IoType) -> Result { - match value { - IoType::Read => Ok(Self::Read), - IoType::Write => Ok(Self::Write), - _ => Err(()), + FaultIoOperation::Read => f.write_str("r"), + FaultIoOperation::Write => f.write_str("w"), + FaultIoOperation::ReadWrite => f.write_str("rw"), } } } @@ -94,110 +76,8 @@ pub enum FaultIoStage { impl Display for FaultIoStage { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - Self::Submission => f.write_str("submit"), - Self::Completion => f.write_str("compl"), - } - } -} - -/// Fault type. -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum FaultType { - Status(IoCompletionStatus), - Data, -} - -impl Display for FaultType { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Self::Status(_) => f.write_str("status"), - Self::Data => f.write_str("data"), - } - } -} - -impl FaultType { - pub fn status_data_transfer_error() -> Self { - use spdk_rs::{libspdk::SPDK_NVME_SC_DATA_TRANSFER_ERROR, NvmeStatus}; - - Self::Status(IoCompletionStatus::NvmeError(NvmeStatus::Generic( - SPDK_NVME_SC_DATA_TRANSFER_ERROR, - ))) - } -} - -/// Injection I/O. -#[derive(Debug, Clone)] -pub struct InjectIoCtx { - dev: Option<*mut dyn BlockDevice>, - io_type: IoType, - range: Range, - iovs: *mut IoVec, - iovs_len: usize, -} - -impl Default for InjectIoCtx { - fn default() -> Self { - Self { - dev: None, - io_type: IoType::Invalid, - range: 0 .. 0, - iovs: std::ptr::null_mut(), - iovs_len: 0, - } - } -} - -impl InjectIoCtx { - /// TODO - #[inline(always)] - pub fn with_iovs( - dev: &dyn BlockDevice, - io_type: IoType, - offset: u64, - num_blocks: u64, - iovs: &[IoVec], - ) -> Self { - Self { - dev: Some(dev as *const _ as *mut dyn BlockDevice), - io_type, - range: offset .. offset + num_blocks, - iovs: iovs.as_ptr() as *mut _, - iovs_len: iovs.len(), - } - } - - /// TODO - #[inline(always)] - pub fn is_valid(&self) -> bool { - self.dev.is_some() - } - - /// TODO - #[inline(always)] - pub fn device(&self) -> &dyn BlockDevice { - unsafe { &*self.dev.unwrap() } - } - - /// TODO - #[inline(always)] - pub fn device_name(&self) -> String { - self.device().device_name() - } - - /// TODO - #[inline(always)] - pub fn iovs_mut(&self) -> Option<&mut [IoVec]> { - unsafe { - if self.iovs.is_null() - || !(*self.iovs).is_initialized() - || (*self.iovs).is_empty() - || self.iovs_len == 0 - { - None - } else { - Some(from_raw_parts_mut(self.iovs, self.iovs_len)) - } + FaultIoStage::Submission => f.write_str("submit"), + FaultIoStage::Completion => f.write_str("compl"), } } } @@ -226,4 +106,8 @@ pub enum FaultInjectionError { begin: Duration, end: Duration, }, + #[snafu(display("Injection device not found: '{name}'"))] + DeviceNotFound { name: String }, + #[snafu(display("Injection is invalid for '{name}': {msg}"))] + InvalidInjection { name: String, msg: String }, } diff --git a/io-engine/src/core/mod.rs b/io-engine/src/core/mod.rs index 38fca91f9f..92994d3882 100644 --- a/io-engine/src/core/mod.rs +++ b/io-engine/src/core/mod.rs @@ -78,6 +78,8 @@ pub use snapshot::{ SnapshotXattrs, }; +use spdk_rs::libspdk::SPDK_NVME_SC_CAPACITY_EXCEEDED; + mod bdev; mod block_device; mod descriptor; @@ -496,10 +498,13 @@ impl Debug for IoCompletionStatus { impl From for IoCompletionStatus { fn from(s: NvmeStatus) -> Self { - if s == NvmeStatus::VendorSpecific(libc::ENOSPC) { - IoCompletionStatus::LvolError(LvolFailure::NoSpace) - } else { - IoCompletionStatus::NvmeError(s) + match s { + NvmeStatus::NO_SPACE + | NvmeStatus::Generic(SPDK_NVME_SC_CAPACITY_EXCEEDED) => { + IoCompletionStatus::LvolError(LvolFailure::NoSpace) + } + + _ => IoCompletionStatus::NvmeError(s), } } } diff --git a/io-engine/src/grpc/v1/test.rs b/io-engine/src/grpc/v1/test.rs index 525a0890e8..2b05887a35 100644 --- a/io-engine/src/grpc/v1/test.rs +++ b/io-engine/src/grpc/v1/test.rs @@ -30,8 +30,8 @@ use crate::core::fault_injection::{ add_fault_injection, list_fault_injections, remove_fault_injection, - FaultInjection, FaultInjectionError, + Injection, }; #[derive(Debug, Clone)] @@ -131,8 +131,8 @@ impl TestRpc for TestService { { let rx = rpc_submit::<_, _, FaultInjectionError>(async move { let uri = args.uri.clone(); - let inj = FaultInjection::from_uri(&uri)?; - add_fault_injection(inj); + let inj = Injection::from_uri(&uri)?; + add_fault_injection(inj)?; Ok(()) })?; @@ -161,9 +161,9 @@ impl TestRpc for TestService { let uri = args.uri.clone(); // Validate injection URI by trying to parse it. - FaultInjection::from_uri(&uri)?; + Injection::from_uri(&uri)?; - remove_fault_injection(&uri); + remove_fault_injection(&uri)?; Ok(()) })?; @@ -372,11 +372,11 @@ impl From for tonic::Status { } #[cfg(feature = "fault-injection")] -impl From for v1::test::FaultInjection { - fn from(src: FaultInjection) -> Self { +impl From for v1::test::FaultInjection { + fn from(src: Injection) -> Self { let is_active = src.is_active(); Self { - uri: src.uri, + uri: src.uri(), device_name: src.device_name, is_active, } diff --git a/io-engine/src/lib.rs b/io-engine/src/lib.rs index 66f0c0f7a2..7e6719f5ca 100644 --- a/io-engine/src/lib.rs +++ b/io-engine/src/lib.rs @@ -10,6 +10,10 @@ extern crate serde_json; extern crate snafu; extern crate spdk_rs; +#[allow(unused_imports)] +#[macro_use] +extern crate derive_builder; + #[macro_use] pub mod core; pub mod bdev; diff --git a/io-engine/src/subsys/config/opts.rs b/io-engine/src/subsys/config/opts.rs index 68d4eca3a6..819a94662e 100644 --- a/io-engine/src/subsys/config/opts.rs +++ b/io-engine/src/subsys/config/opts.rs @@ -84,7 +84,7 @@ pub struct NvmfTgtConfig { /// the max number of namespaces this target should allow for pub max_namespaces: u32, /// Command Retry Delay. - pub crdt: u16, + pub crdt: [u16; 3], /// TCP transport options pub opts: NvmfTcpTransportOpts, } @@ -94,7 +94,7 @@ impl From for Box { let mut out = Self::default(); copy_str_with_null(&o.name, &mut out.name); out.max_subsystems = o.max_namespaces; - out.crdt[0] = o.crdt; + out.crdt = o.crdt; out } } diff --git a/io-engine/src/subsys/nvmf/subsystem.rs b/io-engine/src/subsys/nvmf/subsystem.rs index 58ca2ac437..1899cb30de 100644 --- a/io-engine/src/subsys/nvmf/subsystem.rs +++ b/io-engine/src/subsys/nvmf/subsystem.rs @@ -14,8 +14,10 @@ use spdk_rs::{ nvmf_subsystem_set_ana_state, nvmf_subsystem_set_cntlid_range, spdk_bdev_nvme_opts, + spdk_nvmf_ctrlr_set_cpl_error_cb, spdk_nvmf_ns_get_bdev, spdk_nvmf_ns_opts, + spdk_nvmf_request, spdk_nvmf_subsystem, spdk_nvmf_subsystem_add_host, spdk_nvmf_subsystem_add_listener, @@ -46,9 +48,14 @@ use spdk_rs::{ spdk_nvmf_subsystem_state_change_done, spdk_nvmf_subsystem_stop, spdk_nvmf_tgt, + SPDK_NVME_SCT_GENERIC, + SPDK_NVME_SC_CAPACITY_EXCEEDED, + SPDK_NVME_SC_RESERVATION_CONFLICT, SPDK_NVMF_SUBTYPE_DISCOVERY, SPDK_NVMF_SUBTYPE_NVME, }, + NvmeStatus, + NvmfController, NvmfSubsystemEvent, }; @@ -64,6 +71,7 @@ use crate::{ }, }; +/// TODO #[derive(Debug, PartialOrd, PartialEq)] pub enum SubType { Nvme, @@ -235,20 +243,41 @@ impl NvmfSubsystem { let nexus = nexus_lookup_nqn(&nqn); let event = NvmfSubsystemEvent::from_cb_args(event, ctx); - debug!("NVMF subsystem event '{nqn}': {event:?}"); + debug!("NVMF subsystem event {subsystem:?}: {event:?}"); match event { NvmfSubsystemEvent::HostConnect(ctrlr) => { + info!( + "Subsystem '{nqn}': host connected: '{host}'", + host = ctrlr.hostnqn() + ); + if let Some(nex) = nexus { nex.add_initiator(&ctrlr.hostnqn()); + subsystem.host_connect_nexus(ctrlr); + } else { + subsystem.host_connect_replica(ctrlr); } } NvmfSubsystemEvent::HostDisconnect(ctrlr) => { + info!( + "Subsystem '{nqn}': host disconnected: '{host}'", + host = ctrlr.hostnqn() + ); + if let Some(nex) = nexus { nex.rm_initiator(&ctrlr.hostnqn()); + subsystem.host_disconnect_nexus(ctrlr); + } else { + subsystem.host_disconnect_replica(ctrlr); } } NvmfSubsystemEvent::HostKeepAliveTimeout(ctrlr) => { + warn!( + "Subsystem '{nqn}': host keep alive timeout: '{host}'", + host = ctrlr.hostnqn() + ); + if let Some(nex) = nexus { nex.initiator_keep_alive_timeout(&ctrlr.hostnqn()); } @@ -259,6 +288,99 @@ impl NvmfSubsystem { } } + /// Completion error callback for nexuses. + unsafe extern "C" fn nexus_cpl_error_cb( + req: *mut spdk_nvmf_request, + _cb_arg: *mut ::std::os::raw::c_void, + ) { + let req = &mut *req; + let cpl = req.nvme_cpl_mut(); + let mut status = cpl.status(); + + if status.crd() == 0 { + return; + } + + // Use CRD #2 for certain errors. + match status.status() { + NvmeStatus::Generic(SPDK_NVME_SC_RESERVATION_CONFLICT) + | NvmeStatus::Generic(SPDK_NVME_SC_CAPACITY_EXCEEDED) => { + status.set_crd(2); + } + _ => {} + } + + cpl.set_status(status); + } + + /// Called upon a host connection to a nexus. + fn host_connect_nexus(&self, ctrlr: NvmfController) { + unsafe { + spdk_nvmf_ctrlr_set_cpl_error_cb( + ctrlr.0.as_ptr(), + Some(Self::nexus_cpl_error_cb), + std::ptr::null_mut(), + ); + } + } + + /// Called upon a host disconnection from a nexus. + fn host_disconnect_nexus(&self, ctrlr: NvmfController) { + unsafe { + spdk_nvmf_ctrlr_set_cpl_error_cb( + ctrlr.0.as_ptr(), + None, + std::ptr::null_mut(), + ); + } + } + + /// Completion error callback for replicas. + unsafe extern "C" fn replica_cpl_error_cb( + req: *mut spdk_nvmf_request, + _cb_arg: *mut ::std::os::raw::c_void, + ) { + let req = &mut *req; + let cpl = req.nvme_cpl_mut(); + + let mut status = cpl.status(); + + // Change CRD for replica to 3. + if status.crd() == 1 { + status.set_crd(3); + } + + // Correct vendor-specific ENOSPC error. + if status.status().is_no_space() { + status.set_sct(SPDK_NVME_SCT_GENERIC as u16); + status.set_sc(SPDK_NVME_SC_CAPACITY_EXCEEDED as u16); + } + + cpl.set_status(status); + } + + /// Called upon a host connection to a replica. + fn host_connect_replica(&self, ctrlr: NvmfController) { + unsafe { + spdk_nvmf_ctrlr_set_cpl_error_cb( + ctrlr.0.as_ptr(), + Some(Self::replica_cpl_error_cb), + std::ptr::null_mut(), + ); + } + } + + /// Called upon a host disconnection from a replica. + fn host_disconnect_replica(&self, ctrlr: NvmfController) { + unsafe { + spdk_nvmf_ctrlr_set_cpl_error_cb( + ctrlr.0.as_ptr(), + None, + std::ptr::null_mut(), + ); + } + } + /// create a new subsystem where the NQN is based on the UUID pub fn new(uuid: &str) -> Result { let nqn = make_nqn(uuid).into_cstring(); diff --git a/io-engine/tests/nexus_child_retire.rs b/io-engine/tests/nexus_child_retire.rs index 44aa43e99e..5f75072984 100644 --- a/io-engine/tests/nexus_child_retire.rs +++ b/io-engine/tests/nexus_child_retire.rs @@ -43,10 +43,10 @@ use io_engine::{ fault_injection::{ add_fault_injection, FaultDomain, - FaultInjection, + FaultIoOperation, FaultIoStage, - FaultIoType, - FaultType, + FaultMethod, + InjectionBuilder, }, CoreError, IoCompletionStatus, @@ -245,7 +245,10 @@ async fn nexus_child_retire_persist_unresponsive_with_fio() { nex_0 .add_injection_at_replica( &repl_0, - &format!("domain=nexus&op=write&offset={offset}", offset = 10), + &format!( + "domain=child&op=write&stage=completion&offset={offset}", + offset = 10 + ), ) .await .unwrap(); @@ -362,16 +365,18 @@ async fn nexus_child_retire_persist_unresponsive_with_bdev_io() { let inj_device = nex.child_at(0).get_device_name().unwrap(); - add_fault_injection(FaultInjection::new( - FaultDomain::Nexus, - &inj_device, - FaultIoType::Write, - FaultIoStage::Completion, - FaultType::status_data_transfer_error(), - Duration::ZERO, - Duration::MAX, - 0 .. 1, - )); + add_fault_injection( + InjectionBuilder::default() + .with_domain(FaultDomain::NexusChild) + .with_device_name(inj_device) + .with_io_operation(FaultIoOperation::Write) + .with_io_stage(FaultIoStage::Completion) + .with_method(FaultMethod::DATA_TRANSFER_ERROR) + .with_block_range(0 .. 1) + .build() + .unwrap(), + ) + .unwrap(); // Pause etcd. test.pause("etcd").await.unwrap(); @@ -441,16 +446,18 @@ async fn nexus_child_retire_persist_failure_with_bdev_io() { let inj_device = nex.child_at(0).get_device_name().unwrap(); - add_fault_injection(FaultInjection::new( - FaultDomain::Nexus, - &inj_device, - FaultIoType::Write, - FaultIoStage::Completion, - FaultType::status_data_transfer_error(), - Duration::ZERO, - Duration::MAX, - 0 .. 1, - )); + add_fault_injection( + InjectionBuilder::default() + .with_domain(FaultDomain::NexusChild) + .with_device_name(inj_device) + .with_io_operation(FaultIoOperation::Write) + .with_io_stage(FaultIoStage::Completion) + .with_method(FaultMethod::DATA_TRANSFER_ERROR) + .with_block_range(0 .. 1) + .build() + .unwrap(), + ) + .unwrap(); // Pause etcd. test.pause("etcd").await.unwrap(); diff --git a/io-engine/tests/nexus_crd.rs b/io-engine/tests/nexus_crd.rs new file mode 100644 index 0000000000..b71a78c222 --- /dev/null +++ b/io-engine/tests/nexus_crd.rs @@ -0,0 +1,411 @@ +#![cfg(feature = "fault-injection")] + +pub mod common; + +use std::{sync::Arc, time::Duration}; +use tokio::sync::{ + oneshot, + oneshot::{Receiver, Sender}, +}; + +use common::{ + compose::{ + rpc::v1::{ + nexus::{NexusNvmePreemption, NvmeReservation}, + GrpcConnect, + SharedRpcHandle, + }, + Binary, + Builder, + }, + file_io::DataSize, + fio::{spawn_fio_task, Fio, FioJob, FioJobResult}, + nexus::NexusBuilder, + nvme::{find_mayastor_nvme_device_path, NmveConnectGuard}, + nvmf::NvmfLocation, + pool::PoolBuilder, + replica::ReplicaBuilder, + test::{add_fault_injection, remove_fault_injection}, +}; + +use io_engine::core::fault_injection::{ + FaultDomain, + FaultIoOperation, + InjectionBuilder, +}; + +const POOL_SIZE: u64 = 500; +const REPL_SIZE: u64 = 450; +const REPL_UUID: &str = "65acdaac-14c4-41d8-a55e-d03bfd7185a4"; +const NEXUS_NAME: &str = "nexus_0"; +const NEXUS_SIZE: u64 = REPL_SIZE; +const NEXUS_UUID: &str = "bbe6cbb6-c508-443a-877a-af5fa690c760"; + +/// Tests that without CRD enabled, initiator would eventually fail I/Os. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn nexus_fail_no_crd() { + test_nexus_fail("0") + .await + .expect_err("I/O expected to fail"); +} + +/// Tests that CRD properly delays I/O retries on initiator, while the target +/// has a chance to replace a failed nexus. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn nexus_fail_crd() { + test_nexus_fail("20") + .await + .expect("I/O expected to succeed"); +} + +async fn test_nexus_fail(crdt: &str) -> std::io::Result<()> { + common::composer_init(); + + let test = Builder::new() + .name("cargo-test") + .network("10.1.0.0/16") + .unwrap() + .add_container_bin( + "ms_0", + Binary::from_dbg("io-engine").with_args(vec!["-l", "5"]), + ) + .add_container_bin( + "ms_nex", + Binary::from_dbg("io-engine").with_args(vec![ + "-l", + "1,2,3,4", + "--tgt-crdt", + crdt, + ]), + ) + .with_clean(true) + .build() + .await + .unwrap(); + + let test = Arc::new(test); + + let conn = GrpcConnect::new(&test); + + let ms_0 = conn.grpc_handle_shared("ms_0").await.unwrap(); + let ms_nex = conn.grpc_handle_shared("ms_nex").await.unwrap(); + + let mut pool_0 = PoolBuilder::new(ms_0.clone()) + .with_name("pool0") + .with_new_uuid() + .with_malloc("mem0", POOL_SIZE); + + let mut repl_0 = ReplicaBuilder::new(ms_0.clone()) + .with_pool(&pool_0) + .with_name("r0") + .with_new_uuid() + .with_size_mb(REPL_SIZE) + .with_thin(false); + + pool_0.create().await.unwrap(); + repl_0.create().await.unwrap(); + repl_0.share().await.unwrap(); + + let mut nex_0 = NexusBuilder::new(ms_nex.clone()) + .with_name(NEXUS_NAME) + .with_uuid(NEXUS_UUID) + .with_size_mb(NEXUS_SIZE) + .with_replica(&repl_0); + + nex_0.create().await.unwrap(); + nex_0.publish().await.unwrap(); + + let children = nex_0.get_nexus().await.unwrap().children; + let dev_name = children[0].device_name.as_ref().unwrap(); + + let inj_w = InjectionBuilder::default() + .with_device_name(dev_name.clone()) + .with_domain(FaultDomain::NexusChild) + .with_io_operation(FaultIoOperation::Write) + .build_uri() + .unwrap(); + + let inj_r = InjectionBuilder::default() + .with_device_name(dev_name.clone()) + .with_domain(FaultDomain::NexusChild) + .with_io_operation(FaultIoOperation::Read) + .build_uri() + .unwrap(); + + let cfg = NexusManageTask { + ms_nex: ms_nex.clone(), + nex_0: nex_0.clone(), + repl_0: repl_0.clone(), + inj_w, + inj_r, + }; + + // Run two tasks in parallel, I/O and nexus management. + let (s, r) = oneshot::channel(); + + let j0 = tokio::spawn({ + let nvmf = nex_0.nvmf_location(); + async move { run_io_task(s, &nvmf, 10, 20).await } + }); + tokio::pin!(j0); + + let j1 = tokio::spawn({ + let cfg = cfg.clone(); + async move { + run_nexus_manage_task(r, cfg).await; + } + }); + tokio::pin!(j1); + + let (io_res, _) = tokio::join!(j0, j1); + io_res.unwrap() +} + +#[derive(Clone)] +struct NexusManageTask { + ms_nex: SharedRpcHandle, + nex_0: NexusBuilder, + repl_0: ReplicaBuilder, + inj_w: String, + inj_r: String, +} + +/// Runs multiple FIO I/O jobs. +async fn run_io_task( + s: Sender<()>, + nvmf: &NvmfLocation, + cnt: u32, + rt: u32, +) -> std::io::Result<()> { + let _cg = NmveConnectGuard::connect_addr(&nvmf.addr, &nvmf.nqn); + let path = find_mayastor_nvme_device_path(&nvmf.serial) + .unwrap() + .to_str() + .unwrap() + .to_string(); + + let jobs = (0 .. cnt).map(|_| { + FioJob::new() + .with_direct(true) + .with_ioengine("libaio") + .with_iodepth(128) + .with_filename(&path) + .with_runtime(rt) + .with_rw("randwrite") + }); + + let fio = Fio::new().with_jobs(jobs); + + // Notify the nexus management task that connection is complete and I/O + // starts. + s.send(()).unwrap(); + + // Start FIO. + spawn_fio_task(&fio).await +} + +/// Manages the nexus in parallel to I/O task. +/// [1] Nexus is failed by injecting a fault. +/// [2] I/O running in parallel should freeze or fail, depending on how target's +/// configured. +/// [3] Nexus is recreated. +async fn run_nexus_manage_task(r: Receiver<()>, cfg: NexusManageTask) { + let NexusManageTask { + ms_nex, + inj_w, + inj_r, + mut nex_0, + repl_0, + .. + } = cfg; + + // Wait until I/O task connects and signals it is ready. + r.await.unwrap(); + + // Allow I/O to run for some time. + tokio::time::sleep(Duration::from_secs(2)).await; + + // Inject fault, so the nexus would fail. + add_fault_injection(ms_nex.clone(), &inj_w).await.unwrap(); + add_fault_injection(ms_nex.clone(), &inj_r).await.unwrap(); + + // When nexus fails, I/O should be freezing due to CRD (if enabled). + tokio::time::sleep(Duration::from_secs(2)).await; + + // Destroy the nexus, remove injectios and re-create and re-publish the + // nexus with the same ID. + // I/O would eventually retry and the new nexus would run I/O. + nex_0.shutdown().await.unwrap(); + nex_0.destroy().await.unwrap(); + + remove_fault_injection(ms_nex.clone(), &inj_w) + .await + .unwrap(); + remove_fault_injection(ms_nex.clone(), &inj_r) + .await + .unwrap(); + + let mut nex_0 = NexusBuilder::new(ms_nex.clone()) + .with_name(NEXUS_NAME) + .with_uuid(NEXUS_UUID) + .with_size_mb(NEXUS_SIZE) + .with_replica(&repl_0); + + nex_0.create().await.unwrap(); + nex_0.publish().await.unwrap(); +} + +#[tokio::test] +async fn nexus_crd_resv() { + common::composer_init(); + + const HOSTID_0: &str = "53b35ce9-8e71-49a9-ab9b-cba7c5670fad"; + const HOSTID_1: &str = "c1affd2d-ef79-4ba4-b5cf-8eb48f9c07d0"; + const HOSTID_2: &str = "3f264cc3-1c95-44ca-bc1f-ed7fb68f3894"; + const PTPL_CONTAINER_DIR: &str = "/host/tmp/ptpl"; + const RESV_KEY_1: u64 = 0xabcd_ef00_1234_5678; + const RESV_KEY_2: u64 = 0xfeed_f00d_bead_5678; + + // Set 1st, 3nd CRD to non-zero value and 2nd to zero. + // Nexus reservation must select the second one (zero). + const CRDT: &str = "0,15,0"; + const TOTAL_DELAY: u64 = 15 * 5 * 100; + + let ptpl_dir = |ms| format!("{PTPL_CONTAINER_DIR}/{ms}"); + + let test = Builder::new() + .name("nexus_crd_resv_test") + .network("10.1.0.0/16") + .unwrap() + .add_container_bin( + "ms_0", + Binary::from_dbg("io-engine") + .with_env("NEXUS_NVMF_RESV_ENABLE", "1") + .with_env("MAYASTOR_NVMF_HOSTID", HOSTID_0) + .with_args(vec![ + "-l", + "1", + "-F", + "compact,color,host", + "--tgt-crdt", + CRDT, + "--ptpl-dir", + ptpl_dir("ms_0").as_str(), + ]) + .with_bind("/tmp", "/host/tmp"), + ) + .add_container_bin( + "ms_1", + Binary::from_dbg("io-engine") + .with_env("NEXUS_NVMF_RESV_ENABLE", "1") + .with_env("MAYASTOR_NVMF_HOSTID", HOSTID_1) + .with_args(vec![ + "-l", + "2", + "-F", + "compact,color,host", + "--tgt-crdt", + CRDT, + "--ptpl-dir", + ptpl_dir("ms_1").as_str(), + ]) + .with_bind("/tmp", "/host/tmp"), + ) + .add_container_bin( + "ms_2", + Binary::from_dbg("io-engine") + .with_env("NEXUS_NVMF_RESV_ENABLE", "1") + .with_env("MAYASTOR_NVMF_HOSTID", HOSTID_2) + .with_args(vec![ + "-l", + "3", + "-F", + "compact,color,host", + "--tgt-crdt", + CRDT, + "--ptpl-dir", + ptpl_dir("ms_2").as_str(), + ]), + ) + .with_clean(true) + .build() + .await + .unwrap(); + + let conn = GrpcConnect::new(&test); + + let ms_0 = conn.grpc_handle_shared("ms_0").await.unwrap(); + let ms_1 = conn.grpc_handle_shared("ms_1").await.unwrap(); + let ms_2 = conn.grpc_handle_shared("ms_2").await.unwrap(); + + let mut pool_0 = PoolBuilder::new(ms_0.clone()) + .with_name("pool0") + .with_new_uuid() + .with_malloc("mem0", POOL_SIZE); + + pool_0.create().await.unwrap(); + + let mut repl_0 = ReplicaBuilder::new(ms_0.clone()) + .with_pool(&pool_0) + .with_name("r0") + .with_uuid(REPL_UUID) + .with_size_mb(REPL_SIZE) + .with_thin(false); + + repl_0.create().await.unwrap(); + repl_0.share().await.unwrap(); + + // Create nexus #1. + let mut nex_1 = NexusBuilder::new(ms_1.clone()) + .with_name(NEXUS_NAME) + .with_uuid(NEXUS_UUID) + .with_size_mb(NEXUS_SIZE) + .with_replica(&repl_0) + .with_resv_key(RESV_KEY_1) + .with_resv_type(NvmeReservation::ExclusiveAccess) + .with_preempt_policy(NexusNvmePreemption::Holder); + + nex_1.create().await.unwrap(); + nex_1.publish().await.unwrap(); + + // Create nexus #2. + let mut nex_2 = NexusBuilder::new(ms_2.clone()) + .with_name(NEXUS_NAME) + .with_uuid(NEXUS_UUID) + .with_size_mb(NEXUS_SIZE) + .with_replica(&repl_0) + .with_resv_key(RESV_KEY_2) + .with_resv_type(NvmeReservation::ExclusiveAccess) + .with_preempt_policy(NexusNvmePreemption::Holder); + + nex_2.create().await.unwrap(); + nex_2.publish().await.unwrap(); + + // Run I/O on the first nexus, causing SPDK_NVME_SC_RESERVATION_CONFLICT. + // io-engine must select 2nd CRD, which is configured to be zero. + let fio_res = { + let (_cg, path) = nex_1.nvmf_location().open().unwrap(); + + let fio = Fio::new().with_job( + FioJob::new() + .with_name("j0") + .with_filename_path(path) + .with_ioengine("libaio") + .with_iodepth(1) + .with_direct(true) + .with_rw("write") + .with_size(DataSize::from_kb(4)), + ); + + tokio::spawn(async { fio.run() }).await.unwrap() + }; + assert!(fio_res.total_time < Duration::from_millis(TOTAL_DELAY)); + + // The required errno (EBADE) exists on Linux-like targets only. On other + // platforms like macos, an IDE would highlight it as an error. + #[cfg(target_os = "linux")] + assert_eq!( + fio_res.find_job("j0").unwrap().result, + FioJobResult::Error(Errno::EBADE) + ); +} diff --git a/io-engine/tests/nexus_fail_crd.rs b/io-engine/tests/nexus_fail_crd.rs deleted file mode 100644 index b964e3f08e..0000000000 --- a/io-engine/tests/nexus_fail_crd.rs +++ /dev/null @@ -1,229 +0,0 @@ -pub mod common; - -use std::{sync::Arc, time::Duration}; -use tokio::sync::{ - oneshot, - oneshot::{Receiver, Sender}, -}; - -use common::{ - compose::{ - rpc::v1::{GrpcConnect, SharedRpcHandle}, - Binary, - Builder, - }, - fio::{Fio, FioJob}, - nexus::NexusBuilder, - nvme::{find_mayastor_nvme_device_path, NmveConnectGuard}, - pool::PoolBuilder, - replica::ReplicaBuilder, - test::{add_fault_injection, remove_fault_injection}, -}; - -const POOL_SIZE: u64 = 500; -const REPL_SIZE: u64 = 450; -const NEXUS_NAME: &str = "nexus_0"; -const NEXUS_SIZE: u64 = REPL_SIZE; -const NEXUS_UUID: &str = "bbe6cbb6-c508-443a-877a-af5fa690c760"; - -/// Tests that without CRD enabled, initiator would eventually fail I/Os. -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn nexus_fail_no_crd() { - test_nexus_fail("0") - .await - .expect_err("I/O expected to fail"); -} - -/// Tests that CRD properly delays I/O retries on initiator, while the target -/// has a chance to replace a failed nexus. -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn nexus_fail_crd() { - test_nexus_fail("20") - .await - .expect("I/O expected to succeed"); -} - -async fn test_nexus_fail(crdt: &str) -> std::io::Result<()> { - common::composer_init(); - - let test = Builder::new() - .name("cargo-test") - .network("10.1.0.0/16") - .unwrap() - .add_container_bin( - "ms_0", - Binary::from_dbg("io-engine").with_args(vec!["-l", "5"]), - ) - .add_container_bin( - "ms_nex", - Binary::from_dbg("io-engine").with_args(vec![ - "-l", - "1,2,3,4", - "--tgt-crdt", - crdt, - ]), - ) - .with_clean(true) - .build() - .await - .unwrap(); - - let test = Arc::new(test); - - let conn = GrpcConnect::new(&test); - - let ms_0 = conn.grpc_handle_shared("ms_0").await.unwrap(); - let ms_nex = conn.grpc_handle_shared("ms_nex").await.unwrap(); - - let mut pool_0 = PoolBuilder::new(ms_0.clone()) - .with_name("pool0") - .with_new_uuid() - .with_malloc("mem0", POOL_SIZE); - - let mut repl_0 = ReplicaBuilder::new(ms_0.clone()) - .with_pool(&pool_0) - .with_name("r0") - .with_new_uuid() - .with_size_mb(REPL_SIZE) - .with_thin(false); - - pool_0.create().await.unwrap(); - repl_0.create().await.unwrap(); - repl_0.share().await.unwrap(); - - let mut nex_0 = NexusBuilder::new(ms_nex.clone()) - .with_name(NEXUS_NAME) - .with_uuid(NEXUS_UUID) - .with_size_mb(NEXUS_SIZE) - .with_replica(&repl_0); - - nex_0.create().await.unwrap(); - nex_0.publish().await.unwrap(); - - let children = nex_0.get_nexus().await.unwrap().children; - let dev_name = children[0].device_name.as_ref().unwrap(); - - let inj = "domain=nexus&op=write&offset=0"; - let inj_w = format!("inject://{dev_name}?{inj}"); - - let inj = "domain=nexus&op=read&offset=0"; - let inj_r = format!("inject://{dev_name}?{inj}"); - - let cfg = JobCfg { - ms_nex: ms_nex.clone(), - nex_0: nex_0.clone(), - repl_0: repl_0.clone(), - inj_w: inj_w.clone(), - inj_r: inj_r.clone(), - }; - - // Run two tasks in parallel, I/O and nexus management. - let (s, r) = oneshot::channel(); - - let j0 = tokio::spawn({ - let cfg = cfg.clone(); - async move { run_io_task(s, cfg).await } - }); - tokio::pin!(j0); - - let j1 = tokio::spawn({ - let cfg = cfg.clone(); - async move { - run_manage_task(r, cfg).await; - } - }); - tokio::pin!(j1); - - let (io_res, _) = tokio::join!(j0, j1); - io_res.unwrap() -} - -#[derive(Clone)] -struct JobCfg { - ms_nex: SharedRpcHandle, - nex_0: NexusBuilder, - repl_0: ReplicaBuilder, - inj_w: String, - inj_r: String, -} - -/// Runs multiple FIO I/O jobs. -async fn run_io_task(s: Sender<()>, cfg: JobCfg) -> std::io::Result<()> { - let nvmf = cfg.nex_0.nvmf_location(); - let _cg = NmveConnectGuard::connect_addr(&nvmf.addr, &nvmf.nqn); - let path = find_mayastor_nvme_device_path(&nvmf.serial) - .unwrap() - .to_str() - .unwrap() - .to_string(); - - let jobs = (0 .. 10).map(|_| { - FioJob::new() - .with_direct(true) - .with_ioengine("libaio") - .with_iodepth(128) - .with_filename(&path) - .with_runtime(20) - .with_rw("randwrite") - }); - - let fio = Fio::new().with_jobs(jobs); - - // Notify the nexus management task that connection is complete and I/O - // starts. - s.send(()).unwrap(); - - // Start FIO. - tokio::spawn(async move { fio.run() }).await.unwrap() -} - -/// Manages the nexus in parallel to I/O task. -/// [1] Nexus is failed by injecting a fault. -/// [2] I/O running in parallel should freeze or fail, depending on how target's -/// configured. -/// [3] Nexus is recreated. -async fn run_manage_task(r: Receiver<()>, cfg: JobCfg) { - let JobCfg { - ms_nex, - inj_w, - inj_r, - mut nex_0, - repl_0, - .. - } = cfg; - - // Wait until I/O task connects and signals it is ready. - r.await.unwrap(); - - // Allow I/O to run for some time. - tokio::time::sleep(Duration::from_secs(2)).await; - - // Inject fault, so the nexus would fail. - add_fault_injection(ms_nex.clone(), &inj_w).await.unwrap(); - add_fault_injection(ms_nex.clone(), &inj_r).await.unwrap(); - - // When nexus fails, I/O should be freezing due to CRD (if enabled). - tokio::time::sleep(Duration::from_secs(2)).await; - - // Destroy the nexus, remove injectios and re-create and re-publish the - // nexus with the same ID. - // I/O would eventually retry and the new nexus would run I/O. - nex_0.shutdown().await.unwrap(); - nex_0.destroy().await.unwrap(); - - remove_fault_injection(ms_nex.clone(), &inj_w) - .await - .unwrap(); - remove_fault_injection(ms_nex.clone(), &inj_r) - .await - .unwrap(); - - let mut nex_0 = NexusBuilder::new(ms_nex.clone()) - .with_name(NEXUS_NAME) - .with_uuid(NEXUS_UUID) - .with_size_mb(NEXUS_SIZE) - .with_replica(&repl_0); - - nex_0.create().await.unwrap(); - nex_0.publish().await.unwrap(); -} diff --git a/io-engine/tests/nexus_fault_injection.rs b/io-engine/tests/nexus_fault_injection.rs index a342e8aed9..daf96b7183 100644 --- a/io-engine/tests/nexus_fault_injection.rs +++ b/io-engine/tests/nexus_fault_injection.rs @@ -4,6 +4,18 @@ pub mod common; use std::time::Duration; +use io_engine::core::{ + fault_injection::{ + FaultDomain, + FaultIoOperation, + FaultIoStage, + FaultMethod, + Injection, + InjectionBuilder, + }, + IoCompletionStatus, +}; + use common::{ compose::{ rpc::v1::{ @@ -15,12 +27,16 @@ use common::{ ComposeTest, }, file_io::DataSize, + fio::{spawn_fio_task, Fio, FioJob}, nexus::{test_write_to_nexus, NexusBuilder}, + nvme::{find_mayastor_nvme_device_path, NmveConnectGuard}, pool::PoolBuilder, replica::ReplicaBuilder, test::{add_fault_injection, list_fault_injections}, }; +use spdk_rs::NvmeStatus; + static POOL_SIZE: u64 = 60; static REPL_SIZE: u64 = 50; @@ -165,22 +181,22 @@ async fn test_injection_uri(inj_part: &str) { #[tokio::test] async fn nexus_fault_injection_write_submission() { - test_injection_uri("domain=nexus&op=write&stage=submit&offset=64").await; + test_injection_uri("domain=child&op=write&stage=submit&offset=64").await; } #[tokio::test] async fn nexus_fault_injection_write() { - test_injection_uri("domain=nexus&op=write&offset=64").await; + test_injection_uri("domain=child&op=write&stage=compl&offset=64").await; } #[tokio::test] async fn nexus_fault_injection_read_submission() { - test_injection_uri("domain=nexus&op=read&stage=submit&offset=64").await; + test_injection_uri("domain=child&op=read&stage=submit&offset=64").await; } #[tokio::test] async fn nexus_fault_injection_read() { - test_injection_uri("domain=nexus&op=read&offset=64").await; + test_injection_uri("domain=child&op=read&stage=compl&offset=64").await; } #[tokio::test] @@ -202,7 +218,8 @@ async fn nexus_fault_injection_time_based() { // Create an injection that will start in 1 sec after first I/O // to the device, and end after 5s. - let inj_part = "domain=nexus&op=write&begin=1000&end=5000"; + let inj_part = + "domain=child&op=write&stage=compl&begin_at=1000&end_at=5000"; let inj_uri = format!("inject://{dev_name}?{inj_part}"); add_fault_injection(nex_0.rpc(), &inj_uri).await.unwrap(); @@ -280,8 +297,14 @@ async fn nexus_fault_injection_range_based() { // Create injection that will fail at offset of 128 blocks, for a span // of 16 blocks. - let inj_part = "domain=nexus&op=write&offset=128&num_blk=16"; - let inj_uri = format!("inject://{dev_name}?{inj_part}"); + let inj_uri = InjectionBuilder::default() + .with_device_name(dev_name.clone()) + .with_domain(FaultDomain::NexusChild) + .with_io_operation(FaultIoOperation::Write) + .with_io_stage(FaultIoStage::Completion) + .with_offset(128, 16) + .build_uri() + .unwrap(); add_fault_injection(nex_0.rpc(), &inj_uri).await.unwrap(); // List injected fault. @@ -349,3 +372,135 @@ async fn nexus_fault_injection_range_based() { let children = nex_0.get_nexus().await.unwrap().children; assert_eq!(children[0].state, ChildState::Faulted as i32); } + +#[tokio::test] +async fn injection_uri_creation() { + let src = InjectionBuilder::default() + .with_domain(FaultDomain::BlockDevice) + .with_device_name("dev0".to_string()) + .with_method(FaultMethod::Status(IoCompletionStatus::NvmeError( + NvmeStatus::NO_SPACE, + ))) + .with_io_operation(FaultIoOperation::Read) + .with_io_stage(FaultIoStage::Completion) + .with_block_range(123 .. 456) + .with_time_range(Duration::from_secs(5) .. Duration::from_secs(10)) + .with_retries(789) + .build() + .unwrap(); + + // Test that debug output works. + println!("{src:?}"); + println!("{src:#?}"); + + let uri = src.as_uri(); + let res = Injection::from_uri(&uri).unwrap(); + + assert_eq!(src.uri(), uri); + assert_eq!(src.uri(), res.uri()); + assert_eq!(src.domain, res.domain); + assert_eq!(src.device_name, res.device_name); + assert_eq!(src.method, res.method); + assert_eq!(src.io_operation, res.io_operation); + assert_eq!(src.io_stage, res.io_stage); + assert_eq!(src.time_range, res.time_range); + assert_eq!(src.block_range, res.block_range); + assert_eq!(src.retries, res.retries); +} + +#[tokio::test] +async fn replica_bdev_io_injection() { + common::composer_init(); + + const BLK_SIZE: u64 = 512; + + let test = Builder::new() + .name("cargo-test") + .network("10.1.0.0/16") + .unwrap() + .add_container_bin( + "ms_0", + Binary::from_dbg("io-engine").with_args(vec![ + "-l", + "1", + "-Fcompact,color,nodate", + ]), + ) + .with_clean(true) + .build() + .await + .unwrap(); + + let conn = GrpcConnect::new(&test); + let rpc = conn.grpc_handle_shared("ms_0").await.unwrap(); + + let mut pool_0 = PoolBuilder::new(rpc.clone()) + .with_name("pool0") + .with_new_uuid() + .with_malloc_blk_size("mem0", 100, BLK_SIZE); + pool_0.create().await.unwrap(); + + let mut repl_0 = ReplicaBuilder::new(rpc.clone()) + .with_pool(&pool_0) + .with_name("r0") + .with_new_uuid() + .with_size_mb(80) + .with_thin(true); + + repl_0.create().await.unwrap(); + repl_0.share().await.unwrap(); + + let inj_uri = InjectionBuilder::default() + .with_device_name("r0".to_string()) + .with_domain(FaultDomain::BdevIo) + .with_io_operation(FaultIoOperation::Write) + .with_io_stage(FaultIoStage::Submission) + .with_method(FaultMethod::Status(IoCompletionStatus::NvmeError( + NvmeStatus::DATA_TRANSFER_ERROR, + ))) + .with_offset(20, 1) + .build_uri() + .unwrap(); + + add_fault_injection(rpc.clone(), &inj_uri).await.unwrap(); + + let nvmf = repl_0.nvmf_location(); + let _cg = NmveConnectGuard::connect_addr(&nvmf.addr, &nvmf.nqn); + let path = find_mayastor_nvme_device_path(&nvmf.serial) + .unwrap() + .to_str() + .unwrap() + .to_string(); + + // With offset of 30 blocks, the job mustn't hit the injected fault, which + // is set on block #20. + let fio_ok = Fio::new().with_job( + FioJob::new() + .with_direct(true) + .with_ioengine("libaio") + .with_iodepth(1) + .with_filename(&path) + .with_offset(DataSize::from_blocks(30, BLK_SIZE)) + .with_rw("write"), + ); + + // With the entire device, the job must hit the injected fault. + let fio_fail = Fio::new().with_job( + FioJob::new() + .with_direct(true) + .with_ioengine("libaio") + .with_iodepth(1) + .with_filename(&path) + .with_rw("write"), + ); + + spawn_fio_task(&fio_ok) + .await + .expect("This FIO job must succeed"); + + let r = spawn_fio_task(&fio_fail) + .await + .expect_err("This FIO job must fail"); + + assert_eq!(r.kind(), std::io::ErrorKind::Other); +} diff --git a/io-engine/tests/nexus_rebuild_partial.rs b/io-engine/tests/nexus_rebuild_partial.rs index 3388fec4fb..c2b33ab194 100644 --- a/io-engine/tests/nexus_rebuild_partial.rs +++ b/io-engine/tests/nexus_rebuild_partial.rs @@ -22,6 +22,14 @@ use io_engine_tests::{ nexus::test_fio_to_nexus, }; +#[cfg(feature = "fault-injection")] +use io_engine::core::fault_injection::{ + FaultDomain, + FaultIoOperation, + FaultIoStage, + InjectionBuilder, +}; + #[cfg(feature = "fault-injection")] use common::compose::rpc::v1::nexus::RebuildJobState; @@ -188,11 +196,14 @@ async fn nexus_partial_rebuild_io_fault() { // Inject a child failure. // All write operations starting of segment #7 will fail. let dev_name_1 = children[1].device_name.as_ref().unwrap(); - let inj_uri = format!( - "inject://{dev_name_1}?domain=nexus&op=write&offset={offset}", - offset = 7 * SEG_BLK - ); - + let inj_uri = InjectionBuilder::default() + .with_device_name(dev_name_1.clone()) + .with_domain(FaultDomain::NexusChild) + .with_io_operation(FaultIoOperation::Write) + .with_io_stage(FaultIoStage::Completion) + .with_block_range(7 * SEG_BLK .. u64::MAX) + .build_uri() + .unwrap(); add_fault_injection(nex_0.rpc(), &inj_uri).await.unwrap(); // This write must be okay as the injection is not triggered yet. @@ -486,11 +497,14 @@ async fn nexus_partial_rebuild_double_fault() { .unwrap(); // Inject a failure at FAULT_POS. - let inj_uri = format!( - "inject://{child_0_dev_name}?\ - domain=nexus&op=write&offset={offset}&num_blk=1", - offset = FAULT_POS * 1024 * 1024 / BLK_SIZE - ); + let inj_uri = InjectionBuilder::default() + .with_device_name(child_0_dev_name.clone()) + .with_domain(FaultDomain::NexusChild) + .with_io_operation(FaultIoOperation::Write) + .with_io_stage(FaultIoStage::Completion) + .with_offset(FAULT_POS * 1024 * 1024 / BLK_SIZE, 1) + .build_uri() + .unwrap(); add_fault_injection(nex_0.rpc(), &inj_uri).await.unwrap(); // Online the replica, triggering the rebuild. diff --git a/io-engine/tests/nexus_rebuild_verify.rs b/io-engine/tests/nexus_rebuild_verify.rs index d9b4429d01..83ae8d3ac5 100644 --- a/io-engine/tests/nexus_rebuild_verify.rs +++ b/io-engine/tests/nexus_rebuild_verify.rs @@ -18,6 +18,13 @@ use common::{ test::add_fault_injection, }; +use io_engine::core::fault_injection::{ + FaultDomain, + FaultIoOperation, + FaultIoStage, + FaultMethod, + InjectionBuilder, +}; use std::time::Duration; const POOL_SIZE: u64 = 80; @@ -50,9 +57,15 @@ async fn test_rebuild_verify( .unwrap(); // Add an injection as block device level. - let inj_part = "domain=block&op=write&stage=submission&type=data\ - &offset=10240&num_blk=1"; - let inj_uri = format!("inject://{dev_name}?{inj_part}"); + let inj_uri = InjectionBuilder::default() + .with_device_name(dev_name.clone()) + .with_domain(FaultDomain::BlockDevice) + .with_io_operation(FaultIoOperation::Write) + .with_io_stage(FaultIoStage::Submission) + .with_method(FaultMethod::Data) + .with_offset(10240, 1) + .build_uri() + .unwrap(); add_fault_injection(nex_0.rpc(), &inj_uri).await.unwrap(); // Online the replica. Rebuild must fail at some point because of injected diff --git a/io-engine/tests/replica_crd.rs b/io-engine/tests/replica_crd.rs new file mode 100644 index 0000000000..d9e7509678 --- /dev/null +++ b/io-engine/tests/replica_crd.rs @@ -0,0 +1,108 @@ +#![cfg(feature = "fault-injection")] + +pub mod common; + +use nix::errno::Errno; +use std::time::Duration; + +use common::{ + compose::{rpc::v1::GrpcConnect, Binary, Builder}, + fio::{Fio, FioJob, FioJobResult}, + pool::PoolBuilder, + replica::ReplicaBuilder, + test::add_fault_injection, +}; + +use io_engine::core::fault_injection::{ + FaultDomain, + FaultIoOperation, + FaultIoStage, + FaultMethod, + InjectionBuilder, +}; + +// Test that the third CRD value is used for a replica target. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn replica_no_fail_crd() { + const POOL_SIZE: u64 = 100; + const REPL_SIZE: u64 = 80; + const REPL_NAME: &str = "r0"; + + // Set 1st, 2nd CRD to non-zero value and 3rd to zero. + // Replica must select the third one (zero). + const CRDT: &str = "15,15,0"; + + const TOTAL_DELAY: u64 = 15 * 5 * 100; + + common::composer_init(); + + let test = Builder::new() + .name("cargo-test") + .network("10.1.0.0/16") + .unwrap() + .add_container_bin( + "ms_0", + Binary::from_dbg("io-engine").with_args(vec![ + "-l", + "1", + "-Fcompact,color,nodate", + "--tgt-crdt", + CRDT, + ]), + ) + .with_clean(true) + .build() + .await + .unwrap(); + + let conn = GrpcConnect::new(&test); + let rpc = conn.grpc_handle_shared("ms_0").await.unwrap(); + + let mut pool_0 = PoolBuilder::new(rpc.clone()) + .with_name("pool0") + .with_new_uuid() + .with_malloc("mem0", POOL_SIZE); + pool_0.create().await.unwrap(); + + let mut repl_0 = ReplicaBuilder::new(rpc.clone()) + .with_pool(&pool_0) + .with_name(REPL_NAME) + .with_new_uuid() + .with_size_mb(REPL_SIZE) + .with_thin(true); + + repl_0.create().await.unwrap(); + repl_0.share().await.unwrap(); + + // Injection. + let inj_uri = InjectionBuilder::default() + .with_device_name(REPL_NAME.to_string()) + .with_domain(FaultDomain::BdevIo) + .with_io_operation(FaultIoOperation::Write) + .with_io_stage(FaultIoStage::Submission) + .with_method(FaultMethod::DATA_TRANSFER_ERROR) + .with_offset(1000, 1) + .build_uri() + .unwrap(); + + add_fault_injection(rpc.clone(), &inj_uri).await.unwrap(); + + let (_cg, path) = repl_0.nvmf_location().open().unwrap(); + + // FIO jobs. + let fio = Fio::new().with_job( + FioJob::new() + .with_name("job0") + .with_direct(true) + .with_ioengine("libaio") + .with_iodepth(1) + .with_filename_path(&path) + .with_rw("write"), + ); + + let fio_res = tokio::spawn(async { fio.run() }).await.unwrap(); + let job_res = fio_res.find_job("job0").unwrap(); + + assert_eq!(job_res.result, FioJobResult::Error(Errno::EIO)); + assert!(fio_res.total_time < Duration::from_millis(TOTAL_DELAY)); +} diff --git a/io-engine/tests/replica_thin_no_space.rs b/io-engine/tests/replica_thin_no_space.rs new file mode 100644 index 0000000000..d4a7ae8c01 --- /dev/null +++ b/io-engine/tests/replica_thin_no_space.rs @@ -0,0 +1,166 @@ +#![cfg(feature = "fault-injection")] + +pub mod common; + +use nix::errno::Errno; + +use common::{ + compose::{rpc::v1::GrpcConnect, Binary, Builder}, + fio::{Fio, FioJob, FioJobResult}, + pool::PoolBuilder, + replica::ReplicaBuilder, + test::add_fault_injection, +}; + +use io_engine::core::fault_injection::{ + FaultDomain, + FaultIoStage, + InjectionBuilder, +}; + +use spdk_rs::NvmeStatus; + +#[tokio::test] +async fn replica_thin_nospc() { + common::composer_init(); + + const BLK_SIZE: u64 = 512; + + let test = Builder::new() + .name("cargo-test") + .network("10.1.0.0/16") + .unwrap() + .add_container_bin( + "ms_0", + Binary::from_dbg("io-engine").with_args(vec![ + "-l", + "1", + "-Fcompact,color,nodate", + ]), + ) + .with_clean(true) + .build() + .await + .unwrap(); + + let conn = GrpcConnect::new(&test); + let rpc = conn.grpc_handle_shared("ms_0").await.unwrap(); + + let mut pool_0 = PoolBuilder::new(rpc.clone()) + .with_name("pool0") + .with_new_uuid() + .with_malloc_blk_size("mem0", 100, BLK_SIZE); + pool_0.create().await.unwrap(); + + let mut repl_0 = ReplicaBuilder::new(rpc.clone()) + .with_pool(&pool_0) + .with_name("r0") + .with_new_uuid() + .with_size_mb(80) + .with_thin(true); + + repl_0.create().await.unwrap(); + repl_0.share().await.unwrap(); + + let mut repl_1 = ReplicaBuilder::new(rpc.clone()) + .with_pool(&pool_0) + .with_name("r1") + .with_new_uuid() + .with_size_mb(80) + .with_thin(false); + + repl_1.create().await.unwrap(); + + let nvmf = repl_0.nvmf_location(); + let (_nvmf_conn, path) = nvmf.open().unwrap(); + + let fio = Fio::new().with_job( + FioJob::new() + .with_name("j-0") + .with_direct(true) + .with_ioengine("libaio") + .with_iodepth(1) + .with_filename_path(&path) + .with_rw("write"), + ); + + let res = tokio::spawn(async move { fio.run() }).await.unwrap(); + + assert!(matches!( + res.find_job("j-0").unwrap().result, + FioJobResult::Error(Errno::ENOSPC) + )); +} + +#[tokio::test] +async fn replica_nospc_inject() { + common::composer_init(); + + const BLK_SIZE: u64 = 512; + + let test = Builder::new() + .name("cargo-test") + .network("10.1.0.0/16") + .unwrap() + .add_container_bin( + "ms_0", + Binary::from_dbg("io-engine").with_args(vec![ + "-l", + "1", + "-Fcompact,color,nodate", + ]), + ) + .with_clean(true) + .build() + .await + .unwrap(); + + let conn = GrpcConnect::new(&test); + let rpc = conn.grpc_handle_shared("ms_0").await.unwrap(); + + let mut pool_0 = PoolBuilder::new(rpc.clone()) + .with_name("pool0") + .with_new_uuid() + .with_malloc_blk_size("mem0", 100, BLK_SIZE); + pool_0.create().await.unwrap(); + + let mut repl_0 = ReplicaBuilder::new(rpc.clone()) + .with_pool(&pool_0) + .with_name("r0") + .with_new_uuid() + .with_size_mb(80) + .with_thin(true); + + repl_0.create().await.unwrap(); + repl_0.share().await.unwrap(); + + let inj_uri = InjectionBuilder::default() + .with_device_name("r0".to_string()) + .with_domain(FaultDomain::BdevIo) + .with_io_stage(FaultIoStage::Submission) + .with_method_nvme_error(NvmeStatus::NO_SPACE) + .build_uri() + .unwrap(); + + add_fault_injection(rpc.clone(), &inj_uri).await.unwrap(); + + let nvmf = repl_0.nvmf_location(); + let (_nvmf_conn, path) = nvmf.open().unwrap(); + + let fio = Fio::new().with_job( + FioJob::new() + .with_name("j-0") + .with_direct(true) + .with_ioengine("libaio") + .with_iodepth(1) + .with_filename_path(&path) + .with_rw("write"), + ); + + let res = tokio::spawn(async move { fio.run() }).await.unwrap(); + + assert!(matches!( + res.find_job("j-0").unwrap().result, + FioJobResult::Error(Errno::ENOSPC) + )); +} diff --git a/nix/pkgs/libspdk/default.nix b/nix/pkgs/libspdk/default.nix index adb49101e0..f12a6ba36c 100644 --- a/nix/pkgs/libspdk/default.nix +++ b/nix/pkgs/libspdk/default.nix @@ -56,13 +56,13 @@ let # 7. Copy SHA256 from 'got' of the error message to 'sha256' field. # 8. 'nix-shell' build must now succeed. drvAttrs = rec { - version = "23.05-b1f0b4e"; + version = "23.05-baffd90"; src = fetchFromGitHub { owner = "openebs"; repo = "spdk"; - rev = "b1f0b4ea640441e4cde551aae2a3368fe811366e"; - sha256 = "sha256-F3wwEUnf1o30oMHAt3CRZpTOoeaqZwo7hRpPlr4S+Vg="; + rev = "baffd90809bdd0b113b76fc7c9d7663b69d26752"; + sha256 = "sha256-tyxtXh7RpU6VtBlEjZ5MotnKQ4uZbbLD5sV+ndkuHhc="; fetchSubmodules = true; };