Skip to content

Commit

Permalink
introducing a timer based vector cache
Browse files Browse the repository at this point in the history
  • Loading branch information
Thejas-bhat committed Mar 25, 2024
1 parent 0c7027f commit 068f7af
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 10 deletions.
3 changes: 3 additions & 0 deletions build.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32,
docValueOffset: 0, // docValueOffsets identified automatically by the section
dictLocs: dictLocs,
fieldFSTs: make(map[uint16]*vellum.FST),
vectorCache: &vecCache{
cache: make(map[uint16]*cacheEntry),
},
}
sb.updateSize()

Expand Down
186 changes: 186 additions & 0 deletions faiss_vector_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright (c) 2023 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build vectors
// +build vectors

package zap

import (
"sync"
"sync/atomic"
"time"

faiss "github.com/blevesearch/go-faiss"
)

type vecCache struct {
m sync.RWMutex
cache map[uint16]*cacheEntry
}

type ewma struct {
alpha float64
avg float64
sample uint64
}

type cacheEntry struct {
cacheMonitor *ewma
closeCh chan struct{}

m sync.RWMutex
index *faiss.IndexImpl
}

func (vc *vecCache) Clear() {
vc.m.Lock()
defer vc.m.Unlock()

// close every cache monitor in the cache, thereby closing the index
// cached as well.
for _, c := range vc.cache {
close(c.closeCh)
}
}

func (vc *vecCache) checkCacheForVecIndex(fieldID uint16,
indexBytes []byte) (vecIndex *faiss.IndexImpl, err error) {
cachedIndex, present := vc.isVecIndexCached(fieldID)
if present {
vecIndex = cachedIndex
vc.addRef(fieldID)
} else {
// if the cache doesn't have vector index, just construct it out of the
// index bytes and update the cache.
vecIndex, err = faiss.ReadIndexFromBuffer(indexBytes, faiss.IOFlagReadOnly)
vc.update(fieldID, vecIndex)
}
return vecIndex, err
}

func (vc *vecCache) isVecIndexCached(fieldID uint16) (*faiss.IndexImpl, bool) {
vc.m.RLock()
entry, present := vc.cache[fieldID]
vc.m.RUnlock()
if entry == nil {
return nil, false
}

entry.m.RLock()
rv := entry.index
entry.m.RUnlock()

// the following cleanup is when the index is closed as part of cacheEntry.closeIndex()
// but the corresponding field entry in the vecCache is not cleaned up
// fixme: this looks ugly must be refactored in a better way.
if rv == nil {
vc.m.Lock()
delete(vc.cache, fieldID)
vc.m.Unlock()
}

return rv, present && (rv != nil)
}

func (vc *vecCache) addRef(fieldIDPlus1 uint16) {
vc.m.RLock()
entry := vc.cache[fieldIDPlus1]
vc.m.RUnlock()

entry.addRef()
}

func (vc *vecCache) update(fieldIDPlus1 uint16, index *faiss.IndexImpl) {
vc.m.Lock()
_, ok := vc.cache[fieldIDPlus1]
if !ok {
// initializing the alpha with 0.3 essentially means that we are favoring
// the history a little bit more relative to the current sample value.
// this makes the average to be kept above the threshold value for a
// longer time and thereby the index to be resident in the cache
// for longer time.
// todo: alpha to be experimented with different values
vc.cache[fieldIDPlus1] = initCacheEntry(index, 0.3)
}
vc.m.Unlock()
}

func (e *ewma) add(val uint64) {
if e.avg == 0.0 {
e.avg = float64(val)
} else {
// the exponentially weighted moving average
// X(t) = a.v + (1 - a).X(t-1)
e.avg = e.alpha*float64(val) + (1-e.alpha)*e.avg
}
}

func initCacheEntry(index *faiss.IndexImpl, alpha float64) *cacheEntry {
vc := &cacheEntry{
index: index,
closeCh: make(chan struct{}),
cacheMonitor: &ewma{},
}
vc.cacheMonitor.alpha = alpha
go vc.monitor()

// initing the sample to be 16 for now. more like a cold start to the monitor
// with a large enough value so that we don't immediately evict the index
atomic.StoreUint64(&vc.cacheMonitor.sample, 16)
return vc
}

func (vc *cacheEntry) addRef() {
// every access to the cache entry is accumulated as part of a sample
// which will be used to calculate the average in the next cycle of average
// computation
atomic.AddUint64(&vc.cacheMonitor.sample, 1)
}

