Skip to content

Commit

Permalink
Shm segment cleanup (#1281)
Browse files Browse the repository at this point in the history
* Fix SHM cleanup at exit

* fix tests

* change cleanup to be more robust

* can't use this because of typos checks that fire on external crate imports

* fix docs

* remove unsafe
  • Loading branch information
yellowhatter authored Jul 31, 2024
1 parent b31cc1a commit 5cfcccc
Show file tree
Hide file tree
Showing 20 changed files with 193 additions and 69 deletions.
65 changes: 62 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ serde_cbor = "0.11.2"
serde_json = "1.0.114"
serde-pickle = "1.1.1"
serde_yaml = "0.9.19"
static_init = "1.0.3"
stabby = "5.0.1"
sha3 = "0.10.6"
shared_memory = "0.12.4"
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-shm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ zenoh-core = { workspace = true }
zenoh-macros = { workspace = true }
zenoh-buffers = { workspace = true }
rand = { workspace = true }
lazy_static = { workspace = true }
static_init = { workspace = true }
num-traits = { workspace = true }
num_cpus = { workspace = true, optional = true }
thread-priority = { workspace = true }
Expand Down
22 changes: 10 additions & 12 deletions commons/zenoh-shm/src/api/client_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{
sync::{Arc, RwLock},
};

use lazy_static::lazy_static;
use static_init::dynamic;
use zenoh_result::{bail, ZResult};

use crate::{
Expand All @@ -31,17 +31,15 @@ use crate::{
reader::{ClientStorage, GlobalDataSegmentID},
};

lazy_static! {
/// A global lazily-initialized SHM client storage.
/// When initialized, contains default client set,
/// see ShmClientStorage::with_default_client_set
#[zenoh_macros::unstable_doc]
pub static ref GLOBAL_CLIENT_STORAGE: Arc<ShmClientStorage> = Arc::new(
ShmClientStorage::builder()
.with_default_client_set()
.build()
);
}
#[dynamic(lazy, drop)]
/// A global lazily-initialized SHM client storage. When initialized,
/// contains default client set, see [with_default_client_set](ShmClientStorage::with_default_client_set)
#[zenoh_macros::unstable_doc]
pub static mut GLOBAL_CLIENT_STORAGE: Arc<ShmClientStorage> = Arc::new(
ShmClientStorage::builder()
.with_default_client_set()
.build(),
);

/// Builder to create new client storages
#[zenoh_macros::unstable_doc]
Expand Down
10 changes: 6 additions & 4 deletions commons/zenoh-shm/src/api/provider/shm_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -904,13 +904,15 @@ where
ConfirmedDescriptor,
)> {
// allocate shared header
let allocated_header = GLOBAL_HEADER_STORAGE.allocate_header()?;
let allocated_header = GLOBAL_HEADER_STORAGE.read().allocate_header()?;

// allocate watchdog
let allocated_watchdog = GLOBAL_STORAGE.allocate_watchdog()?;
let allocated_watchdog = GLOBAL_STORAGE.read().allocate_watchdog()?;

// add watchdog to confirmator
let confirmed_watchdog = GLOBAL_CONFIRMATOR.add_owned(&allocated_watchdog.descriptor)?;
let confirmed_watchdog = GLOBAL_CONFIRMATOR
.read()
.add_owned(&allocated_watchdog.descriptor)?;

Ok((allocated_header, allocated_watchdog, confirmed_watchdog))
}
Expand All @@ -928,7 +930,7 @@ where

// add watchdog to validator
let c_header = header.clone();
GLOBAL_VALIDATOR.add(
GLOBAL_VALIDATOR.read().add(
allocated_watchdog.descriptor.clone(),
Box::new(move || {
c_header
Expand Down
47 changes: 47 additions & 0 deletions commons/zenoh-shm/src/cleanup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use static_init::dynamic;

/// A global cleanup, that is guaranteed to be dropped at normal program exit and that will
/// execute all registered cleanup routines at this moment
#[dynamic(lazy, drop)]
pub(crate) static mut CLEANUP: Cleanup = Cleanup::new();

/// An RAII object that calls all registered routines upon destruction
pub(crate) struct Cleanup {
cleanups: lockfree::queue::Queue<Option<Box<dyn FnOnce() + Send>>>,
}

impl Cleanup {
fn new() -> Self {
Self {
cleanups: Default::default(),
}
}

pub(crate) fn register_cleanup(&self, cleanup_fn: Box<dyn FnOnce() + Send>) {
self.cleanups.push(Some(cleanup_fn));
}
}

impl Drop for Cleanup {
fn drop(&mut self) {
while let Some(cleanup) = self.cleanups.pop() {
if let Some(f) = cleanup {
f();
}
}
}
}
4 changes: 3 additions & 1 deletion commons/zenoh-shm/src/header/allocated_descriptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub struct AllocatedHeaderDescriptor {

impl Drop for AllocatedHeaderDescriptor {
fn drop(&mut self) {
GLOBAL_HEADER_STORAGE.reclaim_header(self.descriptor.clone());
GLOBAL_HEADER_STORAGE
.read()
.reclaim_header(self.descriptor.clone());
}
}
7 changes: 3 additions & 4 deletions commons/zenoh-shm/src/header/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{
sync::{Arc, Mutex},
};

use lazy_static::lazy_static;
use static_init::dynamic;
use zenoh_result::{zerror, ZResult};

use super::{
Expand All @@ -25,9 +25,8 @@ use super::{
segment::HeaderSegment,
};

lazy_static! {
pub static ref GLOBAL_HEADER_STORAGE: HeaderStorage = HeaderStorage::new(32768usize).unwrap();
}
#[dynamic(lazy,drop)]
pub static mut GLOBAL_HEADER_STORAGE: HeaderStorage = HeaderStorage::new(32768usize).unwrap();

pub struct HeaderStorage {
available: Arc<Mutex<LinkedList<OwnedHeaderDescriptor>>>,
Expand Down
8 changes: 4 additions & 4 deletions commons/zenoh-shm/src/header/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ use std::{
sync::{Arc, Mutex},
};

use lazy_static::lazy_static;
use static_init::dynamic;
use zenoh_result::{zerror, ZResult};

use super::{
descriptor::{HeaderDescriptor, HeaderSegmentID, OwnedHeaderDescriptor},
segment::HeaderSegment,
};

lazy_static! {
pub static ref GLOBAL_HEADER_SUBSCRIPTION: Subscription = Subscription::new();
}
#[dynamic(lazy,drop)]
pub static mut GLOBAL_HEADER_SUBSCRIPTION: Subscription = Subscription::new();


pub struct Subscription {
linked_table: Mutex<BTreeMap<HeaderSegmentID, Arc<HeaderSegment>>>,
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-shm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ macro_rules! tested_crate_module {
}

pub mod api;
mod cleanup;
pub mod header;
pub mod posix_shm;
pub mod reader;
Expand Down
18 changes: 13 additions & 5 deletions commons/zenoh-shm/src/posix_shm/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use rand::Rng;
use shared_memory::{Shmem, ShmemConf, ShmemError};
use zenoh_result::{bail, zerror, ZResult};

use crate::cleanup::CLEANUP;

const SEGMENT_DEDICATE_TRIES: usize = 100;
const ECMA: crc::Crc<u64> = crc::Crc::<u64>::new(&crc::CRC_64_ECMA_182);

Expand Down Expand Up @@ -55,15 +57,21 @@ where
for _ in 0..SEGMENT_DEDICATE_TRIES {
// Generate random id
let id: ID = rand::thread_rng().gen();
let os_id = Self::os_id(id.clone(), id_prefix);

// Register cleanup routine to make sure Segment will be unlinked on exit
let c_os_id = os_id.clone();
CLEANUP.read().register_cleanup(Box::new(move || {
if let Ok(mut shmem) = ShmemConf::new().os_id(c_os_id).open() {
shmem.set_owner(true);
drop(shmem);
}
}));

// Try to create a new segment identified by prefix and generated id.
// If creation fails because segment already exists for this id,
// the creation attempt will be repeated with another id
match ShmemConf::new()
.size(alloc_size)
.os_id(Self::os_id(id.clone(), id_prefix))
.create()
{
match ShmemConf::new().size(alloc_size).os_id(os_id).create() {
Ok(shmem) => {
tracing::debug!(
"Created SHM segment, size: {alloc_size}, prefix: {id_prefix}, id: {id}"
Expand Down
6 changes: 4 additions & 2 deletions commons/zenoh-shm/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ impl ShmReader {
// that the sender of this buffer has incremented it for us.

// attach to the watchdog before doing other things
let watchdog = Arc::new(GLOBAL_CONFIRMATOR.add(&info.watchdog_descriptor)?);
let watchdog = Arc::new(GLOBAL_CONFIRMATOR.read().add(&info.watchdog_descriptor)?);

let segment = self.ensure_segment(info)?;
let shmb = ShmBufInner {
header: GLOBAL_HEADER_SUBSCRIPTION.link(&info.header_descriptor)?,
header: GLOBAL_HEADER_SUBSCRIPTION
.read()
.link(&info.header_descriptor)?,
buf: segment.map(info.data_descriptor.chunk)?,
info: info.clone(),
watchdog,
Expand Down
4 changes: 2 additions & 2 deletions commons/zenoh-shm/src/watchdog/allocated_watchdog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl AllocatedWatchdog {

impl Drop for AllocatedWatchdog {
fn drop(&mut self) {
GLOBAL_VALIDATOR.remove(self.descriptor.clone());
GLOBAL_STORAGE.free_watchdog(self.descriptor.clone());
GLOBAL_VALIDATOR.read().remove(self.descriptor.clone());
GLOBAL_STORAGE.read().free_watchdog(self.descriptor.clone());
}
}
8 changes: 4 additions & 4 deletions commons/zenoh-shm/src/watchdog/confirmator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{
time::Duration,
};

use lazy_static::lazy_static;
use static_init::dynamic;
use zenoh_result::{zerror, ZResult};

use super::{
Expand All @@ -27,10 +27,10 @@ use super::{
segment::Segment,
};

lazy_static! {
pub static ref GLOBAL_CONFIRMATOR: WatchdogConfirmator =
#[dynamic(lazy,drop)]
pub static mut GLOBAL_CONFIRMATOR: WatchdogConfirmator =
WatchdogConfirmator::new(Duration::from_millis(50));
}


pub struct ConfirmedDescriptor {
pub owned: OwnedDescriptor,
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-shm/src/watchdog/periodic_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct PeriodicTask {

impl Drop for PeriodicTask {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed)
self.running.store(false, Ordering::Relaxed);
}
}

Expand Down
Loading

0 comments on commit 5cfcccc

Please sign in to comment.