Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move the plugin interface into scorch #1507

Merged
merged 3 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ go 1.13

require (
github.com/RoaringBitmap/roaring v0.4.23
github.com/blevesearch/bleve_index_api v0.0.2
github.com/blevesearch/bleve_index_api v0.0.3
github.com/blevesearch/blevex v0.0.0-20190916190636-152f0fe5c040
github.com/blevesearch/cld2 v0.0.0-20200327141045-8b5f551d37f5 // indirect
github.com/blevesearch/go-porterstemmer v1.0.3
github.com/blevesearch/scorch_segment_api v0.0.3
github.com/blevesearch/scorch_segment_api v0.0.5
github.com/blevesearch/segment v0.9.0
github.com/blevesearch/snowballstem v0.9.0
github.com/blevesearch/zapx/v11 v11.1.3
github.com/blevesearch/zapx/v12 v12.1.3
github.com/blevesearch/zapx/v13 v13.1.3
github.com/blevesearch/zapx/v14 v14.1.3
github.com/blevesearch/zapx/v15 v15.1.3
github.com/blevesearch/zapx/v11 v11.1.4
github.com/blevesearch/zapx/v12 v12.1.4
github.com/blevesearch/zapx/v13 v13.1.4
github.com/blevesearch/zapx/v14 v14.1.4
github.com/blevesearch/zapx/v15 v15.1.4
github.com/couchbase/moss v0.1.0
github.com/couchbase/vellum v1.0.2
github.com/golang/protobuf v1.3.2
Expand Down
28 changes: 14 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/RoaringBitmap/roaring v0.4.23 h1:gpyfd12QohbqhFO4NVDUdoPOCXsyahYRQhINmlHxKeo=
github.com/RoaringBitmap/roaring v0.4.23/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/blevesearch/bleve_index_api v0.0.2 h1:M6EPUf354pMQw0J9NOyJaeIKNAlwpbSU+ofP6wHVC0w=
github.com/blevesearch/bleve_index_api v0.0.2/go.mod h1:yq9YIbuvEQdUB0h+UiLbpDdLLnvcD5ZkqO0LS1wuSvY=
github.com/blevesearch/bleve_index_api v0.0.3 h1:DNaKT/0F9r2WQA0EQwVTJSE12lZv11J6od0IV5Zzbvg=
github.com/blevesearch/bleve_index_api v0.0.3/go.mod h1:yq9YIbuvEQdUB0h+UiLbpDdLLnvcD5ZkqO0LS1wuSvY=
github.com/blevesearch/blevex v0.0.0-20190916190636-152f0fe5c040 h1:SjYVcfJVZoCfBlg+fkaq2eoZHTf5HaJfaTeTkOtyfHQ=
github.com/blevesearch/blevex v0.0.0-20190916190636-152f0fe5c040/go.mod h1:WH+MU2F4T0VmSdaPX+Wu5GYoZBrYWdOZWSjzvYcDmqQ=
github.com/blevesearch/cld2 v0.0.0-20200327141045-8b5f551d37f5 h1:/4ikScMMYMqsRFWJjCyzd3CNWB0lxvqDkqa5nEv6NMc=
Expand All @@ -12,22 +12,22 @@ github.com/blevesearch/go-porterstemmer v1.0.3 h1:GtmsqID0aZdCSNiY8SkuPJ12pD4jI+
github.com/blevesearch/go-porterstemmer v1.0.3/go.mod h1:angGc5Ht+k2xhJdZi511LtmxuEf0OVpvUUNrwmM1P7M=
github.com/blevesearch/mmap-go v1.0.2 h1:JtMHb+FgQCTTYIhtMvimw15dJwu1Y5lrZDMOFXVWPk0=
github.com/blevesearch/mmap-go v1.0.2/go.mod h1:ol2qBqYaOUsGdm7aRMRrYGgPvnwLe6Y+7LMvAB5IbSA=
github.com/blevesearch/scorch_segment_api v0.0.3 h1:DJ7aqcIr1JUT77B42uTGtCRw0vz1HxlvUfyv7CefR9I=
github.com/blevesearch/scorch_segment_api v0.0.3/go.mod h1:WpV76h6YHekU8CzseKfzJRoZfcVu+yWlwcEq+7N+Kp0=
github.com/blevesearch/scorch_segment_api v0.0.5 h1:LBndHXXi79/8Omh3h0tFBPs1hSf3MgjB7azkt4vAxWg=
github.com/blevesearch/scorch_segment_api v0.0.5/go.mod h1:Ilxtn7p82x+1djbDr+XLXATdYuvVYDlECSip1l/3Roo=
github.com/blevesearch/segment v0.9.0 h1:5lG7yBCx98or7gK2cHMKPukPZ/31Kag7nONpoBt22Ac=
github.com/blevesearch/segment v0.9.0/go.mod h1:9PfHYUdQCgHktBgvtUOF4x+pc4/l8rdH0u5spnW85UQ=
github.com/blevesearch/snowballstem v0.9.0 h1:lMQ189YspGP6sXvZQ4WZ+MLawfV8wOmPoD/iWeNXm8s=
github.com/blevesearch/snowballstem v0.9.0/go.mod h1:PivSj3JMc8WuaFkTSRDW2SlrulNWPl4ABg1tC/hlgLs=
github.com/blevesearch/zapx/v11 v11.1.3 h1:3pQN/ghHIZvF2sTZuAixWGfS2s/xFUh2HHg7XuGoAcA=
github.com/blevesearch/zapx/v11 v11.1.3/go.mod h1:F7a5ANvbZHt0D28ZCjo6ZT5E2HZdLXGnKd77TO6PfOs=
github.com/blevesearch/zapx/v12 v12.1.3 h1:9KYAqQURIIMVJ1+vHnq1snYvPPukX/LLJ+xpYj3bRG8=
github.com/blevesearch/zapx/v12 v12.1.3/go.mod h1:ykFX5dk1FfDh9qWRxtoRoFgh1VoT4syFSqKa31t3rGg=
github.com/blevesearch/zapx/v13 v13.1.3 h1:34nYfEz0v9BfDIoZPERX781fCEvQWR15agUdZRLpu3Y=
github.com/blevesearch/zapx/v13 v13.1.3/go.mod h1:coKC574yL3KQ0ciIBdVYtxufN+Ikh7z5HbPMO9yH18M=
github.com/blevesearch/zapx/v14 v14.1.3 h1:Rx4Bjni8JW1wfTw9VGw1NUbFyA5wl+x7OJBaaqp2ypE=
github.com/blevesearch/zapx/v14 v14.1.3/go.mod h1:4MIfa+zZsiL57ve1aYZZ6S+hihYjtApo6DCizjM/eoE=
github.com/blevesearch/zapx/v15 v15.1.3 h1:gUvMSGfnAlhdjtNszD8kaA7SNLoIB7Hb55uAuQJZEVQ=
github.com/blevesearch/zapx/v15 v15.1.3/go.mod h1:TRWvTUYjESzoCzjfDckPmjXHuTyTLt5uKBJEf4C/8G4=
github.com/blevesearch/zapx/v11 v11.1.4 h1:p4FhcBPDB/lhBctXGAB1r7AxMbg5uta5oRKoCuMtiRA=
github.com/blevesearch/zapx/v11 v11.1.4/go.mod h1:MTlzXennnLzspvhHYEbZupmr823icJiSj41042mcV6c=
github.com/blevesearch/zapx/v12 v12.1.4 h1:5zB6BmQYtIEEAXrY8iZh09rkpuTCTVOHXqwsF0/4Yxw=
github.com/blevesearch/zapx/v12 v12.1.4/go.mod h1:wM4SGTOyF8SUkv36C3NUy82oHAEF+nKhua31Z9IT2Bk=
github.com/blevesearch/zapx/v13 v13.1.4 h1:e7caDqGHzZEQwLoUYHzY4CXJSlEy2PsabaSup4W5GNI=
github.com/blevesearch/zapx/v13 v13.1.4/go.mod h1:tpvyPg3yhbColAlAcQQSDAi/4vKzQl46jOcUSA4RUag=
github.com/blevesearch/zapx/v14 v14.1.4 h1:jWnVstovnq81b38nP3Y0T0rTa+dWXjPSWJmnrtkZBxY=
github.com/blevesearch/zapx/v14 v14.1.4/go.mod h1:VCHYImrCnhiSC/4NOlM1W3k4G+hjD2VsDD3YzZbUImY=
github.com/blevesearch/zapx/v15 v15.1.4 h1:x2vZerNVhjilFxkG89+eIPX4hywVxwPPR8WDeX3DuXA=
github.com/blevesearch/zapx/v15 v15.1.4/go.mod h1:Q0iQzBh6k4FKqj7zqlIsTIrVBUkQD7HG2p+Qp6XjmKo=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
Expand Down
8 changes: 4 additions & 4 deletions index/scorch/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Builder struct {
mergeMax int
batch *index.Batch
internal map[string][]byte
segPlugin segment.Plugin
segPlugin SegmentPlugin
}

func NewBuilder(config map[string]interface{}) (*Builder, error) {
Expand Down Expand Up @@ -134,14 +134,14 @@ func (o *Builder) maybeFlushBatchLOCKED(moreThan int) error {
}

func (o *Builder) executeBatchLOCKED(batch *index.Batch) (err error) {
analysisResults := make([]*index.AnalysisResult, 0, len(batch.IndexOps))
analysisResults := make([]index.Document, 0, len(batch.IndexOps))
for _, doc := range batch.IndexOps {
if doc != nil {
// insert _id field
doc.AddIDField()
// perform analysis directly
analysisResult := analyze(doc)
analysisResults = append(analysisResults, analysisResult)
analyze(doc)
analysisResults = append(analysisResults, doc)
}
}

Expand Down
2 changes: 1 addition & 1 deletion index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
}

func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
segPlugin segment.Plugin) ([]string, map[uint64]string, error) {
segPlugin SegmentPlugin) ([]string, map[uint64]string, error) {
snapshotsBucket, err := tx.CreateBucketIfNotExists(boltSnapshotsBucket)
if err != nil {
return nil, nil, err
Expand Down
27 changes: 10 additions & 17 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type Scorch struct {

forceMergeRequestCh chan *mergerCtrl

segPlugin segment.Plugin
segPlugin SegmentPlugin
}

type internalStats struct {
Expand Down Expand Up @@ -311,7 +311,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
s.fireEvent(EventKindBatchIntroduction, time.Since(start))
}()

resultChan := make(chan *index.AnalysisResult, len(batch.IndexOps))
resultChan := make(chan index.Document, len(batch.IndexOps))

var numUpdates uint64
var numDeletes uint64
Expand All @@ -333,18 +333,21 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {

if numUpdates > 0 {
go func() {
for _, doc := range batch.IndexOps {
for k := range batch.IndexOps {
doc := batch.IndexOps[k]
if doc != nil {
aw := index.NewAnalysisWork(s, doc, resultChan)
// put the work on the queue
s.analysisQueue.Queue(aw)
s.analysisQueue.Queue(func() {
analyze(doc)
resultChan <- doc
})
}
}
}()
}

// wait for analysis result
analysisResults := make([]*index.AnalysisResult, int(numUpdates))
analysisResults := make([]index.Document, int(numUpdates))
var itemsDeQueued uint64
var totalAnalysisSize int
for itemsDeQueued < numUpdates {
Expand Down Expand Up @@ -564,15 +567,7 @@ func (s *Scorch) StatsMap() map[string]interface{} {
return m
}

func (s *Scorch) Analyze(d index.Document) *index.AnalysisResult {
return analyze(d)
}

func analyze(d index.Document) *index.AnalysisResult {
rv := &index.AnalysisResult{
Document: d,
}

func analyze(d index.Document) {
d.VisitFields(func(field index.Field) {
if field.IsIndexed() {
field.Analyze()
Expand All @@ -585,8 +580,6 @@ func analyze(d index.Document) *index.AnalysisResult {
}
}
})

return rv
}

func (s *Scorch) Advanced() (store.KVStore, error) {
Expand Down
66 changes: 53 additions & 13 deletions index/scorch/segment_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package scorch

import (
"fmt"
"github.com/RoaringBitmap/roaring"
index "github.com/blevesearch/bleve_index_api"

segment "github.com/blevesearch/scorch_segment_api"

Expand All @@ -26,25 +28,63 @@ import (
zapv15 "github.com/blevesearch/zapx/v15"
)

var supportedSegmentPlugins map[string]map[uint32]segment.Plugin
var defaultSegmentPlugin segment.Plugin
// SegmentPlugin represents the essential functions required by a package to plug in
// it's segment implementation
type SegmentPlugin interface {

// Type is the name for this segment plugin
Type() string

// Version is a numeric value identifying a specific version of this type.
// When incompatible changes are made to a particular type of plugin, the
// version must be incremented.
Version() uint32

// New takes a set of Documents and turns them into a new Segment
New(results []index.Document) (segment.Segment, uint64, error)

// Open attempts to open the file at the specified path and
// return the corresponding Segment
Open(path string) (segment.Segment, error)

// Merge takes a set of Segments, and creates a new segment on disk at
// the specified path.
// Drops is a set of bitmaps (one for each segment) indicating which
// documents can be dropped from the segments during the merge.
// If the closeCh channel is closed, Merge will cease doing work at
// the next opportunity, and return an error (closed).
// StatsReporter can optionally be provided, in which case progress
// made during the merge is reported while operation continues.
// Returns:
// A slice of new document numbers (one for each input segment),
// this allows the caller to know a particular document's new
// document number in the newly merged segment.
// The number of bytes written to the new segment file.
// An error, if any occurred.
Merge(segments []segment.Segment, drops []*roaring.Bitmap, path string,
closeCh chan struct{}, s segment.StatsReporter) (
[][]uint64, uint64, error)
}

var supportedSegmentPlugins map[string]map[uint32]SegmentPlugin
var defaultSegmentPlugin SegmentPlugin

func init() {
ResetPlugins()
RegisterPlugin(zapv15.Plugin(), false)
RegisterPlugin(zapv14.Plugin(), false)
RegisterPlugin(zapv13.Plugin(), false)
RegisterPlugin(zapv12.Plugin(), false)
RegisterPlugin(zapv11.Plugin(), true)
ResetSegmentPlugins()
RegisterSegmentPlugin(&zapv15.ZapPlugin{}, false)
RegisterSegmentPlugin(&zapv14.ZapPlugin{}, false)
RegisterSegmentPlugin(&zapv13.ZapPlugin{}, false)
RegisterSegmentPlugin(&zapv12.ZapPlugin{}, false)
RegisterSegmentPlugin(&zapv11.ZapPlugin{}, true)
}

func ResetPlugins() {
supportedSegmentPlugins = map[string]map[uint32]segment.Plugin{}
func ResetSegmentPlugins() {
supportedSegmentPlugins = map[string]map[uint32]SegmentPlugin{}
}

func RegisterPlugin(plugin segment.Plugin, makeDefault bool) {
func RegisterSegmentPlugin(plugin SegmentPlugin, makeDefault bool) {
if _, ok := supportedSegmentPlugins[plugin.Type()]; !ok {
supportedSegmentPlugins[plugin.Type()] = map[uint32]segment.Plugin{}
supportedSegmentPlugins[plugin.Type()] = map[uint32]SegmentPlugin{}
}
supportedSegmentPlugins[plugin.Type()][plugin.Version()] = plugin
if makeDefault {
Expand All @@ -67,7 +107,7 @@ func SupportedSegmentTypeVersions(typ string) (rv []uint32) {
}

func chooseSegmentPlugin(forcedSegmentType string,
forcedSegmentVersion uint32) (segment.Plugin, error) {
forcedSegmentVersion uint32) (SegmentPlugin, error) {
if versions, ok := supportedSegmentPlugins[forcedSegmentType]; ok {
if segPlugin, ok := versions[uint32(forcedSegmentVersion)]; ok {
return segPlugin, nil
Expand Down
23 changes: 19 additions & 4 deletions index/upsidedown/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,25 @@ import (
index "github.com/blevesearch/bleve_index_api"
)

func (udc *UpsideDownCouch) Analyze(d index.Document) *index.AnalysisResult {
rv := &index.AnalysisResult{
type IndexRow interface {
KeySize() int
KeyTo([]byte) (int, error)
Key() []byte

ValueSize() int
ValueTo([]byte) (int, error)
Value() []byte
}

type AnalysisResult struct {
DocID string
Rows []IndexRow
}

func (udc *UpsideDownCouch) analyze(d index.Document) *AnalysisResult {
rv := &AnalysisResult{
DocID: d.ID(),
Rows: make([]index.IndexRow, 0, 100),
Rows: make([]IndexRow, 0, 100),
}

docIDBytes := []byte(d.ID())
Expand Down Expand Up @@ -88,7 +103,7 @@ func (udc *UpsideDownCouch) Analyze(d index.Document) *index.AnalysisResult {
rowsCapNeeded += len(tokenFreqs)
}

rv.Rows = append(make([]index.IndexRow, 0, rowsCapNeeded), rv.Rows...)
rv.Rows = append(make([]IndexRow, 0, rowsCapNeeded), rv.Rows...)

backIndexTermsEntries := make([]*BackIndexTermsEntry, 0, len(fieldTermFreqs))

Expand Down
4 changes: 2 additions & 2 deletions index/upsidedown/analysis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestAnalysisBug328(t *testing.T) {
cf := document.NewCompositeFieldWithIndexingOptions("_all", true, []string{}, []string{}, document.IndexField|document.IncludeTermVectors)
d.AddField(cf)

rv := idx.Analyze(d)
rv := idx.(*UpsideDownCouch).analyze(d)
fieldIndexes := make(map[uint16]string)
for _, row := range rv.Rows {
if row, ok := row.(*FieldRow); ok {
Expand Down Expand Up @@ -84,7 +84,7 @@ func BenchmarkAnalyze(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
rv := idx.Analyze(d)
rv := idx.(*UpsideDownCouch).analyze(d)
if len(rv.Rows) < 92 || len(rv.Rows) > 93 {
b.Fatalf("expected 512-13 rows, got %d", len(rv.Rows))
}
Expand Down
Loading