diff --git a/go.mod b/go.mod index 1fc0c73ab..8572d7688 100644 --- a/go.mod +++ b/go.mod @@ -4,18 +4,18 @@ go 1.13 require ( github.com/RoaringBitmap/roaring v0.4.23 - github.com/blevesearch/bleve_index_api v0.0.2 + github.com/blevesearch/bleve_index_api v0.0.3 github.com/blevesearch/blevex v0.0.0-20190916190636-152f0fe5c040 github.com/blevesearch/cld2 v0.0.0-20200327141045-8b5f551d37f5 // indirect github.com/blevesearch/go-porterstemmer v1.0.3 - github.com/blevesearch/scorch_segment_api v0.0.3 + github.com/blevesearch/scorch_segment_api v0.0.5 github.com/blevesearch/segment v0.9.0 github.com/blevesearch/snowballstem v0.9.0 - github.com/blevesearch/zapx/v11 v11.1.3 - github.com/blevesearch/zapx/v12 v12.1.3 - github.com/blevesearch/zapx/v13 v13.1.3 - github.com/blevesearch/zapx/v14 v14.1.3 - github.com/blevesearch/zapx/v15 v15.1.3 + github.com/blevesearch/zapx/v11 v11.1.4 + github.com/blevesearch/zapx/v12 v12.1.4 + github.com/blevesearch/zapx/v13 v13.1.4 + github.com/blevesearch/zapx/v14 v14.1.4 + github.com/blevesearch/zapx/v15 v15.1.4 github.com/couchbase/moss v0.1.0 github.com/couchbase/vellum v1.0.2 github.com/golang/protobuf v1.3.2 diff --git a/go.sum b/go.sum index c0d45e668..cef95b67e 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/RoaringBitmap/roaring v0.4.23 h1:gpyfd12QohbqhFO4NVDUdoPOCXsyahYRQhINmlHxKeo= github.com/RoaringBitmap/roaring v0.4.23/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= -github.com/blevesearch/bleve_index_api v0.0.2 h1:M6EPUf354pMQw0J9NOyJaeIKNAlwpbSU+ofP6wHVC0w= -github.com/blevesearch/bleve_index_api v0.0.2/go.mod h1:yq9YIbuvEQdUB0h+UiLbpDdLLnvcD5ZkqO0LS1wuSvY= +github.com/blevesearch/bleve_index_api v0.0.3 h1:DNaKT/0F9r2WQA0EQwVTJSE12lZv11J6od0IV5Zzbvg= +github.com/blevesearch/bleve_index_api v0.0.3/go.mod h1:yq9YIbuvEQdUB0h+UiLbpDdLLnvcD5ZkqO0LS1wuSvY= github.com/blevesearch/blevex v0.0.0-20190916190636-152f0fe5c040 h1:SjYVcfJVZoCfBlg+fkaq2eoZHTf5HaJfaTeTkOtyfHQ= github.com/blevesearch/blevex v0.0.0-20190916190636-152f0fe5c040/go.mod h1:WH+MU2F4T0VmSdaPX+Wu5GYoZBrYWdOZWSjzvYcDmqQ= github.com/blevesearch/cld2 v0.0.0-20200327141045-8b5f551d37f5 h1:/4ikScMMYMqsRFWJjCyzd3CNWB0lxvqDkqa5nEv6NMc= @@ -12,22 +12,22 @@ github.com/blevesearch/go-porterstemmer v1.0.3 h1:GtmsqID0aZdCSNiY8SkuPJ12pD4jI+ github.com/blevesearch/go-porterstemmer v1.0.3/go.mod h1:angGc5Ht+k2xhJdZi511LtmxuEf0OVpvUUNrwmM1P7M= github.com/blevesearch/mmap-go v1.0.2 h1:JtMHb+FgQCTTYIhtMvimw15dJwu1Y5lrZDMOFXVWPk0= github.com/blevesearch/mmap-go v1.0.2/go.mod h1:ol2qBqYaOUsGdm7aRMRrYGgPvnwLe6Y+7LMvAB5IbSA= -github.com/blevesearch/scorch_segment_api v0.0.3 h1:DJ7aqcIr1JUT77B42uTGtCRw0vz1HxlvUfyv7CefR9I= -github.com/blevesearch/scorch_segment_api v0.0.3/go.mod h1:WpV76h6YHekU8CzseKfzJRoZfcVu+yWlwcEq+7N+Kp0= +github.com/blevesearch/scorch_segment_api v0.0.5 h1:LBndHXXi79/8Omh3h0tFBPs1hSf3MgjB7azkt4vAxWg= +github.com/blevesearch/scorch_segment_api v0.0.5/go.mod h1:Ilxtn7p82x+1djbDr+XLXATdYuvVYDlECSip1l/3Roo= github.com/blevesearch/segment v0.9.0 h1:5lG7yBCx98or7gK2cHMKPukPZ/31Kag7nONpoBt22Ac= github.com/blevesearch/segment v0.9.0/go.mod h1:9PfHYUdQCgHktBgvtUOF4x+pc4/l8rdH0u5spnW85UQ= github.com/blevesearch/snowballstem v0.9.0 h1:lMQ189YspGP6sXvZQ4WZ+MLawfV8wOmPoD/iWeNXm8s= github.com/blevesearch/snowballstem v0.9.0/go.mod h1:PivSj3JMc8WuaFkTSRDW2SlrulNWPl4ABg1tC/hlgLs= -github.com/blevesearch/zapx/v11 v11.1.3 h1:3pQN/ghHIZvF2sTZuAixWGfS2s/xFUh2HHg7XuGoAcA= -github.com/blevesearch/zapx/v11 v11.1.3/go.mod h1:F7a5ANvbZHt0D28ZCjo6ZT5E2HZdLXGnKd77TO6PfOs= -github.com/blevesearch/zapx/v12 v12.1.3 h1:9KYAqQURIIMVJ1+vHnq1snYvPPukX/LLJ+xpYj3bRG8= -github.com/blevesearch/zapx/v12 v12.1.3/go.mod h1:ykFX5dk1FfDh9qWRxtoRoFgh1VoT4syFSqKa31t3rGg= -github.com/blevesearch/zapx/v13 v13.1.3 h1:34nYfEz0v9BfDIoZPERX781fCEvQWR15agUdZRLpu3Y= -github.com/blevesearch/zapx/v13 v13.1.3/go.mod h1:coKC574yL3KQ0ciIBdVYtxufN+Ikh7z5HbPMO9yH18M= -github.com/blevesearch/zapx/v14 v14.1.3 h1:Rx4Bjni8JW1wfTw9VGw1NUbFyA5wl+x7OJBaaqp2ypE= -github.com/blevesearch/zapx/v14 v14.1.3/go.mod h1:4MIfa+zZsiL57ve1aYZZ6S+hihYjtApo6DCizjM/eoE= -github.com/blevesearch/zapx/v15 v15.1.3 h1:gUvMSGfnAlhdjtNszD8kaA7SNLoIB7Hb55uAuQJZEVQ= -github.com/blevesearch/zapx/v15 v15.1.3/go.mod h1:TRWvTUYjESzoCzjfDckPmjXHuTyTLt5uKBJEf4C/8G4= +github.com/blevesearch/zapx/v11 v11.1.4 h1:p4FhcBPDB/lhBctXGAB1r7AxMbg5uta5oRKoCuMtiRA= +github.com/blevesearch/zapx/v11 v11.1.4/go.mod h1:MTlzXennnLzspvhHYEbZupmr823icJiSj41042mcV6c= +github.com/blevesearch/zapx/v12 v12.1.4 h1:5zB6BmQYtIEEAXrY8iZh09rkpuTCTVOHXqwsF0/4Yxw= +github.com/blevesearch/zapx/v12 v12.1.4/go.mod h1:wM4SGTOyF8SUkv36C3NUy82oHAEF+nKhua31Z9IT2Bk= +github.com/blevesearch/zapx/v13 v13.1.4 h1:e7caDqGHzZEQwLoUYHzY4CXJSlEy2PsabaSup4W5GNI= +github.com/blevesearch/zapx/v13 v13.1.4/go.mod h1:tpvyPg3yhbColAlAcQQSDAi/4vKzQl46jOcUSA4RUag= +github.com/blevesearch/zapx/v14 v14.1.4 h1:jWnVstovnq81b38nP3Y0T0rTa+dWXjPSWJmnrtkZBxY= +github.com/blevesearch/zapx/v14 v14.1.4/go.mod h1:VCHYImrCnhiSC/4NOlM1W3k4G+hjD2VsDD3YzZbUImY= +github.com/blevesearch/zapx/v15 v15.1.4 h1:x2vZerNVhjilFxkG89+eIPX4hywVxwPPR8WDeX3DuXA= +github.com/blevesearch/zapx/v15 v15.1.4/go.mod h1:Q0iQzBh6k4FKqj7zqlIsTIrVBUkQD7HG2p+Qp6XjmKo= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= diff --git a/index/scorch/builder.go b/index/scorch/builder.go index fbe05d4f7..455c53578 100644 --- a/index/scorch/builder.go +++ b/index/scorch/builder.go @@ -39,7 +39,7 @@ type Builder struct { mergeMax int batch *index.Batch internal map[string][]byte - segPlugin segment.Plugin + segPlugin SegmentPlugin } func NewBuilder(config map[string]interface{}) (*Builder, error) { @@ -134,14 +134,14 @@ func (o *Builder) maybeFlushBatchLOCKED(moreThan int) error { } func (o *Builder) executeBatchLOCKED(batch *index.Batch) (err error) { - analysisResults := make([]*index.AnalysisResult, 0, len(batch.IndexOps)) + analysisResults := make([]index.Document, 0, len(batch.IndexOps)) for _, doc := range batch.IndexOps { if doc != nil { // insert _id field doc.AddIDField() // perform analysis directly - analysisResult := analyze(doc) - analysisResults = append(analysisResults, analysisResult) + analyze(doc) + analysisResults = append(analysisResults, doc) } } diff --git a/index/scorch/persister.go b/index/scorch/persister.go index d50f46e8d..d58a4b8e7 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -429,7 +429,7 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) ( } func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, - segPlugin segment.Plugin) ([]string, map[uint64]string, error) { + segPlugin SegmentPlugin) ([]string, map[uint64]string, error) { snapshotsBucket, err := tx.CreateBucketIfNotExists(boltSnapshotsBucket) if err != nil { return nil, nil, err diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 909fb43e8..68988e7d1 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -73,7 +73,7 @@ type Scorch struct { forceMergeRequestCh chan *mergerCtrl - segPlugin segment.Plugin + segPlugin SegmentPlugin } type internalStats struct { @@ -311,7 +311,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) { s.fireEvent(EventKindBatchIntroduction, time.Since(start)) }() - resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps)) + resultChan := make(chan index.Document, len(batch.IndexOps)) var numUpdates uint64 var numDeletes uint64 @@ -333,18 +333,21 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) { if numUpdates > 0 { go func() { - for _, doc := range batch.IndexOps { + for k := range batch.IndexOps { + doc := batch.IndexOps[k] if doc != nil { - aw := index.NewAnalysisWork(s, doc, resultChan) // put the work on the queue - s.analysisQueue.Queue(aw) + s.analysisQueue.Queue(func() { + analyze(doc) + resultChan <- doc + }) } } }() } // wait for analysis result - analysisResults := make([]*index.AnalysisResult, int(numUpdates)) + analysisResults := make([]index.Document, int(numUpdates)) var itemsDeQueued uint64 var totalAnalysisSize int for itemsDeQueued < numUpdates { @@ -564,15 +567,7 @@ func (s *Scorch) StatsMap() map[string]interface{} { return m } -func (s *Scorch) Analyze(d index.Document) *index.AnalysisResult { - return analyze(d) -} - -func analyze(d index.Document) *index.AnalysisResult { - rv := &index.AnalysisResult{ - Document: d, - } - +func analyze(d index.Document) { d.VisitFields(func(field index.Field) { if field.IsIndexed() { field.Analyze() @@ -585,8 +580,6 @@ func analyze(d index.Document) *index.AnalysisResult { } } }) - - return rv } func (s *Scorch) Advanced() (store.KVStore, error) { diff --git a/index/scorch/segment_plugin.go b/index/scorch/segment_plugin.go index c87e35a62..94b4cee40 100644 --- a/index/scorch/segment_plugin.go +++ b/index/scorch/segment_plugin.go @@ -16,6 +16,8 @@ package scorch import ( "fmt" + "github.com/RoaringBitmap/roaring" + index "github.com/blevesearch/bleve_index_api" segment "github.com/blevesearch/scorch_segment_api" @@ -26,25 +28,63 @@ import ( zapv15 "github.com/blevesearch/zapx/v15" ) -var supportedSegmentPlugins map[string]map[uint32]segment.Plugin -var defaultSegmentPlugin segment.Plugin +// SegmentPlugin represents the essential functions required by a package to plug in +// it's segment implementation +type SegmentPlugin interface { + + // Type is the name for this segment plugin + Type() string + + // Version is a numeric value identifying a specific version of this type. + // When incompatible changes are made to a particular type of plugin, the + // version must be incremented. + Version() uint32 + + // New takes a set of Documents and turns them into a new Segment + New(results []index.Document) (segment.Segment, uint64, error) + + // Open attempts to open the file at the specified path and + // return the corresponding Segment + Open(path string) (segment.Segment, error) + + // Merge takes a set of Segments, and creates a new segment on disk at + // the specified path. + // Drops is a set of bitmaps (one for each segment) indicating which + // documents can be dropped from the segments during the merge. + // If the closeCh channel is closed, Merge will cease doing work at + // the next opportunity, and return an error (closed). + // StatsReporter can optionally be provided, in which case progress + // made during the merge is reported while operation continues. + // Returns: + // A slice of new document numbers (one for each input segment), + // this allows the caller to know a particular document's new + // document number in the newly merged segment. + // The number of bytes written to the new segment file. + // An error, if any occurred. + Merge(segments []segment.Segment, drops []*roaring.Bitmap, path string, + closeCh chan struct{}, s segment.StatsReporter) ( + [][]uint64, uint64, error) +} + +var supportedSegmentPlugins map[string]map[uint32]SegmentPlugin +var defaultSegmentPlugin SegmentPlugin func init() { - ResetPlugins() - RegisterPlugin(zapv15.Plugin(), false) - RegisterPlugin(zapv14.Plugin(), false) - RegisterPlugin(zapv13.Plugin(), false) - RegisterPlugin(zapv12.Plugin(), false) - RegisterPlugin(zapv11.Plugin(), true) + ResetSegmentPlugins() + RegisterSegmentPlugin(&zapv15.ZapPlugin{}, false) + RegisterSegmentPlugin(&zapv14.ZapPlugin{}, false) + RegisterSegmentPlugin(&zapv13.ZapPlugin{}, false) + RegisterSegmentPlugin(&zapv12.ZapPlugin{}, false) + RegisterSegmentPlugin(&zapv11.ZapPlugin{}, true) } -func ResetPlugins() { - supportedSegmentPlugins = map[string]map[uint32]segment.Plugin{} +func ResetSegmentPlugins() { + supportedSegmentPlugins = map[string]map[uint32]SegmentPlugin{} } -func RegisterPlugin(plugin segment.Plugin, makeDefault bool) { +func RegisterSegmentPlugin(plugin SegmentPlugin, makeDefault bool) { if _, ok := supportedSegmentPlugins[plugin.Type()]; !ok { - supportedSegmentPlugins[plugin.Type()] = map[uint32]segment.Plugin{} + supportedSegmentPlugins[plugin.Type()] = map[uint32]SegmentPlugin{} } supportedSegmentPlugins[plugin.Type()][plugin.Version()] = plugin if makeDefault { @@ -67,7 +107,7 @@ func SupportedSegmentTypeVersions(typ string) (rv []uint32) { } func chooseSegmentPlugin(forcedSegmentType string, - forcedSegmentVersion uint32) (segment.Plugin, error) { + forcedSegmentVersion uint32) (SegmentPlugin, error) { if versions, ok := supportedSegmentPlugins[forcedSegmentType]; ok { if segPlugin, ok := versions[uint32(forcedSegmentVersion)]; ok { return segPlugin, nil diff --git a/index/upsidedown/analysis.go b/index/upsidedown/analysis.go index bec5dd930..2b6727e62 100644 --- a/index/upsidedown/analysis.go +++ b/index/upsidedown/analysis.go @@ -18,10 +18,25 @@ import ( index "github.com/blevesearch/bleve_index_api" ) -func (udc *UpsideDownCouch) Analyze(d index.Document) *index.AnalysisResult { - rv := &index.AnalysisResult{ +type IndexRow interface { + KeySize() int + KeyTo([]byte) (int, error) + Key() []byte + + ValueSize() int + ValueTo([]byte) (int, error) + Value() []byte +} + +type AnalysisResult struct { + DocID string + Rows []IndexRow +} + +func (udc *UpsideDownCouch) analyze(d index.Document) *AnalysisResult { + rv := &AnalysisResult{ DocID: d.ID(), - Rows: make([]index.IndexRow, 0, 100), + Rows: make([]IndexRow, 0, 100), } docIDBytes := []byte(d.ID()) @@ -88,7 +103,7 @@ func (udc *UpsideDownCouch) Analyze(d index.Document) *index.AnalysisResult { rowsCapNeeded += len(tokenFreqs) } - rv.Rows = append(make([]index.IndexRow, 0, rowsCapNeeded), rv.Rows...) + rv.Rows = append(make([]IndexRow, 0, rowsCapNeeded), rv.Rows...) backIndexTermsEntries := make([]*BackIndexTermsEntry, 0, len(fieldTermFreqs)) diff --git a/index/upsidedown/analysis_test.go b/index/upsidedown/analysis_test.go index 26b96a9c8..f90ecd95d 100644 --- a/index/upsidedown/analysis_test.go +++ b/index/upsidedown/analysis_test.go @@ -45,7 +45,7 @@ func TestAnalysisBug328(t *testing.T) { cf := document.NewCompositeFieldWithIndexingOptions("_all", true, []string{}, []string{}, document.IndexField|document.IncludeTermVectors) d.AddField(cf) - rv := idx.Analyze(d) + rv := idx.(*UpsideDownCouch).analyze(d) fieldIndexes := make(map[uint16]string) for _, row := range rv.Rows { if row, ok := row.(*FieldRow); ok { @@ -84,7 +84,7 @@ func BenchmarkAnalyze(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - rv := idx.Analyze(d) + rv := idx.(*UpsideDownCouch).analyze(d) if len(rv.Rows) < 92 || len(rv.Rows) > 93 { b.Fatalf("expected 512-13 rows, got %d", len(rv.Rows)) } diff --git a/index/upsidedown/upsidedown.go b/index/upsidedown/upsidedown.go index 6d107b263..499203bf6 100644 --- a/index/upsidedown/upsidedown.go +++ b/index/upsidedown/upsidedown.go @@ -414,11 +414,13 @@ func (udc *UpsideDownCouch) Close() error { func (udc *UpsideDownCouch) Update(doc index.Document) (err error) { // do analysis before acquiring write lock analysisStart := time.Now() - resultChan := make(chan *index.AnalysisResult) - aw := index.NewAnalysisWork(udc, doc, resultChan) + resultChan := make(chan *AnalysisResult) // put the work on the queue - udc.analysisQueue.Queue(aw) + udc.analysisQueue.Queue(func() { + ar := udc.analyze(doc) + resultChan <- ar + }) // wait for the result result := <-resultChan @@ -454,7 +456,7 @@ func (udc *UpsideDownCouch) Update(doc index.Document) (err error) { } func (udc *UpsideDownCouch) UpdateWithAnalysis(doc index.Document, - result *index.AnalysisResult, backIndexRow *BackIndexRow) (err error) { + result *AnalysisResult, backIndexRow *BackIndexRow) (err error) { // start a writer for this update indexStart := time.Now() var kvwriter store.KVWriter @@ -500,7 +502,7 @@ func (udc *UpsideDownCouch) UpdateWithAnalysis(doc index.Document, return } -func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []index.IndexRow) (addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) { +func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []IndexRow) (addRows []UpsideDownCouchRow, updateRows []UpsideDownCouchRow, deleteRows []UpsideDownCouchRow) { addRows = make([]UpsideDownCouchRow, 0, len(rows)) if backIndexRow == nil { @@ -586,7 +588,7 @@ func (udc *UpsideDownCouch) mergeOldAndNew(backIndexRow *BackIndexRow, rows []in return addRows, updateRows, deleteRows } -func (udc *UpsideDownCouch) storeField(docID []byte, field index.Field, fieldIndex uint16, rows []index.IndexRow, backIndexStoredEntries []*BackIndexStoreEntry) ([]index.IndexRow, []*BackIndexStoreEntry) { +func (udc *UpsideDownCouch) storeField(docID []byte, field index.Field, fieldIndex uint16, rows []IndexRow, backIndexStoredEntries []*BackIndexStoreEntry) ([]IndexRow, []*BackIndexStoreEntry) { fieldType := field.EncodedFieldType() storedRow := NewStoredRow(docID, fieldIndex, field.ArrayPositions(), fieldType, field.Value()) @@ -596,7 +598,7 @@ func (udc *UpsideDownCouch) storeField(docID []byte, field index.Field, fieldInd return append(rows, storedRow), append(backIndexStoredEntries, &backIndexStoredEntry) } -func (udc *UpsideDownCouch) indexField(docID []byte, includeTermVectors bool, fieldIndex uint16, fieldLength int, tokenFreqs index.TokenFrequencies, rows []index.IndexRow, backIndexTermsEntries []*BackIndexTermsEntry) ([]index.IndexRow, []*BackIndexTermsEntry) { +func (udc *UpsideDownCouch) indexField(docID []byte, includeTermVectors bool, fieldIndex uint16, fieldLength int, tokenFreqs index.TokenFrequencies, rows []IndexRow, backIndexTermsEntries []*BackIndexTermsEntry) ([]IndexRow, []*BackIndexTermsEntry) { fieldNorm := float32(1.0 / math.Sqrt(float64(fieldLength))) termFreqRows := make([]TermFrequencyRow, len(tokenFreqs)) @@ -731,7 +733,7 @@ func frequencyFromTokenFreq(tf *index.TokenFreq) int { return tf.Frequency() } -func (udc *UpsideDownCouch) termVectorsFromTokenFreq(field uint16, tf *index.TokenFreq, rows []index.IndexRow) ([]*TermVector, []index.IndexRow) { +func (udc *UpsideDownCouch) termVectorsFromTokenFreq(field uint16, tf *index.TokenFreq, rows []IndexRow) ([]*TermVector, []IndexRow) { a := make([]TermVector, len(tf.Locations)) rv := make([]*TermVector, len(tf.Locations)) @@ -787,7 +789,7 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { } analysisStart := time.Now() - resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps)) + resultChan := make(chan *AnalysisResult, len(batch.IndexOps)) var numUpdates uint64 var numPlainTextBytes uint64 @@ -803,9 +805,11 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { for k := range batch.IndexOps { doc := batch.IndexOps[k] if doc != nil { - aw := index.NewAnalysisWork(udc, doc, resultChan) // put the work on the queue - udc.analysisQueue.Queue(aw) + udc.analysisQueue.Queue(func() { + ar := udc.analyze(doc) + resultChan <- ar + }) } } }() @@ -846,7 +850,7 @@ func (udc *UpsideDownCouch) Batch(batch *index.Batch) (err error) { }() // wait for analysis result - newRowsMap := make(map[string][]index.IndexRow) + newRowsMap := make(map[string][]IndexRow) var itemsDeQueued uint64 for itemsDeQueued < numUpdates { result := <-resultChan