Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correcting bytes read value with respect to metadata loading #1769

Merged
merged 5 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions index/scorch/introducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func (s *Scorch) introducePersist(persist *persistIntroduction) {
deleted: segmentSnapshot.deleted,
cachedDocs: segmentSnapshot.cachedDocs,
creator: "introducePersist",
mmaped: 1,
}
newIndexSnapshot.segment[i] = newSegmentSnapshot
delete(persist.persisted, segmentSnapshot.id)
Expand Down Expand Up @@ -413,13 +414,15 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
// deleted by the time we reach here, can skip the introduction.
if nextMerge.new != nil &&
nextMerge.new.Count() > newSegmentDeleted.GetCardinality() {

// put new segment at end
newSnapshot.segment = append(newSnapshot.segment, &SegmentSnapshot{
id: nextMerge.id,
segment: nextMerge.new, // take ownership for nextMerge.new's ref-count
deleted: newSegmentDeleted,
cachedDocs: &cachedDocs{cache: nil},
creator: "introduceMerge",
mmaped: nextMerge.mmaped,
})
newSnapshot.offsets = append(newSnapshot.offsets, running)
atomic.AddUint64(&s.stats.TotIntroducedSegmentsMerge, 1)
Expand Down
2 changes: 2 additions & 0 deletions index/scorch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
oldNewDocNums: oldNewDocNums,
new: seg,
notifyCh: make(chan *mergeTaskIntroStatus),
mmaped: 1,
}

s.fireEvent(EventKindMergeTaskIntroductionStart, 0)
Expand Down Expand Up @@ -429,6 +430,7 @@ type segmentMerge struct {
oldNewDocNums map[uint64][]uint64
new segment.Segment
notifyCh chan *mergeTaskIntroStatus
mmaped uint32
}

func cumulateBytesRead(sbs []segment.Segment) uint64 {
Expand Down
11 changes: 9 additions & 2 deletions index/scorch/snapshot_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,15 @@ func (is *IndexSnapshot) TermFieldReader(ctx context.Context, term []byte, field
if rv.dicts == nil {
rv.dicts = make([]segment.TermDictionary, len(is.segment))
for i, s := range is.segment {
segBytesRead := s.segment.BytesRead()
rv.incrementBytesRead(segBytesRead)
// the intention behind this compare and swap operation is
// to make sure that the accounting of the metadata is happening
// only once(which corresponds to this persisted segment's most
// recent segPlugin.Open() call), and any subsequent queries won't
// incur this cost which would essentially be a double counting.
if atomic.CompareAndSwapUint32(&s.mmaped, 1, 0) {
segBytesRead := s.segment.BytesRead()
rv.incrementBytesRead(segBytesRead)
}
dict, err := s.segment.Dictionary(field)
if err != nil {
return nil, err
Expand Down
7 changes: 6 additions & 1 deletion index/scorch/snapshot_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ var TermSeparator byte = 0xff
var TermSeparatorSplitSlice = []byte{TermSeparator}

type SegmentSnapshot struct {
// this flag is needed to identify whether this
// segment was mmaped recently, in which case
// we consider the loading cost of the metadata
// as part of IO stats.
mmaped uint32
id uint64
segment segment.Segment
deleted *roaring.Bitmap
Expand All @@ -54,7 +59,7 @@ func (s *SegmentSnapshot) FullSize() int64 {
return int64(s.segment.Count())
}

func (s SegmentSnapshot) LiveSize() int64 {
func (s *SegmentSnapshot) LiveSize() int64 {
return int64(s.Count())
}

Expand Down
12 changes: 6 additions & 6 deletions index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ func TestBytesRead(t *testing.T) {
}
stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 206545 && res.BytesRead == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for fuzzy query is 206545, got %v",
if bytesRead-prevBytesRead != 8468 && res.BytesRead == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for fuzzy query is 8468, got %v",
bytesRead-prevBytesRead)
}
prevBytesRead = bytesRead
Expand Down Expand Up @@ -466,8 +466,8 @@ func TestBytesRead(t *testing.T) {

stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 54945 && res.BytesRead == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for numeric range query is 54945, got %v",
if bytesRead-prevBytesRead != 924 && res.BytesRead == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for numeric range query is 924, got %v",
bytesRead-prevBytesRead)
}
prevBytesRead = bytesRead
Expand Down Expand Up @@ -498,8 +498,8 @@ func TestBytesRead(t *testing.T) {
// since it's created afresh and not reused
stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 18090 && res.BytesRead == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for disjunction query is 18090, got %v",
if bytesRead-prevBytesRead != 83 && res.BytesRead == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for disjunction query is 83, got %v",
bytesRead-prevBytesRead)
}
}
Expand Down