Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove hot calls to memmove by replacing Vec<u8> with VecDeque<u8> #17

Merged
merged 7 commits into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 132 additions & 59 deletions src/decoding/decodebuffer.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,38 @@
use std::collections::VecDeque;
use std::hash::Hasher;
use std::mem::{self, MaybeUninit};
use std::{io, ptr, slice};

use twox_hash::XxHash64;

pub struct Decodebuffer {
pub buffer: Vec<u8>,
buffer: VecDeque<u8>,
pub dict_content: Vec<u8>,

pub window_size: usize,
total_output_counter: u64,
pub hash: XxHash64,
}

impl std::io::Read for Decodebuffer {
fn read(&mut self, target: &mut [u8]) -> std::result::Result<usize, std::io::Error> {
impl io::Read for Decodebuffer {
fn read(&mut self, target: &mut [u8]) -> io::Result<usize> {
let max_amount = self.can_drain_to_window_size().unwrap_or(0);
let amount = max_amount.min(target.len());

let amount = if max_amount > target.len() {
target.len()
} else {
max_amount
};

if amount == 0 {
return Ok(0);
}

self.hash.write(&self.buffer[0..amount]);
target[..amount].copy_from_slice(&self.buffer[..amount]);
self.buffer.drain(0..amount);

let mut written = 0;
self.drain_to(amount, |buf| {
target[written..][..buf.len()].copy_from_slice(buf);
written += buf.len();
Ok(())
})?;
Ok(amount)
}
}

impl Decodebuffer {
pub fn new(window_size: usize) -> Decodebuffer {
Decodebuffer {
buffer: Vec::new(),
buffer: VecDeque::new(),
dict_content: Vec::new(),
window_size,
total_output_counter: 0,
Expand All @@ -61,7 +58,7 @@ impl Decodebuffer {
}

pub fn push(&mut self, data: &[u8]) {
self.buffer.extend_from_slice(data);
self.buffer.extend(data);
self.total_output_counter += data.len() as u64;
}

Expand Down Expand Up @@ -102,19 +99,53 @@ impl Decodebuffer {
} else {
let start_idx = self.buffer.len() - offset;

self.buffer.reserve(match_length);

if start_idx + match_length > self.buffer.len() {
self.buffer.reserve(match_length);
//need to copy byte by byte. can be optimized more but for now lets leave it like this
//TODO batch whats possible
for x in 0..match_length {
self.buffer.push(self.buffer[start_idx + x]);
self.buffer.push_back(self.buffer[start_idx + x]);
}
} else {
// can just copy parts of the existing buffer,
// which is exactly what Vec::extend_from_within was create for
let end_idx = start_idx + match_length;
self.buffer.extend_from_within(start_idx..end_idx);
let mut buf = [MaybeUninit::<u8>::uninit(); 4096];

// can just copy parts of the existing buffer

let mut start_idx = start_idx;
let mut match_length = match_length;
while match_length > 0 {
let filled = {
let (slice1, slice2) = self.buffer.as_slices();

let slice = if slice1.len() > start_idx {
&slice1[start_idx..]
} else {
&slice2[start_idx - slice1.len()..]
};
let slice = &slice[..match_length.min(slice.len()).min(buf.len())];

// TODO: replace with MaybeUninit::write_slice once it's stable.
// SAFETY: we initialize a portion of `buf` and then we return a slice
// of the initialized portion of `buf`.
unsafe {
debug_assert!(slice.len() <= buf.len());

ptr::copy_nonoverlapping(
slice.as_ptr().cast::<MaybeUninit<u8>>(),
buf.as_mut_ptr(),
slice.len(),
);
slice::from_raw_parts(buf.as_ptr().cast::<u8>(), slice.len())
}
};

self.buffer.extend(filled);
start_idx += filled.len();
match_length -= filled.len();
}
}

self.total_output_counter += match_length as u64;
}

Expand Down Expand Up @@ -142,65 +173,107 @@ impl Decodebuffer {
match self.can_drain_to_window_size() {
None => None,
Some(can_drain) => {
self.hash.write(&self.buffer[0..can_drain]);
Some(self.buffer.drain(0..can_drain).collect())
let mut vec = Vec::with_capacity(can_drain);
self.drain_to(can_drain, |buf| {
vec.extend_from_slice(buf);
Ok(())
})
.ok()?;
Some(vec)
}
}
}

pub fn drain_to_window_size_writer(
&mut self,
sink: &mut dyn std::io::Write,
) -> Result<usize, std::io::Error> {
pub fn drain_to_window_size_writer(&mut self, sink: &mut dyn io::Write) -> io::Result<usize> {
match self.can_drain_to_window_size() {
None => Ok(0),
Some(can_drain) => {
self.hash.write(&self.buffer[0..can_drain]);
let mut buf = [0u8; 1]; //TODO batch to reasonable size
for x in self.buffer.drain(0..can_drain) {
buf[0] = x;
sink.write_all(&buf[..])?;
}
self.drain_to(can_drain, |buf| {
sink.write_all(buf)?;
Ok(())
})?;
Ok(can_drain)
}
}
}

//drain the buffer completely
pub fn drain(&mut self) -> Vec<u8> {
self.hash.write(&self.buffer);
let new_buffer = VecDeque::with_capacity(self.buffer.capacity());

let new_buffer = Vec::with_capacity(self.buffer.capacity());
std::mem::replace(&mut self.buffer, new_buffer)
}
let (slice1, slice2) = self.buffer.as_slices();
self.hash.write(slice1);
self.hash.write(slice2);

pub fn drain_to_writer(
&mut self,
sink: &mut dyn std::io::Write,
) -> Result<usize, std::io::Error> {
self.hash.write(&self.buffer);
sink.write_all(&self.buffer)?;
mem::replace(&mut self.buffer, new_buffer).into()
}

pub fn drain_to_writer(&mut self, sink: &mut dyn io::Write) -> io::Result<usize> {
let len = self.buffer.len();
self.buffer.clear();
self.drain_to(len, |buf| {
sink.write_all(buf)?;
Ok(())
})?;

Ok(len)
}

pub fn read_all(&mut self, target: &mut [u8]) -> Result<usize, std::io::Error> {
let amount = if self.buffer.len() > target.len() {
target.len()
} else {
self.buffer.len()
};
pub fn read_all(&mut self, target: &mut [u8]) -> io::Result<usize> {
let amount = self.buffer.len().min(target.len());

let mut written = 0;
self.drain_to(amount, |buf| {
target[written..][..buf.len()].copy_from_slice(buf);
written += buf.len();
Ok(())
})?;
Ok(amount)
}

fn drain_to(
&mut self,
amount: usize,
mut f: impl FnMut(&[u8]) -> io::Result<()>,
) -> io::Result<()> {
if amount == 0 {
return Ok(0);
return Ok(());
}

self.hash.write(&self.buffer[0..amount]);
target[..amount].copy_from_slice(&self.buffer[..amount]);
self.buffer.drain(0..amount);
struct DrainGuard<'a> {
buffer: &'a mut VecDeque<u8>,
amount: usize,
}

Ok(amount)
impl<'a> Drop for DrainGuard<'a> {
fn drop(&mut self) {
if self.amount != 0 {
self.buffer.drain(..self.amount);
}
}
}

let mut drain_guard = DrainGuard {
buffer: &mut self.buffer,
amount: 0,
};

let (slice1, slice2) = drain_guard.buffer.as_slices();
let n1 = slice1.len().min(amount);
let n2 = slice2.len().min(amount - n1);

f(&slice1[..n1])?;
self.hash.write(&slice1[..n1]);
drain_guard.amount += n1;

if n2 != 0 {
f(&slice2[..n2])?;
self.hash.write(&slice2[..n2]);
drain_guard.amount += n2;
}

// Make sure we don't accidentally drop `DrainGuard` earlier.
drop(drain_guard);

Ok(())
}
}
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![deny(trivial_casts, trivial_numeric_casts, rust_2018_idioms)]
#![forbid(unsafe_code)]

pub mod blocks;
pub mod decoding;
Expand Down