Skip to content

Commit

Permalink
Merge pull request #11 from Nemo157/fix-large-chunks
Browse files Browse the repository at this point in the history
Fix compressing large chunks
  • Loading branch information
Nemo157 authored May 3, 2019
2 parents 0dae0f2 + 78d0964 commit 0ff6229
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 105 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ bytes = "0.4.12"
flate2 = "1.0.7"
futures-preview = "0.3.0-alpha.15"
pin-project = "0.3.2"

[dev-dependencies]
rand = "0.6.5"
14 changes: 7 additions & 7 deletions src/stream/brotli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use std::io::Result;

use brotli2::raw::{CoStatus, CompressOp};
pub use brotli2::{raw::Compress, CompressParams};
use bytes::{BufMut, Bytes, BytesMut};
use bytes::{Bytes, BytesMut};
use futures::{ready, stream::Stream};
use pin_project::unsafe_project;

#[unsafe_project(Unpin)]
pub struct BrotliStream<S: Stream<Item = Result<Bytes>>> {
#[pin]
inner: S,
flushing: bool,
flush: bool,
compress: Compress,
}

Expand All @@ -26,14 +26,14 @@ impl<S: Stream<Item = Result<Bytes>>> Stream for BrotliStream<S> {

let this = self.project();

if *this.flushing {
if *this.flush {
return Poll::Ready(None);
}

let input_buffer = if let Some(bytes) = ready!(this.inner.poll_next(cx)) {
bytes?
} else {
*this.flushing = true;
*this.flush = true;
Bytes::new()
};

Expand All @@ -42,7 +42,7 @@ impl<S: Stream<Item = Result<Bytes>>> Stream for BrotliStream<S> {
let output_ref = &mut &mut [][..];
loop {
let status = this.compress.compress(
if *this.flushing {
if *this.flush {
CompressOp::Finish
} else {
CompressOp::Process
Expand All @@ -51,7 +51,7 @@ impl<S: Stream<Item = Result<Bytes>>> Stream for BrotliStream<S> {
output_ref,
)?;
while let Some(buf) = this.compress.take_output(None) {
compressed_output.put(buf);
compressed_output.extend_from_slice(buf);
}
match status {
CoStatus::Finished => break,
Expand All @@ -67,7 +67,7 @@ impl<S: Stream<Item = Result<Bytes>>> BrotliStream<S> {
pub fn new(stream: S, compress: Compress) -> BrotliStream<S> {
BrotliStream {
inner: stream,
flushing: false,
flush: false,
compress,
}
}
Expand Down
134 changes: 94 additions & 40 deletions src/stream/flate.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,126 @@
use core::{
use std::{
io::Result,
mem,
pin::Pin,
task::{Context, Poll},
};
use std::io::Result;

use bytes::{Bytes, BytesMut};
use flate2::FlushCompress;
pub use flate2::{Compress, Compression};
pub(crate) use flate2::Compress;
use flate2::{FlushCompress, Status};
use futures::{ready, stream::Stream};
use pin_project::unsafe_project;

#[derive(Debug)]
enum State {
Reading,
Writing(Bytes),
Flushing,
Done,
Invalid,
}

#[unsafe_project(Unpin)]
pub struct CompressedStream<S: Stream<Item = Result<Bytes>>> {
pub(crate) struct CompressedStream<S: Stream<Item = Result<Bytes>>> {
#[pin]
inner: S,
flushing: bool,
input_buffer: Bytes,
output_buffer: BytesMut,
state: State,
output: BytesMut,
compress: Compress,
}

impl<S: Stream<Item = Result<Bytes>>> Stream for CompressedStream<S> {
type Item = Result<Bytes>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
const OUTPUT_BUFFER_SIZE: usize = 8_000;
let mut this = self.project();

let this = self.project();
fn compress(
compress: &mut Compress,
input: &mut Bytes,
output: &mut BytesMut,
flush: FlushCompress,
) -> Result<(Status, Bytes)> {
const OUTPUT_BUFFER_SIZE: usize = 8_000;

if this.input_buffer.is_empty() {
if *this.flushing {
return Poll::Ready(None);
} else if let Some(bytes) = ready!(this.inner.poll_next(cx)) {
*this.input_buffer = bytes?;
} else {
*this.flushing = true;
if output.len() < OUTPUT_BUFFER_SIZE {
output.resize(OUTPUT_BUFFER_SIZE, 0);
}

let (prior_in, prior_out) = (compress.total_in(), compress.total_out());
let status = compress.compress(input, output, flush)?;
let input_len = compress.total_in() - prior_in;
let output_len = compress.total_out() - prior_out;

input.advance(input_len as usize);
Ok((status, output.split_to(output_len as usize).freeze()))
}

this.output_buffer.resize(OUTPUT_BUFFER_SIZE, 0);

let flush = if *this.flushing {
FlushCompress::Finish
} else {
FlushCompress::None
};

let (prior_in, prior_out) = (this.compress.total_in(), this.compress.total_out());
this.compress
.compress(this.input_buffer, this.output_buffer, flush)?;
let input = this.compress.total_in() - prior_in;
let output = this.compress.total_out() - prior_out;

this.input_buffer.advance(input as usize);
Poll::Ready(Some(Ok(this
.output_buffer
.split_to(output as usize)
.freeze())))
#[allow(clippy::never_loop)] // https://github.com/rust-lang/rust-clippy/issues/4058
loop {
break match mem::replace(this.state, State::Invalid) {
State::Reading => {
*this.state = State::Reading;
*this.state = match ready!(this.inner.as_mut().poll_next(cx)) {
Some(chunk) => State::Writing(chunk?),
None => State::Flushing,
};
continue;
}

State::Writing(mut input) => {
if input.is_empty() {
*this.state = State::Reading;
continue;
}

let (status, chunk) = compress(
&mut this.compress,
&mut input,
&mut this.output,
FlushCompress::None,
)?;

*this.state = match status {
Status::Ok => State::Writing(input),
Status::StreamEnd => unreachable!(),
Status::BufError => panic!("unexpected BufError"),
};

Poll::Ready(Some(Ok(chunk)))
}

State::Flushing => {
let (status, chunk) = compress(
&mut this.compress,
&mut Bytes::new(),
&mut this.output,
FlushCompress::Finish,
)?;

*this.state = match status {
Status::Ok => State::Flushing,
Status::StreamEnd => State::Done,
Status::BufError => panic!("unexpected BufError"),
};

Poll::Ready(Some(Ok(chunk)))
}

State::Done => Poll::Ready(None),

State::Invalid => panic!("CompressedStream reached invalid state"),
};
}
}
}

impl<S: Stream<Item = Result<Bytes>>> CompressedStream<S> {
pub fn new(stream: S, compress: Compress) -> CompressedStream<S> {
pub(crate) fn new(stream: S, compress: Compress) -> CompressedStream<S> {
CompressedStream {
inner: stream,
flushing: false,
input_buffer: Bytes::new(),
output_buffer: BytesMut::new(),
state: State::Reading,
output: BytesMut::new(),
compress,
}
}
Expand Down
Loading

0 comments on commit 0ff6229

Please sign in to comment.