Skip to content

Commit

Permalink
Merge pull request #2750 from rbtcollins/fillthebuffers
Browse files Browse the repository at this point in the history
Fixes #2748 by filling the streaming buffers fully
  • Loading branch information
kinnison authored May 4, 2021
2 parents 8e632bb + ad3ce8b commit 55ba10d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
1 change: 1 addition & 0 deletions src/diskio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ pub fn write_file_incremental<P: AsRef<Path>, F: Fn(usize)>(
{
trace_scoped!("write_segment", "name": path_display, "len": len);
f.write_all(&contents)?;
drop(contents);
chunk_complete_callback(len);
}
}
Expand Down
27 changes: 18 additions & 9 deletions src/dist/component/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,16 +364,18 @@ fn unpack_without_first_dir<'a, R: Read>(
continue;
}

struct SenderEntry<'a, 'b, R: std::io::Read> {
sender: Box<dyn FnMut(Vec<u8>) -> bool + 'a>,
entry: tar::Entry<'b, R>,
}

/// true if either no sender_entry was provided, or the incremental file
/// has been fully dispatched.
fn flush_ios<'a, R: std::io::Read, P: AsRef<Path>>(
mut budget: &mut MemoryBudget,
io_executor: &dyn Executor,
mut directories: &mut HashMap<PathBuf, DirStatus>,
mut sender_entry: Option<&mut (
Box<dyn FnMut(Vec<u8>) -> bool + 'a>,
&mut tar::Entry<'_, R>,
)>,
mut sender_entry: Option<&mut SenderEntry<'a, '_, R>>,
full_path: P,
) -> Result<bool> {
let mut result = sender_entry.is_none();
Expand All @@ -384,16 +386,20 @@ fn unpack_without_first_dir<'a, R: Read>(
trigger_children(&*io_executor, &mut directories, &mut budget, op)?;
}
// Maybe stream a file incrementally
if let Some((sender, entry)) = sender_entry.as_mut() {
if let Some(sender) = sender_entry.as_mut() {
if budget.available() as u64 >= IO_CHUNK_SIZE {
let mut v = vec![0; IO_CHUNK_SIZE as usize];
let len = entry.read(&mut v)?;
let len = sender
.entry
.by_ref()
.take(IO_CHUNK_SIZE)
.read_to_end(&mut v)?;
if len == 0 {
result = true;
}
v.resize(len, 0);
budget.claim_chunk(len);
if !sender(v) {
if !(sender.sender)(v) {
bail!(format!(
"IO receiver for '{}' disconnected",
full_path.as_ref().display()
Expand Down Expand Up @@ -519,8 +525,11 @@ fn unpack_without_first_dir<'a, R: Read>(
}
}

let mut incremental_file_sender = incremental_file_sender
.map(|incremental_file_sender| (incremental_file_sender, &mut entry));
let mut incremental_file_sender =
incremental_file_sender.map(|incremental_file_sender| SenderEntry {
sender: incremental_file_sender,
entry,
});

// monitor io queue and feed in the content of the file (if needed)
while !flush_ios(
Expand Down

0 comments on commit 55ba10d

Please sign in to comment.