Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s2: Add EncodeBuffer buffer recycling callback #982

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions s2/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,14 @@ type Writer struct {
snappy bool
flushOnWrite bool
appendIndex bool
bufferCB func([]byte)
level uint8
}

type result struct {
b []byte
// return when writing
ret []byte
// Uncompressed start offset
startOffset int64
}
Expand Down Expand Up @@ -146,6 +149,10 @@ func (w *Writer) Reset(writer io.Writer) {
for write := range toWrite {
// Wait for the data to be available.
input := <-write
if input.ret != nil && w.bufferCB != nil {
w.bufferCB(input.ret)
input.ret = nil
}
in := input.b
if len(in) > 0 {
if w.err(nil) == nil {
Expand Down Expand Up @@ -341,7 +348,8 @@ func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) {
// but the input buffer cannot be written to by the caller
// until Flush or Close has been called when concurrency != 1.
//
// If you cannot control that, use the regular Write function.
// Use the WriterBufferDone to receive a callback when the buffer is done
// Processing.
//
// Note that input is not buffered.
// This means that each write will result in discrete blocks being created.
Expand All @@ -364,6 +372,9 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
}
if w.concurrency == 1 {
_, err := w.writeSync(buf)
if w.bufferCB != nil {
w.bufferCB(buf)
}
return err
}

Expand All @@ -378,7 +389,7 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
hWriter <- result{startOffset: w.uncompWritten, b: magicChunkBytes}
}
}

orgBuf := buf
for len(buf) > 0 {
// Cut input.
uncompressed := buf
Expand All @@ -397,6 +408,9 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
startOffset: w.uncompWritten,
}
w.uncompWritten += int64(len(uncompressed))
if len(buf) == 0 && w.bufferCB != nil {
res.ret = orgBuf
}
go func() {
race.ReadSlice(uncompressed)

Expand Down Expand Up @@ -941,6 +955,17 @@ func WriterUncompressed() WriterOption {
}
}

// WriterBufferDone will perform a callback when EncodeBuffer has finished
// writing a buffer to the output and the buffer can safely be reused.
// If the buffer was split into several blocks, it will be sent after the last block.
// Callbacks will not be done concurrently.
func WriterBufferDone(fn func(b []byte)) WriterOption {
return func(w *Writer) error {
w.bufferCB = fn
return nil
}
}

// WriterBlockSize allows to override the default block size.
// Blocks will be this size or smaller.
// Minimum size is 4KB and maximum size is 4MB.
Expand Down
43 changes: 43 additions & 0 deletions s2/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,49 @@ func TestBigEncodeBufferSync(t *testing.T) {
t.Log(n)
}

func TestWriterBufferDone(t *testing.T) {
const blockSize = 1 << 20
var buffers [][]byte
for _, size := range []int{10, 100, 10000, blockSize, blockSize * 8} {
buffers = append(buffers, make([]byte, size))
}

dst := io.Discard
wantNextBuf := 0
var cbErr error
enc := NewWriter(dst, WriterBlockSize(blockSize), WriterConcurrency(4), WriterBufferDone(func(b []byte) {
if !bytes.Equal(b, buffers[wantNextBuf]) && cbErr == nil {
cbErr = fmt.Errorf("wrong buffer returned, want %v got %v", buffers[wantNextBuf], b)
}
// Detect races.
for i := range b[:] {
b[i] = 255
}
wantNextBuf++
}))
for n, buf := range buffers {
// Change the buffer to a new value.
for i := range buf[:] {
buf[i] = byte(n)
}
// Send the buffer
err := enc.EncodeBuffer(buf)
if err != nil {
t.Fatal(err)
}
}
err := enc.Close()
if err != nil {
t.Fatal(err)
}
if wantNextBuf != len(buffers) {
t.Fatalf("want %d buffers, got %d ", len(buffers), wantNextBuf)
}
if cbErr != nil {
t.Fatal(cbErr)
}
}

func BenchmarkWriterRandom(b *testing.B) {
rng := rand.New(rand.NewSource(1))
// Make max window so we never get matches.
Expand Down
Loading