Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stuck rebuilds and stuck nexus subsystems #1720

Merged
merged 3 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 43 additions & 7 deletions io-engine-tests/src/compose/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@ use std::future::Future;
use tokio::sync::oneshot::channel;

use crate::mayastor_test_init_ex;
use io_engine::core::{
mayastor_env_stop,
MayastorCliArgs,
MayastorEnvironment,
Reactor,
Reactors,
GLOBAL_RC,
use io_engine::{
core::{
device_monitor_loop,
mayastor_env_stop,
runtime,
MayastorCliArgs,
MayastorEnvironment,
ProtectedSubsystems,
Reactor,
Reactors,
ResourceLockManager,
ResourceLockManagerConfig,
GLOBAL_RC,
},
grpc,
};
use std::time::Duration;

Expand Down Expand Up @@ -99,6 +107,34 @@ impl<'a> MayastorTest<'a> {
tokio::time::sleep(Duration::from_millis(500)).await;
}
}

/// Starts the device monitor loop which is required to fully
/// remove devices when they are not in use.
pub fn start_device_monitor(&self) {
runtime::spawn(device_monitor_loop());
}

/// Start the gRPC server which can be useful to debug tests.
pub fn start_grpc(&self) {
let cfg = ResourceLockManagerConfig::default()
.with_subsystem(ProtectedSubsystems::POOL, 32)
.with_subsystem(ProtectedSubsystems::NEXUS, 512)
.with_subsystem(ProtectedSubsystems::REPLICA, 1024);
ResourceLockManager::initialize(cfg);

let env = MayastorEnvironment::global_or_default();
runtime::spawn(async {
grpc::MayastorGrpcServer::run(
&env.node_name,
&env.node_nqn,
env.grpc_endpoint.unwrap(),
env.rpc_addr,
env.api_versions,
)
.await
.ok();
});
}
}

