Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zstd: Improve decompression speed #511

Merged
merged 3 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions zstd/blockdec.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ func (b *blockDec) decodeSequences(hist *history) error {
}
b.sequence = b.sequence[:hist.decoders.nSeqs]
if hist.decoders.nSeqs == 0 {
hist.decoders.seqSize = len(hist.decoders.literals)
return nil
}
hist.decoders.prevOffset = hist.recentOffsets
Expand Down
2 changes: 1 addition & 1 deletion zstd/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,7 @@ func testDecoderFile(t *testing.T, fn string, newDec func() (*Decoder, error)) {
}
wg.Wait()
if gotError != nil {
t.Error(err)
t.Error(gotError, err)
if err != ErrCRCMismatch {
return
}
Expand Down
100 changes: 56 additions & 44 deletions zstd/seqdec.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ func (s *sequenceDecs) decode(seqs []seqVals) error {
br.fillFast()
} else {
if br.overread() {
printf("reading sequence %d, exceeded available data\n", i)
if debugDecoder {
printf("reading sequence %d, exceeded available data\n", i)
}
return io.ErrUnexpectedEOF
}
ll, mo, ml = s.next(br, llState, mlState, ofState)
Expand Down Expand Up @@ -248,76 +250,89 @@ func (s *sequenceDecs) execute(seqs []seqVals, hist []byte) error {
}

if debugDecoder {
printf("Execute %d seqs with hist %d, dict %d, literals: %d bytes\n", len(seqs), len(hist), len(s.dict), len(s.literals))
printf("Execute %d seqs with hist %d, dict %d, literals: %d into %d bytes\n", len(seqs), len(hist), len(s.dict), len(s.literals), s.seqSize)
}

var t = len(s.out)
out := s.out[:t+s.seqSize]

for _, seq := range seqs {
// Add literals
s.out = append(s.out, s.literals[:seq.ll]...)
copy(out[t:], s.literals[:seq.ll])
t += seq.ll
s.literals = s.literals[seq.ll:]
out := s.out

// Copy form dictionary...
if seq.mo > len(s.out)+len(hist) || seq.mo > s.windowSize {
// Copy from dictionary...
if seq.mo > t+len(hist) || seq.mo > s.windowSize {
if len(s.dict) == 0 {
return fmt.Errorf("match offset (%d) bigger than current history (%d)", seq.mo, len(s.out)+len(hist))
return fmt.Errorf("match offset (%d) bigger than current history (%d)", seq.mo, t+len(hist))
}

// we may be in dictionary.
dictO := len(s.dict) - (seq.mo - (len(s.out) + len(hist)))
dictO := len(s.dict) - (seq.mo - (t + len(hist)))
if dictO < 0 || dictO >= len(s.dict) {
return fmt.Errorf("match offset (%d) bigger than current history+dict (%d)", seq.mo, len(s.out)+len(hist)+len(s.dict))
return fmt.Errorf("match offset (%d) bigger than current history+dict (%d)", seq.mo, t+len(hist)+len(s.dict))
}
end := dictO + seq.ml
if end > len(s.dict) {
out = append(out, s.dict[dictO:]...)
seq.mo -= len(s.dict) - dictO
seq.ml -= len(s.dict) - dictO
n := len(s.dict) - dictO
copy(out[t:], s.dict[dictO:])
t += n
seq.ml -= n
} else {
s.out = append(out, s.dict[dictO:end]...)
copy(out[t:], s.dict[dictO:end])
t += end - dictO
continue
}
}

// Copy from history.
if v := seq.mo - len(s.out); v > 0 {
if v := seq.mo - t; v > 0 {
// v is the start position in history from end.
start := len(hist) - v
if seq.ml > v {
// Some goes into current block.
// Copy remainder of history
out = append(out, hist[start:]...)
seq.mo -= v
copy(out[t:], hist[start:])
t += v
seq.ml -= v
} else {
s.out = append(out, hist[start:start+seq.ml]...)
copy(out[t:], hist[start:start+seq.ml])
t += seq.ml
continue
}
}
// We must be in current buffer now
if seq.ml > 0 {
start := len(s.out) - seq.mo
if seq.ml <= len(s.out)-start {
start := t - seq.mo
if seq.ml <= t-start {
// No overlap
s.out = append(out, s.out[start:start+seq.ml]...)
copy(out[t:], out[start:start+seq.ml])
t += seq.ml
continue
} else {
// Overlapping copy
// Extend destination slice and copy one byte at the time.
out = out[:len(out)+seq.ml]
src := out[start : start+seq.ml]
// Destination is the space we just added.
dst := out[len(out)-seq.ml:]
dst := out[t:]
dst = dst[:len(src)]
t += len(src)
// Destination is the space we just added.
for i := range src {
dst[i] = src[i]
}
}
}
s.out = out
}
// Add final literals
s.out = append(s.out, s.literals...)
copy(out[t:], s.literals)
if debugDecoder {
t += len(s.literals)
if t != len(out) {
panic(fmt.Errorf("length mismatch, want %d, got %d, ss: %d", len(out), t, s.seqSize))
}
}
s.out = out

return nil
}
Expand All @@ -331,6 +346,7 @@ func (s *sequenceDecs) decodeSync(history *history) error {
llTable, mlTable, ofTable := s.litLengths.fse.dt[:maxTablesize], s.matchLengths.fse.dt[:maxTablesize], s.offsets.fse.dt[:maxTablesize]
llState, mlState, ofState := s.litLengths.state.state, s.matchLengths.state.state, s.offsets.state.state
hist := history.b[history.ignoreBuffer:]
out := s.out

for i := seqs - 1; i >= 0; i-- {
if br.overread() {
Expand Down Expand Up @@ -408,51 +424,49 @@ func (s *sequenceDecs) decodeSync(history *history) error {
if ll > len(s.literals) {
return fmt.Errorf("unexpected literal count, want %d bytes, but only %d is available", ll, len(s.literals))
}
size := ll + ml + len(s.out)
size := ll + ml + len(out)
if size-startSize > maxBlockSize {
return fmt.Errorf("output (%d) bigger than max block size", size)
}
if size > cap(s.out) {
if size > cap(out) {
// Not enough size, which can happen under high volume block streaming conditions
// but could be if destination slice is too small for sync operations.
// over-allocating here can create a large amount of GC pressure so we try to keep
// it as contained as possible
used := len(s.out) - startSize
used := len(out) - startSize
addBytes := 256 + ll + ml + used>>2
// Clamp to max block size.
if used+addBytes > maxBlockSize {
addBytes = maxBlockSize - used
}
s.out = append(s.out, make([]byte, addBytes)...)
s.out = s.out[:len(s.out)-addBytes]
out = append(out, make([]byte, addBytes)...)
out = out[:len(out)-addBytes]
}
if ml > maxMatchLen {
return fmt.Errorf("match len (%d) bigger than max allowed length", ml)
}

// Add literals
s.out = append(s.out, s.literals[:ll]...)
out = append(out, s.literals[:ll]...)
s.literals = s.literals[ll:]
out := s.out

if mo == 0 && ml > 0 {
return fmt.Errorf("zero matchoff and matchlen (%d) > 0", ml)
}

if mo > len(s.out)+len(hist) || mo > s.windowSize {
if mo > len(out)+len(hist) || mo > s.windowSize {
if len(s.dict) == 0 {
return fmt.Errorf("match offset (%d) bigger than current history (%d)", mo, len(s.out)+len(hist))
return fmt.Errorf("match offset (%d) bigger than current history (%d)", mo, len(out)+len(hist))
}

// we may be in dictionary.
dictO := len(s.dict) - (mo - (len(s.out) + len(hist)))
dictO := len(s.dict) - (mo - (len(out) + len(hist)))
if dictO < 0 || dictO >= len(s.dict) {
return fmt.Errorf("match offset (%d) bigger than current history (%d)", mo, len(s.out)+len(hist))
return fmt.Errorf("match offset (%d) bigger than current history (%d)", mo, len(out)+len(hist))
}
end := dictO + ml
if end > len(s.dict) {
out = append(out, s.dict[dictO:]...)
mo -= len(s.dict) - dictO
ml -= len(s.dict) - dictO
} else {
out = append(out, s.dict[dictO:end]...)
Expand All @@ -463,14 +477,13 @@ func (s *sequenceDecs) decodeSync(history *history) error {

// Copy from history.
// TODO: Blocks without history could be made to ignore this completely.
if v := mo - len(s.out); v > 0 {
if v := mo - len(out); v > 0 {
// v is the start position in history from end.
start := len(hist) - v
if ml > v {
// Some goes into current block.
// Copy remainder of history
out = append(out, hist[start:]...)
mo -= v
ml -= v
} else {
out = append(out, hist[start:start+ml]...)
Expand All @@ -479,10 +492,10 @@ func (s *sequenceDecs) decodeSync(history *history) error {
}
// We must be in current buffer now
if ml > 0 {
start := len(s.out) - mo
if ml <= len(s.out)-start {
start := len(out) - mo
if ml <= len(out)-start {
// No overlap
out = append(out, s.out[start:start+ml]...)
out = append(out, out[start:start+ml]...)
} else {
// Overlapping copy
// Extend destination slice and copy one byte at the time.
Expand All @@ -496,7 +509,6 @@ func (s *sequenceDecs) decodeSync(history *history) error {
}
}
}
s.out = out
if i == 0 {
// This is the last sequence, so we shouldn't update state.
break
Expand Down Expand Up @@ -524,7 +536,7 @@ func (s *sequenceDecs) decodeSync(history *history) error {
}

// Add final literals
s.out = append(s.out, s.literals...)
s.out = append(out, s.literals...)
return br.close()
}

Expand Down