From 094e4970d4f0451535504fba4fa96b20f90751f1 Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Tue, 4 May 2021 14:06:25 +0200 Subject: [PATCH] Fixes #2748 by filling the streaming buffers fully Diagnosis by Daniel. Code by Robert. --- src/dist/component/package.rs | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/src/dist/component/package.rs b/src/dist/component/package.rs index 56a5a897b43..74878721500 100644 --- a/src/dist/component/package.rs +++ b/src/dist/component/package.rs @@ -2,6 +2,7 @@ //! for installing from a directory or tarball to an installation //! prefix, represented by a `Components` instance. +use std::cmp::min; use std::collections::{HashMap, HashSet}; use std::fmt; use std::io::{self, ErrorKind as IOErrorKind, Read}; @@ -364,16 +365,19 @@ fn unpack_without_first_dir<'a, R: Read>( continue; } + struct SenderEntry<'a, 'b, R: std::io::Read> { + sender: Box) -> bool + 'a>, + entry: tar::Entry<'b, R>, + read: usize, + } + /// true if either no sender_entry was provided, or the incremental file /// has been fully dispatched. fn flush_ios<'a, R: std::io::Read, P: AsRef>( mut budget: &mut MemoryBudget, io_executor: &dyn Executor, mut directories: &mut HashMap, - mut sender_entry: Option<&mut ( - Box) -> bool + 'a>, - &mut tar::Entry<'_, R>, - )>, + mut sender_entry: Option<&mut SenderEntry<'a, '_, R>>, full_path: P, ) -> Result { let mut result = sender_entry.is_none(); @@ -384,16 +388,19 @@ fn unpack_without_first_dir<'a, R: Read>( trigger_children(&*io_executor, &mut directories, &mut budget, op)?; } // Maybe stream a file incrementally - if let Some((sender, entry)) = sender_entry.as_mut() { + if let Some(sender) = sender_entry.as_mut() { if budget.available() as u64 >= IO_CHUNK_SIZE { - let mut v = vec![0; IO_CHUNK_SIZE as usize]; - let len = entry.read(&mut v)?; + let chunk_size = min(IO_CHUNK_SIZE, sender.entry.size() - sender.read as u64); + let mut v = vec![0; chunk_size as usize]; + let len = v.len(); if len == 0 { result = true; + } else { + sender.entry.read_exact(&mut v)?; + sender.read += v.len(); } - v.resize(len, 0); budget.claim_chunk(len); - if !sender(v) { + if !(sender.sender)(v) { bail!(format!( "IO receiver for '{}' disconnected", full_path.as_ref().display() @@ -519,8 +526,13 @@ fn unpack_without_first_dir<'a, R: Read>( } } - let mut incremental_file_sender = incremental_file_sender - .map(|incremental_file_sender| (incremental_file_sender, &mut entry)); + let mut incremental_file_sender = incremental_file_sender.map(|incremental_file_sender| { + (SenderEntry { + sender: incremental_file_sender, + entry, + read: 0, + }) + }); // monitor io queue and feed in the content of the file (if needed) while !flush_ios(