From d82a5e6cdba8c1ae4de66d2ed54aac1279b8a1b5 Mon Sep 17 00:00:00 2001 From: Barnaby Keene <1636971+Southclaws@users.noreply.github.com> Date: Thu, 2 Jan 2025 18:23:17 +0000 Subject: [PATCH] rework reindexing and update pinecone dedupe - removed GetMany from semdex retrieval - implemented de-duplicate indexing for pinecone - simplified reindex process to rely on later de-dupe --- app/services/library/node_semdex/indexer.go | 12 +- app/services/library/node_semdex/reindexer.go | 12 +- app/services/semdex/disabled.go | 8 +- app/services/semdex/index_job/indexer.go | 6 +- app/services/semdex/semdex.go | 6 +- .../semdexer/chromem_semdexer/chromem.go | 18 +- .../semdexer/pinecone_semdexer/index.go | 186 +++++++++++------- .../semdexer/pinecone_semdexer/object.go | 32 ++- .../semdexer/pinecone_semdexer/pinecone.go | 29 ++- .../pinecone_semdexer/pinecone_test.go | 40 ++++ .../semdexer/pinecone_semdexer/recommend.go | 2 +- .../semdexer/pinecone_semdexer/search.go | 2 +- .../semdexer/weaviate_semdexer/delete.go | 8 +- .../semdexer/weaviate_semdexer/indexer.go | 8 +- .../semdexer/weaviate_semdexer/mapping.go | 17 -- .../semdexer/weaviate_semdexer/retrieval.go | 75 ------- app/services/thread/thread_semdex/indexer.go | 12 +- .../thread/thread_semdex/reindexer.go | 72 ++++--- .../thread/thread_semdex/thread_semdex.go | 4 +- go.mod | 18 +- go.sum | 20 ++ .../vector/pinecone/pinecone.go | 2 + 22 files changed, 322 insertions(+), 267 deletions(-) create mode 100644 app/services/semdex/semdexer/pinecone_semdexer/pinecone_test.go delete mode 100644 app/services/semdex/semdexer/weaviate_semdexer/retrieval.go diff --git a/app/services/library/node_semdex/indexer.go b/app/services/library/node_semdex/indexer.go index c78a3360..d0927dbb 100644 --- a/app/services/library/node_semdex/indexer.go +++ b/app/services/library/node_semdex/indexer.go @@ -19,14 +19,16 @@ func (i *semdexer) index(ctx context.Context, id library.NodeID) error { return fault.Wrap(err, fctx.With(ctx)) } - err = i.semdexMutator.Index(ctx, node) + updates, err := i.semdexMutator.Index(ctx, node) if err != nil { return fault.Wrap(err, fctx.With(ctx)) } - _, err = i.nodeWriter.Update(ctx, qk, node_writer.WithIndexed()) - if err != nil { - return fault.Wrap(err, fctx.With(ctx)) + if updates > 0 { + _, err = i.nodeWriter.Update(ctx, qk, node_writer.WithIndexed()) + if err != nil { + return fault.Wrap(err, fctx.With(ctx)) + } } return nil @@ -35,7 +37,7 @@ func (i *semdexer) index(ctx context.Context, id library.NodeID) error { func (i *semdexer) deindex(ctx context.Context, id library.NodeID) error { qk := library.NewID(xid.ID(id)) - err := i.semdexMutator.Delete(ctx, xid.ID(id)) + _, err := i.semdexMutator.Delete(ctx, xid.ID(id)) if err != nil { return fault.Wrap(err, fctx.With(ctx)) } diff --git a/app/services/library/node_semdex/reindexer.go b/app/services/library/node_semdex/reindexer.go index f8d2f9ec..6e33a9dd 100644 --- a/app/services/library/node_semdex/reindexer.go +++ b/app/services/library/node_semdex/reindexer.go @@ -11,7 +11,6 @@ import ( "github.com/samber/lo" "go.uber.org/zap" - "github.com/Southclaws/storyden/app/resources/datagraph" "github.com/Southclaws/storyden/app/resources/library" "github.com/Southclaws/storyden/app/resources/mq" "github.com/Southclaws/storyden/internal/ent" @@ -53,15 +52,8 @@ func (r *semdexer) reindex(ctx context.Context, reindexThreshold time.Duration, keepIDs := dt.Map(keep, func(p *ent.Node) xid.ID { return p.ID }) discardIDs := dt.Map(discard, func(p *ent.Node) xid.ID { return p.ID }) - indexed, err := r.semdexQuerier.GetMany(ctx, uint(reindexChunk), keepIDs...) - if err != nil { - return fault.Wrap(err, fctx.With(ctx)) - } - - indexedIDs := dt.Map(indexed, func(p *datagraph.Ref) xid.ID { return p.ID }) - - updated := diff(keepIDs, indexedIDs) - deleted := lo.Intersect(indexedIDs, discardIDs) + updated := keepIDs + deleted := discardIDs r.logger.Debug("reindexing nodes", zap.Int("all", len(nodes)), diff --git a/app/services/semdex/disabled.go b/app/services/semdex/disabled.go index a7a1011c..62489048 100644 --- a/app/services/semdex/disabled.go +++ b/app/services/semdex/disabled.go @@ -14,12 +14,12 @@ type Disabled struct{} var _ Semdexer = &Disabled{} -func (*Disabled) Index(ctx context.Context, object datagraph.Item) error { - return nil +func (*Disabled) Index(ctx context.Context, object datagraph.Item) (int, error) { + return 0, nil } -func (*Disabled) Delete(ctx context.Context, object xid.ID) error { - return nil +func (*Disabled) Delete(ctx context.Context, object xid.ID) (int, error) { + return 0, nil } func (*Disabled) Search(ctx context.Context, q string, p pagination.Parameters, opts searcher.Options) (*pagination.Result[datagraph.Item], error) { diff --git a/app/services/semdex/index_job/indexer.go b/app/services/semdex/index_job/indexer.go index 68a3206b..812e2b09 100644 --- a/app/services/semdex/index_job/indexer.go +++ b/app/services/semdex/index_job/indexer.go @@ -69,7 +69,8 @@ func (i *indexerConsumer) indexReply(ctx context.Context, id post.ID) error { return fault.Wrap(err, fctx.With(ctx)) } - return i.indexer.Index(ctx, p) + _, err = i.indexer.Index(ctx, p) + return err } func (i *indexerConsumer) indexProfile(ctx context.Context, id account.AccountID) error { @@ -78,5 +79,6 @@ func (i *indexerConsumer) indexProfile(ctx context.Context, id account.AccountID return fault.Wrap(err, fctx.With(ctx)) } - return i.indexer.Index(ctx, profile.ProfileFromAccount(p)) + _, err = i.indexer.Index(ctx, profile.ProfileFromAccount(p)) + return err } diff --git a/app/services/semdex/semdex.go b/app/services/semdex/semdex.go index 5ccad7c3..a5a2fce9 100644 --- a/app/services/semdex/semdex.go +++ b/app/services/semdex/semdex.go @@ -18,15 +18,13 @@ type Semdexer interface { } type Mutator interface { - Index(ctx context.Context, object datagraph.Item) error - Delete(ctx context.Context, object xid.ID) error + Index(ctx context.Context, object datagraph.Item) (int, error) + Delete(ctx context.Context, object xid.ID) (int, error) } type Querier interface { Searcher Recommender - - GetMany(ctx context.Context, limit uint, ids ...xid.ID) (datagraph.RefList, error) } type Chunk struct { diff --git a/app/services/semdex/semdexer/chromem_semdexer/chromem.go b/app/services/semdex/semdexer/chromem_semdexer/chromem.go index b35ae007..b3cdd465 100644 --- a/app/services/semdex/semdexer/chromem_semdexer/chromem.go +++ b/app/services/semdex/semdexer/chromem_semdexer/chromem.go @@ -52,18 +52,28 @@ func New(cfg config.Config, rh *hydrate.Hydrator, aip ai.Prompter) (semdex.Semde }, nil } -func (c *chromemRefIndex) Index(ctx context.Context, object datagraph.Item) error { - return c.c.AddDocument(ctx, chromem.Document{ +func (c *chromemRefIndex) Index(ctx context.Context, object datagraph.Item) (int, error) { + err := c.c.AddDocument(ctx, chromem.Document{ ID: object.GetID().String(), Content: object.GetContent().Plaintext(), Metadata: map[string]string{ "datagraph_kind": object.GetKind().String(), }, }) + if err != nil { + return 0, fault.Wrap(err, fctx.With(ctx)) + } + + return 1, nil } -func (c *chromemRefIndex) Delete(ctx context.Context, object xid.ID) error { - return c.c.Delete(ctx, nil, nil, object.String()) +func (c *chromemRefIndex) Delete(ctx context.Context, object xid.ID) (int, error) { + err := c.c.Delete(ctx, nil, nil, object.String()) + if err != nil { + return 0, fault.Wrap(err, fctx.With(ctx)) + } + + return 1, nil } func (c *chromemRefIndex) Search(ctx context.Context, q string, p pagination.Parameters, opts searcher.Options) (*pagination.Result[datagraph.Item], error) { diff --git a/app/services/semdex/semdexer/pinecone_semdexer/index.go b/app/services/semdex/semdexer/pinecone_semdexer/index.go index f278b0c7..634864b4 100644 --- a/app/services/semdex/semdexer/pinecone_semdexer/index.go +++ b/app/services/semdex/semdexer/pinecone_semdexer/index.go @@ -3,90 +3,73 @@ package pinecone_semdexer import ( "context" "runtime" - "sync" + "github.com/Southclaws/dt" "github.com/Southclaws/fault" "github.com/Southclaws/fault/fctx" + "github.com/alitto/pond/v2" "github.com/rs/xid" + "github.com/samber/lo" "google.golang.org/protobuf/types/known/structpb" "github.com/Southclaws/storyden/app/resources/datagraph" "github.com/Southclaws/storyden/internal/infrastructure/vector/pinecone" ) -func (c *pineconeSemdexer) Index(ctx context.Context, object datagraph.Item) error { - chunks := object.GetContent().Split() - - if len(chunks) == 0 { - return fault.New("no text chunks to index", fctx.With(ctx)) +func (s *pineconeSemdexer) Index(ctx context.Context, object datagraph.Item) (int, error) { + inserts, deletes, err := s.buildIndexOps(ctx, object) + if err != nil { + return 0, fault.Wrap(err, fctx.With(ctx)) } - numWorkers := min(runtime.NumCPU(), len(chunks)) - chunkQueue := make(chan string, len(chunks)) - errChan := make(chan error, len(chunks)) - chunkChan := make(chan *pinecone.Vector, len(chunks)) - - var wg sync.WaitGroup - - for i := 0; i < numWorkers; i++ { - wg.Add(1) - go func(workerID int) { - defer wg.Done() - for chunk := range chunkQueue { - vec, err := c.ef(ctx, chunk) - if err != nil { - errChan <- err - } - - objectID := object.GetID() - - metadata, err := structpb.NewStruct(map[string]any{ - "datagraph_id": objectID.String(), - "datagraph_type": object.GetKind().String(), - "name": object.GetName(), - "content": chunk, - }) - if err != nil { - errChan <- err - } - - chunkID := generateChunkID(objectID, chunk).String() - - chunkChan <- &pinecone.Vector{ - Id: chunkID, - Values: vec, - Metadata: metadata, - } - } - }(i) + if len(inserts) > 0 { + _, err = s.index.UpsertVectors(ctx, inserts) + if err != nil { + return 0, fault.Wrap(err, fctx.With(ctx)) + } } - go func() { - for _, chunk := range chunks { - chunkQueue <- chunk + if len(deletes) > 0 { + err = s.deleteVectors(ctx, deletes) + if err != nil { + return 0, fault.Wrap(err, fctx.With(ctx)) } - close(chunkQueue) - }() + } - go func() { - wg.Wait() + changes := len(inserts) - len(deletes) - close(errChan) - close(chunkChan) - }() + return changes, nil +} - for err := range errChan { - if err != nil { - return err - } +func (c *pineconeSemdexer) Delete(ctx context.Context, object xid.ID) (int, error) { + prefix := object.String() + vectors, err := c.index.ListVectors(ctx, &pinecone.ListVectorsRequest{ + Prefix: &prefix, + }) + if err != nil { + return 0, fault.Wrap(err, fctx.With(ctx)) } - var vecs []*pinecone.Vector - for vec := range chunkChan { - vecs = append(vecs, vec) + if len(vectors.VectorIds) == 0 { + return 0, nil } - _, err := c.index.UpsertVectors(ctx, vecs) + ids := dt.Map(vectors.VectorIds, func(id *string) string { return *id }) + + err = c.index.DeleteVectorsById(ctx, ids) + if err != nil { + return 0, fault.Wrap(err, fctx.With(ctx)) + } + + return len(ids), nil +} + +func (s *pineconeSemdexer) deleteVectors(ctx context.Context, ids []string) error { + if len(ids) == 0 { + return nil + } + + err := s.index.DeleteVectorsById(ctx, ids) if err != nil { return fault.Wrap(err, fctx.With(ctx)) } @@ -94,18 +77,83 @@ func (c *pineconeSemdexer) Index(ctx context.Context, object datagraph.Item) err return nil } -func (c *pineconeSemdexer) Delete(ctx context.Context, object xid.ID) error { - filter, err := structpb.NewStruct(map[string]any{ - "datagraph_id": object.String(), +func (s *pineconeSemdexer) buildIndexOps(ctx context.Context, object datagraph.Item) ([]*pinecone.Vector, []string, error) { + allChunks := chunksFor(object) + if len(allChunks) == 0 { + return nil, nil, nil + } + chunkIDs := dt.Map(allChunks, func(c chunk) string { return c.id }) + + objectID := object.GetID() + + inputChunkTable := lo.SliceToMap(allChunks, func(c chunk) (string, chunk) { + return c.id, c + }) + + prefix := objectID.String() + indexedChunk, err := s.index.ListVectors(ctx, &pinecone.ListVectorsRequest{ + Prefix: &prefix, }) if err != nil { - return fault.Wrap(err, fctx.With(ctx)) + return nil, nil, fault.Wrap(err, fctx.With(ctx)) } - err = c.index.DeleteVectorsByFilter(ctx, filter) + vecids := dt.Map(indexedChunk.VectorIds, func(id *string) string { return *id }) + + indexedChunkTable, err := s.index.FetchVectors(ctx, vecids) if err != nil { - return fault.Wrap(err, fctx.With(ctx)) + return nil, nil, fault.Wrap(err, fctx.With(ctx)) } - return nil + pool := pond.NewResultPool[*pinecone.Vector](min(runtime.NumCPU(), len(chunkIDs))) + group := pool.NewGroupContext(ctx) + + for id, chunk := range inputChunkTable { + _, exists := indexedChunkTable.Vectors[id] + if exists { + continue + } + + group.SubmitErr(func() (*pinecone.Vector, error) { + vec, err := s.ef(ctx, chunk.content) + if err != nil { + return nil, err + } + + metadata, err := structpb.NewStruct(map[string]any{ + "datagraph_id": objectID.String(), + "datagraph_type": object.GetKind().String(), + "name": object.GetName(), + "content": chunk.content, + }) + if err != nil { + return nil, err + } + + return &pinecone.Vector{ + Id: id, + Values: vec, + Metadata: metadata, + }, nil + }) + } + + inserts, err := group.Wait() + if err != nil { + return nil, nil, fault.Wrap(err, fctx.With(ctx)) + } + + // build a list of vectors to delete by yielding items that are indexed but + // not present in the input object chunk table. + deletes := []string{} + for id := range indexedChunkTable.Vectors { + _, exists := inputChunkTable[id] + if exists { + continue + } + + deletes = append(deletes, id) + } + + return inserts, deletes, nil } diff --git a/app/services/semdex/semdexer/pinecone_semdexer/object.go b/app/services/semdex/semdexer/pinecone_semdexer/object.go index b43220dc..df425359 100644 --- a/app/services/semdex/semdexer/pinecone_semdexer/object.go +++ b/app/services/semdex/semdexer/pinecone_semdexer/object.go @@ -56,8 +56,8 @@ func (o Objects) ToRefs() datagraph.RefList { return refs } -func mapObject(v *pinecone.ScoredVector) (*Object, error) { - meta := v.Vector.Metadata.AsMap() +func mapVector(v *pinecone.Vector) (*Object, error) { + meta := v.Metadata.AsMap() idRaw, ok := meta["datagraph_id"] if !ok { @@ -109,14 +109,28 @@ func mapObject(v *pinecone.ScoredVector) (*Object, error) { } return &Object{ - ID: id, - Kind: dk, - Relevance: float64((v.Score + 1) / 2), - URL: *sdr, - Content: content, + ID: id, + Kind: dk, + URL: *sdr, + Content: content, }, nil } -func mapObjects(objects []*pinecone.ScoredVector) (Objects, error) { - return dt.MapErr(objects, mapObject) +func mapVectors(objects []*pinecone.Vector) (Objects, error) { + return dt.MapErr(objects, mapVector) +} + +func mapScoredVector(v *pinecone.ScoredVector) (*Object, error) { + obj, err := mapVector(v.Vector) + if err != nil { + return nil, err + } + + obj.Relevance = float64((v.Score + 1) / 2) + + return obj, nil +} + +func mapScoredVectors(objects []*pinecone.ScoredVector) (Objects, error) { + return dt.MapErr(objects, mapScoredVector) } diff --git a/app/services/semdex/semdexer/pinecone_semdexer/pinecone.go b/app/services/semdex/semdexer/pinecone_semdexer/pinecone.go index 1dd30fcc..98499d25 100644 --- a/app/services/semdex/semdexer/pinecone_semdexer/pinecone.go +++ b/app/services/semdex/semdexer/pinecone_semdexer/pinecone.go @@ -2,6 +2,7 @@ package pinecone_semdexer import ( "context" + "fmt" "hash/fnv" "github.com/Southclaws/dt" @@ -44,17 +45,20 @@ func New(ctx context.Context, cfg config.Config, pc *pinecone.Client, rh *hydrat }, nil } -func generateChunkID(id xid.ID, chunk string) uuid.UUID { +func generateChunkID(id xid.ID, chunk string) string { // We don't currently support sharing chunks across content nodes, so append // the object's ID to the chunk's hash, to ensure it's unique to the object. - payload := []byte(append(id.Bytes(), chunk...)) - return uuid.NewHash(fnv.New128(), uuid.NameSpaceOID, payload, 4) + hash := uuid.NewHash(fnv.New128(), uuid.NameSpaceOID, []byte(chunk), 4) + + prefix := id.String() + + return fmt.Sprintf("%s/%s", prefix, hash) } func chunkIDsFor(id xid.ID) func(chunk string) string { return func(chunk string) string { - return generateChunkID(id, chunk).String() + return generateChunkID(id, chunk) } } @@ -62,6 +66,19 @@ func chunkIDsForItem(object datagraph.Item) []string { return dt.Map(object.GetContent().Split(), chunkIDsFor(object.GetID())) } -func (c *pineconeSemdexer) GetMany(ctx context.Context, limit uint, ids ...xid.ID) (datagraph.RefList, error) { - return nil, nil +type chunk struct { + id string + content string +} + +func chunksFor(object datagraph.Item) []chunk { + id := object.GetID() + chunks := object.GetContent().Split() + + return dt.Map(chunks, func(c string) chunk { + return chunk{ + id: generateChunkID(id, c), + content: c, + } + }) } diff --git a/app/services/semdex/semdexer/pinecone_semdexer/pinecone_test.go b/app/services/semdex/semdexer/pinecone_semdexer/pinecone_test.go new file mode 100644 index 00000000..811760fa --- /dev/null +++ b/app/services/semdex/semdexer/pinecone_semdexer/pinecone_test.go @@ -0,0 +1,40 @@ +package pinecone_semdexer + +import ( + "testing" + + "github.com/rs/xid" + "github.com/stretchr/testify/assert" +) + +func Test_generateChunkID(t *testing.T) { + a := assert.New(t) + + id := xid.New() + + id1 := generateChunkID(id, "chunk number one") + id2 := generateChunkID(id, "chunk number one") + id3 := generateChunkID(id, "chunk number two") + + a.Equal(id1, id2) + a.NotEqual(id1, id3) + a.NotEqual(id2, id3) +} + +func Test_generateID(t *testing.T) { + a := assert.New(t) + + chunk := `org + +**Name-** Fake News Inference Dataset + +**Link-** [https://ieee-dataport.org/open-access/fnid-fake-news-inference-dataset](https://ieee-dataport.org/open-access/fnid-fake-news-inference-dataset) + +This database is provided for the Fake News Detection task` + + id, _ := xid.FromString("cth0hcifunp6ib5ivvug") + + id1 := generateChunkID(id, chunk) + + a.Equal("cth0hcifunp6ib5ivvug/97d726b5-5092-49af-8eb7-fa00d1be9b8d", id1) +} diff --git a/app/services/semdex/semdexer/pinecone_semdexer/recommend.go b/app/services/semdex/semdexer/pinecone_semdexer/recommend.go index 1d9cc02b..3a575d6e 100644 --- a/app/services/semdex/semdexer/pinecone_semdexer/recommend.go +++ b/app/services/semdex/semdexer/pinecone_semdexer/recommend.go @@ -53,7 +53,7 @@ func (s *pineconeSemdexer) RecommendRefs(ctx context.Context, object datagraph.I return nil, fault.Wrap(err, fctx.With(ctx)) } - objects, err := mapObjects(result.Matches) + objects, err := mapScoredVectors(result.Matches) if err != nil { return nil, fault.Wrap(err, fctx.With(ctx)) } diff --git a/app/services/semdex/semdexer/pinecone_semdexer/search.go b/app/services/semdex/semdexer/pinecone_semdexer/search.go index e1d3dd50..65ce10b4 100644 --- a/app/services/semdex/semdexer/pinecone_semdexer/search.go +++ b/app/services/semdex/semdexer/pinecone_semdexer/search.go @@ -89,7 +89,7 @@ func (s *pineconeSemdexer) searchObjects(ctx context.Context, q string, p pagina return nil, fault.Wrap(err, fctx.With(ctx)) } - return mapObjects(response.Matches) + return mapScoredVectors(response.Matches) } func filterChunks(results []*datagraph.Ref) []*datagraph.Ref { diff --git a/app/services/semdex/semdexer/weaviate_semdexer/delete.go b/app/services/semdex/semdexer/weaviate_semdexer/delete.go index 3a02d382..33f9f4c0 100644 --- a/app/services/semdex/semdexer/weaviate_semdexer/delete.go +++ b/app/services/semdex/semdexer/weaviate_semdexer/delete.go @@ -9,7 +9,7 @@ import ( "github.com/weaviate/weaviate-go-client/v4/weaviate/filters" ) -func (w *weaviateSemdexer) Delete(ctx context.Context, id xid.ID) error { +func (w *weaviateSemdexer) Delete(ctx context.Context, id xid.ID) (int, error) { delete := w.wc.Batch(). ObjectsBatchDeleter(). WithWhere( @@ -19,10 +19,10 @@ func (w *weaviateSemdexer) Delete(ctx context.Context, id xid.ID) error { WithValueString(id.String()), ) - _, err := delete.Do(ctx) + r, err := delete.Do(ctx) if err != nil { - return fault.Wrap(err, fctx.With(ctx)) + return 0, fault.Wrap(err, fctx.With(ctx)) } - return nil + return int(r.Results.Successful), nil } diff --git a/app/services/semdex/semdexer/weaviate_semdexer/indexer.go b/app/services/semdex/semdexer/weaviate_semdexer/indexer.go index c1578495..ec7f2072 100644 --- a/app/services/semdex/semdexer/weaviate_semdexer/indexer.go +++ b/app/services/semdex/semdexer/weaviate_semdexer/indexer.go @@ -15,11 +15,11 @@ import ( "github.com/Southclaws/storyden/app/resources/datagraph" ) -func (s *weaviateSemdexer) Index(ctx context.Context, object datagraph.Item) error { +func (s *weaviateSemdexer) Index(ctx context.Context, object datagraph.Item) (int, error) { chunks := object.GetContent().Split() if len(chunks) == 0 { - return fault.New("no text chunks to index", fctx.With(ctx)) + return 0, fault.New("no text chunks to index", fctx.With(ctx)) } numWorkers := runtime.NumCPU() @@ -52,11 +52,11 @@ func (s *weaviateSemdexer) Index(ctx context.Context, object datagraph.Item) err for err := range errChan { if err != nil { - return err + return 0, err } } - return nil + return len(chunks), nil } func (s *weaviateSemdexer) indexChunk(ctx context.Context, object datagraph.Item, chunk string) error { diff --git a/app/services/semdex/semdexer/weaviate_semdexer/mapping.go b/app/services/semdex/semdexer/weaviate_semdexer/mapping.go index dedbe051..4629bbc5 100644 --- a/app/services/semdex/semdexer/weaviate_semdexer/mapping.go +++ b/app/services/semdex/semdexer/weaviate_semdexer/mapping.go @@ -87,15 +87,6 @@ func mapResponseObjects(raw map[string]models.JSONObject) (*WeaviateResponse, er return &parsed, nil } -func (s *weaviateSemdexer) getFirstResult(wr *WeaviateResponse) (*WeaviateObject, error) { - objects := wr.Get[s.cn.String()] - if len(objects) != 1 { - return nil, fault.Newf("expected exactly one result, got %d", len(objects)) - } - - return &objects[0], nil -} - func generateChunkID(id xid.ID, chunk string) uuid.UUID { // We don't currently support sharing chunks across content nodes, so append // the object's ID to the chunk's hash, to ensure it's unique to the object. @@ -117,11 +108,3 @@ func chunkIDsFor(id xid.ID) func(chunk string) uuid.UUID { func chunkIDsForItem(object datagraph.Item) []uuid.UUID { return dt.Map(object.GetContent().Split(), chunkIDsFor(object.GetID())) } - -func objectIDsToStrings(ids []xid.ID) []string { - strs := make([]string, len(ids)) - for i, id := range ids { - strs[i] = id.String() - } - return strs -} diff --git a/app/services/semdex/semdexer/weaviate_semdexer/retrieval.go b/app/services/semdex/semdexer/weaviate_semdexer/retrieval.go deleted file mode 100644 index bc22920d..00000000 --- a/app/services/semdex/semdexer/weaviate_semdexer/retrieval.go +++ /dev/null @@ -1,75 +0,0 @@ -package weaviate_semdexer - -import ( - "context" - "encoding/json" - - "github.com/Southclaws/dt" - "github.com/Southclaws/fault" - "github.com/Southclaws/fault/fctx" - "github.com/rs/xid" - "github.com/weaviate/weaviate-go-client/v4/weaviate/filters" - "github.com/weaviate/weaviate/entities/models" - - "github.com/Southclaws/storyden/app/resources/datagraph" -) - -func (o *weaviateSemdexer) GetMany(ctx context.Context, limit uint, ids ...xid.ID) (datagraph.RefList, error) { - stringIDs := dt.Map(ids, func(x xid.ID) string { return x.String() }) - - objects, err := o.wc. - GraphQL(). - Get(). - WithClassName(o.cn.String()). - WithWhere( - filters.Where(). - WithPath([]string{"id"}). - WithOperator(filters.ContainsAny). - WithValueString(stringIDs...), - ). - WithLimit(int(limit)). - Do(ctx) - if err != nil { - return nil, fault.Wrap(err, fctx.With(ctx)) - } - - data, err := mapResponseObjects(objects.Data) - if err != nil { - return nil, fault.Wrap(err, fctx.With(ctx)) - } - - refs, err := dt.MapErr(data.Get[string(o.cn)], mapToNodeReference) - if err != nil { - return nil, fault.Wrap(err, fctx.With(ctx)) - } - - return datagraph.RefList(refs), nil -} - -func mapWeaviateObject(o *models.Object) (*datagraph.Ref, error) { - wo, err := unmarshalWeaviateObject(o.Properties) - if err != nil { - return nil, err - } - - ref, err := mapToNodeReference(*wo) - if err != nil { - return nil, err - } - - return ref, nil -} - -func unmarshalWeaviateObject(p models.PropertySchema) (*WeaviateObject, error) { - b, err := json.Marshal(p) - if err != nil { - return nil, err - } - - wo := WeaviateObject{} - if err := json.Unmarshal(b, &wo); err != nil { - return nil, err - } - - return &wo, nil -} diff --git a/app/services/thread/thread_semdex/indexer.go b/app/services/thread/thread_semdex/indexer.go index b946cf7f..1a6a35a7 100644 --- a/app/services/thread/thread_semdex/indexer.go +++ b/app/services/thread/thread_semdex/indexer.go @@ -17,21 +17,23 @@ func (i *semdexer) indexThread(ctx context.Context, id post.ID) error { return fault.Wrap(err, fctx.With(ctx)) } - err = i.semdexMutator.Index(ctx, p) + updates, err := i.semdexMutator.Index(ctx, p) if err != nil { return fault.Wrap(err, fctx.With(ctx)) } - _, err = i.threadWriter.Update(ctx, id, thread.WithIndexed()) - if err != nil { - return fault.Wrap(err, fctx.With(ctx)) + if updates > 0 { + _, err = i.threadWriter.Update(ctx, id, thread.WithIndexed()) + if err != nil { + return fault.Wrap(err, fctx.With(ctx)) + } } return nil } func (i *semdexer) deindexThread(ctx context.Context, id post.ID) error { - err := i.semdexMutator.Delete(ctx, xid.ID(id)) + _, err := i.semdexMutator.Delete(ctx, xid.ID(id)) if err != nil { return fault.Wrap(err, fctx.With(ctx)) } diff --git a/app/services/thread/thread_semdex/reindexer.go b/app/services/thread/thread_semdex/reindexer.go index f0e8d907..cbdeb6d1 100644 --- a/app/services/thread/thread_semdex/reindexer.go +++ b/app/services/thread/thread_semdex/reindexer.go @@ -11,7 +11,6 @@ import ( "github.com/samber/lo" "go.uber.org/zap" - "github.com/Southclaws/storyden/app/resources/datagraph" "github.com/Southclaws/storyden/app/resources/mq" "github.com/Southclaws/storyden/app/resources/post" "github.com/Southclaws/storyden/internal/ent" @@ -28,44 +27,12 @@ func (r *semdexer) schedule(ctx context.Context, schedule time.Duration, reindex } func (r *semdexer) reindex(ctx context.Context, reindexThreshold time.Duration, reindexChunk int) error { - threads, err := r.db.Post.Query(). - Select( - ent_post.FieldID, - ent_post.FieldVisibility, - ent_post.FieldDeletedAt, - ). - Where( - ent_post.Or( - ent_post.IndexedAtIsNil(), - ent_post.IndexedAtLT(time.Now().Add(-reindexThreshold)), - ), - ent_post.RootPostIDIsNil(), - ). - Limit(reindexChunk). - All(ctx) + updated, deleted, err := r.gatherTargets(ctx, reindexThreshold, reindexChunk) if err != nil { return fault.Wrap(err, fctx.With(ctx)) } - keep, discard := lo.FilterReject(threads, func(p *ent.Post, _ int) bool { - return p.Visibility == ent_post.VisibilityPublished && p.DeletedAt == nil - }) - - keepIDs := dt.Map(keep, func(p *ent.Post) xid.ID { return p.ID }) - discardIDs := dt.Map(discard, func(p *ent.Post) xid.ID { return p.ID }) - - indexed, err := r.semdexQuerier.GetMany(ctx, uint(reindexChunk), keepIDs...) - if err != nil { - return fault.Wrap(err, fctx.With(ctx)) - } - - indexedIDs := dt.Map(indexed, func(p *datagraph.Ref) xid.ID { return p.ID }) - - updated := diff(keepIDs, indexedIDs) - deleted := lo.Intersect(indexedIDs, discardIDs) - r.logger.Debug("reindexing threads", - zap.Int("all", len(threads)), zap.Int("updated", len(updated)), zap.Int("deleted", len(deleted)), ) @@ -88,7 +55,38 @@ func (r *semdexer) reindex(ctx context.Context, reindexThreshold time.Duration, return nil } -func diff(targets []xid.ID, indexed []xid.ID) []xid.ID { - _, ids := lo.Difference(indexed, targets) - return ids +func (r *semdexer) gatherTargets(ctx context.Context, reindexThreshold time.Duration, reindexChunk int) ([]xid.ID, []xid.ID, error) { + threads, err := r.db.Post.Query(). + Select( + ent_post.FieldID, + ent_post.FieldVisibility, + ent_post.FieldDeletedAt, + ). + Where( + ent_post.Or( + ent_post.IndexedAtIsNil(), + ent_post.IndexedAtLT(time.Now().Add(-reindexThreshold)), + ), + ). + Order(ent.Desc(ent_post.FieldCreatedAt)). + Limit(reindexChunk). + All(ctx) + if err != nil { + return nil, nil, fault.Wrap(err, fctx.With(ctx)) + } + + keepIDs, discardIDs := r.partition(threads) + + return keepIDs, discardIDs, nil +} + +func (r *semdexer) partition(threads []*ent.Post) ([]xid.ID, []xid.ID) { + keep, discard := lo.FilterReject(threads, func(p *ent.Post, _ int) bool { + return p.Visibility == ent_post.VisibilityPublished && p.DeletedAt == nil + }) + + keepIDs := dt.Map(keep, func(p *ent.Post) xid.ID { return p.ID }) + discardIDs := dt.Map(discard, func(p *ent.Post) xid.ID { return p.ID }) + + return keepIDs, discardIDs } diff --git a/app/services/thread/thread_semdex/thread_semdex.go b/app/services/thread/thread_semdex/thread_semdex.go index 23fca316..33592e78 100644 --- a/app/services/thread/thread_semdex/thread_semdex.go +++ b/app/services/thread/thread_semdex/thread_semdex.go @@ -30,7 +30,7 @@ func Build() fx.Option { // with duplicate messages since there's no checksum mechanism built currently. // TODO: Make these parameters configurable by the SD instance administrator. var ( - DefaultReindexSchedule = time.Hour * 21 // how frequently do we reindex + DefaultReindexSchedule = time.Hour // how frequently do we reindex DefaultReindexThreshold = time.Hour * 24 // ignore indexed_at after this DefaultReindexChunk = 100 // size of query per reindex ) @@ -103,7 +103,7 @@ func newSemdexer( go func() { for msg := range sub { if err := re.deindexThread(ctx, msg.Payload.ID); err != nil { - l.Error("failed to index post", zap.Error(err)) + l.Error("failed to deindex post", zap.Error(err)) } msg.Ack() diff --git a/go.mod b/go.mod index 3e5c90d7..5d404894 100644 --- a/go.mod +++ b/go.mod @@ -85,6 +85,8 @@ require ( github.com/Masterminds/goutils v1.1.1 // indirect github.com/Masterminds/semver v1.5.0 // indirect github.com/Masterminds/sprig v2.22.0+incompatible // indirect + github.com/alitto/pond v1.9.2 // indirect + github.com/alitto/pond/v2 v2.1.6 // indirect github.com/andybalholm/cascadia v1.3.2 // indirect github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect @@ -135,7 +137,7 @@ require ( github.com/oklog/ulid v1.3.1 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/perimeterx/marshmallow v1.1.5 // indirect - github.com/pinecone-io/go-pinecone v1.1.1 // indirect + github.com/pinecone-io/go-pinecone v1.1.2-0.20241220212044-af29d07e7c68 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/rabbitmq/amqp091-go v1.10.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect @@ -159,14 +161,14 @@ require ( go.mongodb.org/mongo-driver v1.17.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 // indirect - go.opentelemetry.io/otel v1.30.0 // indirect - go.opentelemetry.io/otel/metric v1.30.0 // indirect - go.opentelemetry.io/otel/trace v1.30.0 // indirect + go.opentelemetry.io/otel v1.31.0 // indirect + go.opentelemetry.io/otel/metric v1.31.0 // indirect + go.opentelemetry.io/otel/trace v1.31.0 // indirect golang.org/x/image v0.20.0 // indirect golang.org/x/tools v0.25.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241113202542-65e8d215514f // indirect - google.golang.org/grpc v1.67.1 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20241230172942-26aa7a208def // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241230172942-26aa7a208def // indirect + google.golang.org/grpc v1.69.2 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect modernc.org/libc v1.61.0 // indirect modernc.org/mathutil v1.6.0 // indirect @@ -222,6 +224,6 @@ require ( golang.org/x/sys v0.28.0 // indirect golang.org/x/text v0.21.0 golang.org/x/time v0.8.0 // indirect - google.golang.org/protobuf v1.35.2 // indirect + google.golang.org/protobuf v1.36.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index f675c5bb..4cf4f5cc 100644 --- a/go.sum +++ b/go.sum @@ -51,6 +51,10 @@ github.com/agext/levenshtein v1.2.3 h1:YB2fHEn0UJagG8T1rrWknE3ZQzWM06O8AMAatNn7l github.com/agext/levenshtein v1.2.3/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558= github.com/alexedwards/argon2id v1.0.0 h1:wJzDx66hqWX7siL/SRUmgz3F8YMrd/nfX/xHHcQQP0w= github.com/alexedwards/argon2id v1.0.0/go.mod h1:tYKkqIjzXvZdzPvADMWOEZ+l6+BD6CtBXMj5fnJppiw= +github.com/alitto/pond v1.9.2 h1:9Qb75z/scEZVCoSU+osVmQ0I0JOeLfdTDafrbcJ8CLs= +github.com/alitto/pond v1.9.2/go.mod h1:xQn3P/sHTYcU/1BR3i86IGIrilcrGC2LiS+E2+CJWsI= +github.com/alitto/pond/v2 v2.1.6 h1:6U3nSOjxpuNyvjIKjjRkpS2JDdgX5JqBm9GO2urcCjM= +github.com/alitto/pond/v2 v2.1.6/go.mod h1:xkjYEgQ05RSpWdfSd1nM3OVv7TBhLdy7rMp3+2Nq+yE= github.com/andybalholm/cascadia v1.0.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= github.com/andybalholm/cascadia v1.3.2 h1:3Xi6Dw5lHF15JtdcmAHD3i1+T8plmv7BQ/nsViSLyss= github.com/andybalholm/cascadia v1.3.2/go.mod h1:7gtRlve5FxPPgIgX36uWBX58OdBsSS6lUvCFb+h7KvU= @@ -408,6 +412,8 @@ github.com/philippgille/chromem-go v0.7.0 h1:4jfvfyKymjKNfGxBUhHUcj1kp7B17NL/I1P github.com/philippgille/chromem-go v0.7.0/go.mod h1:hTd+wGEm/fFPQl7ilfCwQXkgEUxceYh86iIdoKMolPo= github.com/pinecone-io/go-pinecone v1.1.1 h1:pKoIiYcBIbrR7gaq0JXPiVnNEtevFYeq/AYL7T0NbbE= github.com/pinecone-io/go-pinecone v1.1.1/go.mod h1:KfJhn4yThX293+fbtrZLnxe2PJYo8557Py062W4FYKk= +github.com/pinecone-io/go-pinecone v1.1.2-0.20241220212044-af29d07e7c68 h1:EHPrl/hnfeLYb6HwmUL2cOqPT3k2weIOTgMmRA4kjR4= +github.com/pinecone-io/go-pinecone v1.1.2-0.20241220212044-af29d07e7c68/go.mod h1:KfJhn4yThX293+fbtrZLnxe2PJYo8557Py062W4FYKk= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -547,10 +553,16 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 h1:ZIg3ZT/ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0/go.mod h1:DQAwmETtZV00skUwgD6+0U89g80NKsJE3DCKeLLPQMI= go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= +go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= +go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= go.opentelemetry.io/otel/metric v1.30.0 h1:4xNulvn9gjzo4hjg+wzIKG7iNFEaBMX00Qd4QIZs7+w= go.opentelemetry.io/otel/metric v1.30.0/go.mod h1:aXTfST94tswhWEb+5QjlSqG+cZlmyXy/u8jFpor3WqQ= +go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= +go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= go.opentelemetry.io/otel/trace v1.30.0 h1:7UBkkYzeg3C7kQX8VAidWh2biiQbtAKjyIML8dQ9wmc= go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o= +go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= +go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= go.uber.org/dig v1.18.0 h1:imUL1UiY0Mg4bqbFfsRQO5G4CGRBec/ZujWTvSVp3pw= go.uber.org/dig v1.18.0/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE= go.uber.org/fx v1.23.0 h1:lIr/gYWQGfTwGcSXWXu4vP5Ws6iqnNEIY+F/aFzCKTg= @@ -722,10 +734,14 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240827150818-7e3bb234dfed h1: google.golang.org/genproto/googleapis/api v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:OCdP9MfskevB/rbYvHTsXTtKC+3bHWajPdoKgjcYkfo= google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 h1:M0KvPgPmDZHPlbRbaNU1APr28TvwvvdUPlSv7PUvy8g= google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:dguCy7UOdZhTvLzDyt15+rOrawrpM4q7DD9dQ1P11P4= +google.golang.org/genproto/googleapis/api v0.0.0-20241230172942-26aa7a208def h1:0Km0hi+g2KXbXL0+riZzSCKz23f4MmwicuEb00JeonI= +google.golang.org/genproto/googleapis/api v0.0.0-20241230172942-26aa7a208def/go.mod h1:u2DoMSpCXjrzqLdobRccQMc9wrnMAJ1DLng0a2yqM2Q= google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 h1:zciRKQ4kBpFgpfC5QQCVtnnNAcLIqweL7plyZRQHVpI= google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= google.golang.org/genproto/googleapis/rpc v0.0.0-20241113202542-65e8d215514f h1:C1QccEa9kUwvMgEUORqQD9S17QesQijxjZ84sO82mfo= google.golang.org/genproto/googleapis/rpc v0.0.0-20241113202542-65e8d215514f/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241230172942-26aa7a208def h1:4P81qv5JXI/sDNae2ClVx88cgDDA6DPilADkG9tYKz8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241230172942-26aa7a208def/go.mod h1:bdAgzvd4kFrpykc5/AC2eLUiegK9T/qxZHD4hXYf/ho= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= @@ -733,6 +749,8 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= +google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -748,6 +766,8 @@ google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFyt google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= +google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/infrastructure/vector/pinecone/pinecone.go b/internal/infrastructure/vector/pinecone/pinecone.go index 4541bfd2..4b9174cc 100644 --- a/internal/infrastructure/vector/pinecone/pinecone.go +++ b/internal/infrastructure/vector/pinecone/pinecone.go @@ -29,6 +29,8 @@ type MetadataFilter = pinecone.MetadataFilter type QueryByVectorValuesRequest = pinecone.QueryByVectorValuesRequest +type ListVectorsRequest = pinecone.ListVectorsRequest + type ScoredVector = pinecone.ScoredVector func Build() fx.Option {