Skip to content

Commit

Permalink
New stat for bytes read off the disk (#1702)
Browse files Browse the repository at this point in the history
* bytes read while querying intial

* adding the postings RBM bytes size to bytesRead

* making bytesRead part of scorch stats

* resetting bytes read stat for every search

* fixed the rcu logic wrt merger and introduced a new wcu stat

* bug fix: fixed total bytes read value after merge

* updated bytes read with stored fields and iterators' count

* accounting bytes read for loading term dictionaries in newIndexSnapshotFieldDict

* unit tests and bug fixes

* updated unit tests

* code cleanup

* refactoring and code cleanup

* updated the scorch_segment_api version

* refactoring and code cleanup

* renaming num_bytes_read_query_time -> num_bytes_read_at_query_time

* updated zapx version, code cleanup
  • Loading branch information
Thejas-bhat authored Jul 13, 2022
1 parent 0f17630 commit 1cbcfd5
Show file tree
Hide file tree
Showing 9 changed files with 468 additions and 17 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/blevesearch/go-porterstemmer v1.0.3
github.com/blevesearch/goleveldb v1.0.1
github.com/blevesearch/gtreap v0.1.1
github.com/blevesearch/scorch_segment_api/v2 v2.1.0
github.com/blevesearch/scorch_segment_api/v2 v2.1.1
github.com/blevesearch/segment v0.9.0
github.com/blevesearch/snowball v0.6.1
github.com/blevesearch/snowballstem v0.9.0
Expand All @@ -21,7 +21,7 @@ require (
github.com/blevesearch/zapx/v12 v12.3.4
github.com/blevesearch/zapx/v13 v13.3.4
github.com/blevesearch/zapx/v14 v14.3.4
github.com/blevesearch/zapx/v15 v15.3.4
github.com/blevesearch/zapx/v15 v15.3.5-0.20220713163830-ae843e553177
github.com/couchbase/moss v0.2.0
github.com/golang/protobuf v1.3.2
github.com/spf13/cobra v0.0.5
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ github.com/blevesearch/gtreap v0.1.1/go.mod h1:QaQyDRAT51sotthUWAH4Sj08awFSSWzgY
github.com/blevesearch/mmap-go v1.0.2/go.mod h1:ol2qBqYaOUsGdm7aRMRrYGgPvnwLe6Y+7LMvAB5IbSA=
github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc=
github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs=
github.com/blevesearch/scorch_segment_api/v2 v2.1.0 h1:NFwteOpZEvJk5Vg0H6gD0hxupsG3JYocE4DBvsA2GZI=
github.com/blevesearch/scorch_segment_api/v2 v2.1.0/go.mod h1:uch7xyyO/Alxkuxa+CGs79vw0QY8BENSBjg6Mw5L5DE=
github.com/blevesearch/scorch_segment_api/v2 v2.1.1 h1:J8UDudUpDJz21d/hCMIshCeRordwnDTftgXcSDMUx40=
github.com/blevesearch/scorch_segment_api/v2 v2.1.1/go.mod h1:uch7xyyO/Alxkuxa+CGs79vw0QY8BENSBjg6Mw5L5DE=
github.com/blevesearch/segment v0.9.0 h1:5lG7yBCx98or7gK2cHMKPukPZ/31Kag7nONpoBt22Ac=
github.com/blevesearch/segment v0.9.0/go.mod h1:9PfHYUdQCgHktBgvtUOF4x+pc4/l8rdH0u5spnW85UQ=
github.com/blevesearch/snowball v0.6.1 h1:cDYjn/NCH+wwt2UdehaLpr2e4BwLIjN4V/TdLsL+B5A=
Expand All @@ -40,8 +41,8 @@ github.com/blevesearch/zapx/v13 v13.3.4 h1:f646k6300VGRIR7eJ6lLtF8UC95NIWmF899j4
github.com/blevesearch/zapx/v13 v13.3.4/go.mod h1:Wl7hO1gT+IDvJb7i06g2iW5Qvw0KzncJPsBx7WGWhLA=
github.com/blevesearch/zapx/v14 v14.3.4 h1:/FVzSGFG5rbVWfPEqlcaJd8lZSJMQpTdmFhz/l2QI7w=
github.com/blevesearch/zapx/v14 v14.3.4/go.mod h1:b1YhRXXhAj9i+9aOwhRKCHUmJyYieK/QbDvPJDLddUk=
github.com/blevesearch/zapx/v15 v15.3.4 h1:/y6AOxRuBiZPFAItqcrKcXPPtlAwuW/jMoOFO7tc7rs=
github.com/blevesearch/zapx/v15 v15.3.4/go.mod h1:TQ/qDC2q7TSSpeC6Vgr9fDN56Ra0u49lZJQ4v30WEx4=
github.com/blevesearch/zapx/v15 v15.3.5-0.20220713163830-ae843e553177 h1:0/WYF9nS1HBgDc3z7ePdDz15CwMuNYZ4WeD7Kravm7M=
github.com/blevesearch/zapx/v15 v15.3.5-0.20220713163830-ae843e553177/go.mod h1:ii4ohMQC0TCUjYfq8OtrbABgeI1zljjyXBFpUe/dPDw=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
Expand Down
19 changes: 19 additions & 0 deletions index/scorch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
fileMergeZapStartTime := time.Now()

atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1)
prevBytesReadTotal := cumulateBytesRead(segmentsToMerge)
newDocNums, _, err := s.segPlugin.Merge(segmentsToMerge, docsToDrop, path,
cw.cancelCh, s)
atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1)
Expand All @@ -352,6 +353,14 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1)
return err
}

