Skip to content

Commit

Permalink
sort: add retry logic on file-descriptor exhaustion
Browse files Browse the repository at this point in the history
Partial fix for bug #5714
Rework the merging logic to continue if we error-out due to file
descriptor exhaustion. Merging will only stop if no progress can
be made.
Also minor tweek to code handling output-is-an-input scenario to
prevent a double-open of the generated temp-file.
Updated appropriate test case.
  • Loading branch information
karlmcdowall committed Jan 22, 2025
1 parent 93e3d08 commit eadc90a
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 98 deletions.
18 changes: 8 additions & 10 deletions src/uu/sort/src/ext_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use itertools::Itertools;
use uucore::error::UResult;

use crate::chunks::RecycledChunk;
use crate::merge::ClosedTmpFile;
use crate::merge::WriteableCompressedTmpFile;
use crate::merge::WriteablePlainTmpFile;
use crate::merge::WriteableTmpFile;
Expand Down Expand Up @@ -51,7 +50,7 @@ pub fn ext_sort(
move || sorter(&recycled_receiver, &sorted_sender, &settings)
});
if settings.compress_prog.is_some() {
reader_writer::<_, WriteableCompressedTmpFile>(
reader_writer::<WriteableCompressedTmpFile>(
files,
settings,
&sorted_receiver,
Expand All @@ -60,7 +59,7 @@ pub fn ext_sort(
tmp_dir,
)
} else {
reader_writer::<_, WriteablePlainTmpFile>(
reader_writer::<WriteablePlainTmpFile>(
files,
settings,
&sorted_receiver,
Expand All @@ -71,11 +70,8 @@ pub fn ext_sort(
}
}

fn reader_writer<
F: Iterator<Item = UResult<Box<dyn Read + Send>>>,
Tmp: WriteableTmpFile + 'static,
>(
files: F,
fn reader_writer<Tmp: WriteableTmpFile>(
files: impl Iterator<Item = UResult<Box<dyn Read + Send>>>,
settings: &GlobalSettings,
receiver: &Receiver<Chunk>,
sender: SyncSender<Chunk>,
Expand All @@ -98,11 +94,13 @@ fn reader_writer<
)?;
match read_result {
ReadResult::WroteChunksToFile { tmp_files } => {
merge::merge_with_file_limit::<_, _, Tmp>(
tmp_files.into_iter().map(|c| c.reopen()),
let tmp_file = Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
merge::merge_with_file_limit::<Tmp>(
tmp_files.into_iter(),
settings,
output,
tmp_dir,
tmp_file,
)?;
}
ReadResult::SortedSingleChunk(chunk) => {
Expand Down
185 changes: 114 additions & 71 deletions src/uu/sort/src/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ fn replace_output_file_in_input_files(
if let Some(copy) = &copy {
*file = copy.clone().into_os_string();
} else {
let (_file, copy_path) = tmp_dir.next_file()?;
std::fs::copy(file_path, &copy_path)
.map_err(|error| SortError::OpenTmpFileFailed { error })?;
let (mut temp_file, copy_path) = tmp_dir.next_file()?;
let mut source_file = File::open(&file_path)?;
std::io::copy(&mut source_file, &mut temp_file)?;
*file = copy_path.clone().into_os_string();
copy = Some(copy_path);
}
Expand All @@ -73,75 +73,98 @@ pub fn merge(
tmp_dir: &mut TmpDirWrapper,
) -> UResult<()> {
replace_output_file_in_input_files(files, output.as_output_name(), tmp_dir)?;
let files = files
.iter()
.map(|file| open(file).map(|file| PlainMergeInput { inner: file }));
// Convert files to an iterator of PlainInputFile
let files_iter = files.iter().map(|file| PlainInputFile { path: file });
if settings.compress_prog.is_none() {
merge_with_file_limit::<_, _, WriteablePlainTmpFile>(files, settings, output, tmp_dir)
let tmp_file =
WriteablePlainTmpFile::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
merge_with_file_limit::<WriteablePlainTmpFile>(
files_iter, settings, output, tmp_dir, tmp_file,
)
} else {
merge_with_file_limit::<_, _, WriteableCompressedTmpFile>(files, settings, output, tmp_dir)
let tmp_file = WriteableCompressedTmpFile::create(
tmp_dir.next_file()?,
settings.compress_prog.as_deref(),
)?;
merge_with_file_limit::<WriteableCompressedTmpFile>(
files_iter, settings, output, tmp_dir, tmp_file,
)
}
}

// Merge already sorted `MergeInput`s.
pub fn merge_with_file_limit<
M: MergeInput + 'static,
F: ExactSizeIterator<Item = UResult<M>>,
Tmp: WriteableTmpFile + 'static,
>(
files: F,
pub fn merge_with_file_limit<Tmp: WriteableTmpFile>(
input_files: impl ExactSizeIterator<Item = impl ClosedFile>,
settings: &GlobalSettings,
output: Output,
tmp_dir: &mut TmpDirWrapper,
mut tmp_file: Tmp,
) -> UResult<()> {
if files.len() <= settings.merge_batch_size {
let merger = merge_without_limit(files, settings);
merger?.write_all(settings, output)
} else {
let mut temporary_files = vec![];
let mut batch = vec![];
for file in files {
batch.push(file);
if batch.len() >= settings.merge_batch_size {
assert_eq!(batch.len(), settings.merge_batch_size);
let merger = merge_without_limit(batch.into_iter(), settings)?;
batch = vec![];

let mut tmp_file =
Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
// Code assumes settings.merge_batch_size >= 2. Assert it!
assert!(settings.merge_batch_size >= 2);
// Merge down all the input files into as few temporary files as we can (ideally 0
// if possible - i.e. merge directly to output).
let mut output_temporary_files = vec![];
let mut opened_files = vec![];
for input_file in input_files {
if opened_files.len() >= settings.merge_batch_size {
// Check that we've not somehow accidentally violated our merge-size requirement.
assert_eq!(opened_files.len(), settings.merge_batch_size);
// We have a full batch. Merge them.
let merger = merge_without_limit(opened_files.into_iter(), settings)?;
merger.write_all_to(settings, tmp_file.as_write())?;
output_temporary_files.push(tmp_file.finished_writing()?);
tmp_file = Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
opened_files = vec![];
}

// Make a backup in case our first attempt to open fails (due to file-descriptor exhaustion).
let input_file_backup = input_file.clone();

match input_file.open() {
Ok(opened_file) => opened_files.push(opened_file),
Err(err) => {
// We've run out of descriptors. If we've only managed to open one file then give up,
// otherwise merge what we've got.
if opened_files.len() < 2 {
return Err(err);
}
let merger = merge_without_limit(opened_files.into_iter(), settings)?;
merger.write_all_to(settings, tmp_file.as_write())?;
temporary_files.push(tmp_file.finished_writing()?);
output_temporary_files.push(tmp_file.finished_writing()?);
tmp_file = Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;

// Now retry the open. If we fail this time, give up completely and return error.
opened_files = vec![input_file_backup.open()?];
}
}
// Merge any remaining files that didn't get merged in a full batch above.
if !batch.is_empty() {
assert!(batch.len() < settings.merge_batch_size);
let merger = merge_without_limit(batch.into_iter(), settings)?;

let mut tmp_file =
Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
merger.write_all_to(settings, tmp_file.as_write())?;
temporary_files.push(tmp_file.finished_writing()?);
}
// If we've opened some files but not yet merged them...
if !opened_files.is_empty() {
let merger = merge_without_limit(opened_files.into_iter(), settings)?;
// If we have no output temp files at this point, we can just merge to the final output and be done.
if output_temporary_files.is_empty() {
return merger.write_all(settings, output);
}
merge_with_file_limit::<_, _, Tmp>(
temporary_files
.into_iter()
.map(Box::new(|c: Tmp::Closed| c.reopen())
as Box<
dyn FnMut(Tmp::Closed) -> UResult<<Tmp::Closed as ClosedTmpFile>::Reopened>,
>),
// If we get to here then we have at least one other open temp file, need to do another round of merges.
merger.write_all_to(settings, tmp_file.as_write())?;
output_temporary_files.push(tmp_file.finished_writing()?);
tmp_file = Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
}
if output_temporary_files.is_empty() {
// If there are no temporary files we must be done.
Ok(())
} else {
merge_with_file_limit::<Tmp>(
output_temporary_files.into_iter(),
settings,
output,
tmp_dir,
tmp_file,
)
}
}

/// Merge files without limiting how many files are concurrently open.
///
/// It is the responsibility of the caller to ensure that `files` yields only
/// as many files as we are allowed to open concurrently.
fn merge_without_limit<M: MergeInput + 'static, F: Iterator<Item = UResult<M>>>(
fn merge_without_limit<F: Iterator<Item = impl MergeInput>>(
files: F,
settings: &GlobalSettings,
) -> UResult<FileMerger> {
Expand All @@ -152,7 +175,7 @@ fn merge_without_limit<M: MergeInput + 'static, F: Iterator<Item = UResult<M>>>(
let (sender, receiver) = sync_channel(2);
loaded_receivers.push(receiver);
reader_files.push(Some(ReaderFile {
file: file?,
file,
sender,
carry_over: vec![],
}));
Expand Down Expand Up @@ -204,6 +227,7 @@ fn merge_without_limit<M: MergeInput + 'static, F: Iterator<Item = UResult<M>>>(
reader_join_handle,
})
}

/// The struct on the reader thread representing an input file
struct ReaderFile<M: MergeInput> {
file: M,
Expand Down Expand Up @@ -379,22 +403,24 @@ fn check_child_success(mut child: Child, program: &str) -> UResult<()> {
}

/// A temporary file that can be written to.
pub trait WriteableTmpFile: Sized {
type Closed: ClosedTmpFile;
pub trait WriteableTmpFile: Sized + 'static {
type Closed: ClosedFile;
type InnerWrite: Write;
fn create(file: (File, PathBuf), compress_prog: Option<&str>) -> UResult<Self>;
/// Closes the temporary file.
fn finished_writing(self) -> UResult<Self::Closed>;
fn as_write(&mut self) -> &mut Self::InnerWrite;
}
/// A temporary file that is (temporarily) closed, but can be reopened.
pub trait ClosedTmpFile {
type Reopened: MergeInput;
/// Reopens the temporary file.
fn reopen(self) -> UResult<Self::Reopened>;

/// A file that is (temporarily) closed, but can be reopened.
pub trait ClosedFile: Clone {
type Opened: MergeInput;
/// Opens temporary file.
fn open(self) -> UResult<Self::Opened>;
}

/// A pre-sorted input for merging.
pub trait MergeInput: Send {
pub trait MergeInput: Send + 'static {
type InnerRead: Read;
/// Cleans this `MergeInput` up.
/// Implementations may delete the backing file.
Expand All @@ -406,6 +432,7 @@ pub struct WriteablePlainTmpFile {
path: PathBuf,
file: BufWriter<File>,
}
#[derive(Clone)]
pub struct ClosedPlainTmpFile {
path: PathBuf,
}
Expand All @@ -432,9 +459,9 @@ impl WriteableTmpFile for WriteablePlainTmpFile {
&mut self.file
}
}
impl ClosedTmpFile for ClosedPlainTmpFile {
type Reopened = PlainTmpMergeInput;
fn reopen(self) -> UResult<Self::Reopened> {
impl ClosedFile for ClosedPlainTmpFile {
type Opened = PlainTmpMergeInput;
fn open(self) -> UResult<Self::Opened> {
Ok(PlainTmpMergeInput {
file: File::open(&self.path).map_err(|error| SortError::OpenTmpFileFailed { error })?,
path: self.path,
Expand Down Expand Up @@ -463,6 +490,7 @@ pub struct WriteableCompressedTmpFile {
child: Child,
child_stdin: BufWriter<ChildStdin>,
}
#[derive(Clone)]
pub struct ClosedCompressedTmpFile {
path: PathBuf,
compress_prog: String,
Expand Down Expand Up @@ -508,10 +536,10 @@ impl WriteableTmpFile for WriteableCompressedTmpFile {
&mut self.child_stdin
}
}
impl ClosedTmpFile for ClosedCompressedTmpFile {
type Reopened = CompressedTmpMergeInput;
impl ClosedFile for ClosedCompressedTmpFile {
type Opened = CompressedTmpMergeInput;

fn reopen(self) -> UResult<Self::Reopened> {
fn open(self) -> UResult<Self::Opened> {
let mut command = Command::new(&self.compress_prog);
let file = File::open(&self.path).unwrap();
command.stdin(file).stdout(Stdio::piped()).arg("-d");
Expand Down Expand Up @@ -544,11 +572,26 @@ impl MergeInput for CompressedTmpMergeInput {
}
}

pub struct PlainMergeInput<R: Read + Send> {
inner: R,
#[derive(Clone)]
pub struct PlainInputFile<'a> {
path: &'a OsString,
}
impl<R: Read + Send> MergeInput for PlainMergeInput<R> {
type InnerRead = R;

impl ClosedFile for PlainInputFile<'_> {
type Opened = PlainMergeInput;
fn open(self) -> UResult<Self::Opened> {
Ok(PlainMergeInput {
inner: open(self.path)?,
})
}
}

pub struct PlainMergeInput {
inner: Box<dyn Read + Send>,
}

impl MergeInput for PlainMergeInput {
type InnerRead = Box<dyn Read + Send>;
fn finished_reading(self) -> UResult<()> {
Ok(())
}
Expand Down
Loading

0 comments on commit eadc90a

Please sign in to comment.