diff --git a/src/uu/sort/src/ext_sort.rs b/src/uu/sort/src/ext_sort.rs index 57e434e99b..a885e72032 100644 --- a/src/uu/sort/src/ext_sort.rs +++ b/src/uu/sort/src/ext_sort.rs @@ -23,7 +23,6 @@ use itertools::Itertools; use uucore::error::UResult; use crate::chunks::RecycledChunk; -use crate::merge::ClosedTmpFile; use crate::merge::WriteableCompressedTmpFile; use crate::merge::WriteablePlainTmpFile; use crate::merge::WriteableTmpFile; @@ -51,7 +50,7 @@ pub fn ext_sort( move || sorter(&recycled_receiver, &sorted_sender, &settings) }); if settings.compress_prog.is_some() { - reader_writer::<_, WriteableCompressedTmpFile>( + reader_writer::( files, settings, &sorted_receiver, @@ -60,7 +59,7 @@ pub fn ext_sort( tmp_dir, ) } else { - reader_writer::<_, WriteablePlainTmpFile>( + reader_writer::( files, settings, &sorted_receiver, @@ -71,11 +70,8 @@ pub fn ext_sort( } } -fn reader_writer< - F: Iterator>>, - Tmp: WriteableTmpFile + 'static, ->( - files: F, +fn reader_writer( + files: impl Iterator>>, settings: &GlobalSettings, receiver: &Receiver, sender: SyncSender, @@ -98,11 +94,13 @@ fn reader_writer< )?; match read_result { ReadResult::WroteChunksToFile { tmp_files } => { - merge::merge_with_file_limit::<_, _, Tmp>( - tmp_files.into_iter().map(|c| c.reopen()), + let tmp_file = Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?; + merge::merge_with_file_limit::( + tmp_files.into_iter(), settings, output, tmp_dir, + tmp_file, )?; } ReadResult::SortedSingleChunk(chunk) => { diff --git a/src/uu/sort/src/merge.rs b/src/uu/sort/src/merge.rs index 300733d1e3..bcbf59454a 100644 --- a/src/uu/sort/src/merge.rs +++ b/src/uu/sort/src/merge.rs @@ -49,9 +49,9 @@ fn replace_output_file_in_input_files( if let Some(copy) = © { *file = copy.clone().into_os_string(); } else { - let (_file, copy_path) = tmp_dir.next_file()?; - std::fs::copy(file_path, ©_path) - .map_err(|error| SortError::OpenTmpFileFailed { error })?; + let (mut temp_file, copy_path) = tmp_dir.next_file()?; + let mut source_file = File::open(&file_path)?; + std::io::copy(&mut source_file, &mut temp_file)?; *file = copy_path.clone().into_os_string(); copy = Some(copy_path); } @@ -73,75 +73,98 @@ pub fn merge( tmp_dir: &mut TmpDirWrapper, ) -> UResult<()> { replace_output_file_in_input_files(files, output.as_output_name(), tmp_dir)?; - let files = files - .iter() - .map(|file| open(file).map(|file| PlainMergeInput { inner: file })); + // Convert files to an iterator of PlainInputFile + let files_iter = files.iter().map(|file| PlainInputFile { path: file }); if settings.compress_prog.is_none() { - merge_with_file_limit::<_, _, WriteablePlainTmpFile>(files, settings, output, tmp_dir) + let tmp_file = + WriteablePlainTmpFile::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?; + merge_with_file_limit::( + files_iter, settings, output, tmp_dir, tmp_file, + ) } else { - merge_with_file_limit::<_, _, WriteableCompressedTmpFile>(files, settings, output, tmp_dir) + let tmp_file = WriteableCompressedTmpFile::create( + tmp_dir.next_file()?, + settings.compress_prog.as_deref(), + )?; + merge_with_file_limit::( + files_iter, settings, output, tmp_dir, tmp_file, + ) } } -// Merge already sorted `MergeInput`s. -pub fn merge_with_file_limit< - M: MergeInput + 'static, - F: ExactSizeIterator>, - Tmp: WriteableTmpFile + 'static, ->( - files: F, +pub fn merge_with_file_limit( + input_files: impl ExactSizeIterator, settings: &GlobalSettings, output: Output, tmp_dir: &mut TmpDirWrapper, + mut tmp_file: Tmp, ) -> UResult<()> { - if files.len() <= settings.merge_batch_size { - let merger = merge_without_limit(files, settings); - merger?.write_all(settings, output) - } else { - let mut temporary_files = vec![]; - let mut batch = vec![]; - for file in files { - batch.push(file); - if batch.len() >= settings.merge_batch_size { - assert_eq!(batch.len(), settings.merge_batch_size); - let merger = merge_without_limit(batch.into_iter(), settings)?; - batch = vec![]; - - let mut tmp_file = - Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?; + // Code assumes settings.merge_batch_size >= 2. Assert it! + assert!(settings.merge_batch_size >= 2); + // Merge down all the input files into as few temporary files as we can (ideally 0 + // if possible - i.e. merge directly to output). + let mut output_temporary_files = vec![]; + let mut opened_files = vec![]; + for input_file in input_files { + if opened_files.len() >= settings.merge_batch_size { + // Check that we've not somehow accidentally violated our merge-size requirement. + assert_eq!(opened_files.len(), settings.merge_batch_size); + // We have a full batch. Merge them. + let merger = merge_without_limit(opened_files.into_iter(), settings)?; + merger.write_all_to(settings, tmp_file.as_write())?; + output_temporary_files.push(tmp_file.finished_writing()?); + tmp_file = Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?; + opened_files = vec![]; + } + + // Make a backup in case our first attempt to open fails (due to file-descriptor exhaustion). + let input_file_backup = input_file.clone(); + + match input_file.open() { + Ok(opened_file) => opened_files.push(opened_file), + Err(err) => { + // We've run out of descriptors. If we've only managed to open one file then give up, + // otherwise merge what we've got. + if opened_files.len() < 2 { + return Err(err); + } + let merger = merge_without_limit(opened_files.into_iter(), settings)?; merger.write_all_to(settings, tmp_file.as_write())?; - temporary_files.push(tmp_file.finished_writing()?); + output_temporary_files.push(tmp_file.finished_writing()?); + tmp_file = Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?; + + // Now retry the open. If we fail this time, give up completely and return error. + opened_files = vec![input_file_backup.open()?]; } } - // Merge any remaining files that didn't get merged in a full batch above. - if !batch.is_empty() { - assert!(batch.len() < settings.merge_batch_size); - let merger = merge_without_limit(batch.into_iter(), settings)?; - - let mut tmp_file = - Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?; - merger.write_all_to(settings, tmp_file.as_write())?; - temporary_files.push(tmp_file.finished_writing()?); + } + // If we've opened some files but not yet merged them... + if !opened_files.is_empty() { + let merger = merge_without_limit(opened_files.into_iter(), settings)?; + // If we have no output temp files at this point, we can just merge to the final output and be done. + if output_temporary_files.is_empty() { + return merger.write_all(settings, output); } - merge_with_file_limit::<_, _, Tmp>( - temporary_files - .into_iter() - .map(Box::new(|c: Tmp::Closed| c.reopen()) - as Box< - dyn FnMut(Tmp::Closed) -> UResult<::Reopened>, - >), + // If we get to here then we have at least one other open temp file, need to do another round of merges. + merger.write_all_to(settings, tmp_file.as_write())?; + output_temporary_files.push(tmp_file.finished_writing()?); + tmp_file = Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?; + } + if output_temporary_files.is_empty() { + // If there are no temporary files we must be done. + Ok(()) + } else { + merge_with_file_limit::( + output_temporary_files.into_iter(), settings, output, tmp_dir, + tmp_file, ) } } -/// Merge files without limiting how many files are concurrently open. -/// -/// It is the responsibility of the caller to ensure that `files` yields only -/// as many files as we are allowed to open concurrently. -fn merge_without_limit>>( +fn merge_without_limit>( files: F, settings: &GlobalSettings, ) -> UResult { @@ -152,7 +175,7 @@ fn merge_without_limit>>( let (sender, receiver) = sync_channel(2); loaded_receivers.push(receiver); reader_files.push(Some(ReaderFile { - file: file?, + file, sender, carry_over: vec![], })); @@ -204,6 +227,7 @@ fn merge_without_limit>>( reader_join_handle, }) } + /// The struct on the reader thread representing an input file struct ReaderFile { file: M, @@ -379,22 +403,24 @@ fn check_child_success(mut child: Child, program: &str) -> UResult<()> { } /// A temporary file that can be written to. -pub trait WriteableTmpFile: Sized { - type Closed: ClosedTmpFile; +pub trait WriteableTmpFile: Sized + 'static { + type Closed: ClosedFile; type InnerWrite: Write; fn create(file: (File, PathBuf), compress_prog: Option<&str>) -> UResult; /// Closes the temporary file. fn finished_writing(self) -> UResult; fn as_write(&mut self) -> &mut Self::InnerWrite; } -/// A temporary file that is (temporarily) closed, but can be reopened. -pub trait ClosedTmpFile { - type Reopened: MergeInput; - /// Reopens the temporary file. - fn reopen(self) -> UResult; + +/// A file that is (temporarily) closed, but can be reopened. +pub trait ClosedFile: Clone { + type Opened: MergeInput; + /// Opens temporary file. + fn open(self) -> UResult; } + /// A pre-sorted input for merging. -pub trait MergeInput: Send { +pub trait MergeInput: Send + 'static { type InnerRead: Read; /// Cleans this `MergeInput` up. /// Implementations may delete the backing file. @@ -406,6 +432,7 @@ pub struct WriteablePlainTmpFile { path: PathBuf, file: BufWriter, } +#[derive(Clone)] pub struct ClosedPlainTmpFile { path: PathBuf, } @@ -432,9 +459,9 @@ impl WriteableTmpFile for WriteablePlainTmpFile { &mut self.file } } -impl ClosedTmpFile for ClosedPlainTmpFile { - type Reopened = PlainTmpMergeInput; - fn reopen(self) -> UResult { +impl ClosedFile for ClosedPlainTmpFile { + type Opened = PlainTmpMergeInput; + fn open(self) -> UResult { Ok(PlainTmpMergeInput { file: File::open(&self.path).map_err(|error| SortError::OpenTmpFileFailed { error })?, path: self.path, @@ -463,6 +490,7 @@ pub struct WriteableCompressedTmpFile { child: Child, child_stdin: BufWriter, } +#[derive(Clone)] pub struct ClosedCompressedTmpFile { path: PathBuf, compress_prog: String, @@ -508,10 +536,10 @@ impl WriteableTmpFile for WriteableCompressedTmpFile { &mut self.child_stdin } } -impl ClosedTmpFile for ClosedCompressedTmpFile { - type Reopened = CompressedTmpMergeInput; +impl ClosedFile for ClosedCompressedTmpFile { + type Opened = CompressedTmpMergeInput; - fn reopen(self) -> UResult { + fn open(self) -> UResult { let mut command = Command::new(&self.compress_prog); let file = File::open(&self.path).unwrap(); command.stdin(file).stdout(Stdio::piped()).arg("-d"); @@ -544,11 +572,26 @@ impl MergeInput for CompressedTmpMergeInput { } } -pub struct PlainMergeInput { - inner: R, +#[derive(Clone)] +pub struct PlainInputFile<'a> { + path: &'a OsString, } -impl MergeInput for PlainMergeInput { - type InnerRead = R; + +impl ClosedFile for PlainInputFile<'_> { + type Opened = PlainMergeInput; + fn open(self) -> UResult { + Ok(PlainMergeInput { + inner: open(self.path)?, + }) + } +} + +pub struct PlainMergeInput { + inner: Box, +} + +impl MergeInput for PlainMergeInput { + type InnerRead = Box; fn finished_reading(self) -> UResult<()> { Ok(()) } diff --git a/tests/by-util/test_sort.rs b/tests/by-util/test_sort.rs index b6ce74cc85..9d582df10c 100644 --- a/tests/by-util/test_sort.rs +++ b/tests/by-util/test_sort.rs @@ -1086,27 +1086,56 @@ fn test_merge_batch_size() { #[test] #[cfg(any(target_os = "linux", target_os = "android"))] -fn test_merge_batch_size_with_limit() { +fn test_merge_with_limit() { use rlimit::Resource; // Currently need... // 3 descriptors for stdin, stdout, stderr // 2 descriptors for CTRL+C handling logic (to be reworked at some point) - // 2 descriptors for the input files (i.e. batch-size of 2). - let limit_fd = 3 + 2 + 2; - TestScenario::new(util_name!()) - .ucmd() - .limit(Resource::NOFILE, limit_fd, limit_fd) - .arg("--batch-size=2") - .arg("-m") - .arg("--unique") - .arg("merge_ints_interleaved_1.txt") - .arg("merge_ints_interleaved_2.txt") - .arg("merge_ints_interleaved_3.txt") - .arg("merge_ints_interleaved_3.txt") - .arg("merge_ints_interleaved_2.txt") - .arg("merge_ints_interleaved_1.txt") - .succeeds() - .stdout_only_fixture("merge_ints_interleaved.expected"); + // Minimum 2 descriptors for the input files. + // 1 descriptor for a working temp file. + // Test outputs to stdout, so no need for an additional output descriptor. + let minimum_required_fd = 3 + 2 + 2 + 1; + + // Need to test that if we have fewer than minimum_required_fd descriptors available + // we don't hang. + // Then for all cases >= minimum_required_fd the test should pass. + // Run up to a maximum of 10 extra descriptors (which is clearly more than we should + // ever need since we're only opening maximum 6 inputs simultaneously). + let easily_sufficient_fd = minimum_required_fd + 10; + + // Test for explicit fail (i.e. validate we don't hang/loop-forever) when we don't + // have enough descriptors. + for limit_fd in 0..minimum_required_fd { + TestScenario::new(util_name!()) + .ucmd() + .limit(Resource::NOFILE, limit_fd, limit_fd) + .arg("-m") + .arg("--unique") + .arg("merge_ints_interleaved_1.txt") + .arg("merge_ints_interleaved_2.txt") + .arg("merge_ints_interleaved_3.txt") + .arg("merge_ints_interleaved_3.txt") + .arg("merge_ints_interleaved_2.txt") + .arg("merge_ints_interleaved_1.txt") + .fails(); + } + + // Test for pass when we have enough descriptors. + for limit_fd in minimum_required_fd..easily_sufficient_fd { + TestScenario::new(util_name!()) + .ucmd() + .limit(Resource::NOFILE, limit_fd, limit_fd) + .arg("-m") + .arg("--unique") + .arg("merge_ints_interleaved_1.txt") + .arg("merge_ints_interleaved_2.txt") + .arg("merge_ints_interleaved_3.txt") + .arg("merge_ints_interleaved_3.txt") + .arg("merge_ints_interleaved_2.txt") + .arg("merge_ints_interleaved_1.txt") + .succeeds() + .stdout_only_fixture("merge_ints_interleaved.expected"); + } } #[test]