switch segI := seg.(type) {
case segment.DiskStatsReporter:
totalBytesRead := segI.BytesRead() + prevBytesReadTotal
segI.SetBytesRead(totalBytesRead)
seg = segI.(segment.Segment)
}

oldNewDocNums = make(map[uint64][]uint64)
for i, segNewDocNums := range newDocNums {
oldNewDocNums[task.Segments[i].Id()] = segNewDocNums
Expand Down Expand Up @@ -426,6 +435,16 @@ type segmentMerge struct {
notifyCh chan *mergeTaskIntroStatus
}

func cumulateBytesRead(sbs []segment.Segment) uint64 {
var rv uint64
for _, seg := range sbs {
if segI, diskStatsAvailable := seg.(segment.DiskStatsReporter); diskStatsAvailable {
rv += segI.BytesRead()
}
}
return rv
}

// perform a merging of the given SegmentBase instances into a new,
// persisted segment, and synchronously introduce that new segment
// into the root
Expand Down
16 changes: 16 additions & 0 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,13 +387,23 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
analysisResults := make([]index.Document, int(numUpdates))
var itemsDeQueued uint64
var totalAnalysisSize int
analysisBytes := func(tokMap index.TokenFrequencies) (rv uint64) {
for k := range tokMap {
rv += uint64(len(k))
}
return rv
}
for itemsDeQueued < numUpdates {
result := <-resultChan
resultSize := result.Size()
atomic.AddUint64(&s.iStats.analysisBytesAdded, uint64(resultSize))
totalAnalysisSize += resultSize
analysisResults[itemsDeQueued] = result
itemsDeQueued++
result.VisitFields(func(f index.Field) {
atomic.AddUint64(&s.stats.TotBytesIndexedAfterAnalysis,
analysisBytes(f.AnalyzedTokenFrequencies()))
})
}
close(resultChan)
defer atomic.AddUint64(&s.iStats.analysisBytesRemoved, uint64(totalAnalysisSize))
Expand Down Expand Up @@ -525,6 +535,10 @@ func (s *Scorch) Stats() json.Marshaler {
return &s.stats
}

func (s *Scorch) BytesReadQueryTime() uint64 {
return s.stats.TotBytesReadAtQueryTime
}

func (s *Scorch) diskFileStats(rootSegmentPaths map[string]struct{}) (uint64,
uint64, uint64) {
var numFilesOnDisk, numBytesUsedDisk, numBytesOnDiskByRoot uint64
Expand Down Expand Up @@ -582,7 +596,9 @@ func (s *Scorch) StatsMap() map[string]interface{} {
m["index_time"] = m["TotIndexTime"]
m["term_searchers_started"] = m["TotTermSearchersStarted"]
m["term_searchers_finished"] = m["TotTermSearchersFinished"]
m["num_bytes_read_at_query_time"] = m["TotBytesReadAtQueryTime"]
m["num_plain_text_bytes_indexed"] = m["TotIndexedPlainTextBytes"]
m["num_bytes_indexed_after_analysis"] = m["TotBytesIndexedAfterAnalysis"]
m["num_items_introduced"] = m["TotIntroducedItems"]
m["num_items_persisted"] = m["TotPersistedItems"]
m["num_recs_to_persist"] = m["TotItemsToPersist"]
Expand Down
76 changes: 66 additions & 10 deletions index/scorch/snapshot_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ var reflectStaticSizeIndexSnapshot int
// in the kvConfig.
var DefaultFieldTFRCacheThreshold uint64 = 10

type diskStatsReporter segment.DiskStatsReporter

func init() {
var is interface{} = IndexSnapshot{}
reflectStaticSizeIndexSnapshot = int(reflect.TypeOf(is).Size())
Expand Down Expand Up @@ -146,10 +148,19 @@ func (i *IndexSnapshot) newIndexSnapshotFieldDict(field string,
results := make(chan *asynchSegmentResult)
for index, segment := range i.segment {
go func(index int, segment *SegmentSnapshot) {
var prevBytesRead uint64
seg, diskStatsAvailable := segment.segment.(diskStatsReporter)
if diskStatsAvailable {
prevBytesRead = seg.BytesRead()
}
dict, err := segment.segment.Dictionary(field)
if err != nil {
results <- &asynchSegmentResult{err: err}
} else {
if diskStatsAvailable {
atomic.AddUint64(&i.parent.stats.TotBytesReadAtQueryTime,
seg.BytesRead()-prevBytesRead)
}
if randomLookup {
results <- &asynchSegmentResult{dict: dict}
} else {
Expand Down Expand Up @@ -424,6 +435,11 @@ func (i *IndexSnapshot) Document(id string) (rv index.Document, err error) {
segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum)

rvd := document.NewDocument(id)
var prevBytesRead uint64
seg, diskStatsAvailable := i.segment[segmentIndex].segment.(segment.DiskStatsReporter)
if diskStatsAvailable {
prevBytesRead = seg.BytesRead()
}
err = i.segment[segmentIndex].VisitDocument(localDocNum, func(name string, typ byte, val []byte, pos []uint64) bool {
if name == "_id" {
return true
Expand Down Expand Up @@ -453,7 +469,10 @@ func (i *IndexSnapshot) Document(id string) (rv index.Document, err error) {
if err != nil {
return nil, err
}

if diskStatsAvailable {
delta := seg.BytesRead() - prevBytesRead
atomic.AddUint64(&i.parent.stats.TotBytesReadAtQueryTime, delta)
}
return rvd, nil
}

Expand Down Expand Up @@ -505,18 +524,18 @@ func (i *IndexSnapshot) InternalID(id string) (rv index.IndexInternalID, err err
return next.ID, nil
}

func (i *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq,
func (is *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq,
includeNorm, includeTermVectors bool) (index.TermFieldReader, error) {
rv := i.allocTermFieldReaderDicts(field)
rv := is.allocTermFieldReaderDicts(field)

rv.term = term
rv.field = field
rv.snapshot = i
rv.snapshot = is
if rv.postings == nil {
rv.postings = make([]segment.PostingsList, len(i.segment))
rv.postings = make([]segment.PostingsList, len(is.segment))
}
if rv.iterators == nil {
rv.iterators = make([]segment.PostingsIterator, len(i.segment))
rv.iterators = make([]segment.PostingsIterator, len(is.segment))
}
rv.segmentOffset = 0
rv.includeFreq = includeFreq
Expand All @@ -526,25 +545,54 @@ func (i *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq,
rv.currID = rv.currID[:0]

if rv.dicts == nil {
rv.dicts = make([]segment.TermDictionary, len(i.segment))
for i, segment := range i.segment {
rv.dicts = make([]segment.TermDictionary, len(is.segment))
for i, segment := range is.segment {
var prevBytesRead uint64
segP, diskStatsAvailable := segment.segment.(diskStatsReporter)
if diskStatsAvailable {
prevBytesRead = segP.BytesRead()
}
dict, err := segment.segment.Dictionary(field)
if err != nil {
return nil, err
}
if diskStatsAvailable {
atomic.AddUint64(&is.parent.stats.TotBytesReadAtQueryTime, segP.BytesRead()-prevBytesRead)
}
rv.dicts[i] = dict
}
}

for i, segment := range i.segment {
for i, segment := range is.segment {
var prevBytesReadPL uint64
if postings, diskStatsAvailable := rv.postings[i].(diskStatsReporter); diskStatsAvailable {
prevBytesReadPL = postings.BytesRead()
}
pl, err := rv.dicts[i].PostingsList(term, segment.deleted, rv.postings[i])
if err != nil {
return nil, err
}
rv.postings[i] = pl

var prevBytesReadItr uint64
if itr, diskStatsAvailable := rv.iterators[i].(diskStatsReporter); diskStatsAvailable {
prevBytesReadItr = itr.BytesRead()
}
rv.iterators[i] = pl.Iterator(includeFreq, includeNorm, includeTermVectors, rv.iterators[i])

if postings, diskStatsAvailable := pl.(diskStatsReporter); diskStatsAvailable &&
prevBytesReadPL < postings.BytesRead() {
atomic.AddUint64(&is.parent.stats.TotBytesReadAtQueryTime,
postings.BytesRead()-prevBytesReadPL)
}

if itr, diskStatsAvailable := rv.iterators[i].(diskStatsReporter); diskStatsAvailable &&
prevBytesReadItr < itr.BytesRead() {
atomic.AddUint64(&is.parent.stats.TotBytesReadAtQueryTime,
itr.BytesRead()-prevBytesReadItr)
}
}
atomic.AddUint64(&i.parent.stats.TotTermSearchersStarted, uint64(1))
atomic.AddUint64(&is.parent.stats.TotTermSearchersStarted, uint64(1))
return rv, nil
}

Expand Down Expand Up @@ -661,10 +709,18 @@ func (i *IndexSnapshot) documentVisitFieldTermsOnSegment(
}

if ssvOk && ssv != nil && len(vFields) > 0 {
var prevBytesRead uint64
ssvp, diskStatsAvailable := ssv.(segment.DiskStatsReporter)
if diskStatsAvailable {
prevBytesRead = ssvp.BytesRead()
}
dvs, err = ssv.VisitDocValues(localDocNum, fields, visitor, dvs)
if err != nil {
return nil, nil, err
}
if diskStatsAvailable {
atomic.AddUint64(&i.parent.stats.TotBytesReadAtQueryTime, ssvp.BytesRead()-prevBytesRead)
}
}

if errCh != nil {
Expand Down
14 changes: 14 additions & 0 deletions index/scorch/snapshot_index_tfr.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func (i *IndexSnapshotTermFieldReader) Next(preAlloced *index.TermFieldDoc) (*in
}
// find the next hit
for i.segmentOffset < len(i.iterators) {
prevBytesRead := uint64(0)
itr, diskStatsAvailable := i.iterators[i.segmentOffset].(segment.DiskStatsReporter)
if diskStatsAvailable {
prevBytesRead = itr.BytesRead()
}
next, err := i.iterators[i.segmentOffset].Next()
if err != nil {
return nil, err
Expand All @@ -89,6 +94,15 @@ func (i *IndexSnapshotTermFieldReader) Next(preAlloced *index.TermFieldDoc) (*in

i.currID = rv.ID
i.currPosting = next
// postingsIterators is maintain the bytesRead stat in a cumulative fashion.
// this is because there are chances of having a series of loadChunk calls,
// and they have to be added together before sending the bytesRead at this point
// upstream.
if diskStatsAvailable {
delta := itr.BytesRead() - prevBytesRead
atomic.AddUint64(&i.snapshot.parent.stats.TotBytesReadAtQueryTime, uint64(delta))
}

return rv, nil
}
i.segmentOffset++
Expand Down
4 changes: 3 additions & 1 deletion index/scorch/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ type Stats struct {
TotAnalysisTime uint64
TotIndexTime uint64

TotIndexedPlainTextBytes uint64
TotBytesReadAtQueryTime uint64
TotIndexedPlainTextBytes uint64
TotBytesIndexedAfterAnalysis uint64

TotTermSearchersStarted uint64
TotTermSearchersFinished uint64
Expand Down
Loading

0 comments on commit 1cbcfd5

Please sign in to comment.