Skip to content

Commit

Permalink
feat(debug): new diag added #1
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Savitskiy <dmitry.savitskiy@datacore.com>
  • Loading branch information
dsavitskiy committed Nov 10, 2022
1 parent 7c482f4 commit ddf9e57
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 11 deletions.
40 changes: 38 additions & 2 deletions io-engine/src/bdev/nvmx/controller_inner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
cell::RefCell,
convert::TryFrom,
ops::{Deref, DerefMut},
os::raw::c_void,
Expand Down Expand Up @@ -75,6 +76,9 @@ pub(crate) struct TimeoutConfig {
reset_attempts: u32,
next_reset_time: Instant,
destroy_in_progress: AtomicCell<bool>,
last_adminq_poll: RefCell<Option<Instant>>,
adminq_poll_cnt: RefCell<u64>,
adminq_interval_warn_threshold: Duration,
}

impl Drop for TimeoutConfig {
Expand All @@ -87,15 +91,27 @@ impl Drop for TimeoutConfig {
/// providing fast and atomic access to it.
impl TimeoutConfig {
pub fn new(ctrlr: &str) -> Self {
Self {
let x = nvme_bdev_running_config().nvme_adminq_poll_period_us * 10;
let s = Self {
name: String::from(ctrlr),
timeout_action: AtomicCell::new(DeviceTimeoutAction::Ignore),
reset_in_progress: AtomicCell::new(false),
ctrlr: SpdkNvmeController(NonNull::dangling()),
reset_attempts: MAX_RESET_ATTEMPTS,
next_reset_time: Instant::now(),
destroy_in_progress: AtomicCell::new(false),
}
last_adminq_poll: RefCell::new(None),
adminq_poll_cnt: RefCell::new(0),
adminq_interval_warn_threshold: Duration::from_micros(x),
};

info!(
"#### new ctrlr: '{}', adminq interval warn treshold: {} us",
s.name,
s.adminq_interval_warn_threshold.as_micros()
);

s
}

fn as_ptr(&mut self) -> *mut c_void {
Expand All @@ -112,6 +128,26 @@ impl TimeoutConfig {
}

pub fn process_adminq(&self) -> i32 {
let n = Instant::now();

if let Some(t) = *self.last_adminq_poll.borrow() {
let delta = n.saturating_duration_since(t);
if delta > self.adminq_interval_warn_threshold {
warn!(
"#### process_adminq on {}: '{}' poll #{}: starvation: \
{} us, warning threshold is {} us",
spdk_rs::Thread::current_info(),
self.name,
self.adminq_poll_cnt.borrow(),
delta.as_micros(),
self.adminq_interval_warn_threshold.as_micros()
);
}
}

*self.last_adminq_poll.borrow_mut() = Some(n);
*self.adminq_poll_cnt.borrow_mut() += 1;

unsafe {
spdk_nvme_ctrlr_process_admin_completions(self.ctrlr.as_ptr())
}
Expand Down
53 changes: 45 additions & 8 deletions io-engine/src/core/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use std::{
os::raw::c_void,
pin::Pin,
slice::Iter,
time::Duration,
time::{Duration, Instant},
};

use once_cell::sync::OnceCell;
Expand Down Expand Up @@ -180,10 +180,11 @@ impl Reactors {
if unsafe { spdk_cpuset_get_cpu(mask, r.lcore) } {
let mt = spdk_rs::Thread::from_ptr(thread);
info!(
"scheduled {} {:p} on core:{}",
"scheduled '{}' ({:p}) on core #{} (total incoming spdk threads: {})",
mt.name(),
thread,
r.lcore
r.lcore,
r.incoming.len() + 1,
);
r.incoming.push(mt);
return true;
Expand Down Expand Up @@ -479,13 +480,12 @@ impl Reactor {
self.run_futures();
let threads = self.threads.borrow();
threads.iter().for_each(|t| {
t.poll();
self.checked_poll(t);
});

drop(threads);
while let Some(i) = self.incoming.pop() {
self.threads.borrow_mut().push_back(i);
}

self.add_incoming();
}

/// poll the threads n times but only poll the futures queue once and look
Expand All @@ -497,16 +497,53 @@ impl Reactor {
let threads = self.threads.borrow();
for _ in 0 .. times {
threads.iter().for_each(|t| {
t.poll();
self.checked_poll(t);
});
}

self.receive_futures();
self.run_futures();

drop(threads);

self.add_incoming();
}

fn checked_poll(&self, t: &spdk_rs::Thread) {
let start = Instant::now();

t.poll();

let delta = Instant::now().saturating_duration_since(start);
if delta.as_millis() > 100 {
warn!(
"@@@@ core #{}: spdk_thread_pool() '{}': took too long: {} ms",
self.lcore,
t.name(),
delta.as_millis()
);
}
}

fn add_incoming(&self) {
let mut n = 0;

while let Some(i) = self.incoming.pop() {
info!(
"@@@@ core #{}: adding incoming spdk thread: '{}'",
self.lcore,
i.name()
);
self.threads.borrow_mut().push_back(i);
n += 1;
}

if n > 0 {
info!(
"@@@@ core #{}: new number of spdk threads: {}",
self.lcore,
self.threads.borrow().len()
);
}
}

Expand Down
2 changes: 1 addition & 1 deletion spdk-rs
Submodule spdk-rs updated 2 files
+59 −24 src/poller.rs
+11 −0 src/thread.rs

0 comments on commit ddf9e57

Please sign in to comment.