Skip to content

Commit

Permalink
add pinecone semdexer
Browse files Browse the repository at this point in the history
  • Loading branch information
Southclaws committed Jan 1, 2025
1 parent 5d5faf0 commit bc262f0
Show file tree
Hide file tree
Showing 13 changed files with 680 additions and 2 deletions.
2 changes: 1 addition & 1 deletion app/services/search/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func New(
semdexSearcher semdex.Searcher,
) searcher.Searcher {
switch cfg.SemdexProvider {
case "chromem", "weaviate":
case "chromem", "weaviate", "pinecone":
return semdexSearcher

default:
Expand Down
111 changes: 111 additions & 0 deletions app/services/semdex/semdexer/pinecone_semdexer/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package pinecone_semdexer

import (
"context"
"runtime"
"sync"

"github.com/Southclaws/fault"
"github.com/Southclaws/fault/fctx"
"github.com/rs/xid"
"google.golang.org/protobuf/types/known/structpb"

"github.com/Southclaws/storyden/app/resources/datagraph"
"github.com/Southclaws/storyden/internal/infrastructure/vector/pinecone"
)

func (c *pineconeSemdexer) Index(ctx context.Context, object datagraph.Item) error {
chunks := object.GetContent().Split()

if len(chunks) == 0 {
return fault.New("no text chunks to index", fctx.With(ctx))
}

numWorkers := min(runtime.NumCPU(), len(chunks))
chunkQueue := make(chan string, len(chunks))
errChan := make(chan error, len(chunks))
chunkChan := make(chan *pinecone.Vector, len(chunks))

var wg sync.WaitGroup

for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for chunk := range chunkQueue {
vec, err := c.ef(ctx, chunk)
if err != nil {
errChan <- err
}

objectID := object.GetID()

metadata, err := structpb.NewStruct(map[string]any{
"datagraph_id": objectID.String(),
"datagraph_type": object.GetKind().String(),
"name": object.GetName(),
"content": chunk,
})
if err != nil {
errChan <- err
}

chunkID := generateChunkID(objectID, chunk).String()

chunkChan <- &pinecone.Vector{
Id: chunkID,
Values: vec,
Metadata: metadata,
}
}
}(i)
}

go func() {
for _, chunk := range chunks {
chunkQueue <- chunk
}
close(chunkQueue)
}()

go func() {
wg.Wait()

close(errChan)
close(chunkChan)
}()

for err := range errChan {
if err != nil {
return err
}
}

var vecs []*pinecone.Vector
for vec := range chunkChan {
vecs = append(vecs, vec)
}

_, err := c.index.UpsertVectors(ctx, vecs)
if err != nil {
return fault.Wrap(err, fctx.With(ctx))
}

return nil
}

func (c *pineconeSemdexer) Delete(ctx context.Context, object xid.ID) error {
filter, err := structpb.NewStruct(map[string]any{
"datagraph_id": object.String(),
})
if err != nil {
return fault.Wrap(err, fctx.With(ctx))
}

err = c.index.DeleteVectorsByFilter(ctx, filter)
if err != nil {
return fault.Wrap(err, fctx.With(ctx))
}

return nil
}
122 changes: 122 additions & 0 deletions app/services/semdex/semdexer/pinecone_semdexer/object.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package pinecone_semdexer

import (
"fmt"
"net/url"

"github.com/Southclaws/dt"
"github.com/Southclaws/fault"
"github.com/rs/xid"

"github.com/Southclaws/storyden/app/resources/datagraph"
"github.com/Southclaws/storyden/app/services/semdex"
"github.com/Southclaws/storyden/internal/infrastructure/vector/pinecone"
)

type Object struct {
ID xid.ID
Kind datagraph.Kind
Relevance float64
URL url.URL
Content string
}

type Objects []*Object

func (o *Object) ToChunk() *semdex.Chunk {
return &semdex.Chunk{
ID: o.ID,
Kind: o.Kind,
URL: o.URL,
Content: o.Content,
}
}

func (o *Object) ToRef() *datagraph.Ref {
return &datagraph.Ref{
ID: o.ID,
Kind: o.Kind,
Relevance: o.Relevance,
}
}

