Skip to content

Commit

Permalink
Bugfix NGT flush logic (#2598) (#2606)
Browse files Browse the repository at this point in the history
* fix: bugfix flush logic



* fix: nil check for flushing



* fix: add flush check logic



* fix: nil check bug



* fix: add nil check



* fix: return err when the flush process is executing



* fix: add error check for flushing



* fix: error message



* fix: disable kvs and vqueue initialization



* fix: disable commentout



* fix: disable kvs and vq



* fix: nil set to kvs and vq



* fix: copy ngt service object for flushing



* fix: deleted unnecessary nil check



* fix: variable name



---------

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>
Co-authored-by: Hiroto Funakoshi <hiroto.funakoshi.hiroto@gmail.com>
Co-authored-by: Yusuke Kato <kpango@vdaas.org>
Co-authored-by: Kiichiro YUKAWA <kyukawa315@gmail.com>
  • Loading branch information
4 people authored Sep 11, 2024
1 parent 626873f commit d6f600a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 28 deletions.
6 changes: 6 additions & 0 deletions pkg/agent/core/ngt/handler/grpc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func (s *server) CreateIndex(
},
}, info.Get())...)
code = codes.FailedPrecondition
case errors.Is(err, errors.ErrFlushingIsInProgress):
err = status.WrapWithAborted("CreateIndex API aborted to process create indexes request due to flushing indices is in progress", err, details...)
code = codes.Aborted
case errors.Is(err, context.Canceled):
err = status.WrapWithCanceled(fmt.Sprintf("CreateIndex API canceled to create indexes pool_size = %d, error: %v", c.GetPoolSize(), err), err, details...)
code = codes.Canceled
Expand Down Expand Up @@ -149,6 +152,9 @@ func (s *server) CreateAndSaveIndex(
},
}, info.Get())...)
code = codes.FailedPrecondition
case errors.Is(err, errors.ErrFlushingIsInProgress):
err = status.WrapWithAborted("CreateAndSaveIndex API aborted to process create indexes request due to flushing indices is in progress", err, details...)
code = codes.Aborted
case errors.Is(err, context.Canceled):
err = status.WrapWithCanceled(fmt.Sprintf("CreateAndSaveIndex API canceled to create indexes pool_size = %d, error: %v", c.GetPoolSize(), err), err, details...)
code = codes.Canceled
Expand Down
73 changes: 45 additions & 28 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,26 @@ func newNGT(cfg *config.NGT, opts ...Option) (n *ngt, err error) {
return n, nil
}

func (n *ngt) copyNGT(src *ngt) {
// instances
n.core = src.core
n.kvs = src.kvs
n.fmap = src.fmap
n.vq = src.vq

// counters
n.wfci = src.wfci
n.nobic = src.nobic
n.nopvq = atomic.Uint64{}

// paths
n.path = src.path
n.tmpPath = src.tmpPath
n.oldPath = src.oldPath
n.basePath = src.basePath
n.brokenPath = src.brokenPath
}

// migrate migrates the index directory from old to new under the input path if necessary.
// Migration happens when the path is not empty and there is no `path/origin` directory,
// which indicates that the user has NOT been using CoW mode and the index directory is not migrated yet.
Expand Down Expand Up @@ -908,7 +928,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error {
}
return ctx.Err()
case <-tick.C:
if n.vq.IVQLen() >= n.alen {
if n.vq != nil && !n.IsFlushing() && n.vq.IVQLen() >= n.alen {
err = n.CreateIndex(ctx, n.poolSize)
}
case <-limit.C:
Expand Down Expand Up @@ -1242,14 +1262,12 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) {
if err != nil {
log.Errorf("failed to flushing vector to ngt index in delete kvs. error: %v", err)
}
n.kvs = nil
n.vq = nil

// gc
runtime.GC()
atomic.AddUint64(&n.nogce, 1)

if n.inMem {
if !n.inMem {
// delete file
err = file.DeleteDir(ctx, n.path)
if err != nil {
Expand All @@ -1265,30 +1283,14 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) {
}
}

nkvs := kvs.New(kvs.WithConcurrency(n.kvsdbConcurrency))

nvq, err := vqueue.New()
if err != nil {
log.Errorf("failed to create new vector vector queue. error: %v", err)
}

// renew instance
nn, err := newNGT(n.cfg, n.opts...)
if err != nil {
return err
}
nn.kvs = nkvs
nn.vq = nvq

// Regenerate with flags set
nn.flushing.Store(true)
nn.indexing.Store(true)
defer nn.flushing.Store(false)
defer nn.indexing.Store(false)
n.copyNGT(nn)

n = nn

return nil
return n.loadStatistics()
}

func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) {
Expand All @@ -1299,8 +1301,11 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) {
}
}()

if n.isReadReplica {
switch {
case n.isReadReplica:
return errors.ErrWriteOperationToReadReplica
case n.IsFlushing():
return errors.ErrFlushingIsInProgress
}

ic := n.vq.IVQLen() + n.vq.DVQLen()
Expand Down Expand Up @@ -1428,6 +1433,10 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) {
return err
}
}
return n.loadStatistics()
}

func (n *ngt) loadStatistics() error {
if n.IsStatisticsEnabled() {
log.Info("loading index statistics to cache")
stats, err := n.core.GetGraphStatistics(core.AdditionalStatistics)
Expand Down Expand Up @@ -1471,8 +1480,7 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) {
IndegreeHistogram: stats.IndegreeHistogram,
})
}

return err
return nil
}

func (n *ngt) removeInvalidIndex(ctx context.Context) {
Expand Down Expand Up @@ -1941,15 +1949,24 @@ func (n *ngt) gc() {
}

func (n *ngt) Len() uint64 {
return n.kvs.Len()
if n.kvs != nil && !n.IsFlushing() {
return n.kvs.Len()
}
return 0
}

func (n *ngt) InsertVQueueBufferLen() uint64 {
return uint64(n.vq.IVQLen())
if n.vq != nil && !n.IsFlushing() {
return uint64(n.vq.IVQLen())
}
return 0
}

func (n *ngt) DeleteVQueueBufferLen() uint64 {
return uint64(n.vq.DVQLen())
if n.vq != nil && !n.IsFlushing() {
return uint64(n.vq.DVQLen())
}
return 0
}

func (n *ngt) GetDimensionSize() int {
Expand Down

0 comments on commit d6f600a

Please sign in to comment.