From 0d0883c21f7b7a638ab1806633dc7efa41d9dfda Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 28 Jan 2022 09:33:16 -0500 Subject: [PATCH] Lazy TempDir creation in DiskManager --- datafusion/src/execution/disk_manager.rs | 99 +++++++++++++++++------- 1 file changed, 73 insertions(+), 26 deletions(-) diff --git a/datafusion/src/execution/disk_manager.rs b/datafusion/src/execution/disk_manager.rs index 31565fec130d..4486f53a21b8 100644 --- a/datafusion/src/execution/disk_manager.rs +++ b/datafusion/src/execution/disk_manager.rs @@ -21,8 +21,8 @@ use crate::error::{DataFusionError, Result}; use log::debug; use rand::{thread_rng, Rng}; -use std::path::PathBuf; use std::sync::Arc; +use std::{path::PathBuf, sync::Mutex}; use tempfile::{Builder, NamedTempFile, TempDir}; /// Configuration for temporary disk access @@ -67,7 +67,9 @@ impl DiskManagerConfig { /// while processing dataset larger than available memory. #[derive(Debug)] pub struct DiskManager { - local_dirs: Vec, + /// TempDirs to put temporary files in. A new OS specified + /// temporary directory will be created if this list is empty. + local_dirs: Mutex>, } impl DiskManager { @@ -75,31 +77,39 @@ impl DiskManager { pub fn try_new(config: DiskManagerConfig) -> Result> { match config { DiskManagerConfig::Existing(manager) => Ok(manager), - DiskManagerConfig::NewOs => { - let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?; - - debug!( - "Created directory {:?} as DataFusion working directory", - tempdir - ); - Ok(Arc::new(Self { - local_dirs: vec![tempdir], - })) - } + DiskManagerConfig::NewOs => Ok(Arc::new(Self { + local_dirs: Mutex::new(vec![]), + })), DiskManagerConfig::NewSpecified(conf_dirs) => { let local_dirs = create_local_dirs(conf_dirs)?; debug!( "Created local dirs {:?} as DataFusion working directory", local_dirs ); - Ok(Arc::new(Self { local_dirs })) + Ok(Arc::new(Self { + local_dirs: Mutex::new(local_dirs), + })) } } } /// Return a temporary file from a randomized choice in the configured locations pub fn create_tmp_file(&self) -> Result { - create_tmp_file(&self.local_dirs) + let mut local_dirs = self.local_dirs.lock().unwrap(); + + // Create a temporary directory if needed + if local_dirs.is_empty() { + let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?; + + debug!( + "Created directory '{:?}' as DataFusion tempfile directory", + tempdir.path().to_string_lossy() + ); + + local_dirs.push(tempdir); + } + + create_tmp_file(&local_dirs) } } @@ -129,10 +139,42 @@ fn create_tmp_file(local_dirs: &[TempDir]) -> Result { #[cfg(test)] mod tests { + use std::path::Path; + use super::*; use crate::error::Result; use tempfile::TempDir; + #[test] + fn lazy_temp_dir_creation() -> Result<()> { + // A default configuration should not create temp files until requested + let config = DiskManagerConfig::new(); + let dm = DiskManager::try_new(config)?; + + assert_eq!(0, local_dir_snapshot(&dm).len()); + + // can still create a tempfile however: + let actual = dm.create_tmp_file()?; + + // Now the tempdir has been created on demand + assert_eq!(1, local_dir_snapshot(&dm).len()); + + // the returned tempfile file should be in the temp directory + let local_dirs = local_dir_snapshot(&dm); + assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path())); + + Ok(()) + } + + fn local_dir_snapshot(dm: &DiskManager) -> Vec { + dm.local_dirs + .lock() + .unwrap() + .iter() + .map(|p| p.path().into()) + .collect() + } + #[test] fn file_in_right_dir() -> Result<()> { let local_dir1 = TempDir::new()?; @@ -147,19 +189,24 @@ mod tests { let actual = dm.create_tmp_file()?; // the file should be in one of the specified local directories - let found = local_dirs.iter().any(|p| { - actual - .path() + assert_path_in_dirs(actual.path(), local_dirs.into_iter()); + + Ok(()) + } + + /// Asserts that `file_path` is found anywhere in any of `dir` directories + fn assert_path_in_dirs<'a>( + file_path: &'a Path, + dirs: impl Iterator, + ) { + let dirs: Vec<&Path> = dirs.collect(); + + let found = dirs.iter().any(|file_path| { + file_path .ancestors() - .any(|candidate_path| *p == candidate_path) + .any(|candidate_path| *file_path == candidate_path) }); - assert!( - found, - "Can't find {:?} in specified local dirs: {:?}", - actual, local_dirs - ); - - Ok(()) + assert!(found, "Can't find {:?} in dirs: {:?}", file_path, dirs); } }