Skip to content

Commit

Permalink
update to make all analysis take place in func() (#1508)
Browse files Browse the repository at this point in the history
* update to make all analysis take place in func()

by making analysis take place inside a func() a
closure can be used to encapsulate all the index
specific details.  this allows the index API to not
know about scorch vs upsidedown analysis structures

* rewrite variable declaration for clarity

also update comment, per review feedback

* update to latest versions

selects latest versions compatible with these changes
  • Loading branch information
mschoch authored Dec 2, 2020
1 parent 4d33a05 commit c5e42bd
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 60 deletions.
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ go 1.13

require (
github.com/RoaringBitmap/roaring v0.4.23
github.com/blevesearch/bleve_index_api v0.0.2
github.com/blevesearch/bleve_index_api v0.0.3
github.com/blevesearch/blevex v0.0.0-20190916190636-152f0fe5c040
github.com/blevesearch/cld2 v0.0.0-20200327141045-8b5f551d37f5 // indirect
github.com/blevesearch/go-porterstemmer v1.0.3
github.com/blevesearch/scorch_segment_api v0.0.3
github.com/blevesearch/scorch_segment_api v0.0.5
github.com/blevesearch/segment v0.9.0
github.com/blevesearch/snowballstem v0.9.0
github.com/blevesearch/zapx/v11 v11.1.3
github.com/blevesearch/zapx/v12 v12.1.3
github.com/blevesearch/zapx/v13 v13.1.3
github.com/blevesearch/zapx/v14 v14.1.3
github.com/blevesearch/zapx/v15 v15.1.3
github.com/blevesearch/zapx/v11 v11.1.4
github.com/blevesearch/zapx/v12 v12.1.4
github.com/blevesearch/zapx/v13 v13.1.4
github.com/blevesearch/zapx/v14 v14.1.4
github.com/blevesearch/zapx/v15 v15.1.4
github.com/couchbase/moss v0.1.0
github.com/couchbase/vellum v1.0.2
github.com/golang/protobuf v1.3.2
Expand Down
28 changes: 14 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/RoaringBitmap/roaring v0.4.23 h1:gpyfd12QohbqhFO4NVDUdoPOCXsyahYRQhINmlHxKeo=
github.com/RoaringBitmap/roaring v0.4.23/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/blevesearch/bleve_index_api v0.0.2 h1:M6EPUf354pMQw0J9NOyJaeIKNAlwpbSU+ofP6wHVC0w=
github.com/blevesearch/bleve_index_api v0.0.2/go.mod h1:yq9YIbuvEQdUB0h+UiLbpDdLLnvcD5ZkqO0LS1wuSvY=
github.com/blevesearch/bleve_index_api v0.0.3 h1:DNaKT/0F9r2WQA0EQwVTJSE12lZv11J6od0IV5Zzbvg=
github.com/blevesearch/bleve_index_api v0.0.3/go.mod h1:yq9YIbuvEQdUB0h+UiLbpDdLLnvcD5ZkqO0LS1wuSvY=
github.com/blevesearch/blevex v0.0.0-20190916190636-152f0fe5c040 h1:SjYVcfJVZoCfBlg+fkaq2eoZHTf5HaJfaTeTkOtyfHQ=
github.com/blevesearch/blevex v0.0.0-20190916190636-152f0fe5c040/go.mod h1:WH+MU2F4T0VmSdaPX+Wu5GYoZBrYWdOZWSjzvYcDmqQ=
github.com/blevesearch/cld2 v0.0.0-20200327141045-8b5f551d37f5 h1:/4ikScMMYMqsRFWJjCyzd3CNWB0lxvqDkqa5nEv6NMc=
Expand All @@ -12,22 +12,22 @@ github.com/blevesearch/go-porterstemmer v1.0.3 h1:GtmsqID0aZdCSNiY8SkuPJ12pD4jI+
github.com/blevesearch/go-porterstemmer v1.0.3/go.mod h1:angGc5Ht+k2xhJdZi511LtmxuEf0OVpvUUNrwmM1P7M=
github.com/blevesearch/mmap-go v1.0.2 h1:JtMHb+FgQCTTYIhtMvimw15dJwu1Y5lrZDMOFXVWPk0=
github.com/blevesearch/mmap-go v1.0.2/go.mod h1:ol2qBqYaOUsGdm7aRMRrYGgPvnwLe6Y+7LMvAB5IbSA=
github.com/blevesearch/scorch_segment_api v0.0.3 h1:DJ7aqcIr1JUT77B42uTGtCRw0vz1HxlvUfyv7CefR9I=
github.com/blevesearch/scorch_segment_api v0.0.3/go.mod h1:WpV76h6YHekU8CzseKfzJRoZfcVu+yWlwcEq+7N+Kp0=
github.com/blevesearch/scorch_segment_api v0.0.5 h1:LBndHXXi79/8Omh3h0tFBPs1hSf3MgjB7azkt4vAxWg=
github.com/blevesearch/scorch_segment_api v0.0.5/go.mod h1:Ilxtn7p82x+1djbDr+XLXATdYuvVYDlECSip1l/3Roo=
github.com/blevesearch/segment v0.9.0 h1:5lG7yBCx98or7gK2cHMKPukPZ/31Kag7nONpoBt22Ac=
github.com/blevesearch/segment v0.9.0/go.mod h1:9PfHYUdQCgHktBgvtUOF4x+pc4/l8rdH0u5spnW85UQ=
github.com/blevesearch/snowballstem v0.9.0 h1:lMQ189YspGP6sXvZQ4WZ+MLawfV8wOmPoD/iWeNXm8s=
github.com/blevesearch/snowballstem v0.9.0/go.mod h1:PivSj3JMc8WuaFkTSRDW2SlrulNWPl4ABg1tC/hlgLs=
github.com/blevesearch/zapx/v11 v11.1.3 h1:3pQN/ghHIZvF2sTZuAixWGfS2s/xFUh2HHg7XuGoAcA=
github.com/blevesearch/zapx/v11 v11.1.3/go.mod h1:F7a5ANvbZHt0D28ZCjo6ZT5E2HZdLXGnKd77TO6PfOs=
github.com/blevesearch/zapx/v12 v12.1.3 h1:9KYAqQURIIMVJ1+vHnq1snYvPPukX/LLJ+xpYj3bRG8=
github.com/blevesearch/zapx/v12 v12.1.3/go.mod h1:ykFX5dk1FfDh9qWRxtoRoFgh1VoT4syFSqKa31t3rGg=
github.com/blevesearch/zapx/v13 v13.1.3 h1:34nYfEz0v9BfDIoZPERX781fCEvQWR15agUdZRLpu3Y=
github.com/blevesearch/zapx/v13 v13.1.3/go.mod h1:coKC574yL3KQ0ciIBdVYtxufN+Ikh7z5HbPMO9yH18M=
github.com/blevesearch/zapx/v14 v14.1.3 h1:Rx4Bjni8JW1wfTw9VGw1NUbFyA5wl+x7OJBaaqp2ypE=
github.com/blevesearch/zapx/v14 v14.1.3/go.mod h1:4MIfa+zZsiL57ve1aYZZ6S+hihYjtApo6DCizjM/eoE=
github.com/blevesearch/zapx/v15 v15.1.3 h1:gUvMSGfnAlhdjtNszD8kaA7SNLoIB7Hb55uAuQJZEVQ=
github.com/blevesearch/zapx/v15 v15.1.3/go.mod h1:TRWvTUYjESzoCzjfDckPmjXHuTyTLt5uKBJEf4C/8G4=
github.com/blevesearch/zapx/v11 v11.1.4 h1:p4FhcBPDB/lhBctXGAB1r7AxMbg5uta5oRKoCuMtiRA=
github.com/blevesearch/zapx/v11 v11.1.4/go.mod h1:MTlzXennnLzspvhHYEbZupmr823icJiSj41042mcV6c=
github.com/blevesearch/zapx/v12 v12.1.4 h1:5zB6BmQYtIEEAXrY8iZh09rkpuTCTVOHXqwsF0/4Yxw=
github.com/blevesearch/zapx/v12 v12.1.4/go.mod h1:wM4SGTOyF8SUkv36C3NUy82oHAEF+nKhua31Z9IT2Bk=
github.com/blevesearch/zapx/v13 v13.1.4 h1:e7caDqGHzZEQwLoUYHzY4CXJSlEy2PsabaSup4W5GNI=
github.com/blevesearch/zapx/v13 v13.1.4/go.mod h1:tpvyPg3yhbColAlAcQQSDAi/4vKzQl46jOcUSA4RUag=
github.com/blevesearch/zapx/v14 v14.1.4 h1:jWnVstovnq81b38nP3Y0T0rTa+dWXjPSWJmnrtkZBxY=
github.com/blevesearch/zapx/v14 v14.1.4/go.mod h1:VCHYImrCnhiSC/4NOlM1W3k4G+hjD2VsDD3YzZbUImY=
github.com/blevesearch/zapx/v15 v15.1.4 h1:x2vZerNVhjilFxkG89+eIPX4hywVxwPPR8WDeX3DuXA=
github.com/blevesearch/zapx/v15 v15.1.4/go.mod h1:Q0iQzBh6k4FKqj7zqlIsTIrVBUkQD7HG2p+Qp6XjmKo=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
Expand Down
6 changes: 3 additions & 3 deletions index/scorch/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,14 @@ func (o *Builder) maybeFlushBatchLOCKED(moreThan int) error {
}

func (o *Builder) executeBatchLOCKED(batch *index.Batch) (err error) {
analysisResults := make([]*index.AnalysisResult, 0, len(batch.IndexOps))
analysisResults := make([]index.Document, 0, len(batch.IndexOps))
for _, doc := range batch.IndexOps {
if doc != nil {
// insert _id field
doc.AddIDField()
// perform analysis directly
analysisResult := analyze(doc)
analysisResults = append(analysisResults, analysisResult)
analyze(doc)
analysisResults = append(analysisResults, doc)
}
}

Expand Down
25 changes: 9 additions & 16 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
s.fireEvent(EventKindBatchIntroduction, time.Since(start))
}()

resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
resultChan := make(chan index.Document, len(batch.IndexOps))

var numUpdates uint64
var numDeletes uint64
Expand All @@ -333,18 +333,21 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {

if numUpdates > 0 {
go func() {
for _, doc := range batch.IndexOps {
for k := range batch.IndexOps {
doc := batch.IndexOps[k]
if doc != nil {
aw := index.NewAnalysisWork(s, doc, resultChan)
// put the work on the queue
s.analysisQueue.Queue(aw)
s.analysisQueue.Queue(func() {
analyze(doc)
resultChan <- doc
})
}
}
}()
}

// wait for analysis result
analysisResults := make([]*index.AnalysisResult, int(numUpdates))
analysisResults := make([]index.Document, int(numUpdates))
var itemsDeQueued uint64
var totalAnalysisSize int
for itemsDeQueued < numUpdates {
Expand Down Expand Up @@ -564,15 +567,7 @@ func (s *Scorch) StatsMap() map[string]interface{} {
return m
}

func (s *Scorch) Analyze(d index.Document) *index.AnalysisResult {
return analyze(d)
}

func analyze(d index.Document) *index.AnalysisResult {
rv := &index.AnalysisResult{
Document: d,
}

func analyze(d index.Document) {
d.VisitFields(func(field index.Field) {
if field.IsIndexed() {
field.Analyze()
Expand All @@ -585,8 +580,6 @@ func analyze(d index.Document) *index.AnalysisResult {
}
}
})

return rv
}

func (s *Scorch) Advanced() (store.KVStore, error) {
Expand Down
4 changes: 2 additions & 2 deletions index/scorch/segment_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ type SegmentPlugin interface {
// version must be incremented.
Version() uint32

// New takes a set of AnalysisResults and turns them into a new Segment
New(results []*index.AnalysisResult) (segment.Segment, uint64, error)
// New takes a set of Documents and turns them into a new Segment
New(results []index.Document) (segment.Segment, uint64, error)

// Open attempts to open the file at the specified path and
// return the corresponding Segment
Expand Down
23 changes: 19 additions & 4 deletions index/upsidedown/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,25 @@ import (
index "github.com/blevesearch/bleve_index_api"
)

func (udc *UpsideDownCouch) Analyze(d index.Document) *index.AnalysisResult {
rv := &index.AnalysisResult{
type IndexRow interface {
KeySize() int
KeyTo([]byte) (int, error)
Key() []byte

ValueSize() int
ValueTo([]byte) (int, error)
Value() []byte
}

type AnalysisResult struct {
DocID string
Rows []IndexRow
}

func (udc *UpsideDownCouch) analyze(d index.Document) *AnalysisResult {
rv := &AnalysisResult{
DocID: d.ID(),
Rows: make([]index.IndexRow, 0, 100),
Rows: make([]IndexRow, 0, 100),
}

docIDBytes := []byte(d.ID())
Expand Down Expand Up @@ -88,7 +103,7 @@ func (udc *UpsideDownCouch) Analyze(d index.Document) *index.AnalysisResult {
rowsCapNeeded += len(tokenFreqs)
}

rv.Rows = append(make([]index.IndexRow, 0, rowsCapNeeded), rv.Rows...)
rv.Rows = append(make([]IndexRow, 0, rowsCapNeeded), rv.Rows...)

backIndexTermsEntries := make([]*BackIndexTermsEntry, 0, len(fieldTermFreqs))

Expand Down
4 changes: 2 additions & 2 deletions index/upsidedown/analysis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestAnalysisBug328(t *testing.T) {
cf := document.NewCompositeFieldWithIndexingOptions("_all", true, []string{}, []string{}, document.IndexField|document.IncludeTermVectors)
d.AddField(cf)

rv := idx.Analyze(d)
rv := idx.(*UpsideDownCouch).analyze(d)
fieldIndexes := make(map[uint16]string)
for _, row := range rv.Rows {
if row, ok := row.(*FieldRow); ok {
Expand Down Expand Up @@ -84,7 +84,7 @@ func BenchmarkAnalyze(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
rv := idx.Analyze(d)
rv := idx.(*UpsideDownCouch).analyze(d)
if len(rv.Rows) < 92 || len(rv.Rows) > 93 {
b.Fatalf("expected 512-13 rows, got %d", len(rv.Rows))
}
Expand Down
28 changes: 16 additions & 12 deletions index/upsidedown/upsidedown.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,13 @@ func (udc *UpsideDownCouch) Close() error {
func (udc *UpsideDownCouch) Update(doc index.Document) (err error) {
// do analysis before acquiring write lock
analysisStart := time.Now()
resultChan := make(chan *index.AnalysisResult)
aw := index.NewAnalysisWork(udc, doc, resultChan)
resultChan := make(chan *AnalysisResult)

// put the work on the queue
udc.analysisQueue.Queue(aw)
udc.analysisQueue.Queue(func() {
ar := udc.analyze(doc)
resultChan <- ar
})

// wait for the result
result := <-resultChan
Expand Down Expand Up @@ -454,7 +456,7 @@ func (udc *UpsideDownCouch) Update(doc index.Document) (err error) {
}

func (udc *UpsideDownCouch) UpdateWithAnalysis(doc index.Document,
result *index.AnalysisResult, backIndexRow *BackIndexRow) (err error) {
result *AnalysisResult, backIndexRow *BackIndexRow) (err error) {
// start a writer for this update
indexStart := time.Now()
var kvwriter store.KVWriter
Expand Down Expand Up @@ -500,7 +502,7 @@ func (udc *UpsideDownCouch) UpdateWithAnalysis(doc index.Document,
return
}

func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []index.IndexRow) (addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) {
func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []IndexRow) (addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) {
addRows = make([]UpsideDownCouchRow, 0, len(rows))

if backIndexRow == nil {
Expand Down Expand Up @@ -586,7 +588,7 @@ func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []in
return addRows, updateRows, deleteRows
}

func (udc *UpsideDownCouch) storeField(docID []byte, field index.Field, fieldIndex uint16, rows []index.IndexRow, backIndexStoredEntries []*BackIndexStoreEntry) ([]index.IndexRow, []*BackIndexStoreEntry) {
func (udc *UpsideDownCouch) storeField(docID []byte, field index.Field, fieldIndex uint16, rows []IndexRow, backIndexStoredEntries []*BackIndexStoreEntry) ([]IndexRow, []*BackIndexStoreEntry) {
fieldType := field.EncodedFieldType()
storedRow := NewStoredRow(docID, fieldIndex, field.ArrayPositions(), fieldType, field.Value())

Expand All @@ -596,7 +598,7 @@ func (udc *UpsideDownCouch) storeField(docID []byte, field index.Field, fieldInd
return append(rows, storedRow), append(backIndexStoredEntries, &backIndexStoredEntry)
}

func (udc *UpsideDownCouch) indexField(docID []byte, includeTermVectors bool, fieldIndex uint16, fieldLength int, tokenFreqs index.TokenFrequencies, rows []index.IndexRow, backIndexTermsEntries []*BackIndexTermsEntry) ([]index.IndexRow, []*BackIndexTermsEntry) {
func (udc *UpsideDownCouch) indexField(docID []byte, includeTermVectors bool, fieldIndex uint16, fieldLength int, tokenFreqs index.TokenFrequencies, rows []IndexRow, backIndexTermsEntries []*BackIndexTermsEntry) ([]IndexRow, []*BackIndexTermsEntry) {
fieldNorm := float32(1.0 / math.Sqrt(float64(fieldLength)))

termFreqRows := make([]TermFrequencyRow, len(tokenFreqs))
Expand Down Expand Up @@ -731,7 +733,7 @@ func frequencyFromTokenFreq(tf *index.TokenFreq) int {
return tf.Frequency()
}

func (udc *UpsideDownCouch) termVectorsFromTokenFreq(field uint16, tf *index.TokenFreq, rows []index.IndexRow) ([]*TermVector, []index.IndexRow) {
func (udc *UpsideDownCouch) termVectorsFromTokenFreq(field uint16, tf *index.TokenFreq, rows []IndexRow) ([]*TermVector, []IndexRow) {
a := make([]TermVector, len(tf.Locations))
rv := make([]*TermVector, len(tf.Locations))

Expand Down Expand Up @@ -787,7 +789,7 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
}
analysisStart := time.Now()

resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
resultChan := make(chan *AnalysisResult, len(batch.IndexOps))

var numUpdates uint64
var numPlainTextBytes uint64
Expand All @@ -803,9 +805,11 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
for k := range batch.IndexOps {
doc := batch.IndexOps[k]
if doc != nil {
aw := index.NewAnalysisWork(udc, doc, resultChan)
// put the work on the queue
udc.analysisQueue.Queue(aw)
udc.analysisQueue.Queue(func() {
ar := udc.analyze(doc)
resultChan <- ar
})
}
}
}()
Expand Down Expand Up @@ -846,7 +850,7 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
}()

// wait for analysis result
newRowsMap := make(map[string][]index.IndexRow)
newRowsMap := make(map[string][]IndexRow)
var itemsDeQueued uint64
for itemsDeQueued < numUpdates {
result := <-resultChan
Expand Down

0 comments on commit c5e42bd

Please sign in to comment.