Skip to content

Commit

Permalink
wip - device event listener refactoring openebs#1
Browse files Browse the repository at this point in the history
  • Loading branch information
dsavitskiy committed Oct 2, 2021
1 parent bbf0faa commit 7811614
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 85 deletions.
55 changes: 44 additions & 11 deletions mayastor/src/bdev/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,53 @@ impl SpdkBlockDevice {
Ok(Box::new(SpdkBlockDeviceDescriptor::from(descr)))
}

pub fn process_device_event(event: DeviceEventType, device: &str) {
// Keep a separate copy of all registered listeners in order to not
// invoke them with the lock held.
let listeners = {
let map = BDEV_LISTENERS.read().expect("lock poisoned");
match map.get(device) {
Some(listeners) => listeners.clone(),
None => return,
/// Called by spdk when there is an asynchronous bdev event i.e. removal.
pub(crate) fn bdev_event_callback(
event: spdk::BdevEvent,
bdev: spdk::Bdev<()>,
) {
let dev = SpdkBlockDevice::new(Bdev::new(bdev));

// Translate SPDK events into common device events.
let event = match event {
spdk::BdevEvent::Remove => {
info!("Received remove event for Bdev '{}'", dev.device_name());
DeviceEventType::DeviceRemoved
}
spdk::BdevEvent::Resize => {
warn!("Received resize event for Bdev '{}'", dev.device_name());
DeviceEventType::DeviceResized
}
spdk::BdevEvent::MediaManagement => {
warn!(
"Received media management event for Bdev '{}'",
dev.device_name()
);
DeviceEventType::MediaManagement
}
};

// Notify all listeners of this SPDK bdev.
for l in listeners {
(l)(event, device);
// Forward event to high-level handler.
dev.notify_listeners(event);
}

/// Notifies all listeners of this SPDK Bdev.
fn notify_listeners(self, event: DeviceEventType) {
let name = self.device_name();
for l in self.get_listeners() {
l.notify(event, &name);
}
}

/// Returns a list of this Bdev listeners.
///
/// Note: a separate copy of all registered listeners is returned in order
/// to not invoke them with the lock held.
fn get_listeners(&self) -> Vec<DeviceEventListener> {
let map = BDEV_LISTENERS.read().expect("lock poisoned");
match map.get(&self.device_name()) {
Some(listeners) => listeners.clone(),
None => Vec::new(),
}
}
}
Expand Down
30 changes: 17 additions & 13 deletions mayastor/src/bdev/nexus/nexus_bdev_children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::{
Reason,
VerboseError,
},
core::DeviceEventType,
core::{DeviceEventListener, DeviceEventType},
nexus_uri::NexusBdevError,
};

Expand Down Expand Up @@ -228,22 +228,25 @@ impl Nexus {
}
}

/// TODO
fn register_child_event_listener(&self, child: &NexusChild) {
let dev = child
.get_device()
.expect("No block device associated with nexus child");

dev.add_event_listener(Nexus::child_event_listener)
.map_err(|err| {
error!(
?err,
"{}: failed to register event listener for child {}",
self.name,
child.get_name(),
);
err
})
.unwrap();
dev.add_event_listener(DeviceEventListener::new(
Nexus::child_event_listener,
))
.map_err(|err| {
error!(
?err,
"{}: failed to register event listener for child {}",
self.name,
child.get_name(),
);
err
})
.unwrap();
}

/// Destroy child with given uri.
Expand Down Expand Up @@ -526,7 +529,8 @@ impl Nexus {
if self.bdev().alignment() < alignment {
info!("{}: child {} has alignment {}, updating required_alignment from {}", self.name, child.name, alignment, self.bdev().alignment());
unsafe {
(*self.bdev().as_ptr()).required_alignment = alignment as u8;
(*self.bdev().as_ptr()).required_alignment =
alignment as u8;
}
}
}
Expand Down
95 changes: 83 additions & 12 deletions mayastor/src/bdev/nexus/nexus_instances.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{
bdev::Nexus,
bdev::{nexus::nexus_module::NexusModule, Nexus},
core::singleton::{Singleton, SingletonCell},
};
use spdk::Thread;
use spdk::{BdevModuleIter, Thread};
use std::ptr::NonNull;

/// TODO
Expand Down Expand Up @@ -66,7 +66,26 @@ impl NexusInstances {

/// Lookups a nexus by its name and returns a mutable reference to it.
pub fn lookup_mut(&mut self, name: &str) -> Option<&mut Nexus> {
self.iter_mut().find(|n| n.name == name)
info!("^^^^ lookup_mut: {}", name);
unsafe {
for (i, n) in self.nexuses.iter().enumerate() {
info!(" -- {} :: {}", i, n.as_ref().name);
}

info!("-- count: {}", self.iter_mut().count());

let r = self.iter_mut().find(|n| {
info!(" -- check {}", n.name);
n.name == name
});

match &r {
Some(_) => info!(" ++ !"),
None => info!(" ++ ?"),
}

r
}
}

/// TODO
Expand All @@ -91,17 +110,26 @@ impl NexusInstances {
}
}

/// Lookup a nexus by its name (currently used only by test functions).
pub fn nexus_lookup(name: &str) -> Option<&mut Nexus> {
NexusInstances::as_mut().lookup_mut(name)
}



//////////////////////////////////////////////////////////////////////////////



/// TODO
pub struct NexusIter {
n: usize,
// iter: BdevIter<()>,
}

impl Iterator for NexusIter {
type Item = &'static Nexus;

fn next(&mut self) -> Option<&'static Nexus> {
// self.iter.next().map(|b| b.legacy_ctxt::<Nexus>())
let inst = NexusInstances::get_or_init();
if self.n < inst.nexuses.len() {
let i = self.n;
Expand All @@ -115,24 +143,22 @@ impl Iterator for NexusIter {

impl NexusIter {
fn new() -> Self {
info!("^^^^ new iter");
Self {
n: 0,
// iter: module().iter_bdevs()
}
}
}

/// TODO
pub struct NexusIterMut {
n: usize,
// iter: BdevIter<()>,
}

impl Iterator for NexusIterMut {
type Item = &'static mut Nexus;

fn next(&mut self) -> Option<&'static mut Nexus> {
// self.iter.next().map(|b| b.legacy_ctxt_mut::<Nexus>())
let inst = NexusInstances::get_or_init();
if self.n < inst.nexuses.len() {
let i = self.n;
Expand All @@ -146,14 +172,59 @@ impl Iterator for NexusIterMut {

impl NexusIterMut {
fn new() -> Self {
info!("^^^^ new iter mut");
Self {
n: 0,
// iter: module().iter_bdevs()
}
}
}

/// Lookup a nexus by its name (currently used only by test functions).
pub fn nexus_lookup(name: &str) -> Option<&mut Nexus> {
NexusInstances::as_mut().lookup_mut(name)
//------------------------------------------------------------------------------

/*
/// TODO
pub struct NexusIter {
iter: BdevModuleIter<Nexus>,
}
impl Iterator for NexusIter {
type Item = &'static Nexus;
fn next(&mut self) -> Option<&'static Nexus> {
self.iter.next().map(|b| b.data())
}
}
impl NexusIter {
fn new() -> Self {
info!("^^^^ new iter");
Self {
iter: NexusModule::current().iter_bdevs(),
}
}
}
/// TODO
pub struct NexusIterMut {
iter: BdevModuleIter<Nexus>,
}
impl Iterator for NexusIterMut {
type Item = &'static mut Nexus;
fn next(&mut self) -> Option<&'static mut Nexus> {
self.iter.next().map(|mut b| b.data_mut())
}
}
impl NexusIterMut {
fn new() -> Self {
info!("^^^^ new iter mut");
Self {
iter: NexusModule::current().iter_bdevs(),
}
}
}
*/
31 changes: 16 additions & 15 deletions mayastor/src/bdev/nvmx/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use crate::{
BlockDeviceIoStats,
CoreError,
DeviceEventListener,
DeviceEventListeners,
DeviceEventType,
IoDevice,
OpCompletionCallback,
Expand Down Expand Up @@ -115,18 +116,15 @@ pub struct NvmeControllerInner<'a> {
io_device: Arc<IoDevice>,
}

type EventCallbackList = Vec<DeviceEventListener>;

/*
* NVME controller implementation.
*/
/// NVME controller implementation.
/// TODO
pub struct NvmeController<'a> {
pub(crate) name: String,
id: u64,
prchk_flags: u32,
inner: Option<NvmeControllerInner<'a>>,
state_machine: ControllerStateMachine,
event_listeners: Mutex<EventCallbackList>,
event_listeners: Mutex<DeviceEventListeners>,
/// Timeout config is accessed by SPDK-driven timeout callback handlers,
/// so it needs to be a raw pointer. Mutable members are made atomic to
/// eliminate lock contention between API path and callback path.
Expand Down Expand Up @@ -155,7 +153,7 @@ impl<'a> NvmeController<'a> {
prchk_flags,
state_machine: ControllerStateMachine::new(name),
inner: None,
event_listeners: Mutex::new(Vec::<fn(DeviceEventType, &str)>::new()),
event_listeners: Mutex::new(DeviceEventListeners::new()),
timeout_config: NonNull::new(Box::into_raw(Box::new(
TimeoutConfig::new(name),
)))
Expand Down Expand Up @@ -270,7 +268,7 @@ impl<'a> NvmeController<'a> {

// Notify listeners in case of namespace removal.
if notify_listeners {
self.notify_event(DeviceEventType::DeviceRemoved);
self.notify_listeners(DeviceEventType::DeviceRemoved);
}

ns_active
Expand Down Expand Up @@ -699,9 +697,11 @@ impl<'a> NvmeController<'a> {
NvmeController::_complete_reset(reset_ctx, status);
}

fn notify_event(&self, event: DeviceEventType) -> usize {
// Keep a separate copy of all registered listeners in order to not
// invoke them with the lock held.
/// Notifies all listeners of this controller.
///
/// Note: Keep a separate copy of all registered listeners in order to not
/// invoke them with the lock held.
fn notify_listeners(&self, event: DeviceEventType) -> usize {
let listeners = {
let listeners = self
.event_listeners
Expand All @@ -711,15 +711,15 @@ impl<'a> NvmeController<'a> {
};

for l in listeners.iter() {
(*l)(event, &self.name);
l.notify(event, &self.name);
}
listeners.len()
}

/// Register listener to monitor device events related to this controller.
pub fn add_event_listener(
pub fn register_device_listener(
&self,
listener: fn(DeviceEventType, &str),
listener: DeviceEventListener,
) -> Result<(), CoreError> {
let mut listeners = self
.event_listeners
Expand Down Expand Up @@ -921,7 +921,8 @@ pub(crate) async fn destroy_device(name: String) -> Result<(), NexusBdevError> {
// Notify the listeners.
debug!(?name, "notifying listeners about device removal");
let controller = carc.lock();
let num_listeners = controller.notify_event(DeviceEventType::DeviceRemoved);
let num_listeners =
controller.notify_listeners(DeviceEventType::DeviceRemoved);
debug!(
?name,
?num_listeners,
Expand Down
Loading

0 comments on commit 7811614

Please sign in to comment.