Skip to content

Commit 8b478d0

Browse files
committed
handling buffer overflow while creating new segmentBase
1 parent bdc70e4 commit 8b478d0

7 files changed

+50
-16
lines changed

build.go

+2
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,8 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32,
178178
}
179179
sb.updateSize()
180180

181+
fmt.Println("length of segment base mem", len(sb.mem))
182+
181183
// load the data/section starting offsets for each field
182184
// by via the sectionsIndexOffset as starting point.
183185
err := sb.loadFieldsNew()

new.go

+20-14
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package zap
1717
import (
1818
"bytes"
1919
"encoding/binary"
20+
"fmt"
2021
"math"
2122
"sort"
2223
"sync"
@@ -50,27 +51,29 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document,
5051
s := interimPool.Get().(*interim)
5152

5253
var br bytes.Buffer
53-
if s.lastNumDocs > 0 {
54-
// use previous results to initialize the buf with an estimate
55-
// size, but note that the interim instance comes from a
56-
// global interimPool, so multiple scorch instances indexing
57-
// different docs can lead to low quality estimates
58-
estimateAvgBytesPerDoc := int(float64(s.lastOutSize/s.lastNumDocs) *
59-
NewSegmentBufferNumResultsFactor)
60-
estimateNumResults := int(float64(len(results)+NewSegmentBufferNumResultsBump) *
61-
NewSegmentBufferAvgBytesPerDocFactor)
62-
br.Grow(estimateAvgBytesPerDoc * estimateNumResults)
63-
}
64-
54+
// if s.lastNumDocs > 0 {
55+
// // use previous results to initialize the buf with an estimate
56+
// // size, but note that the interim instance comes from a
57+
// // global interimPool, so multiple scorch instances indexing
58+
// // different docs can lead to low quality estimates
59+
// estimateAvgBytesPerDoc := int(float64(s.lastOutSize/s.lastNumDocs) *
60+
// NewSegmentBufferNumResultsFactor)
61+
// estimateNumResults := int(float64(len(results)+NewSegmentBufferNumResultsBump) *
62+
// NewSegmentBufferAvgBytesPerDocFactor)
63+
// br.Grow(estimateAvgBytesPerDoc * estimateNumResults)
64+
// fmt.Println("=============")
65+
// }
66+
// fmt.Println("br size", br.Cap())
67+
br.Grow(255)
6568
s.results = results
6669
s.chunkMode = chunkMode
6770
s.w = NewCountHashWriter(&br)
68-
71+
fmt.Println("buffer initial capacity:", br.Cap())
6972
storedIndexOffset, dictOffsets, sectionsIndexOffset, err := s.convert()
7073
if err != nil {
7174
return nil, uint64(0), err
7275
}
73-
76+
fmt.Println("buffer capacity after write:", br.Cap(), "length of buffer:", len(br.Bytes()))
7477
sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkMode,
7578
s.FieldsMap, s.FieldsInv, uint64(len(results)),
7679
storedIndexOffset, dictOffsets, sectionsIndexOffset)
@@ -225,9 +228,11 @@ func (s *interim) convert() (uint64, []uint64, uint64, error) {
225228

226229
// after persisting the sections to the writer, account corresponding
227230
for _, opaque := range s.opaque {
231+
fmt.Println("type of opaque", opaque.Type())
228232
opaqueIO, ok := opaque.(segment.DiskStatsReporter)
229233
if ok {
230234
s.incrementBytesWritten(opaqueIO.BytesWritten())
235+
fmt.Println("bytes written", opaqueIO.BytesWritten())
231236
}
232237
}
233238

@@ -242,6 +247,7 @@ func (s *interim) convert() (uint64, []uint64, uint64, error) {
242247
return 0, nil, 0, err
243248
}
244249

250+
fmt.Println("offset values", storedIndexOffset, sectionsIndexOffset)
245251
return storedIndexOffset, dictOffsets, sectionsIndexOffset, nil
246252
}
247253

section.go