impl<'a> Drop for MayastorTest<'a> {
Expand Down
34 changes: 32 additions & 2 deletions io-engine/src/bdev/nexus/nexus_bdev_children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use super::{
Nexus,
NexusChild,
NexusOperation,
NexusPauseState,
NexusState,
NexusStatus,
PersistOp,
Expand Down Expand Up @@ -787,6 +788,13 @@ impl<'n> DeviceEventListener for Nexus<'n> {
false,
);
}
DeviceEventType::AdminQNoticeCtrlFailed => {
Reactors::master().send_future(Nexus::disconnect_failed_child(
self.name.clone(),
dev_name.to_owned(),
));
}

_ => {
warn!(
"{:?}: ignoring event '{:?}' for device '{}'",
Expand Down Expand Up @@ -917,6 +925,28 @@ impl<'n> Nexus<'n> {
}
}

/// Disconnect a failed child from the given nexus.
async fn disconnect_failed_child(nexus_name: String, dev: String) {
let Some(nex) = nexus_lookup_mut(&nexus_name) else {
warn!(
"Nexus '{nexus_name}': retiring failed device '{dev}': \
nexus already gone"
);
return;
};

info!("Nexus '{nexus_name}': disconnect handlers for controller failed device: '{dev}'");

if nex.io_subsystem_state() == Some(NexusPauseState::Pausing) {
nex.traverse_io_channels_async((), |channel, _| {
channel.disconnect_detached_devices(|h| {
h.get_device().device_name() == dev && h.is_ctrlr_failed()
});
})
.await;
}
}

/// Retires a child device for the given nexus.
async fn child_retire_routine(
nexus_name: String,
Expand Down Expand Up @@ -981,12 +1011,12 @@ impl<'n> Nexus<'n> {
// channels, and all I/Os failing due to this device will eventually
// resubmit and succeeded (if any healthy children are left).
//
// Device disconnection is done in two steps (detach, than disconnect)
// Device disconnection is done in two steps (detach, then disconnect)
// in order to prevent an I/O race when retiring a device.
self.detach_device(&dev).await;

// Disconnect the devices with failed controllers _before_ pause,
// otherwise pause would stuck. Keep all controoled that are _not_
// otherwise pause would get stuck. Keep all controllers which are _not_
// failed (e.g., in the case I/O failed due to ENOSPC).
self.traverse_io_channels_async((), |channel, _| {
channel.disconnect_detached_devices(|h| h.is_ctrlr_failed());
Expand Down
3 changes: 2 additions & 1 deletion io-engine/src/bdev/nexus/nexus_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ impl<'n> Debug for NexusChannel<'n> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{io} chan '{nex}' core:{core}({cur}) [R:{r} W:{w} L:{l} C:{c}]",
"{io} chan '{nex}' core:{core}({cur}) [R:{r} W:{w} D:{d} L:{l} C:{c}]",
io = if self.is_io_chan { "I/O" } else { "Aux" },
nex = self.nexus.nexus_name(),
core = self.core,
cur = Cores::current(),
r = self.readers.len(),
w = self.writers.len(),
d = self.detached.len(),
l = self.io_logs.len(),
c = self.nexus.child_count(),
)
Expand Down
12 changes: 9 additions & 3 deletions io-engine/src/bdev/nvmx/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -844,13 +844,14 @@ pub extern "C" fn nvme_poll_adminq(ctx: *mut c_void) -> i32 {
if result < 0 {
if context.start_device_destroy() {
error!(
"process adminq: {}: {}",
"process adminq: {}: ctrl failed: {}, error: {}",
context.name,
context.is_failed(),
Errno::from_i32(result.abs())
);
info!("dispatching nexus fault and retire: {}", context.name);
let dev_name = context.name.to_string();
let carc = NVME_CONTROLLERS.lookup_by_name(&dev_name).unwrap();
let dev_name = context.name.as_str();
let carc = NVME_CONTROLLERS.lookup_by_name(dev_name).unwrap();
debug!(
?dev_name,
"notifying listeners of admin command completion failure"
Expand All @@ -864,6 +865,11 @@ pub extern "C" fn nvme_poll_adminq(ctx: *mut c_void) -> i32 {
?num_listeners,
"listeners notified of admin command completion failure"
);
} else if context.report_failed() {
if let Some(carc) = NVME_CONTROLLERS.lookup_by_name(&context.name) {
carc.lock()
.notify_listeners(DeviceEventType::AdminQNoticeCtrlFailed);
}
}
return 1;
}
Expand Down
15 changes: 15 additions & 0 deletions io-engine/src/bdev/nvmx/controller_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub(crate) struct TimeoutConfig {
reset_attempts: u32,
next_reset_time: Instant,
destroy_in_progress: AtomicCell<bool>,
report_failed: AtomicCell<bool>,
}

impl Drop for TimeoutConfig {
Expand All @@ -94,6 +95,7 @@ impl TimeoutConfig {
reset_attempts: MAX_RESET_ATTEMPTS,
next_reset_time: Instant::now(),
destroy_in_progress: AtomicCell::new(false),
report_failed: AtomicCell::new(true),
}
}

Expand All @@ -116,6 +118,19 @@ impl TimeoutConfig {
}
}

/// Check if the SPDK's nvme controller is failed.
pub fn is_failed(&self) -> bool {
self.ctrlr.is_failed
}
/// Check if we need to report the controller failure.
/// We only report this failure once.
pub fn report_failed(&mut self) -> bool {
if !self.is_failed() {
return false;
}
self.report_failed.compare_exchange(true, false).is_ok()
}

fn reset_cb(success: bool, ctx: *mut c_void) {
let timeout_ctx = TimeoutConfig::from_ptr(ctx as *mut TimeoutConfig);

Expand Down
8 changes: 7 additions & 1 deletion io-engine/src/core/device_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@ pub enum DeviceEventType {
DeviceResized,
/// TODO
MediaManagement,
/// TODO
/// Sent when admin q polling fails for the first time.
AdminCommandCompletionFailed,
/// When the adminq poll fails the first time, the controller may not yet
/// be failed.
/// Next time the admin q poll fails, if the controller is noticed as
/// failed for the first time, this event is sent, allowing further
/// clean up to be performed.
AdminQNoticeCtrlFailed,
}

/// TODO
Expand Down
4 changes: 2 additions & 2 deletions io-engine/src/core/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ type Result<T, E = EnvError> = std::result::Result<T, E>;
#[allow(dead_code)]
pub struct MayastorEnvironment {
pub node_name: String,
node_nqn: Option<String>,
pub node_nqn: Option<String>,
pub grpc_endpoint: Option<std::net::SocketAddr>,
pub registration_endpoint: Option<Uri>,
ps_endpoint: Option<String>,
Expand Down Expand Up @@ -420,7 +420,7 @@ pub struct MayastorEnvironment {
nvmf_tgt_interface: Option<String>,
/// NVMF target Command Retry Delay in x100 ms.
pub nvmf_tgt_crdt: [u16; TARGET_CRDT_LEN],
api_versions: Vec<ApiVersion>,
pub api_versions: Vec<ApiVersion>,
skip_sig_handler: bool,
enable_io_all_thrd_nexus_channels: bool,
developer_delay: bool,
Expand Down
2 changes: 1 addition & 1 deletion io-engine/src/rebuild/rebuild_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl RebuildJob {
}
}

#[derive(Debug, Clone)]
#[derive(Debug)]
struct RebuildFBendChan {
sender: async_channel::Sender<RebuildJobRequest>,
}
Expand Down
21 changes: 19 additions & 2 deletions io-engine/src/rebuild/rebuild_job_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl RebuildJobBackendManager {
}
}

/// Reply back to the requester with the generic rebuild stats.
/// Reply to the requester with the generic rebuild stats.
async fn reply_stats(
&mut self,
requester: oneshot::Sender<RebuildStats>,
Expand Down Expand Up @@ -488,10 +488,27 @@ impl RebuildJobBackendManager {
}

impl Drop for RebuildJobBackendManager {
/// Close and drain comms channel allowing sender to see the cancellation
/// error, should it attempt to communicate.
/// This is required because it seems if a message was already sent then it
/// will not get dropped until both the receivers and the senders are
/// dropped.
fn drop(&mut self) {
// set final stats now so failed stats requesters can still get stats.
let stats = self.stats();
info!("{self}: backend dropped; final stats: {stats:?}");
self.states.write().set_final_stats(stats);
self.states.write().set_final_stats(stats.clone());

// we close before draining, ensuring no new messages can be sent
self.info_chan.receiver.close();
// now we can drain, and we could just ignore, but let's try to
// reply to any stats requests
while let Ok(message) = self.info_chan.receiver.try_recv() {
if let RebuildJobRequest::GetStats(reply) = message {
reply.send(stats.clone()).ok();
}
}

for sender in self.complete_chan.lock().drain(..) {
sender.send(self.state()).ok();
}
Expand Down
2 changes: 1 addition & 1 deletion io-engine/src/subsys/config/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ impl Default for NvmeBdevOpts {
nvme_adminq_poll_period_us: time_try_from_env(
"NVME_ADMINQ_POLL_PERIOD",
1_000,
TimeUnit::MilliSeconds,
TimeUnit::MicroSeconds,
),
nvme_ioq_poll_period_us: time_try_from_env(
"NVME_IOQ_POLL_PERIOD",
Expand Down
3 changes: 3 additions & 0 deletions io-engine/tests/block_device_nvmf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1842,6 +1842,9 @@ async fn nvmf_device_hot_remove() {

impl DeviceEventListener for TestEventListener {
fn handle_device_event(&self, event: DeviceEventType, device: &str) {
if event == DeviceEventType::AdminQNoticeCtrlFailed {
return; // Not interested in this one
}
// Check event type and device name.
assert_eq!(event, DeviceEventType::DeviceRemoved);
assert_eq!(
Expand Down