Skip to content

Commit

Permalink
rework reindexing and update pinecone dedupe
Browse files Browse the repository at this point in the history
- removed GetMany from semdex retrieval
- implemented de-duplicate indexing for pinecone
- simplified reindex process to rely on later de-dupe
  • Loading branch information
Southclaws committed Jan 2, 2025
1 parent ebf38c5 commit d82a5e6
Show file tree
Hide file tree
Showing 22 changed files with 322 additions and 267 deletions.
12 changes: 7 additions & 5 deletions app/services/library/node_semdex/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ func (i *semdexer) index(ctx context.Context, id library.NodeID) error {
return fault.Wrap(err, fctx.With(ctx))
}

err = i.semdexMutator.Index(ctx, node)
updates, err := i.semdexMutator.Index(ctx, node)
if err != nil {
return fault.Wrap(err, fctx.With(ctx))
}

_, err = i.nodeWriter.Update(ctx, qk, node_writer.WithIndexed())
if err != nil {
return fault.Wrap(err, fctx.With(ctx))
if updates > 0 {
_, err = i.nodeWriter.Update(ctx, qk, node_writer.WithIndexed())
if err != nil {
return fault.Wrap(err, fctx.With(ctx))
}
}

return nil
Expand All @@ -35,7 +37,7 @@ func (i *semdexer) index(ctx context.Context, id library.NodeID) error {
func (i *semdexer) deindex(ctx context.Context, id library.NodeID) error {
qk := library.NewID(xid.ID(id))

err := i.semdexMutator.Delete(ctx, xid.ID(id))
_, err := i.semdexMutator.Delete(ctx, xid.ID(id))
if err != nil {
return fault.Wrap(err, fctx.With(ctx))
}
Expand Down
12 changes: 2 additions & 10 deletions app/services/library/node_semdex/reindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"

"github.com/Southclaws/storyden/app/resources/datagraph"
"github.com/Southclaws/storyden/app/resources/library"
"github.com/Southclaws/storyden/app/resources/mq"
"github.com/Southclaws/storyden/internal/ent"
Expand Down Expand Up @@ -53,15 +52,8 @@ func (r *semdexer) reindex(ctx context.Context, reindexThreshold time.Duration,
keepIDs := dt.Map(keep, func(p *ent.Node) xid.ID { return p.ID })
discardIDs := dt.Map(discard, func(p *ent.Node) xid.ID { return p.ID })

indexed, err := r.semdexQuerier.GetMany(ctx, uint(reindexChunk), keepIDs...)
if err != nil {
return fault.Wrap(err, fctx.With(ctx))
}

indexedIDs := dt.Map(indexed, func(p *datagraph.Ref) xid.ID { return p.ID })

updated := diff(keepIDs, indexedIDs)
deleted := lo.Intersect(indexedIDs, discardIDs)
updated := keepIDs
deleted := discardIDs

r.logger.Debug("reindexing nodes",
zap.Int("all", len(nodes)),
Expand Down
8 changes: 4 additions & 4 deletions app/services/semdex/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ type Disabled struct{}

var _ Semdexer = &Disabled{}

func (*Disabled) Index(ctx context.Context, object datagraph.Item) error {
return nil
func (*Disabled) Index(ctx context.Context, object datagraph.Item) (int, error) {
return 0, nil
}

func (*Disabled) Delete(ctx context.Context, object xid.ID) error {
return nil
func (*Disabled) Delete(ctx context.Context, object xid.ID) (int, error) {
return 0, nil
}

func (*Disabled) Search(ctx context.Context, q string, p pagination.Parameters, opts searcher.Options) (*pagination.Result[datagraph.Item], error) {
Expand Down
6 changes: 4 additions & 2 deletions app/services/semdex/index_job/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func (i *indexerConsumer) indexReply(ctx context.Context, id post.ID) error {
return fault.Wrap(err, fctx.With(ctx))
}

return i.indexer.Index(ctx, p)
_, err = i.indexer.Index(ctx, p)
return err
}

func (i *indexerConsumer) indexProfile(ctx context.Context, id account.AccountID) error {
Expand All @@ -78,5 +79,6 @@ func (i *indexerConsumer) indexProfile(ctx context.Context, id account.AccountID
return fault.Wrap(err, fctx.With(ctx))
}

return i.indexer.Index(ctx, profile.ProfileFromAccount(p))
_, err = i.indexer.Index(ctx, profile.ProfileFromAccount(p))
return err
}
6 changes: 2 additions & 4 deletions app/services/semdex/semdex.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ type Semdexer interface {
}

type Mutator interface {
Index(ctx context.Context, object datagraph.Item) error
Delete(ctx context.Context, object xid.ID) error
Index(ctx context.Context, object datagraph.Item) (int, error)
Delete(ctx context.Context, object xid.ID) (int, error)
}

type Querier interface {
Searcher
Recommender

GetMany(ctx context.Context, limit uint, ids ...xid.ID) (datagraph.RefList, error)
}

type Chunk struct {
Expand Down
18 changes: 14 additions & 4 deletions app/services/semdex/semdexer/chromem_semdexer/chromem.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,28 @@ func New(cfg config.Config, rh *hydrate.Hydrator, aip ai.Prompter) (semdex.Semde
}, nil
}

func (c *chromemRefIndex) Index(ctx context.Context, object datagraph.Item) error {
return c.c.AddDocument(ctx, chromem.Document{
func (c *chromemRefIndex) Index(ctx context.Context, object datagraph.Item) (int, error) {
err := c.c.AddDocument(ctx, chromem.Document{
ID: object.GetID().String(),
Content: object.GetContent().Plaintext(),
Metadata: map[string]string{
"datagraph_kind": object.GetKind().String(),
},
})
if err != nil {
return 0, fault.Wrap(err, fctx.With(ctx))
}

return 1, nil
}

func (c *chromemRefIndex) Delete(ctx context.Context, object xid.ID) error {
return c.c.Delete(ctx, nil, nil, object.String())
func (c *chromemRefIndex) Delete(ctx context.Context, object xid.ID) (int, error) {
err := c.c.Delete(ctx, nil, nil, object.String())
if err != nil {
return 0, fault.Wrap(err, fctx.With(ctx))
}

return 1, nil
}

func (c *chromemRefIndex) Search(ctx context.Context, q string, p pagination.Parameters, opts searcher.Options) (*pagination.Result[datagraph.Item], error) {
Expand Down
186 changes: 117 additions & 69 deletions app/services/semdex/semdexer/pinecone_semdexer/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,109 +3,157 @@ package pinecone_semdexer
import (
"context"
"runtime"
"sync"

"github.com/Southclaws/dt"
"github.com/Southclaws/fault"
"github.com/Southclaws/fault/fctx"
"github.com/alitto/pond/v2"
"github.com/rs/xid"
"github.com/samber/lo"
"google.golang.org/protobuf/types/known/structpb"

"github.com/Southclaws/storyden/app/resources/datagraph"
"github.com/Southclaws/storyden/internal/infrastructure/vector/pinecone"
)

func (c *pineconeSemdexer) Index(ctx context.Context, object datagraph.Item) error {
chunks := object.GetContent().Split()

if len(chunks) == 0 {
return fault.New("no text chunks to index", fctx.With(ctx))
func (s *pineconeSemdexer) Index(ctx context.Context, object datagraph.Item) (int, error) {
inserts, deletes, err := s.buildIndexOps(ctx, object)
if err != nil {
return 0, fault.Wrap(err, fctx.With(ctx))
}

numWorkers := min(runtime.NumCPU(), len(chunks))
chunkQueue := make(chan string, len(chunks))
errChan := make(chan error, len(chunks))
chunkChan := make(chan *pinecone.Vector, len(chunks))

var wg sync.WaitGroup

for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for chunk := range chunkQueue {
vec, err := c.ef(ctx, chunk)
if err != nil {
errChan <- err
}

objectID := object.GetID()

metadata, err := structpb.NewStruct(map[string]any{
"datagraph_id": objectID.String(),
"datagraph_type": object.GetKind().String(),
"name": object.GetName(),
"content": chunk,
})
if err != nil {
errChan <- err
}

chunkID := generateChunkID(objectID, chunk).String()

chunkChan <- &pinecone.Vector{
Id: chunkID,
Values: vec,
Metadata: metadata,
}
}
}(i)
if len(inserts) > 0 {
_, err = s.index.UpsertVectors(ctx, inserts)
if err != nil {
return 0, fault.Wrap(err, fctx.With(ctx))
}
}

go func() {
for _, chunk := range chunks {
chunkQueue <- chunk
if len(deletes) > 0 {
err = s.deleteVectors(ctx, deletes)
if err != nil {
return 0, fault.Wrap(err, fctx.With(ctx))
}
close(chunkQueue)
}()
}

go func() {
wg.Wait()
changes := len(inserts) - len(deletes)

close(errChan)
close(chunkChan)
}()
return changes, nil
}

for err := range errChan {
if err != nil {
return err
}
func (c *pineconeSemdexer) Delete(ctx context.Context, object xid.ID) (int, error) {
prefix := object.String()
vectors, err := c.index.ListVectors(ctx, &pinecone.ListVectorsRequest{
Prefix: &prefix,
})
if err != nil {
return 0, fault.Wrap(err, fctx.With(ctx))
}

var vecs []*pinecone.Vector
for vec := range chunkChan {
vecs = append(vecs, vec)
if len(vectors.VectorIds) == 0 {
return 0, nil
}

_, err := c.index.UpsertVectors(ctx, vecs)
ids := dt.Map(vectors.VectorIds, func(id *string) string { return *id })

err = c.index.DeleteVectorsById(ctx, ids)
if err != nil {
return 0, fault.Wrap(err, fctx.With(ctx))
}

return len(ids), nil
}

func (s *pineconeSemdexer) deleteVectors(ctx context.Context, ids []string) error {
if len(ids) == 0 {
return nil
}

err := s.index.DeleteVectorsById(ctx, ids)
if err != nil {
return fault.Wrap(err, fctx.With(ctx))
}

return nil
}

func (c *pineconeSemdexer) Delete(ctx context.Context, object xid.ID) error {
filter, err := structpb.NewStruct(map[string]any{
"datagraph_id": object.String(),
func (s *pineconeSemdexer) buildIndexOps(ctx context.Context, object datagraph.Item) ([]*pinecone.Vector, []string, error) {
allChunks := chunksFor(object)
if len(allChunks) == 0 {
return nil, nil, nil
}
chunkIDs := dt.Map(allChunks, func(c chunk) string { return c.id })

objectID := object.GetID()

inputChunkTable := lo.SliceToMap(allChunks, func(c chunk) (string, chunk) {
return c.id, c
})

prefix := objectID.String()
indexedChunk, err := s.index.ListVectors(ctx, &pinecone.ListVectorsRequest{
Prefix: &prefix,
})
if err != nil {
return fault.Wrap(err, fctx.With(ctx))
return nil, nil, fault.Wrap(err, fctx.With(ctx))
}

err = c.index.DeleteVectorsByFilter(ctx, filter)
vecids := dt.Map(indexedChunk.VectorIds, func(id *string) string { return *id })

indexedChunkTable, err := s.index.FetchVectors(ctx, vecids)
if err != nil {
return fault.Wrap(err, fctx.With(ctx))
return nil, nil, fault.Wrap(err, fctx.With(ctx))
}

return nil
pool := pond.NewResultPool[*pinecone.Vector](min(runtime.NumCPU(), len(chunkIDs)))
group := pool.NewGroupContext(ctx)

for id, chunk := range inputChunkTable {
_, exists := indexedChunkTable.Vectors[id]
if exists {
continue
}

group.SubmitErr(func() (*pinecone.Vector, error) {
vec, err := s.ef(ctx, chunk.content)
if err != nil {
return nil, err
}

metadata, err := structpb.NewStruct(map[string]any{
"datagraph_id": objectID.String(),
"datagraph_type": object.GetKind().String(),
"name": object.GetName(),
"content": chunk.content,
})
if err != nil {
return nil, err
}

return &pinecone.Vector{
Id: id,
Values: vec,
Metadata: metadata,
}, nil
})
}

inserts, err := group.Wait()
if err != nil {
return nil, nil, fault.Wrap(err, fctx.With(ctx))
}

// build a list of vectors to delete by yielding items that are indexed but
// not present in the input object chunk table.
deletes := []string{}
for id := range indexedChunkTable.Vectors {
_, exists := inputChunkTable[id]
if exists {
continue
}

deletes = append(deletes, id)
}

return inserts, deletes, nil
}
Loading

0 comments on commit d82a5e6

Please sign in to comment.