func (o Objects) ToChunks() []*semdex.Chunk {
chunks := make([]*semdex.Chunk, len(o))
for i, object := range o {
chunks[i] = object.ToChunk()
}
return chunks
}

func (o Objects) ToRefs() datagraph.RefList {
refs := make(datagraph.RefList, len(o))
for i, object := range o {
refs[i] = object.ToRef()
}
return refs
}

func mapObject(v *pinecone.ScoredVector) (*Object, error) {
meta := v.Vector.Metadata.AsMap()

idRaw, ok := meta["datagraph_id"]
if !ok {
return nil, fault.New("missing datagraph_id in metadata")
}

typeRaw, ok := meta["datagraph_type"]
if !ok {
return nil, fault.New("missing datagraph_type in metadata")
}

contentRaw, ok := meta["content"]
if !ok {
return nil, fault.New("missing content in metadata")
}

//

idString, ok := idRaw.(string)
if !ok {
return nil, fault.New("datagraph_id in metadata is not a string")
}

typeString, ok := typeRaw.(string)
if !ok {
return nil, fault.New("datagraph_type in metadata is not a string")
}

content, ok := contentRaw.(string)
if !ok {
return nil, fault.New("content in metadata is not a string")
}

//

id, err := xid.FromString(idString)
if err != nil {
return nil, fault.Wrap(err)
}

dk, err := datagraph.NewKind(typeString)
if err != nil {
return nil, fault.Wrap(err)
}

sdr, err := url.Parse(fmt.Sprintf("%s:%s/%s", datagraph.RefScheme, dk.String(), id.String()))
if err != nil {
return nil, err
}

return &Object{
ID: id,
Kind: dk,
Relevance: float64((v.Score + 1) / 2),
URL: *sdr,
Content: content,
}, nil
}

func mapObjects(objects []*pinecone.ScoredVector) (Objects, error) {
return dt.MapErr(objects, mapObject)
}
67 changes: 67 additions & 0 deletions app/services/semdex/semdexer/pinecone_semdexer/pinecone.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package pinecone_semdexer

import (
"context"
"hash/fnv"

"github.com/Southclaws/dt"
"github.com/Southclaws/fault"
"github.com/google/uuid"
"github.com/rs/xid"

"github.com/Southclaws/storyden/app/resources/datagraph"
"github.com/Southclaws/storyden/app/resources/datagraph/hydrate"
"github.com/Southclaws/storyden/app/services/semdex"
"github.com/Southclaws/storyden/internal/config"
"github.com/Southclaws/storyden/internal/infrastructure/ai"
"github.com/Southclaws/storyden/internal/infrastructure/vector/pinecone"
)

type pineconeSemdexer struct {
client *pinecone.Client
index *pinecone.Index
hydrator *hydrate.Hydrator
ef ai.Embedder
}

func New(ctx context.Context, cfg config.Config, pc *pinecone.Client, rh *hydrate.Hydrator, aip ai.Prompter) (semdex.Semdexer, error) {
if _, ok := aip.(*ai.Disabled); ok {
return nil, fault.New("a language model provider must be enabled for the pinecone semdexer to be enabled")
}

ef := aip.EmbeddingFunc()

index, err := pc.GetOrCreateIndex(ctx, cfg.PineconeIndex)
if err != nil {
return nil, err
}

return &pineconeSemdexer{
client: pc,
index: index,
hydrator: rh,
ef: ef,
}, nil
}

func generateChunkID(id xid.ID, chunk string) uuid.UUID {
// We don't currently support sharing chunks across content nodes, so append
// the object's ID to the chunk's hash, to ensure it's unique to the object.
payload := []byte(append(id.Bytes(), chunk...))

return uuid.NewHash(fnv.New128(), uuid.NameSpaceOID, payload, 4)
}

func chunkIDsFor(id xid.ID) func(chunk string) string {
return func(chunk string) string {
return generateChunkID(id, chunk).String()
}
}

func chunkIDsForItem(object datagraph.Item) []string {
return dt.Map(object.GetContent().Split(), chunkIDsFor(object.GetID()))
}

func (c *pineconeSemdexer) GetMany(ctx context.Context, limit uint, ids ...xid.ID) (datagraph.RefList, error) {
return nil, nil
}
Loading

0 comments on commit bc262f0

Please sign in to comment.