diff --git a/internal/core/algorithm/ngt/ngt.go b/internal/core/algorithm/ngt/ngt.go index 9c09d650edf..2fef9de1f11 100644 --- a/internal/core/algorithm/ngt/ngt.go +++ b/internal/core/algorithm/ngt/ngt.go @@ -36,6 +36,7 @@ import ( "github.com/vdaas/vald/internal/file" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/sync" + "github.com/vdaas/vald/internal/sync/singleflight" ) type ( @@ -85,7 +86,7 @@ type ( // GetVector returns vector stored in NGT index. GetVector(id uint) ([]float32, error) - GetGraphStatistics(m statisticsType) (stats *GraphStatistics, err error) + GetGraphStatistics(ctx context.Context, m statisticsType) (stats *GraphStatistics, err error) // GetProperty returns NGT Index Property. GetProperty() (*Property, error) @@ -114,8 +115,10 @@ type ( epl uint64 // NGT error buffer pool size limit index C.NGTIndex ospace C.NGTObjectSpace + group singleflight.Group[*GraphStatistics] mu *sync.RWMutex cmu *sync.RWMutex + smu *sync.Mutex } ngtError struct { @@ -479,6 +482,7 @@ func gen(isLoad bool, opts ...Option) (NGT, error) { ) n.mu = new(sync.RWMutex) n.cmu = new(sync.RWMutex) + n.smu = new(sync.Mutex) defer func() { if err != nil { @@ -1079,36 +1083,39 @@ func (n *ngt) Close() { } func fromCGraphStatistics(cstats *C.NGTGraphStatistics) *GraphStatistics { + if cstats == nil { + return nil + } goStats := &GraphStatistics{ - NumberOfObjects: uint64(cstats.numberOfObjects), - NumberOfIndexedObjects: uint64(cstats.numberOfIndexedObjects), - SizeOfObjectRepository: uint64(cstats.sizeOfObjectRepository), - SizeOfRefinementObjectRepository: uint64(cstats.sizeOfRefinementObjectRepository), - NumberOfRemovedObjects: uint64(cstats.numberOfRemovedObjects), - NumberOfNodes: uint64(cstats.numberOfNodes), - NumberOfEdges: uint64(cstats.numberOfEdges), + C1Indegree: float64(cstats.c1Indegree), + C5Indegree: float64(cstats.c5Indegree), + C95Outdegree: float64(cstats.c95Outdegree), + C99Outdegree: float64(cstats.c99Outdegree), + MaxNumberOfIndegree: uint64(cstats.maxNumberOfIndegree), + MaxNumberOfOutdegree: uint64(cstats.maxNumberOfOutdegree), MeanEdgeLength: float64(cstats.meanEdgeLength), + MeanEdgeLengthFor10Edges: float64(cstats.meanEdgeLengthFor10Edges), + MeanIndegreeDistanceFor10Edges: float64(cstats.meanIndegreeDistanceFor10Edges), MeanNumberOfEdgesPerNode: float64(cstats.meanNumberOfEdgesPerNode), - NumberOfNodesWithoutEdges: uint64(cstats.numberOfNodesWithoutEdges), - MaxNumberOfOutdegree: uint64(cstats.maxNumberOfOutdegree), - MinNumberOfOutdegree: uint64(cstats.minNumberOfOutdegree), - NumberOfNodesWithoutIndegree: uint64(cstats.numberOfNodesWithoutIndegree), - MaxNumberOfIndegree: uint64(cstats.maxNumberOfIndegree), + MedianIndegree: int32(cstats.medianIndegree), + MedianOutdegree: int32(cstats.medianOutdegree), MinNumberOfIndegree: uint64(cstats.minNumberOfIndegree), - MeanEdgeLengthFor10Edges: float64(cstats.meanEdgeLengthFor10Edges), + MinNumberOfOutdegree: uint64(cstats.minNumberOfOutdegree), + ModeIndegree: uint64(cstats.modeIndegree), + ModeOutdegree: uint64(cstats.modeOutdegree), NodesSkippedFor10Edges: uint64(cstats.nodesSkippedFor10Edges), - MeanIndegreeDistanceFor10Edges: float64(cstats.meanIndegreeDistanceFor10Edges), NodesSkippedForIndegreeDistance: uint64(cstats.nodesSkippedForIndegreeDistance), - VarianceOfOutdegree: float64(cstats.varianceOfOutdegree), + NumberOfEdges: uint64(cstats.numberOfEdges), + NumberOfIndexedObjects: uint64(cstats.numberOfIndexedObjects), + NumberOfNodes: uint64(cstats.numberOfNodes), + NumberOfNodesWithoutEdges: uint64(cstats.numberOfNodesWithoutEdges), + NumberOfNodesWithoutIndegree: uint64(cstats.numberOfNodesWithoutIndegree), + NumberOfObjects: uint64(cstats.numberOfObjects), + NumberOfRemovedObjects: uint64(cstats.numberOfRemovedObjects), + SizeOfObjectRepository: uint64(cstats.sizeOfObjectRepository), + SizeOfRefinementObjectRepository: uint64(cstats.sizeOfRefinementObjectRepository), VarianceOfIndegree: float64(cstats.varianceOfIndegree), - MedianOutdegree: int32(cstats.medianOutdegree), - ModeOutdegree: uint64(cstats.modeOutdegree), - C95Outdegree: float64(cstats.c95Outdegree), - C99Outdegree: float64(cstats.c99Outdegree), - MedianIndegree: int32(cstats.medianIndegree), - ModeIndegree: uint64(cstats.modeIndegree), - C5Indegree: float64(cstats.c5Indegree), - C1Indegree: float64(cstats.c1Indegree), + VarianceOfOutdegree: float64(cstats.varianceOfOutdegree), Valid: bool(cstats.valid), } @@ -1139,22 +1146,40 @@ func fromCGraphStatistics(cstats *C.NGTGraphStatistics) *GraphStatistics { return goStats } -func (n *ngt) GetGraphStatistics(m statisticsType) (stats *GraphStatistics, err error) { - var mode rune - switch m { - case NormalStatistics: - mode = '-' - case AdditionalStatistics: - mode = 'a' - } - ne := n.GetErrorBuffer() - cstats := C.ngt_get_graph_statistics(n.index, C.char(mode), C.size_t(n.ces), ne.err) - if !cstats.valid { - return nil, n.newGoError(ne) +func (n *ngt) GetGraphStatistics( + ctx context.Context, m statisticsType, +) (stats *GraphStatistics, err error) { + var shared bool + stats, shared, err = n.group.Do(ctx, "GetGraphStatistics", func(context.Context) (*GraphStatistics, error) { + n.smu.Lock() + defer n.smu.Unlock() + var mode rune + switch m { + case NormalStatistics: + mode = '-' + case AdditionalStatistics: + mode = 'a' + } + ne := n.GetErrorBuffer() + cstats := C.ngt_get_graph_statistics(n.index, C.char(mode), C.size_t(n.ces), ne.err) + if !cstats.valid { + return nil, n.newGoError(ne) + } + n.PutErrorBuffer(ne) + defer C.ngt_free_graph_statistics(&cstats) + s := fromCGraphStatistics(&cstats) + if s == nil { + return nil, errors.ErrNGTIndexStatisticsNotReady + } + return s, nil + }) + if err != nil { + if shared && !errors.Is(err, errors.ErrNGTIndexStatisticsNotReady) { + return n.GetGraphStatistics(ctx, m) + } + return nil, err } - n.PutErrorBuffer(ne) - defer C.ngt_free_graph_statistics(&cstats) - return fromCGraphStatistics(&cstats), nil + return stats, nil } func (n *ngt) GetProperty() (prop *Property, err error) { diff --git a/pkg/agent/core/ngt/service/ngt.go b/pkg/agent/core/ngt/service/ngt.go index 8a16fe978fd..111b3ba1126 100644 --- a/pkg/agent/core/ngt/service/ngt.go +++ b/pkg/agent/core/ngt/service/ngt.go @@ -1306,7 +1306,7 @@ func (n *ngt) RegenerateIndexes(ctx context.Context) (err error) { } n.copyNGT(nn) - return n.loadStatistics() + return n.loadStatistics(ctx) } func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { @@ -1450,13 +1450,13 @@ func (n *ngt) CreateIndex(ctx context.Context, poolSize uint32) (err error) { return err } } - return n.loadStatistics() + return n.loadStatistics(ctx) } -func (n *ngt) loadStatistics() error { +func (n *ngt) loadStatistics(ctx context.Context) error { if n.IsStatisticsEnabled() { log.Info("loading index statistics to cache") - stats, err := n.core.GetGraphStatistics(core.AdditionalStatistics) + stats, err := n.core.GetGraphStatistics(ctx, core.AdditionalStatistics) if err != nil { log.Errorf("failed to load index statistics to cache: %v", err) return err