Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update to make all analysis take place in func() #1508

Merged
merged 3 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ go 1.13

require (
github.com/RoaringBitmap/roaring v0.4.23
github.com/blevesearch/bleve_index_api v0.0.2
github.com/blevesearch/bleve_index_api v0.0.3
github.com/blevesearch/blevex v0.0.0-20190916190636-152f0fe5c040
github.com/blevesearch/cld2 v0.0.0-20200327141045-8b5f551d37f5 // indirect
github.com/blevesearch/go-porterstemmer v1.0.3
github.com/blevesearch/scorch_segment_api v0.0.3
github.com/blevesearch/scorch_segment_api v0.0.5
github.com/blevesearch/segment v0.9.0
github.com/blevesearch/snowballstem v0.9.0
github.com/blevesearch/zapx/v11 v11.1.3
github.com/blevesearch/zapx/v12 v12.1.3
github.com/blevesearch/zapx/v13 v13.1.3
github.com/blevesearch/zapx/v14 v14.1.3
github.com/blevesearch/zapx/v15 v15.1.3
github.com/blevesearch/zapx/v11 v11.1.4
github.com/blevesearch/zapx/v12 v12.1.4
github.com/blevesearch/zapx/v13 v13.1.4
github.com/blevesearch/zapx/v14 v14.1.4
github.com/blevesearch/zapx/v15 v15.1.4
github.com/couchbase/moss v0.1.0
github.com/couchbase/vellum v1.0.2
github.com/golang/protobuf v1.3.2
Expand Down
28 changes: 14 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/RoaringBitmap/roaring v0.4.23 h1:gpyfd12QohbqhFO4NVDUdoPOCXsyahYRQhINmlHxKeo=
github.com/RoaringBitmap/roaring v0.4.23/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/blevesearch/bleve_index_api v0.0.2 h1:M6EPUf354pMQw0J9NOyJaeIKNAlwpbSU+ofP6wHVC0w=
github.com/blevesearch/bleve_index_api v0.0.2/go.mod h1:yq9YIbuvEQdUB0h+UiLbpDdLLnvcD5ZkqO0LS1wuSvY=
github.com/blevesearch/bleve_index_api v0.0.3 h1:DNaKT/0F9r2WQA0EQwVTJSE12lZv11J6od0IV5Zzbvg=
github.com/blevesearch/bleve_index_api v0.0.3/go.mod h1:yq9YIbuvEQdUB0h+UiLbpDdLLnvcD5ZkqO0LS1wuSvY=
github.com/blevesearch/blevex v0.0.0-20190916190636-152f0fe5c040 h1:SjYVcfJVZoCfBlg+fkaq2eoZHTf5HaJfaTeTkOtyfHQ=
github.com/blevesearch/blevex v0.0.0-20190916190636-152f0fe5c040/go.mod h1:WH+MU2F4T0VmSdaPX+Wu5GYoZBrYWdOZWSjzvYcDmqQ=
github.com/blevesearch/cld2 v0.0.0-20200327141045-8b5f551d37f5 h1:/4ikScMMYMqsRFWJjCyzd3CNWB0lxvqDkqa5nEv6NMc=
Expand All @@ -12,22 +12,22 @@ github.com/blevesearch/go-porterstemmer v1.0.3 h1:GtmsqID0aZdCSNiY8SkuPJ12pD4jI+
github.com/blevesearch/go-porterstemmer v1.0.3/go.mod h1:angGc5Ht+k2xhJdZi511LtmxuEf0OVpvUUNrwmM1P7M=
github.com/blevesearch/mmap-go v1.0.2 h1:JtMHb+FgQCTTYIhtMvimw15dJwu1Y5lrZDMOFXVWPk0=
github.com/blevesearch/mmap-go v1.0.2/go.mod h1:ol2qBqYaOUsGdm7aRMRrYGgPvnwLe6Y+7LMvAB5IbSA=
github.com/blevesearch/scorch_segment_api v0.0.3 h1:DJ7aqcIr1JUT77B42uTGtCRw0vz1HxlvUfyv7CefR9I=
github.com/blevesearch/scorch_segment_api v0.0.3/go.mod h1:WpV76h6YHekU8CzseKfzJRoZfcVu+yWlwcEq+7N+Kp0=
github.com/blevesearch/scorch_segment_api v0.0.5 h1:LBndHXXi79/8Omh3h0tFBPs1hSf3MgjB7azkt4vAxWg=
github.com/blevesearch/scorch_segment_api v0.0.5/go.mod h1:Ilxtn7p82x+1djbDr+XLXATdYuvVYDlECSip1l/3Roo=
github.com/blevesearch/segment v0.9.0 h1:5lG7yBCx98or7gK2cHMKPukPZ/31Kag7nONpoBt22Ac=
github.com/blevesearch/segment v0.9.0/go.mod h1:9PfHYUdQCgHktBgvtUOF4x+pc4/l8rdH0u5spnW85UQ=
github.com/blevesearch/snowballstem v0.9.0 h1:lMQ189YspGP6sXvZQ4WZ+MLawfV8wOmPoD/iWeNXm8s=
github.com/blevesearch/snowballstem v0.9.0/go.mod h1:PivSj3JMc8WuaFkTSRDW2SlrulNWPl4ABg1tC/hlgLs=
github.com/blevesearch/zapx/v11 v11.1.3 h1:3pQN/ghHIZvF2sTZuAixWGfS2s/xFUh2HHg7XuGoAcA=
github.com/blevesearch/zapx/v11 v11.1.3/go.mod h1:F7a5ANvbZHt0D28ZCjo6ZT5E2HZdLXGnKd77TO6PfOs=
github.com/blevesearch/zapx/v12 v12.1.3 h1:9KYAqQURIIMVJ1+vHnq1snYvPPukX/LLJ+xpYj3bRG8=
github.com/blevesearch/zapx/v12 v12.1.3/go.mod h1:ykFX5dk1FfDh9qWRxtoRoFgh1VoT4syFSqKa31t3rGg=
github.com/blevesearch/zapx/v13 v13.1.3 h1:34nYfEz0v9BfDIoZPERX781fCEvQWR15agUdZRLpu3Y=
github.com/blevesearch/zapx/v13 v13.1.3/go.mod h1:coKC574yL3KQ0ciIBdVYtxufN+Ikh7z5HbPMO9yH18M=
github.com/blevesearch/zapx/v14 v14.1.3 h1:Rx4Bjni8JW1wfTw9VGw1NUbFyA5wl+x7OJBaaqp2ypE=
github.com/blevesearch/zapx/v14 v14.1.3/go.mod h1:4MIfa+zZsiL57ve1aYZZ6S+hihYjtApo6DCizjM/eoE=
github.com/blevesearch/zapx/v15 v15.1.3 h1:gUvMSGfnAlhdjtNszD8kaA7SNLoIB7Hb55uAuQJZEVQ=
github.com/blevesearch/zapx/v15 v15.1.3/go.mod h1:TRWvTUYjESzoCzjfDckPmjXHuTyTLt5uKBJEf4C/8G4=
github.com/blevesearch/zapx/v11 v11.1.4 h1:p4FhcBPDB/lhBctXGAB1r7AxMbg5uta5oRKoCuMtiRA=
github.com/blevesearch/zapx/v11 v11.1.4/go.mod h1:MTlzXennnLzspvhHYEbZupmr823icJiSj41042mcV6c=
github.com/blevesearch/zapx/v12 v12.1.4 h1:5zB6BmQYtIEEAXrY8iZh09rkpuTCTVOHXqwsF0/4Yxw=
github.com/blevesearch/zapx/v12 v12.1.4/go.mod h1:wM4SGTOyF8SUkv36C3NUy82oHAEF+nKhua31Z9IT2Bk=
github.com/blevesearch/zapx/v13 v13.1.4 h1:e7caDqGHzZEQwLoUYHzY4CXJSlEy2PsabaSup4W5GNI=
github.com/blevesearch/zapx/v13 v13.1.4/go.mod h1:tpvyPg3yhbColAlAcQQSDAi/4vKzQl46jOcUSA4RUag=
github.com/blevesearch/zapx/v14 v14.1.4 h1:jWnVstovnq81b38nP3Y0T0rTa+dWXjPSWJmnrtkZBxY=
github.com/blevesearch/zapx/v14 v14.1.4/go.mod h1:VCHYImrCnhiSC/4NOlM1W3k4G+hjD2VsDD3YzZbUImY=
github.com/blevesearch/zapx/v15 v15.1.4 h1:x2vZerNVhjilFxkG89+eIPX4hywVxwPPR8WDeX3DuXA=
github.com/blevesearch/zapx/v15 v15.1.4/go.mod h1:Q0iQzBh6k4FKqj7zqlIsTIrVBUkQD7HG2p+Qp6XjmKo=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
Expand Down
6 changes: 3 additions & 3 deletions index/scorch/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,14 @@ func (o *Builder) maybeFlushBatchLOCKED(moreThan int) error {
}

func (o *Builder) executeBatchLOCKED(batch *index.Batch) (err error) {
analysisResults := make([]*index.AnalysisResult, 0, len(batch.IndexOps))
analysisResults := make([]index.Document, 0, len(batch.IndexOps))
for _, doc := range batch.IndexOps {
if doc != nil {
// insert _id field
doc.AddIDField()
// perform analysis directly
analysisResult := analyze(doc)
analysisResults = append(analysisResults, analysisResult)
analyze(doc)
analysisResults = append(analysisResults, doc)
}
}

Expand Down
25 changes: 9 additions & 16 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
s.fireEvent(EventKindBatchIntroduction, time.Since(start))
}()

resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
resultChan := make(chan index.Document, len(batch.IndexOps))

var numUpdates uint64
var numDeletes uint64
Expand All @@ -333,18 +333,21 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {

if numUpdates > 0 {
go func() {
for _, doc := range batch.IndexOps {
for k := range batch.IndexOps {
doc := batch.IndexOps[k]
if doc != nil {
aw := index.NewAnalysisWork(s, doc, resultChan)
// put the work on the queue
s.analysisQueue.Queue(aw)
s.analysisQueue.Queue(func() {
analyze(doc)
resultChan <- doc
})
}
}
}()
}

// wait for analysis result
analysisResults := make([]*index.AnalysisResult, int(numUpdates))
analysisResults := make([]index.Document, int(numUpdates))
var itemsDeQueued uint64
var totalAnalysisSize int
for itemsDeQueued < numUpdates {
Expand Down Expand Up @@ -564,15 +567,7 @@ func (s *Scorch) StatsMap() map[string]interface{} {
return m
}

func (s *Scorch) Analyze(d index.Document) *index.AnalysisResult {
return analyze(d)
}

func analyze(d index.Document) *index.AnalysisResult {
rv := &index.AnalysisResult{
Document: d,
}

func analyze(d index.Document) {
d.VisitFields(func(field index.Field) {
if field.IsIndexed() {
field.Analyze()
Expand All @@ -585,8 +580,6 @@ func analyze(d index.Document) *index.AnalysisResult {
}
}
})

return rv
}

func (s *Scorch) Advanced() (store.KVStore, error) {
Expand Down
4 changes: 2 additions & 2 deletions index/scorch/segment_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ type Plugin interface {
// version must be incremented.
Version() uint32

// New takes a set of AnalysisResults and turns them into a new Segment
New(results []*index.AnalysisResult) (segment.Segment, uint64, error)
// New takes a set of Documents and turns them into a new Segment
New(results []index.Document) (segment.Segment, uint64, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps the above comment needs updates too?


// Open attempts to open the file at the specified path and
// return the corresponding Segment
Expand Down
23 changes: 19 additions & 4 deletions index/upsidedown/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,25 @@ import (
index "github.com/blevesearch/bleve_index_api"
)

func (udc *UpsideDownCouch) Analyze(d index.Document) *index.AnalysisResult {
rv := &index.AnalysisResult{
type IndexRow interface {
KeySize() int
KeyTo([]byte) (int, error)
Key() []byte

ValueSize() int
ValueTo([]byte) (int, error)
Value() []byte
}

type AnalysisResult struct {
DocID string
Rows []IndexRow
}

func (udc *UpsideDownCouch) analyze(d index.Document) *AnalysisResult {
rv := &AnalysisResult{
DocID: d.ID(),
Rows: make([]index.IndexRow, 0, 100),
Rows: make([]IndexRow, 0, 100),
}

docIDBytes := []byte(d.ID())
Expand Down Expand Up @@ -88,7 +103,7 @@ func (udc *UpsideDownCouch) Analyze(d index.Document) *index.AnalysisResult {
rowsCapNeeded += len(tokenFreqs)
}

rv.Rows = append(make([]index.IndexRow, 0, rowsCapNeeded), rv.Rows...)
rv.Rows = append(make([]IndexRow, 0, rowsCapNeeded), rv.Rows...)

backIndexTermsEntries := make([]*BackIndexTermsEntry, 0, len(fieldTermFreqs))

Expand Down
4 changes: 2 additions & 2 deletions index/upsidedown/analysis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestAnalysisBug328(t *testing.T) {
cf := document.NewCompositeFieldWithIndexingOptions("_all", true, []string{}, []string{}, document.IndexField|document.IncludeTermVectors)
d.AddField(cf)

rv := idx.Analyze(d)
rv := idx.(*UpsideDownCouch).analyze(d)
fieldIndexes := make(map[uint16]string)
for _, row := range rv.Rows {
if row, ok := row.(*FieldRow); ok {
Expand Down Expand Up @@ -84,7 +84,7 @@ func BenchmarkAnalyze(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
rv := idx.Analyze(d)
rv := idx.(*UpsideDownCouch).analyze(d)
if len(rv.Rows) < 92 || len(rv.Rows) > 93 {
b.Fatalf("expected 512-13 rows, got %d", len(rv.Rows))
}
Expand Down
28 changes: 16 additions & 12 deletions index/upsidedown/upsidedown.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,13 @@ func (udc *UpsideDownCouch) Close() error {
func (udc *UpsideDownCouch) Update(doc index.Document) (err error) {
// do analysis before acquiring write lock
analysisStart := time.Now()
resultChan := make(chan *index.AnalysisResult)
aw := index.NewAnalysisWork(udc, doc, resultChan)
resultChan := make(chan *AnalysisResult)

// put the work on the queue
udc.analysisQueue.Queue(aw)
udc.analysisQueue.Queue(func() {
ar := udc.analyze(doc)
resultChan <- ar
})

// wait for the result
result := <-resultChan
Expand Down Expand Up @@ -454,7 +456,7 @@ func (udc *UpsideDownCouch) Update(doc index.Document) (err error) {
}

func (udc *UpsideDownCouch) UpdateWithAnalysis(doc index.Document,
result *index.AnalysisResult, backIndexRow *BackIndexRow) (err error) {
result *AnalysisResult, backIndexRow *BackIndexRow) (err error) {
// start a writer for this update
indexStart := time.Now()
var kvwriter store.KVWriter
Expand Down Expand Up @@ -500,7 +502,7 @@ func (udc *UpsideDownCouch) UpdateWithAnalysis(doc index.Document,
return
}

func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []index.IndexRow) (addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) {
func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []IndexRow) (addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) {
addRows = make([]UpsideDownCouchRow, 0, len(rows))

if backIndexRow == nil {
Expand Down Expand Up @@ -586,7 +588,7 @@ func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []in
return addRows, updateRows, deleteRows
}

func (udc *UpsideDownCouch) storeField(docID []byte, field index.Field, fieldIndex uint16, rows []index.IndexRow, backIndexStoredEntries []*BackIndexStoreEntry) ([]index.IndexRow, []*BackIndexStoreEntry) {
func (udc *UpsideDownCouch) storeField(docID []byte, field index.Field, fieldIndex uint16, rows []IndexRow, backIndexStoredEntries []*BackIndexStoreEntry) ([]IndexRow, []*BackIndexStoreEntry) {
fieldType := field.EncodedFieldType()
storedRow := NewStoredRow(docID, fieldIndex, field.ArrayPositions(), fieldType, field.Value())

Expand All @@ -596,7 +598,7 @@ func (udc *UpsideDownCouch) storeField(docID []byte, field index.Field, fieldInd
return append(rows, storedRow), append(backIndexStoredEntries, &backIndexStoredEntry)
}

func (udc *UpsideDownCouch) indexField(docID []byte, includeTermVectors bool, fieldIndex uint16, fieldLength int, tokenFreqs index.TokenFrequencies, rows []index.IndexRow, backIndexTermsEntries []*BackIndexTermsEntry) ([]index.IndexRow, []*BackIndexTermsEntry) {
func (udc *UpsideDownCouch) indexField(docID []byte, includeTermVectors bool, fieldIndex uint16, fieldLength int, tokenFreqs index.TokenFrequencies, rows []IndexRow, backIndexTermsEntries []*BackIndexTermsEntry) ([]IndexRow, []*BackIndexTermsEntry) {
fieldNorm := float32(1.0 / math.Sqrt(float64(fieldLength)))

termFreqRows := make([]TermFrequencyRow, len(tokenFreqs))
Expand Down Expand Up @@ -731,7 +733,7 @@ func frequencyFromTokenFreq(tf *index.TokenFreq) int {
return tf.Frequency()
}

func (udc *UpsideDownCouch) termVectorsFromTokenFreq(field uint16, tf *index.TokenFreq, rows []index.IndexRow) ([]*TermVector, []index.IndexRow) {
func (udc *UpsideDownCouch) termVectorsFromTokenFreq(field uint16, tf *index.TokenFreq, rows []IndexRow) ([]*TermVector, []IndexRow) {
a := make([]TermVector, len(tf.Locations))
rv := make([]*TermVector, len(tf.Locations))

Expand Down Expand Up @@ -787,7 +789,7 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
}
analysisStart := time.Now()

resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
resultChan := make(chan *AnalysisResult, len(batch.IndexOps))

var numUpdates uint64
var numPlainTextBytes uint64
Expand All @@ -803,9 +805,11 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
for k := range batch.IndexOps {
doc := batch.IndexOps[k]
if doc != nil {
aw := index.NewAnalysisWork(udc, doc, resultChan)
// put the work on the queue
udc.analysisQueue.Queue(aw)
udc.analysisQueue.Queue(func() {
ar := udc.analyze(doc)
resultChan <- ar
})
}
}
}()
Expand Down Expand Up @@ -846,7 +850,7 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) {
}()

// wait for analysis result
newRowsMap := make(map[string][]index.IndexRow)
newRowsMap := make(map[string][]IndexRow)
var itemsDeQueued uint64
for itemsDeQueued < numUpdates {
result := <-resultChan
Expand Down