+1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ type section interface {
5151
type resetable interface {
5252
Reset() error
5353
Set(key string, value interface{})
54+
Type() string
5455
}
5556

5657
// -----------------------------------------------------------------------------

section_faiss_vector_index.go

+6
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ func (v *faissVectorIndexSection) Process(opaque map[int]resetable, docNum uint3
5959

6060
func (v *faissVectorIndexSection) Persist(opaque map[int]resetable, w *CountHashWriter) (n int64, err error) {
6161
vo := v.getvectorIndexOpaque(opaque)
62+
fmt.Println("the writer count", w.Count(), w.Sum32())
6263
vo.writeVectorIndexes(w)
64+
fmt.Println("the writer count after", w.Count(), w.Sum32())
6365
return 0, nil
6466
}
6567

@@ -755,6 +757,10 @@ func (v *vectorIndexOpaque) BytesRead() uint64 {
755757
func (v *vectorIndexOpaque) ResetBytesRead(uint64) {
756758
}
757759

760+
func (v *vectorIndexOpaque) Type() string {
761+
return "vector"
762+
}
763+
758764
// cleanup stuff over here for reusability
759765
func (v *vectorIndexOpaque) Reset() (err error) {
760766
// tracking the number of vecs and fields processed and tracked in this

section_inverted_text_index.go

+4
Original file line numberDiff line numberDiff line change
@@ -1014,3 +1014,7 @@ func (i *invertedIndexOpaque) Set(key string, val interface{}) {
10141014
i.numDocs = val.(uint64)
10151015
}
10161016
}
1017+
1018+
func (v *invertedIndexOpaque) Type() string {
1019+
return "text"
1020+
}

segment.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -322,8 +322,20 @@ func (s *SegmentBase) loadFieldsNew() error {
322322
return s.loadFields()
323323
}
324324

325+
fmt.Println("pos:", pos, "pos+binary.MaxVarintLen64:", pos+binary.MaxVarintLen64, "capacity of buffer:", cap(s.mem))
326+
327+
seek := pos + binary.MaxVarintLen64
328+
if seek > uint64(cap(s.mem)) {
329+
// handling a buffer overflow case.
330+
// a rare case where the backing buffer is not large enough to be read directly via
331+
// a pos+binary.MaxVarinLen64 seek. For eg, this can happen when there is only
332+
// one field to be indexed in the entire batch of data and while writing out
333+
// these fields metadata, you write 1 + 8 bytes whereas the MaxVarintLen64 = 10.
334+
seek = uint64(cap(s.mem))
335+
}
336+
325337
// read the number of fields
326-
numFields, sz := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64])
338+
numFields, sz := binary.Uvarint(s.mem[pos:seek])
327339
pos += uint64(sz)
328340
s.incrementBytesRead(uint64(sz))
329341

write.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package zap
1616

1717
import (
1818
"encoding/binary"
19+
"fmt"
1920
"io"
2021

2122
"github.com/RoaringBitmap/roaring"
@@ -53,7 +54,7 @@ func writeRoaringWithLen(r *roaring.Bitmap, w io.Writer,
5354
func persistFieldsSection(fieldsInv []string, w *CountHashWriter, dictLocs []uint64, opaque map[int]resetable) (uint64, error) {
5455
var rv uint64
5556
fieldsOffsets := make([]uint64, 0, len(fieldsInv))
56-
57+
fmt.Println("total number of fields:", len(fieldsInv))
5758
for fieldID, fieldName := range fieldsInv {
5859
// record start of this field
5960
fieldsOffsets = append(fieldsOffsets, uint64(w.Count()))
@@ -70,6 +71,8 @@ func persistFieldsSection(fieldsInv []string, w *CountHashWriter, dictLocs []uin
7071
return 0, err
7172
}
7273

74+
fmt.Println(" ->field being written out:", fieldName, "number of sections:", len(segmentSections))
75+
7376
// write out the number of field-specific indexes
7477
// FIXME hard-coding to 2, and not attempting to support sparseness well
7578
_, err = writeUvarints(w, uint64(len(segmentSections)))

0 commit comments

Comments
 (0)