func (vc *cacheEntry) closeIndex() {
vc.m.Lock()
vc.index.Close()
vc.index = nil
vc.m.Unlock()
}

func (vc *cacheEntry) monitor() {
// a timer to determing the frequency at which exponentially weighted
// moving average is computed
ticker := time.NewTicker(1 * time.Second)
var sample uint64
for {
select {
case <-vc.closeCh:
vc.closeIndex()
return
case <-ticker.C:
sample = atomic.LoadUint64(&vc.cacheMonitor.sample)
vc.cacheMonitor.add(sample)
// the comparison threshold as of now is (1 - a). mathematically it
// means that there is only 1 query per second on average as per history.
// and in the current second, there were no queries performed against
// this index.
// todo: threshold needs better experimentation. affects the time for
// which the index is resident in the cache.
if vc.cacheMonitor.avg <= (1 - vc.cacheMonitor.alpha) {
atomic.StoreUint64(&vc.cacheMonitor.sample, 0)
vc.closeIndex()
return
}
atomic.StoreUint64(&vc.cacheMonitor.sample, 0)
}
}
}
12 changes: 6 additions & 6 deletions faiss_vector_posting.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,8 @@ func (sb *SegmentBase) InterpretVectorIndex(field string) (segment.VectorIndex,
return rv, nil
},
close: func() {
if vecIndex != nil {
vecIndex.Close()
}
// skipping the closing for now because the index is cached
// todo: subjected to change based on a optimization type flag
},
size: func() uint64 {
if vecIndex != nil {
Expand Down Expand Up @@ -394,12 +393,13 @@ func (sb *SegmentBase) InterpretVectorIndex(field string) (segment.VectorIndex,
vecDocIDMap[vecID] = uint32(docID)
}

// todo: not a good idea to cache the vector index perhaps, since it could be quite huge.
indexSize, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64])
pos += n
indexBytes := sb.mem[pos : pos+int(indexSize)]

// todo: whether to cache the index or not can be determined by using the
// index optimization type flag.
vecIndex, err = sb.vectorCache.checkCacheForVecIndex(fieldIDPlus1, sb.mem[pos:pos+int(indexSize)])
pos += int(indexSize)

vecIndex, err = faiss.ReadIndexFromBuffer(indexBytes, faiss.IOFlagReadOnly)
return wrapVecIndex, err
}
15 changes: 11 additions & 4 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,11 @@ func (*ZapPlugin) Open(path string) (segment.Segment, error) {

rv := &Segment{
SegmentBase: SegmentBase{
fieldsMap: make(map[string]uint16),
fieldFSTs: make(map[uint16]*vellum.FST),
fieldsMap: make(map[string]uint16),
fieldFSTs: make(map[uint16]*vellum.FST),
vectorCache: &vecCache{
cache: make(map[uint16]*cacheEntry),
},
fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)),
},
f: f,
Expand All @@ -81,7 +84,6 @@ func (*ZapPlugin) Open(path string) (segment.Segment, error) {
_ = rv.Close()
return nil, err
}

return rv, nil
}

Expand Down Expand Up @@ -110,6 +112,10 @@ type SegmentBase struct {

m sync.Mutex
fieldFSTs map[uint16]*vellum.FST

// fixme: although the operations and APIs of the vecCache is hidden under
// the vectors tag, the type should also perhaps be under the vectors tag.
vectorCache *vecCache
}

func (sb *SegmentBase) Size() int {
Expand Down Expand Up @@ -146,7 +152,7 @@ func (sb *SegmentBase) updateSize() {

func (sb *SegmentBase) AddRef() {}
func (sb *SegmentBase) DecRef() (err error) { return nil }
func (sb *SegmentBase) Close() (err error) { return nil }
func (sb *SegmentBase) Close() (err error) { sb.vectorCache.Clear(); return nil }

// Segment implements a persisted segment.Segment interface, by
// embedding an mmap()'ed SegmentBase.
Expand Down Expand Up @@ -640,6 +646,7 @@ func (s *Segment) closeActual() (err error) {
err = err2
}
}
s.vectorCache.Clear()
return
}

Expand Down
31 changes: 31 additions & 0 deletions vector_cache_noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) 2023 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !vectors
// +build !vectors

package zap

import (
"sync"
)

type vecCache struct {
m sync.RWMutex
cache map[uint16]*cacheEntry
}

func (v *vecCache) Clear() {}

type cacheEntry struct{}

0 comments on commit 068f7af

Please sign in to comment.