Skip to content

Commit

Permalink
move the plugin interface into scorch (#1507)
Browse files Browse the repository at this point in the history
* move the plugin interface into scorch

* rename scorch Plugin to SegmentPlugin

and related functions

* update to make all analysis take place in func() (#1508)

* update to make all analysis take place in func()

by making analysis take place inside a func() a
closure can be used to encapsulate all the index
specific details.  this allows the index API to not
know about scorch vs upsidedown analysis structures

* rewrite variable declaration for clarity

also update comment, per review feedback

* update to latest versions

selects latest versions compatible with these changes
  • Loading branch information
mschoch authored Dec 2, 2020
1 parent 1460d32 commit cf6396d
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 74 deletions.
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ go 1.13

require (
github.com/RoaringBitmap/roaring v0.4.23
github.com/blevesearch/bleve_index_api v0.0.2
github.com/blevesearch/bleve_index_api v0.0.3
github.com/blevesearch/blevex v0.0.0-20190916190636-152f0fe5c040
github.com/blevesearch/cld2 v0.0.0-20200327141045-8b5f551d37f5 // indirect
github.com/blevesearch/go-porterstemmer v1.0.3
github.com/blevesearch/scorch_segment_api v0.0.3
github.com/blevesearch/scorch_segment_api v0.0.5
github.com/blevesearch/segment v0.9.0
github.com/blevesearch/snowballstem v0.9.0
github.com/blevesearch/zapx/v11 v11.1.3
github.com/blevesearch/zapx/v12 v12.1.3
github.com/blevesearch/zapx/v13 v13.1.3
github.com/blevesearch/zapx/v14 v14.1.3
github.com/blevesearch/zapx/v15 v15.1.3
github.com/blevesearch/zapx/v11 v11.1.4
github.com/blevesearch/zapx/v12 v12.1.4
github.com/blevesearch/zapx/v13 v13.1.4
github.com/blevesearch/zapx/v14 v14.1.4
github.com/blevesearch/zapx/v15 v15.1.4
github.com/couchbase/moss v0.1.0
github.com/couchbase/vellum v1.0.2
github.com/golang/protobuf v1.3.2
Expand Down
28 changes: 14 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/RoaringBitmap/roaring v0.4.23 h1:gpyfd12QohbqhFO4NVDUdoPOCXsyahYRQhINmlHxKeo=
github.com/RoaringBitmap/roaring v0.4.23/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/blevesearch/bleve_index_api v0.0.2 h1:M6EPUf354pMQw0J9NOyJaeIKNAlwpbSU+ofP6wHVC0w=
github.com/blevesearch/bleve_index_api v0.0.2/go.mod h1:yq9YIbuvEQdUB0h+UiLbpDdLLnvcD5ZkqO0LS1wuSvY=
github.com/blevesearch/bleve_index_api v0.0.3 h1:DNaKT/0F9r2WQA0EQwVTJSE12lZv11J6od0IV5Zzbvg=
github.com/blevesearch/bleve_index_api v0.0.3/go.mod h1:yq9YIbuvEQdUB0h+UiLbpDdLLnvcD5ZkqO0LS1wuSvY=
github.com/blevesearch/blevex v0.0.0-20190916190636-152f0fe5c040 h1:SjYVcfJVZoCfBlg+fkaq2eoZHTf5HaJfaTeTkOtyfHQ=
github.com/blevesearch/blevex v0.0.0-20190916190636-152f0fe5c040/go.mod h1:WH+MU2F4T0VmSdaPX+Wu5GYoZBrYWdOZWSjzvYcDmqQ=
github.com/blevesearch/cld2 v0.0.0-20200327141045-8b5f551d37f5 h1:/4ikScMMYMqsRFWJjCyzd3CNWB0lxvqDkqa5nEv6NMc=
Expand All @@ -12,22 +12,22 @@ github.com/blevesearch/go-porterstemmer v1.0.3 h1:GtmsqID0aZdCSNiY8SkuPJ12pD4jI+
github.com/blevesearch/go-porterstemmer v1.0.3/go.mod h1:angGc5Ht+k2xhJdZi511LtmxuEf0OVpvUUNrwmM1P7M=
github.com/blevesearch/mmap-go v1.0.2 h1:JtMHb+FgQCTTYIhtMvimw15dJwu1Y5lrZDMOFXVWPk0=
github.com/blevesearch/mmap-go v1.0.2/go.mod h1:ol2qBqYaOUsGdm7aRMRrYGgPvnwLe6Y+7LMvAB5IbSA=
github.com/blevesearch/scorch_segment_api v0.0.3 h1:DJ7aqcIr1JUT77B42uTGtCRw0vz1HxlvUfyv7CefR9I=
github.com/blevesearch/scorch_segment_api v0.0.3/go.mod h1:WpV76h6YHekU8CzseKfzJRoZfcVu+yWlwcEq+7N+Kp0=
github.com/blevesearch/scorch_segment_api v0.0.5 h1:LBndHXXi79/8Omh3h0tFBPs1hSf3MgjB7azkt4vAxWg=
github.com/blevesearch/scorch_segment_api v0.0.5/go.mod h1:Ilxtn7p82x+1djbDr+XLXATdYuvVYDlECSip1l/3Roo=
github.com/blevesearch/segment v0.9.0 h1:5lG7yBCx98or7gK2cHMKPukPZ/31Kag7nONpoBt22Ac=
github.com/blevesearch/segment v0.9.0/go.mod h1:9PfHYUdQCgHktBgvtUOF4x+pc4/l8rdH0u5spnW85UQ=
github.com/blevesearch/snowballstem v0.9.0 h1:lMQ189YspGP6sXvZQ4WZ+MLawfV8wOmPoD/iWeNXm8s=
github.com/blevesearch/snowballstem v0.9.0/go.mod h1:PivSj3JMc8WuaFkTSRDW2SlrulNWPl4ABg1tC/hlgLs=
github.com/blevesearch/zapx/v11 v11.1.3 h1:3pQN/ghHIZvF2sTZuAixWGfS2s/xFUh2HHg7XuGoAcA=
github.com/blevesearch/zapx/v11 v11.1.3/go.mod h1:F7a5ANvbZHt0D28ZCjo6ZT5E2HZdLXGnKd77TO6PfOs=
github.com/blevesearch/zapx/v12 v12.1.3 h1:9KYAqQURIIMVJ1+vHnq1snYvPPukX/LLJ+xpYj3bRG8=
github.com/blevesearch/zapx/v12 v12.1.3/go.mod h1:ykFX5dk1FfDh9qWRxtoRoFgh1VoT4syFSqKa31t3rGg=
github.com/blevesearch/zapx/v13 v13.1.3 h1:34nYfEz0v9BfDIoZPERX781fCEvQWR15agUdZRLpu3Y=
github.com/blevesearch/zapx/v13 v13.1.3/go.mod h1:coKC574yL3KQ0ciIBdVYtxufN+Ikh7z5HbPMO9yH18M=
github.com/blevesearch/zapx/v14 v14.1.3 h1:Rx4Bjni8JW1wfTw9VGw1NUbFyA5wl+x7OJBaaqp2ypE=
github.com/blevesearch/zapx/v14 v14.1.3/go.mod h1:4MIfa+zZsiL57ve1aYZZ6S+hihYjtApo6DCizjM/eoE=
github.com/blevesearch/zapx/v15 v15.1.3 h1:gUvMSGfnAlhdjtNszD8kaA7SNLoIB7Hb55uAuQJZEVQ=
github.com/blevesearch/zapx/v15 v15.1.3/go.mod h1:TRWvTUYjESzoCzjfDckPmjXHuTyTLt5uKBJEf4C/8G4=
github.com/blevesearch/zapx/v11 v11.1.4 h1:p4FhcBPDB/lhBctXGAB1r7AxMbg5uta5oRKoCuMtiRA=
github.com/blevesearch/zapx/v11 v11.1.4/go.mod h1:MTlzXennnLzspvhHYEbZupmr823icJiSj41042mcV6c=
github.com/blevesearch/zapx/v12 v12.1.4 h1:5zB6BmQYtIEEAXrY8iZh09rkpuTCTVOHXqwsF0/4Yxw=
github.com/blevesearch/zapx/v12 v12.1.4/go.mod h1:wM4SGTOyF8SUkv36C3NUy82oHAEF+nKhua31Z9IT2Bk=
github.com/blevesearch/zapx/v13 v13.1.4 h1:e7caDqGHzZEQwLoUYHzY4CXJSlEy2PsabaSup4W5GNI=
github.com/blevesearch/zapx/v13 v13.1.4/go.mod h1:tpvyPg3yhbColAlAcQQSDAi/4vKzQl46jOcUSA4RUag=
github.com/blevesearch/zapx/v14 v14.1.4 h1:jWnVstovnq81b38nP3Y0T0rTa+dWXjPSWJmnrtkZBxY=
github.com/blevesearch/zapx/v14 v14.1.4/go.mod h1:VCHYImrCnhiSC/4NOlM1W3k4G+hjD2VsDD3YzZbUImY=
github.com/blevesearch/zapx/v15 v15.1.4 h1:x2vZerNVhjilFxkG89+eIPX4hywVxwPPR8WDeX3DuXA=
github.com/blevesearch/zapx/v15 v15.1.4/go.mod h1:Q0iQzBh6k4FKqj7zqlIsTIrVBUkQD7HG2p+Qp6XjmKo=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
Expand Down
8 changes: 4 additions & 4 deletions index/scorch/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Builder struct {
mergeMax int
batch *index.Batch
internal map[string][]byte
segPlugin segment.Plugin
segPlugin SegmentPlugin
}

func NewBuilder(config map[string]interface{}) (*Builder, error) {
Expand Down Expand Up @@ -134,14 +134,14 @@ func (o *Builder) maybeFlushBatchLOCKED(moreThan int) error {
}

func (o *Builder) executeBatchLOCKED(batch *index.Batch) (err error) {
analysisResults := make([]*index.AnalysisResult, 0, len(batch.IndexOps))
analysisResults := make([]index.Document, 0, len(batch.IndexOps))
for _, doc := range batch.IndexOps {
if doc != nil {
// insert _id field
doc.AddIDField()
// perform analysis directly
analysisResult := analyze(doc)
analysisResults = append(analysisResults, analysisResult)
analyze(doc)
analysisResults = append(analysisResults, doc)
}
}

Expand Down
2 changes: 1 addition & 1 deletion index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
}

func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
segPlugin segment.Plugin) ([]string, map[uint64]string, error) {
segPlugin SegmentPlugin) ([]string, map[uint64]string, error) {
snapshotsBucket, err := tx.CreateBucketIfNotExists(boltSnapshotsBucket)
if err != nil {
return nil, nil, err
Expand Down
27 changes: 10 additions & 17 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type Scorch struct {

forceMergeRequestCh chan *mergerCtrl

segPlugin segment.Plugin
segPlugin SegmentPlugin
}

type internalStats struct {
Expand Down Expand Up @@ -311,7 +311,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
s.fireEvent(EventKindBatchIntroduction, time.Since(start))
}()

resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
resultChan := make(chan index.Document, len(batch.IndexOps))

var numUpdates uint64
var numDeletes uint64
Expand All @@ -333,18 +333,21 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {

if numUpdates > 0 {
go func() {
for _, doc := range batch.IndexOps {
for k := range batch.IndexOps {
doc := batch.IndexOps[k]
if doc != nil {
aw := index.NewAnalysisWork(s, doc, resultChan)
// put the work on the queue
s.analysisQueue.Queue(aw)
s.analysisQueue.Queue(func() {
analyze(doc)
resultChan <- doc
})
}
}
}()
}

// wait for analysis result
analysisResults := make([]*index.AnalysisResult, int(numUpdates))
analysisResults := make([]index.Document, int(numUpdates))
var itemsDeQueued uint64
var totalAnalysisSize int
for itemsDeQueued < numUpdates {
Expand Down Expand Up @@ -564,15 +567,7 @@ func (s *Scorch) StatsMap() map[string]interface{} {
return m
}

func (s *Scorch) Analyze(d index.Document) *index.AnalysisResult {
return analyze(d)
}

func analyze(d index.Document) *index.AnalysisResult {
rv := &index.AnalysisResult{
Document: d,
}

func analyze(d index.Document) {
d.VisitFields(func(field index.Field) {
if field.IsIndexed() {
field.Analyze()
Expand All @@ -585,8 +580,6 @@ func analyze(d index.Document) *index.AnalysisResult {
}
}
})

return rv
}

func (s *Scorch) Advanced() (store.KVStore, error) {
Expand Down
66 changes: 53 additions & 13 deletions index/scorch/segment_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package scorch

import (
"fmt"
"github.com/RoaringBitmap/roaring"
index "github.com/blevesearch/bleve_index_api"

segment "github.com/blevesearch/scorch_segment_api"

Expand All @@ -26,25 +28,63 @@ import (
zapv15 "github.com/blevesearch/zapx/v15"
)

var supportedSegmentPlugins map[string]map[uint32]segment.Plugin
var defaultSegmentPlugin segment.Plugin
// SegmentPlugin represents the essential functions required by a package to plug in
// it's segment implementation
type SegmentPlugin interface {

// Type is the name for this segment plugin
Type() string

// Version is a numeric value identifying a specific version of this type.
// When incompatible changes are made to a particular type of plugin, the
// version must be incremented.
Version() uint32

// New takes a set of Documents and turns them into a new Segment
New(results []index.Document) (segment.Segment, uint64, error)

// Open attempts to open the file at the specified path and
// return the corresponding Segment
Open(path string) (segment.Segment, error)

// Merge takes a set of Segments, and creates a new segment on disk at
// the specified path.
// Drops is a set of bitmaps (one for each segment) indicating which
// documents can be dropped from the segments during the merge.
// If the closeCh channel is closed, Merge will cease doing work at
// the next opportunity, and return an error (closed).
// StatsReporter can optionally be provided, in which case progress
// made during the merge is reported while operation continues.
// Returns:
// A slice of new document numbers (one for each input segment),
// this allows the caller to know a particular document's new
// document number in the newly merged segment.
// The number of bytes written to the new segment file.
// An error, if any occurred.
Merge(segments []segment.Segment, drops []*roaring.Bitmap, path string,
closeCh chan struct{}, s segment.StatsReporter) (
[][]uint64, uint64, error)
}

var supportedSegmentPlugins map[string]map[uint32]SegmentPlugin
var defaultSegmentPlugin SegmentPlugin

func init() {
ResetPlugins()
RegisterPlugin(zapv15.Plugin(), false)
RegisterPlugin(zapv14.Plugin(), false)
RegisterPlugin(zapv13.Plugin(), false)
RegisterPlugin(zapv12.Plugin(), false)
RegisterPlugin(zapv11.Plugin(), true)
ResetSegmentPlugins()
RegisterSegmentPlugin(&zapv15.ZapPlugin{}, false)
RegisterSegmentPlugin(&zapv14.ZapPlugin{}, false)
RegisterSegmentPlugin(&zapv13.ZapPlugin{}, false)
RegisterSegmentPlugin(&zapv12.ZapPlugin{}, false)
RegisterSegmentPlugin(&zapv11.ZapPlugin{}, true)
}

func ResetPlugins() {
supportedSegmentPlugins = map[string]map[uint32]segment.Plugin{}
func ResetSegmentPlugins() {
supportedSegmentPlugins = map[string]map[uint32]SegmentPlugin{}
}

func RegisterPlugin(plugin segment.Plugin, makeDefault bool) {
func RegisterSegmentPlugin(plugin SegmentPlugin, makeDefault bool) {
if _, ok := supportedSegmentPlugins[plugin.Type()]; !ok {
supportedSegmentPlugins[plugin.Type()] = map[uint32]segment.Plugin{}
supportedSegmentPlugins[plugin.Type()] = map[uint32]SegmentPlugin{}
}
supportedSegmentPlugins[plugin.Type()][plugin.Version()] = plugin
if makeDefault {
Expand All @@ -67,7 +107,7 @@ func SupportedSegmentTypeVersions(typ string) (rv []uint32) {
}

func chooseSegmentPlugin(forcedSegmentType string,
forcedSegmentVersion uint32) (segment.Plugin, error) {
forcedSegmentVersion uint32) (SegmentPlugin, error) {
if versions, ok := supportedSegmentPlugins[forcedSegmentType]; ok {
if segPlugin, ok := versions[uint32(forcedSegmentVersion)]; ok {
return segPlugin, nil
Expand Down
23 changes: 19 additions & 4 deletions index/upsidedown/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,25 @@ import (
index "github.com/blevesearch/bleve_index_api"
)

func (udc *UpsideDownCouch) Analyze(d index.Document) *index.AnalysisResult {
rv := &index.AnalysisResult{
type IndexRow interface {
KeySize() int
KeyTo([]byte) (int, error)
Key() []byte

ValueSize() int
ValueTo([]byte) (int, error)
Value() []byte
}

type AnalysisResult struct {
DocID string
Rows []IndexRow
}

func (udc *UpsideDownCouch) analyze(d index.Document) *AnalysisResult {
rv := &AnalysisResult{
DocID: d.ID(),
Rows: make([]index.IndexRow, 0, 100),
Rows: make([]IndexRow, 0, 100),
}

docIDBytes := []byte(d.ID())
Expand Down Expand Up @@ -88,7 +103,7 @@ func (udc *UpsideDownCouch) Analyze(d index.Document) *index.AnalysisResult {
rowsCapNeeded += len(tokenFreqs)
}

rv.Rows = append(make([]index.IndexRow, 0, rowsCapNeeded), rv.Rows...)
rv.Rows = append(make([]IndexRow, 0, rowsCapNeeded), rv.Rows...)

backIndexTermsEntries := make([]*BackIndexTermsEntry, 0, len(fieldTermFreqs))

Expand Down
4 changes: 2 additions & 2 deletions index/upsidedown/analysis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestAnalysisBug328(t *testing.T) {
cf := document.NewCompositeFieldWithIndexingOptions("_all", true, []string{}, []string{}, document.IndexField|document.IncludeTermVectors)
d.AddField(cf)

rv := idx.Analyze(d)
rv := idx.(*UpsideDownCouch).analyze(d)
fieldIndexes := make(map[uint16]string)
for _, row := range rv.Rows {
if row, ok := row.(*FieldRow); ok {
Expand Down Expand Up @@ -84,7 +84,7 @@ func BenchmarkAnalyze(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
rv := idx.Analyze(d)
rv := idx.(*UpsideDownCouch).analyze(d)
if len(rv.Rows) < 92 || len(rv.Rows) > 93 {
b.Fatalf("expected 512-13 rows, got %d", len(rv.Rows))
}
Expand Down
Loading

0 comments on commit cf6396d

Please sign in to comment.