From 9af9fcd35c6744b665243b58e0899d50e383b317 Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Tue, 4 May 2021 14:06:25 +0200 Subject: [PATCH] Fixes #2748 by filling the streaming buffers fully Diagnosis by Daniel. Code by Robert. --- src/dist/component/package.rs | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/src/dist/component/package.rs b/src/dist/component/package.rs index 56a5a897b43..6d3acae1d41 100644 --- a/src/dist/component/package.rs +++ b/src/dist/component/package.rs @@ -364,16 +364,18 @@ fn unpack_without_first_dir<'a, R: Read>( continue; } + struct SenderEntry<'a, 'b, R: std::io::Read> { + sender: Box) -> bool + 'a>, + entry: tar::Entry<'b, R>, + } + /// true if either no sender_entry was provided, or the incremental file /// has been fully dispatched. fn flush_ios<'a, R: std::io::Read, P: AsRef>( mut budget: &mut MemoryBudget, io_executor: &dyn Executor, mut directories: &mut HashMap, - mut sender_entry: Option<&mut ( - Box) -> bool + 'a>, - &mut tar::Entry<'_, R>, - )>, + mut sender_entry: Option<&mut SenderEntry<'a, '_, R>>, full_path: P, ) -> Result { let mut result = sender_entry.is_none(); @@ -384,16 +386,20 @@ fn unpack_without_first_dir<'a, R: Read>( trigger_children(&*io_executor, &mut directories, &mut budget, op)?; } // Maybe stream a file incrementally - if let Some((sender, entry)) = sender_entry.as_mut() { + if let Some(sender) = sender_entry.as_mut() { if budget.available() as u64 >= IO_CHUNK_SIZE { let mut v = vec![0; IO_CHUNK_SIZE as usize]; - let len = entry.read(&mut v)?; + let len = sender + .entry + .by_ref() + .take(IO_CHUNK_SIZE) + .read_to_end(&mut v)?; if len == 0 { result = true; } v.resize(len, 0); budget.claim_chunk(len); - if !sender(v) { + if !(sender.sender)(v) { bail!(format!( "IO receiver for '{}' disconnected", full_path.as_ref().display() @@ -519,8 +525,11 @@ fn unpack_without_first_dir<'a, R: Read>( } } - let mut incremental_file_sender = incremental_file_sender - .map(|incremental_file_sender| (incremental_file_sender, &mut entry)); + let mut incremental_file_sender = + incremental_file_sender.map(|incremental_file_sender| SenderEntry { + sender: incremental_file_sender, + entry, + }); // monitor io queue and feed in the content of the file (if needed) while !flush_ios(