Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix NGT flush logic #2598

Merged
merged 17 commits into from
Sep 11, 2024
6 changes: 6 additions & 0 deletions pkg/agent/core/ngt/handler/grpc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
},
}, info.Get())...)
code = codes.FailedPrecondition
case errors.Is(err, errors.ErrFlushingIsInProgress):
err = status.WrapWithAborted("CreateIndex API aborted to process create indexes request due to flushing indices is in progress", err, details...)
code = codes.Aborted

Check warning on line 69 in pkg/agent/core/ngt/handler/grpc/index.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/handler/grpc/index.go#L67-L69

Added lines #L67 - L69 were not covered by tests
case errors.Is(err, context.Canceled):
err = status.WrapWithCanceled(fmt.Sprintf("CreateIndex API canceled to create indexes pool_size = %d, error: %v", c.GetPoolSize(), err), err, details...)
code = codes.Canceled
Expand Down Expand Up @@ -149,6 +152,9 @@
},
}, info.Get())...)
code = codes.FailedPrecondition
case errors.Is(err, errors.ErrFlushingIsInProgress):
err = status.WrapWithAborted("CreateAndSaveIndex API aborted to process create indexes request due to flushing indices is in progress", err, details...)
code = codes.Aborted

Check warning on line 157 in pkg/agent/core/ngt/handler/grpc/index.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/handler/grpc/index.go#L155-L157

Added lines #L155 - L157 were not covered by tests
case errors.Is(err, context.Canceled):
err = status.WrapWithCanceled(fmt.Sprintf("CreateAndSaveIndex API canceled to create indexes pool_size = %d, error: %v", c.GetPoolSize(), err), err, details...)
code = codes.Canceled
Expand Down
73 changes: 45 additions & 28 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,26 @@
return n, nil
}

func (n *ngt) copyNGT(obj *ngt) {

Check warning on line 263 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L263

Added line #L263 was not covered by tests
// instances
n.core = obj.core
n.kvs = obj.kvs
n.fmap = obj.fmap
n.vq = obj.vq

Check warning on line 268 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L265-L268

Added lines #L265 - L268 were not covered by tests

// counters
n.wfci = obj.wfci
n.nobic = obj.nobic
n.nopvq = atomic.Uint64{}

Check warning on line 273 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L271-L273

Added lines #L271 - L273 were not covered by tests

// paths
n.path = obj.path
n.tmpPath = obj.tmpPath
n.oldPath = obj.oldPath
n.basePath = obj.basePath
n.brokenPath = obj.brokenPath

Check warning on line 280 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L276-L280

Added lines #L276 - L280 were not covered by tests
}

// migrate migrates the index directory from old to new under the input path if necessary.
// Migration happens when the path is not empty and there is no `path/origin` directory,
// which indicates that the user has NOT been using CoW mode and the index directory is not migrated yet.
Expand Down Expand Up @@ -908,7 +928,7 @@
}
return ctx.Err()
case <-tick.C:
if n.vq.IVQLen() >= n.alen {
if n.vq != nil && !n.IsFlushing() && n.vq.IVQLen() >= n.alen {

Check warning on line 931 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L931

Added line #L931 was not covered by tests
err = n.CreateIndex(ctx, n.poolSize)
}
case <-limit.C:
Expand Down Expand Up @@ -1242,14 +1262,12 @@
if err != nil {
log.Errorf("failed to flushing vector to ngt index in delete kvs. error: %v", err)
}
n.kvs = nil
n.vq = nil

// gc
runtime.GC()
atomic.AddUint64(&n.nogce, 1)

if n.inMem {
if !n.inMem {

Check warning on line 1270 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L1270

Added line #L1270 was not covered by tests
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
// delete file
err = file.DeleteDir(ctx, n.path)
if err != nil {
Expand All @@ -1265,30 +1283,14 @@
}
}

nkvs := kvs.New(kvs.WithConcurrency(n.kvsdbConcurrency))

nvq, err := vqueue.New()
if err != nil {
log.Errorf("failed to create new vector vector queue. error: %v", err)
}

// renew instance
nn, err := newNGT(n.cfg, n.opts...)
if err != nil {
return err
}
nn.kvs = nkvs
nn.vq = nvq

// Regenerate with flags set
nn.flushing.Store(true)
nn.indexing.Store(true)
defer nn.flushing.Store(false)
defer nn.indexing.Store(false)
n.copyNGT(nn)

Check warning on line 1291 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L1291

Added line #L1291 was not covered by tests

n = nn

return nil
return n.loadStatistics()

Check warning on line 1293 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L1293

Added line #L1293 was not covered by tests
}

func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) {
Expand All @@ -1299,8 +1301,11 @@
}
}()

if n.isReadReplica {
switch {
case n.isReadReplica:
return errors.ErrWriteOperationToReadReplica
case n.IsFlushing():
return errors.ErrFlushingIsInProgress

Check warning on line 1308 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L1307-L1308

Added lines #L1307 - L1308 were not covered by tests
}

ic := n.vq.IVQLen() + n.vq.DVQLen()
Comment on lines +1307 to 1311
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note

Added logic to check if a vqueue reference could be nil if a flush is running.

Expand Down Expand Up @@ -1428,6 +1433,10 @@
return err
}
}
return n.loadStatistics()
}

func (n *ngt) loadStatistics() error {
if n.IsStatisticsEnabled() {
log.Info("loading index statistics to cache")
stats, err := n.core.GetGraphStatistics(core.AdditionalStatistics)
Expand Down Expand Up @@ -1471,8 +1480,7 @@
IndegreeHistogram: stats.IndegreeHistogram,
})
}

return err
return nil
}

func (n *ngt) removeInvalidIndex(ctx context.Context) {
Expand Down Expand Up @@ -1941,15 +1949,24 @@
}

func (n *ngt) Len() uint64 {
return n.kvs.Len()
if n.kvs != nil && !n.IsFlushing() {
return n.kvs.Len()
}
return 0

Check warning on line 1955 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L1955

Added line #L1955 was not covered by tests
}

func (n *ngt) InsertVQueueBufferLen() uint64 {
return uint64(n.vq.IVQLen())
if n.vq != nil && !n.IsFlushing() {
return uint64(n.vq.IVQLen())
}
return 0

Check warning on line 1962 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L1962

Added line #L1962 was not covered by tests
}

func (n *ngt) DeleteVQueueBufferLen() uint64 {
return uint64(n.vq.DVQLen())
if n.vq != nil && !n.IsFlushing() {
return uint64(n.vq.DVQLen())
}
return 0

Check warning on line 1969 in pkg/agent/core/ngt/service/ngt.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/service/ngt.go#L1969

Added line #L1969 was not covered by tests
}

func (n *ngt) GetDimensionSize() int {
Expand Down
Loading