Skip to content

Commit

Permalink
Merge pull request dragonflyoss#536 from jiangliu/fscache-mt
Browse files Browse the repository at this point in the history
fscache: enable multi-threading to process fscache requests
  • Loading branch information
imeoer authored Jun 28, 2022
2 parents 2012f8b + aef5cef commit 363665f
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 12 deletions.
12 changes: 11 additions & 1 deletion src/bin/nydusd/fs_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ struct FsCacheState {
pub struct FsCacheHandler {
active: AtomicBool,
barrier: Barrier,
threads: usize,
file: File,
state: Arc<Mutex<FsCacheState>>,
poller: Mutex<Poll>,
Expand All @@ -236,6 +237,7 @@ impl FsCacheHandler {
dir: &str,
tag: Option<&str>,
blob_cache_mgr: Arc<BlobCacheMgr>,
threads: usize,
) -> Result<Self> {
info!(
"fscache: create FsCacheHandler with dir {}, tag {}",
Expand Down Expand Up @@ -279,14 +281,20 @@ impl FsCacheHandler {

Ok(FsCacheHandler {
active: AtomicBool::new(true),
barrier: Barrier::new(2),
barrier: Barrier::new(threads + 1),
threads,
file,
state: Arc::new(Mutex::new(state)),
poller: Mutex::new(poller),
waker: Arc::new(waker),
})
}

/// Get number of working threads to service fscache requests.
pub fn working_threads(&self) -> usize {
self.threads
}

/// Stop worker threads for the fscache service.
pub fn stop(&self) {
self.active.store(false, Ordering::Release);
Expand Down Expand Up @@ -327,6 +335,8 @@ impl FsCacheHandler {
&& event.token() == Token(TOKEN_EVENT_WAKER)
&& !self.active.load(Ordering::Acquire)
{
// Notify next worker to exit.
let _ = self.waker.wake();
self.barrier.wait();
return Ok(());
}
Expand Down
20 changes: 20 additions & 0 deletions src/bin/nydusd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,26 @@ fn append_services_subcmd_options(app: App<'static, 'static>) -> App<'static, 's
.help("Tag to identify the fscache daemon instance")
.takes_value(true)
.requires("fscache"),
)
.arg(
Arg::with_name("fscache-threads")
.long("fscache-threads")
.default_value("1")
.help("Number of working threads to serve fscache requests")
.takes_value(true)
.required(false)
.validator(|v| {
if let Ok(t) = v.parse::<i32>() {
if t > 0 && t <= 1024 {
Ok(())
} else {
Err("Invalid working thread number {}, valid values: [1-1024]"
.to_string())
}
} else {
Err("Input thread number is invalid".to_string())
}
}),
);

app.subcommand(subcmd)
Expand Down
42 changes: 31 additions & 11 deletions src/bin/nydusd/service_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,18 @@ impl ServiceController {
#[cfg(target_os = "linux")]
if self.fscache_enabled.load(Ordering::Acquire) {
if let Some(fscache) = self.fscache.lock().unwrap().clone() {
std::thread::spawn(move || {
if let Err(e) = fscache.run_loop() {
error!("Failed to run fscache service loop, {}", e);
}
// Notify the global service controller that one working thread is exiting.
if let Err(e) = crate::DAEMON_CONTROLLER.waker.wake() {
error!("Failed to notify the global service controller, {}", e);
}
});
for _ in 0..fscache.working_threads() {
let fscache2 = fscache.clone();
std::thread::spawn(move || {
if let Err(e) = fscache2.run_loop() {
error!("Failed to run fscache service loop, {}", e);
}
// Notify the global service controller that one working thread is exiting.
if let Err(e) = crate::DAEMON_CONTROLLER.waker.wake() {
error!("Failed to notify the global service controller, {}", e);
}
});
}
}
}

Expand Down Expand Up @@ -117,16 +120,33 @@ impl ServiceController {
};
let tag = subargs.value_of("fscache-tag");

let mut threads = 1usize;
if let Some(threads_value) = subargs.value_of("fscache-threads") {
if let Ok(t) = threads_value.parse::<i32>() {
if t > 0 && t <= 1024 {
threads = t as usize;
} else {
return Err(einval!(
"Invalid working thread number {}, valid values: [1-1024]"
));
}
} else {
return Err(einval!("Input thread number is invalid".to_string()));
}
}

info!(
"Create fscache instance at {} with tag {}",
"Create fscache instance at {} with tag {}, {} working threads",
p,
tag.unwrap_or("<none>")
tag.unwrap_or("<none>"),
threads
);
let fscache = crate::fs_cache::FsCacheHandler::new(
"/dev/cachefiles",
p,
tag,
self.blob_cache_mgr.clone(),
threads,
)?;
*self.fscache.lock().unwrap() = Some(Arc::new(fscache));
self.fscache_enabled.store(true, Ordering::Release);
Expand Down

0 comments on commit 363665f

Please sign in to comment.