Skip to content

Commit

Permalink
addressing review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Thejas-bhat committed Apr 5, 2024
1 parent e3b709f commit eb696f4
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 39 deletions.
2 changes: 1 addition & 1 deletion build.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32,
docValueOffset: 0, // docValueOffsets identified automatically by the section
dictLocs: dictLocs,
fieldFSTs: make(map[uint16]*vellum.FST),
vectorCache: newVectorCache(),
vectorCache: newVectorIndexCache(),
}
sb.updateSize()

Expand Down
45 changes: 17 additions & 28 deletions faiss_vector_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
faiss "github.com/blevesearch/go-faiss"
)

type vecCache struct {
type vecIndexCache struct {
closeCh chan struct{}
m sync.RWMutex
cache map[uint16]*cacheEntry
Expand All @@ -44,24 +44,23 @@ type cacheEntry struct {
index *faiss.IndexImpl
}

func newVectorCache() *vecCache {
return &vecCache{
func newVectorIndexCache() *vecIndexCache {
return &vecIndexCache{
cache: make(map[uint16]*cacheEntry),
closeCh: make(chan struct{}),
}
}

func (vc *vecCache) Clear() {
func (vc *vecIndexCache) Clear() {
vc.m.Lock()
close(vc.closeCh)
vc.cache = nil
vc.m.Unlock()

}

func (vc *vecCache) checkCacheForVecIndex(fieldID uint16,
func (vc *vecIndexCache) loadVectorIndex(fieldID uint16,
indexBytes []byte) (vecIndex *faiss.IndexImpl, err error) {
cachedIndex, present := vc.isVecIndexCached(fieldID)
cachedIndex, present := vc.isIndexCached(fieldID)
if present {
vecIndex = cachedIndex
vc.addRef(fieldID)
Expand All @@ -74,7 +73,7 @@ func (vc *vecCache) checkCacheForVecIndex(fieldID uint16,
return vecIndex, err
}

func (vc *vecCache) isVecIndexCached(fieldID uint16) (*faiss.IndexImpl, bool) {
func (vc *vecIndexCache) isIndexCached(fieldID uint16) (*faiss.IndexImpl, bool) {
vc.m.RLock()
entry, present := vc.cache[fieldID]
vc.m.RUnlock()
Expand All @@ -89,30 +88,17 @@ func (vc *vecCache) isVecIndexCached(fieldID uint16) (*faiss.IndexImpl, bool) {
return rv, present && (rv != nil)
}

func (vc *vecCache) addRef(fieldIDPlus1 uint16) {
func (vc *vecIndexCache) addRef(fieldIDPlus1 uint16) {
vc.m.RLock()
entry := vc.cache[fieldIDPlus1]
vc.m.RUnlock()

entry.addRef()
}

func (vc *vecCache) clearField(fieldIDPlus1 uint16) {
func (vc *vecIndexCache) refresh() (rv int) {
vc.m.Lock()
delete(vc.cache, fieldIDPlus1)
vc.m.Unlock()
}

func (vc *vecCache) refreshEntries() (rv int) {
vc.m.RLock()
cache := vc.cache
vc.m.RUnlock()

defer func() {
vc.m.RLock()
rv = len(vc.cache)
vc.m.RUnlock()
}()

// for every field reconcile the average with the current sample values
for fieldIDPlus1, entry := range cache {
Expand All @@ -125,22 +111,25 @@ func (vc *vecCache) refreshEntries() (rv int) {
if entry.tracker.avg <= (1 - entry.tracker.alpha) {
atomic.StoreUint64(&entry.tracker.sample, 0)
entry.closeIndex()
vc.clearField(fieldIDPlus1)
delete(vc.cache, fieldIDPlus1)
continue
}
atomic.StoreUint64(&entry.tracker.sample, 0)
}
return

rv = len(vc.cache)
vc.m.Unlock()
return rv
}

func (vc *vecCache) monitor() {
func (vc *vecIndexCache) monitor() {
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-vc.closeCh:
return
case <-ticker.C:
numEntries := vc.refreshEntries()
numEntries := vc.refresh()
if numEntries == 0 {
// no entries to be monitored, exit
return
Expand All @@ -149,7 +138,7 @@ func (vc *vecCache) monitor() {
}
}

func (vc *vecCache) update(fieldIDPlus1 uint16, index *faiss.IndexImpl) {
func (vc *vecIndexCache) update(fieldIDPlus1 uint16, index *faiss.IndexImpl) {
vc.m.Lock()

// the first time we've hit the cache, try to spawn a monitoring routine
Expand Down
6 changes: 3 additions & 3 deletions faiss_vector_cache_noop.go → faiss_vector_cache_nosup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package zap

type vecCache struct {
type vecIndexCache struct {
}

func newVectorCache() *vecCache {
func newVectorIndexCache() *vecIndexCache {
return nil
}

func (v *vecCache) Clear() {}
func (v *vecIndexCache) Clear() {}
8 changes: 3 additions & 5 deletions faiss_vector_posting.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,8 @@ func (sb *SegmentBase) InterpretVectorIndex(field string, except *roaring.Bitmap
return rv, nil
},
close: func() {
// skipping the closing for now because the index is cached
// todo: subjected to change based on a optimization type flag
// skipping the closing because the index is cached and it's being
// deferred to a later point of time.
},
size: func() uint64 {
if vecIndex != nil {
Expand Down Expand Up @@ -394,9 +394,7 @@ func (sb *SegmentBase) InterpretVectorIndex(field string, except *roaring.Bitmap
indexSize, n := binary.Uvarint(sb.mem[pos : pos+binary.MaxVarintLen64])
pos += n

// todo: whether to cache the index or not can be determined by using the
// index optimization type flag.
vecIndex, err = sb.vectorCache.checkCacheForVecIndex(fieldIDPlus1, sb.mem[pos:pos+int(indexSize)])
vecIndex, err = sb.vectorCache.loadVectorIndex(fieldIDPlus1, sb.mem[pos:pos+int(indexSize)])
pos += int(indexSize)

return wrapVecIndex, err
Expand Down
4 changes: 2 additions & 2 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (*ZapPlugin) Open(path string) (segment.Segment, error) {
SegmentBase: SegmentBase{
fieldsMap: make(map[string]uint16),
fieldFSTs: make(map[uint16]*vellum.FST),
vectorCache: newVectorCache(),
vectorCache: newVectorIndexCache(),
fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)),
},
f: f,
Expand Down Expand Up @@ -113,7 +113,7 @@ type SegmentBase struct {

// fixme: although the operations and APIs of the vecCache is hidden under
// the vectors tag, the type should also perhaps be under the vectors tag.
vectorCache *vecCache
vectorCache *vecIndexCache
}

func (sb *SegmentBase) Size() int {
Expand Down

0 comments on commit eb696f4

Please sign in to comment.