Skip to content

Commit

Permalink
Merge branch 'main' into feature/api/update-timestamp
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <kpango@vdaas.org>
  • Loading branch information
kpango committed Sep 11, 2024
2 parents 0c0e5b3 + c3c5765 commit ec4c92c
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 28 deletions.
1 change: 1 addition & 0 deletions .github/workflows/labeler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ jobs:
- uses: actions/labeler@v5
with:
repo-token: "${{ secrets.GITHUB_TOKEN }}"
configuration-path: ".github/labeler.yaml"
- name: Add labels
run: |
pr_num=`cat $GITHUB_EVENT_PATH | jq -r ".number"`
Expand Down
6 changes: 6 additions & 0 deletions pkg/agent/core/ngt/handler/grpc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func (s *server) CreateIndex(
},
}, info.Get())...)
code = codes.FailedPrecondition
case errors.Is(err, errors.ErrFlushingIsInProgress):
err = status.WrapWithAborted("CreateIndex API aborted to process create indexes request due to flushing indices is in progress", err, details...)
code = codes.Aborted
case errors.Is(err, context.Canceled):
err = status.WrapWithCanceled(fmt.Sprintf("CreateIndex API canceled to create indexes pool_size = %d, error: %v", c.GetPoolSize(), err), err, details...)
code = codes.Canceled
Expand Down Expand Up @@ -149,6 +152,9 @@ func (s *server) CreateAndSaveIndex(
},
}, info.Get())...)
code = codes.FailedPrecondition
case errors.Is(err, errors.ErrFlushingIsInProgress):
err = status.WrapWithAborted("CreateAndSaveIndex API aborted to process create indexes request due to flushing indices is in progress", err, details...)
code = codes.Aborted
case errors.Is(err, context.Canceled):
err = status.WrapWithCanceled(fmt.Sprintf("CreateAndSaveIndex API canceled to create indexes pool_size = %d, error: %v", c.GetPoolSize(), err), err, details...)
code = codes.Canceled
Expand Down
73 changes: 45 additions & 28 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,26 @@ func newNGT(cfg *config.NGT, opts ...Option) (n *ngt, err error) {
return n, nil
}

func (n *ngt) copyNGT(src *ngt) {
// instances
n.core = src.core
n.kvs = src.kvs
n.fmap = src.fmap
n.vq = src.vq

// counters
n.wfci = src.wfci
n.nobic = src.nobic
n.nopvq = atomic.Uint64{}

// paths
n.path = src.path
n.tmpPath = src.tmpPath
n.oldPath = src.oldPath
n.basePath = src.basePath
n.brokenPath = src.brokenPath
}

// migrate migrates the index directory from old to new under the input path if necessary.
// Migration happens when the path is not empty and there is no `path/origin` directory,
// which indicates that the user has NOT been using CoW mode and the index directory is not migrated yet.
Expand Down Expand Up @@ -912,7 +932,7 @@ func (n *ngt) Start(ctx context.Context) <-chan error {
}
return ctx.Err()
case <-tick.C:
if n.vq.IVQLen() >= n.alen {
if n.vq != nil && !n.IsFlushing() && n.vq.IVQLen() >= n.alen {
err = n.CreateIndex(ctx, n.poolSize)
}
case <-limit.C:
Expand Down Expand Up @@ -1258,14 +1278,12 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) {
if err != nil {
log.Errorf("failed to flushing vector to ngt index in delete kvs. error: %v", err)
}
n.kvs = nil
n.vq = nil

// gc
runtime.GC()
atomic.AddUint64(&n.nogce, 1)

if n.inMem {
if !n.inMem {
// delete file
err = file.DeleteDir(ctx, n.path)
if err != nil {
Expand All @@ -1281,30 +1299,14 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) {
}
}

nkvs := kvs.New(kvs.WithConcurrency(n.kvsdbConcurrency))

nvq, err := vqueue.New()
if err != nil {
log.Errorf("failed to create new vector vector queue. error: %v", err)
}

// renew instance
nn, err := newNGT(n.cfg, n.opts...)
if err != nil {
return err
}
nn.kvs = nkvs
nn.vq = nvq

// Regenerate with flags set
nn.flushing.Store(true)
nn.indexing.Store(true)
defer nn.flushing.Store(false)
defer nn.indexing.Store(false)
n.copyNGT(nn)

n = nn

return nil
return n.loadStatistics()
}

func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) {
Expand All @@ -1315,8 +1317,11 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) {
}
}()

if n.isReadReplica {
switch {
case n.isReadReplica:
return errors.ErrWriteOperationToReadReplica
case n.IsFlushing():
return errors.ErrFlushingIsInProgress
}

ic := n.vq.IVQLen() + n.vq.DVQLen()
Expand Down Expand Up @@ -1445,6 +1450,10 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) {
return err
}
}
return n.loadStatistics()
}

func (n *ngt) loadStatistics() error {
if n.IsStatisticsEnabled() {
log.Info("loading index statistics to cache")
stats, err := n.core.GetGraphStatistics(core.AdditionalStatistics)
Expand Down Expand Up @@ -1488,8 +1497,7 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) {
IndegreeHistogram: stats.IndegreeHistogram,
})
}

return err
return nil
}

func (n *ngt) removeInvalidIndex(ctx context.Context) {
Expand Down Expand Up @@ -1922,15 +1930,24 @@ func (n *ngt) gc() {
}

func (n *ngt) Len() uint64 {
return n.kvs.Len()
if n.kvs != nil && !n.IsFlushing() {
return n.kvs.Len()
}
return 0
}

func (n *ngt) InsertVQueueBufferLen() uint64 {
return uint64(n.vq.IVQLen())
if n.vq != nil && !n.IsFlushing() {
return uint64(n.vq.IVQLen())
}
return 0
}

func (n *ngt) DeleteVQueueBufferLen() uint64 {
return uint64(n.vq.DVQLen())
if n.vq != nil && !n.IsFlushing() {
return uint64(n.vq.DVQLen())
}
return 0
}

func (n *ngt) GetDimensionSize() int {
Expand Down

0 comments on commit ec4c92c

Please sign in to comment.