-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ability to disable DiskManager #4330
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -39,6 +39,9 @@ pub enum DiskManagerConfig { | |||||
/// Create a new [DiskManager] that creates temporary files within | ||||||
/// the specified directories | ||||||
NewSpecified(Vec<PathBuf>), | ||||||
|
||||||
/// Disable disk manager, attempts to create temporary files will error | ||||||
Disabled, | ||||||
} | ||||||
|
||||||
impl Default for DiskManagerConfig { | ||||||
|
@@ -68,9 +71,11 @@ impl DiskManagerConfig { | |||||
/// while processing dataset larger than available memory. | ||||||
#[derive(Debug)] | ||||||
pub struct DiskManager { | ||||||
/// TempDirs to put temporary files in. A new OS specified | ||||||
/// temporary directory will be created if this list is empty. | ||||||
local_dirs: Mutex<Vec<TempDir>>, | ||||||
/// TempDirs to put temporary files in. | ||||||
/// | ||||||
/// If `Some(vec![])` a new OS specified temporary directory will be created | ||||||
/// If `None` an error will be returned | ||||||
local_dirs: Mutex<Option<Vec<TempDir>>>, | ||||||
} | ||||||
|
||||||
impl DiskManager { | ||||||
|
@@ -79,7 +84,7 @@ impl DiskManager { | |||||
match config { | ||||||
DiskManagerConfig::Existing(manager) => Ok(manager), | ||||||
DiskManagerConfig::NewOs => Ok(Arc::new(Self { | ||||||
local_dirs: Mutex::new(vec![]), | ||||||
local_dirs: Mutex::new(Some(vec![])), | ||||||
})), | ||||||
DiskManagerConfig::NewSpecified(conf_dirs) => { | ||||||
let local_dirs = create_local_dirs(conf_dirs)?; | ||||||
|
@@ -88,15 +93,23 @@ impl DiskManager { | |||||
local_dirs | ||||||
); | ||||||
Ok(Arc::new(Self { | ||||||
local_dirs: Mutex::new(local_dirs), | ||||||
local_dirs: Mutex::new(Some(local_dirs)), | ||||||
})) | ||||||
} | ||||||
DiskManagerConfig::Disabled => Ok(Arc::new(Self { | ||||||
local_dirs: Mutex::new(None), | ||||||
})), | ||||||
} | ||||||
} | ||||||
|
||||||
/// Return a temporary file from a randomized choice in the configured locations | ||||||
pub fn create_tmp_file(&self) -> Result<NamedTempFile> { | ||||||
let mut local_dirs = self.local_dirs.lock(); | ||||||
let mut guard = self.local_dirs.lock(); | ||||||
let local_dirs = guard.as_mut().ok_or_else(|| { | ||||||
DataFusionError::ResourcesExhausted( | ||||||
"Cannot spill to temporary file as DiskManager is disabled".to_string(), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a user sees this it may not be clear what is happening or what they can do about it. What do you think about changing the message to something like
Suggested change
As a follow up (which I will do), I think it would be excellent to pass in a context string to create_tmp_file to make the error more specific. pub fn create_tmp_file(&self, context: &str) -> Result<NamedTempFile> { |
||||||
) | ||||||
})?; | ||||||
|
||||||
// Create a temporary directory if needed | ||||||
if local_dirs.is_empty() { | ||||||
|
@@ -110,7 +123,10 @@ impl DiskManager { | |||||
local_dirs.push(tempdir); | ||||||
} | ||||||
|
||||||
create_tmp_file(&local_dirs) | ||||||
let dir_index = thread_rng().gen_range(0..local_dirs.len()); | ||||||
Builder::new() | ||||||
.tempfile_in(&local_dirs[dir_index]) | ||||||
.map_err(DataFusionError::IoError) | ||||||
} | ||||||
} | ||||||
|
||||||
|
@@ -127,17 +143,6 @@ fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<TempDir>> { | |||||
.collect() | ||||||
} | ||||||
|
||||||
fn create_tmp_file(local_dirs: &[TempDir]) -> Result<NamedTempFile> { | ||||||
let dir_index = thread_rng().gen_range(0..local_dirs.len()); | ||||||
let dir = local_dirs.get(dir_index).ok_or_else(|| { | ||||||
DataFusionError::Internal("No directories available to DiskManager".into()) | ||||||
})?; | ||||||
|
||||||
Builder::new() | ||||||
.tempfile_in(dir) | ||||||
.map_err(DataFusionError::IoError) | ||||||
} | ||||||
|
||||||
#[cfg(test)] | ||||||
mod tests { | ||||||
use std::path::Path; | ||||||
|
@@ -171,6 +176,7 @@ mod tests { | |||||
dm.local_dirs | ||||||
.lock() | ||||||
.iter() | ||||||
.flatten() | ||||||
.map(|p| p.path().into()) | ||||||
.collect() | ||||||
} | ||||||
|
@@ -194,6 +200,17 @@ mod tests { | |||||
Ok(()) | ||||||
} | ||||||
|
||||||
#[test] | ||||||
fn test_disabled_disk_manager() { | ||||||
let config = DiskManagerConfig::Disabled; | ||||||
let manager = DiskManager::try_new(config).unwrap(); | ||||||
let e = manager.create_tmp_file().unwrap_err().to_string(); | ||||||
assert_eq!( | ||||||
e, | ||||||
"Resources exhausted: Cannot spill to temporary file as DiskManager is disabled" | ||||||
) | ||||||
} | ||||||
|
||||||
/// Asserts that `file_path` is found anywhere in any of `dir` directories | ||||||
fn assert_path_in_dirs<'a>( | ||||||
file_path: &'a Path, | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.