Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy TempDir creation in DiskManager #1695

Merged
merged 1 commit into from
Jan 30, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 73 additions & 26 deletions datafusion/src/execution/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
use crate::error::{DataFusionError, Result};
use log::debug;
use rand::{thread_rng, Rng};
use std::path::PathBuf;
use std::sync::Arc;
use std::{path::PathBuf, sync::Mutex};
use tempfile::{Builder, NamedTempFile, TempDir};

/// Configuration for temporary disk access
Expand Down Expand Up @@ -67,39 +67,49 @@ impl DiskManagerConfig {
/// while processing dataset larger than available memory.
#[derive(Debug)]
pub struct DiskManager {
local_dirs: Vec<TempDir>,
/// TempDirs to put temporary files in. A new OS specified
/// temporary directory will be created if this list is empty.
local_dirs: Mutex<Vec<TempDir>>,
}

impl DiskManager {
/// Create a DiskManager given the configuration
pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
match config {
DiskManagerConfig::Existing(manager) => Ok(manager),
DiskManagerConfig::NewOs => {
let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;

debug!(
"Created directory {:?} as DataFusion working directory",
tempdir
);
Ok(Arc::new(Self {
local_dirs: vec![tempdir],
}))
}
DiskManagerConfig::NewOs => Ok(Arc::new(Self {
local_dirs: Mutex::new(vec![]),
})),
DiskManagerConfig::NewSpecified(conf_dirs) => {
let local_dirs = create_local_dirs(conf_dirs)?;
debug!(
"Created local dirs {:?} as DataFusion working directory",
local_dirs
);
Ok(Arc::new(Self { local_dirs }))
Ok(Arc::new(Self {
local_dirs: Mutex::new(local_dirs),
}))
}
}
}

/// Return a temporary file from a randomized choice in the configured locations
pub fn create_tmp_file(&self) -> Result<NamedTempFile> {
create_tmp_file(&self.local_dirs)
let mut local_dirs = self.local_dirs.lock().unwrap();

// Create a temporary directory if needed
if local_dirs.is_empty() {
let tempdir = tempfile::tempdir().map_err(DataFusionError::IoError)?;

debug!(
"Created directory '{:?}' as DataFusion tempfile directory",
tempdir.path().to_string_lossy()
);

local_dirs.push(tempdir);
}

create_tmp_file(&local_dirs)
}
}

Expand Down Expand Up @@ -129,10 +139,42 @@ fn create_tmp_file(local_dirs: &[TempDir]) -> Result<NamedTempFile> {

#[cfg(test)]
mod tests {
use std::path::Path;

use super::*;
use crate::error::Result;
use tempfile::TempDir;

#[test]
fn lazy_temp_dir_creation() -> Result<()> {
// A default configuration should not create temp files until requested
let config = DiskManagerConfig::new();
let dm = DiskManager::try_new(config)?;

assert_eq!(0, local_dir_snapshot(&dm).len());

// can still create a tempfile however:
let actual = dm.create_tmp_file()?;

// Now the tempdir has been created on demand
assert_eq!(1, local_dir_snapshot(&dm).len());

// the returned tempfile file should be in the temp directory
let local_dirs = local_dir_snapshot(&dm);
assert_path_in_dirs(actual.path(), local_dirs.iter().map(|p| p.as_path()));

Ok(())
}

fn local_dir_snapshot(dm: &DiskManager) -> Vec<PathBuf> {
dm.local_dirs
.lock()
.unwrap()
.iter()
.map(|p| p.path().into())
.collect()
}

#[test]
fn file_in_right_dir() -> Result<()> {
let local_dir1 = TempDir::new()?;
Expand All @@ -147,19 +189,24 @@ mod tests {
let actual = dm.create_tmp_file()?;

// the file should be in one of the specified local directories
let found = local_dirs.iter().any(|p| {
actual
.path()
assert_path_in_dirs(actual.path(), local_dirs.into_iter());

Ok(())
}

/// Asserts that `file_path` is found anywhere in any of `dir` directories
fn assert_path_in_dirs<'a>(
file_path: &'a Path,
dirs: impl Iterator<Item = &'a Path>,
) {
let dirs: Vec<&Path> = dirs.collect();

let found = dirs.iter().any(|file_path| {
file_path
.ancestors()
.any(|candidate_path| *p == candidate_path)
.any(|candidate_path| *file_path == candidate_path)
});

assert!(
found,
"Can't find {:?} in specified local dirs: {:?}",
actual, local_dirs
);

Ok(())
assert!(found, "Can't find {:?} in dirs: {:?}", file_path, dirs);
}
}