Skip to content

Commit

Permalink
feat: Remove the drain function from stream context
Browse files Browse the repository at this point in the history
Allow decouping the function from a shared stream context.

BREAKING CHANGE: Do not pass the drain function to new_compress_stream_context and new_decompress_stream_context. Pass it to each call of compress_chunk, compress_chunk_at, compress_end, decompress_chunk and decompress_chunk_at.
  • Loading branch information
prantlf committed Oct 26, 2023
1 parent 94bb1b0 commit f9e5872
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 64 deletions.
23 changes: 13 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,36 +183,39 @@ If you want to compress or decompress large data, or if the data comes chunk aft
cctx := zstd.new_compress_context()!
defer { cctx.free() }
cctx.set_param(zstd.CompressParam.checksum_flag, 1)!
mut sctx := zstd.new_compress_stream_context()
mut dst := []u8{cap: zstd.compress_bound(src.len)}
mut dst_ref := &dst
drain := fn [dst_ref] (buf &u8, len int) ! {
unsafe { dst_ref.push_many(buf, len) }
}
mut sctx := new_compress_stream_context(drain)
cctx.compress_chunk(mut sctx, src, false)!
cctx.compress_chunk(mut sctx, src, false, drain)!
...
unsafe { cctx.compress_chunk(mut sctx, data, len, true)! }
unsafe { cctx.compress_chunk(mut sctx, data, len, true, drain)! }
...
cctx.compress_end(mut sctx)!
```

#### Compression

```v
new_compress_stream_context(drain fn (buf &u8, len int) !) &StreamContext
(c &CompressContext) compress_chunk(mut sctx StreamContext, src []u8, last bool) !
(c &CompressContext) compress_chunk_at(mut sctx StreamContext, src &u8, src_len int, last bool) !
new_compress_stream_context() &StreamContext
(c &CompressContext) compress_chunk(
mut sctx StreamContext, src []u8, last bool, drain fn (buf &u8, len int) !) !
(c &CompressContext) compress_chunk_at(
mut sctx StreamContext, src &u8, src_len int, last bool, drain fn (buf &u8, len int) !) !
```

#### Decompression

```v
new_decompress_stream_context(drain fn (buf &u8, len int) !) &StreamContext
(d &DecompressContext) decompress_chunk(mut sctx StreamContext, src []u8, last bool) !
(d &DecompressContext) decompress_chunk_at(mut sctx StreamContext, src &u8, src_len int, last bool) !
new_decompress_stream_context() &StreamContext
(d &DecompressContext) decompress_chunk(
mut sctx StreamContext, src []u8, last bool, drain fn (buf &u8, len int) !) !
(d &DecompressContext) decompress_chunk_at(
mut sctx StreamContext, src &u8, src_len int, last bool, drain fn (buf &u8, len int) !) !
```

### Errors
Expand Down
21 changes: 8 additions & 13 deletions src/compress_stream.v
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,17 @@ const (
compress_stream_in_size = int(C.ZSTD_CStreamInSize())
)

pub fn new_compress_stream_context(drain fn (buf &u8, len int) !) &StreamContext {
dst := []u8{len: zstd.compress_stream_out_size}
output := &C.ZSTD_outBuffer{dst.data, usize(zstd.compress_stream_out_size), 0}
src := []u8{len: zstd.compress_stream_in_size}
input := &C.ZSTD_inBuffer{src.data, 0, 0}
return &StreamContext{drain, output, input}
pub fn new_compress_stream_context() &StreamContext {
return new_stream_context(zstd.compress_stream_out_size, zstd.compress_stream_in_size)
}

[inline]
pub fn (c &CompressContext) compress_chunk(mut sctx StreamContext, src []u8, last bool) ! {
unsafe { c.compress_chunk_at(mut sctx, src.data, src.len, last)! }
pub fn (c &CompressContext) compress_chunk(mut sctx StreamContext, src []u8, last bool, drain fn (buf &u8, len int) !) ! {
unsafe { c.compress_chunk_at(mut sctx, src.data, src.len, last, drain)! }
}

[unsafe]
pub fn (c &CompressContext) compress_chunk_at(mut sctx StreamContext, src &u8, src_len int, last bool) ! {
pub fn (c &CompressContext) compress_chunk_at(mut sctx StreamContext, src &u8, src_len int, last bool, drain fn (buf &u8, len int) !) ! {
mut res := usize(0)
mut pos := 0
mut rest_len := src_len
Expand All @@ -40,19 +36,18 @@ pub fn (c &CompressContext) compress_chunk_at(mut sctx StreamContext, src &u8, s
}
res = C.ZSTD_compressStream2(c.cctx, sctx.output, sctx.input, end_op)
check_error(res)!
drain_buffer(mut sctx)!
drain_buffer(mut sctx, drain)!
pos += max_len
}
if last && (res != 0 || sctx.input.pos != sctx.input.size) {
return error('unfinished compression')
}
}

pub fn (c &CompressContext) compress_end(mut sctx StreamContext) ! {
pub fn (c &CompressContext) compress_end(mut sctx StreamContext, drain fn (buf &u8, len int) !) ! {
mut res := C.ZSTD_compressStream2(c.cctx, sctx.output, sctx.input, C.ZSTD_e_end)
check_error(res)!
if sctx.output.pos > 0 {
sctx.drain(sctx.output.dst, int(sctx.output.pos))!
sctx.output.pos = 0
drain_buffer(mut sctx, drain)!
}
}
24 changes: 13 additions & 11 deletions src/compress_stream_test.v
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ fn decompress_stream_test(src []u8) ![]u8 {
drain := fn [dst_ref] (buf &u8, len int) ! {
unsafe { dst_ref.push_many(buf, len) }
}
mut sctx := new_decompress_stream_context(drain)
dctx.decompress_chunk(mut sctx, src, true)!
mut sctx := new_decompress_stream_context()
dctx.decompress_chunk(mut sctx, src, true, drain)!

return dst
}
Expand All @@ -33,8 +33,8 @@ fn test_compress_one() {
drain := fn [dst_ref] (buf &u8, len int) ! {
unsafe { dst_ref.push_many(buf, len) }
}
mut sctx := new_compress_stream_context(drain)
cctx.compress_chunk(mut sctx, src, true)!
mut sctx := new_compress_stream_context()
cctx.compress_chunk(mut sctx, src, true, drain)!
cctx.reset(ResetDir.session_and_parameters)

assert dst == [u8(40), u8(181), u8(47), u8(253), u8(36), u8(85), u8(61), u8(2), u8(0), u8(18),
Expand Down Expand Up @@ -66,11 +66,12 @@ fn test_compress_two() {
drain := fn [dst_ref] (buf &u8, len int) ! {
unsafe { dst_ref.push_many(buf, len) }
}
mut sctx := new_compress_stream_context(drain)
mut sctx := new_compress_stream_context()
unsafe {
cctx.compress_chunk_at(mut sctx, src.data, half, false)!
cctx.compress_chunk_at(mut sctx, &u8(src.data) + half, src.len - half, true)!
cctx.compress_chunk_at(mut sctx, src.data, half, false, drain)!
cctx.compress_chunk_at(mut sctx, &u8(src.data) + half, src.len - half, true, drain)!
}
sctx.reset()
assert dst == [u8(40), u8(181), u8(47), u8(253), u8(4), u8(88), u8(21), u8(2), u8(0), u8(82),
u8(133), u8(15), u8(19), u8(176), u8(23), u8(115), u8(208), u8(167), u8(37), u8(249), u8(38),
u8(211), u8(179), u8(108), u8(78), u8(82), u8(3), u8(236), u8(193), u8(25), u8(243), u8(136),
Expand Down Expand Up @@ -100,12 +101,13 @@ fn test_compress_with_end() {
drain := fn [dst_ref] (buf &u8, len int) ! {
unsafe { dst_ref.push_many(buf, len) }
}
mut sctx := new_compress_stream_context(drain)
mut sctx := new_compress_stream_context()
unsafe {
cctx.compress_chunk_at(mut sctx, src.data, half, false)!
cctx.compress_chunk_at(mut sctx, &u8(src.data) + half, src.len - half, false)!
cctx.compress_chunk_at(mut sctx, src.data, half, false, drain)!
cctx.compress_chunk_at(mut sctx, &u8(src.data) + half, src.len - half, false,
drain)!
}
cctx.compress_end(mut sctx)!
cctx.compress_end(mut sctx, drain)!

assert dst == [u8(40), u8(181), u8(47), u8(253), u8(4), u8(88), u8(21), u8(2), u8(0), u8(82),
u8(133), u8(15), u8(19), u8(176), u8(23), u8(115), u8(208), u8(167), u8(37), u8(249), u8(38),
Expand Down
16 changes: 6 additions & 10 deletions src/decompress_stream.v
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,17 @@ const (
decompress_stream_in_size = int(C.ZSTD_DStreamInSize())
)

pub fn new_decompress_stream_context(drain fn (buf &u8, len int) !) &StreamContext {
dst := []u8{len: zstd.decompress_stream_out_size}
output := &C.ZSTD_outBuffer{dst.data, usize(zstd.decompress_stream_out_size), 0}
src := []u8{len: zstd.decompress_stream_in_size}
input := &C.ZSTD_inBuffer{src.data, 0, 0}
return &StreamContext{drain, output, input}
pub fn new_decompress_stream_context() &StreamContext {
return new_stream_context(zstd.decompress_stream_out_size, zstd.decompress_stream_in_size)
}

[inline]
pub fn (d &DecompressContext) decompress_chunk(mut sctx StreamContext, src []u8, last bool) ! {
unsafe { d.decompress_chunk_at(mut sctx, src.data, src.len, last)! }
pub fn (d &DecompressContext) decompress_chunk(mut sctx StreamContext, src []u8, last bool, drain fn (buf &u8, len int) !) ! {
unsafe { d.decompress_chunk_at(mut sctx, src.data, src.len, last, drain)! }
}

[unsafe]
pub fn (d &DecompressContext) decompress_chunk_at(mut sctx StreamContext, src &u8, src_len int, last bool) ! {
pub fn (d &DecompressContext) decompress_chunk_at(mut sctx StreamContext, src &u8, src_len int, last bool, drain fn (buf &u8, len int) !) ! {
mut res := usize(0)
mut pos := 0
mut rest_len := src_len
Expand All @@ -35,7 +31,7 @@ pub fn (d &DecompressContext) decompress_chunk_at(mut sctx StreamContext, src &u
sctx.input.size += usize(len)
res = C.ZSTD_decompressStream(d.dctx, sctx.output, sctx.input)
check_error(res)!
drain_buffer(mut sctx)!
drain_buffer(mut sctx, drain)!
pos += max_len
}
if last && (res != 0 || sctx.input.pos != sctx.input.size) {
Expand Down
17 changes: 8 additions & 9 deletions src/decompress_stream_test.v
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ fn compress_stream_test(src []u8) ![]u8 {
drain := fn [dst_ref] (buf &u8, len int) ! {
unsafe { dst_ref.push_many(buf, len) }
}
mut sctx := new_compress_stream_context(drain)
cctx.compress_chunk(mut sctx, src, true)!
mut sctx := new_compress_stream_context()
cctx.compress_chunk(mut sctx, src, true, drain)!

return dst
}
Expand Down Expand Up @@ -42,8 +42,8 @@ fn test_decompress_one() {
drain := fn [dst_ref] (buf &u8, len int) ! {
unsafe { dst_ref.push_many(buf, len) }
}
mut sctx := new_decompress_stream_context(drain)
dctx.decompress_chunk(mut sctx, src, true)!
mut sctx := new_decompress_stream_context()
dctx.decompress_chunk(mut sctx, src, true, drain)!
dctx.reset(ResetDir.session_and_parameters)

assert dst == 'A sentence with a length longer than a minimum content size to test zstd compression.'.bytes()
Expand All @@ -54,8 +54,6 @@ fn test_decompress_two() {
defer {
dctx.free()
}
dctx.set_param(DecompressParam.window_log_max, 27)!
assert dctx.get_param(DecompressParam.window_log_max)! == 27

src := [u8(40), u8(181), u8(47), u8(253), u8(36), u8(85), u8(61), u8(2), u8(0), u8(18), u8(133),
u8(15), u8(20), u8(176), u8(55), u8(7), u8(208), u8(167), u8(37), u8(249), u8(38), u8(165),
Expand All @@ -73,10 +71,11 @@ fn test_decompress_two() {
drain := fn [dst_ref] (buf &u8, len int) ! {
unsafe { dst_ref.push_many(buf, len) }
}
mut sctx := new_decompress_stream_context(drain)
mut sctx := new_decompress_stream_context()
unsafe {
dctx.decompress_chunk_at(mut sctx, src.data, half, false)!
dctx.decompress_chunk_at(mut sctx, &u8(src.data) + half, src.len - half, true)!
dctx.decompress_chunk_at(mut sctx, src.data, half, false, drain)!
dctx.decompress_chunk_at(mut sctx, &u8(src.data) + half, src.len - half, true,
drain)!
}
assert dst == 'A sentence with a length longer than a minimum content size to test zstd compression.'.bytes()
assert src == compress_stream_test(dst)!
Expand Down
19 changes: 16 additions & 3 deletions src/stream_context.v
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,28 @@ module zstd

[noinit]
pub struct StreamContext {
drain fn (buf &u8, len int) ! [required]
mut:
output &C.ZSTD_outBuffer = unsafe { nil }
input &C.ZSTD_inBuffer = unsafe { nil }
}

fn drain_buffer(mut sctx StreamContext) ! {
pub fn (mut s StreamContext) reset() {
s.output.pos = 0
s.input.size = 0
s.input.pos = 0
}

fn new_stream_context(out_len int, in_len int) &StreamContext {
dst := []u8{len: out_len}
output := &C.ZSTD_outBuffer{dst.data, usize(out_len), 0}
src := []u8{len: in_len}
input := &C.ZSTD_inBuffer{src.data, 0, 0}
return &StreamContext{output, input}
}

fn drain_buffer(mut sctx StreamContext, drain fn (buf &u8, len int) !) ! {
if sctx.output.pos > 0 {
sctx.drain(sctx.output.dst, int(sctx.output.pos))!
drain(sctx.output.dst, int(sctx.output.pos))!
sctx.output.pos = 0
}
if sctx.input.size > 0 {
Expand Down
16 changes: 8 additions & 8 deletions src/stream_context_test.v
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,25 @@ fn test_drain_empty_buffer() {
drain := fn (buf &u8, len int) ! {}
output := &C.ZSTD_outBuffer{unsafe { nil }, 0, 0}
input := &C.ZSTD_inBuffer{unsafe { nil }, 0, 0}
mut sctx := &StreamContext{drain, output, input}
drain_buffer(mut sctx)!
mut sctx := &StreamContext{output, input}
drain_buffer(mut sctx, drain)!
}

fn test_drain_output() {
drain := fn (buf &u8, len int) ! {}
output := &C.ZSTD_outBuffer{unsafe { nil }, 0, 1}
input := &C.ZSTD_inBuffer{unsafe { nil }, 0, 0}
mut sctx := &StreamContext{drain, output, input}
drain_buffer(mut sctx)!
mut sctx := &StreamContext{output, input}
drain_buffer(mut sctx, drain)!
assert output.pos == 0
}

fn test_drain_partially_consumed_input() {
drain := fn (buf &u8, len int) ! {}
output := &C.ZSTD_outBuffer{unsafe { nil }, 0, 0}
input := &C.ZSTD_inBuffer{unsafe { nil }, 1, 1}
mut sctx := &StreamContext{drain, output, input}
drain_buffer(mut sctx)!
mut sctx := &StreamContext{output, input}
drain_buffer(mut sctx, drain)!
assert input.size == 0
assert input.pos == 0
}
Expand All @@ -32,8 +32,8 @@ fn test_drain_fully_consumed_input() {
output := &C.ZSTD_outBuffer{unsafe { nil }, 0, 0}
mut src := [u8(1), u8(2)]
input := &C.ZSTD_inBuffer{unsafe { src.data }, 2, 1}
mut sctx := &StreamContext{drain, output, input}
drain_buffer(mut sctx)!
mut sctx := &StreamContext{output, input}
drain_buffer(mut sctx, drain)!
assert input.size == 1
assert input.pos == 0
assert src[0] == u8(2)
Expand Down

0 comments on commit f9e5872

Please sign in to comment.