Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move the plugin interface into scorch #1507

Merged
merged 3 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion index/scorch/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Builder struct {
mergeMax int
batch *index.Batch
internal map[string][]byte
segPlugin segment.Plugin
segPlugin SegmentPlugin
}

func NewBuilder(config map[string]interface{}) (*Builder, error) {
Expand Down
2 changes: 1 addition & 1 deletion index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
}

func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
segPlugin segment.Plugin) ([]string, map[uint64]string, error) {
segPlugin SegmentPlugin) ([]string, map[uint64]string, error) {
snapshotsBucket, err := tx.CreateBucketIfNotExists(boltSnapshotsBucket)
if err != nil {
return nil, nil, err
Expand Down
2 changes: 1 addition & 1 deletion index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type Scorch struct {

forceMergeRequestCh chan *mergerCtrl

segPlugin segment.Plugin
segPlugin SegmentPlugin
}

type internalStats struct {
Expand Down
66 changes: 53 additions & 13 deletions index/scorch/segment_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package scorch

import (
"fmt"
"github.com/RoaringBitmap/roaring"
index "github.com/blevesearch/bleve_index_api"

segment "github.com/blevesearch/scorch_segment_api"

Expand All @@ -26,25 +28,63 @@ import (
zapv15 "github.com/blevesearch/zapx/v15"
)

var supportedSegmentPlugins map[string]map[uint32]segment.Plugin
var defaultSegmentPlugin segment.Plugin
// SegmentPlugin represents the essential functions required by a package to plug in
// it's segment implementation
type SegmentPlugin interface {

// Type is the name for this segment plugin
Type() string

// Version is a numeric value identifying a specific version of this type.
// When incompatible changes are made to a particular type of plugin, the
// version must be incremented.
Version() uint32

// New takes a set of AnalysisResults and turns them into a new Segment
New(results []*index.AnalysisResult) (segment.Segment, uint64, error)

// Open attempts to open the file at the specified path and
// return the corresponding Segment
Open(path string) (segment.Segment, error)

// Merge takes a set of Segments, and creates a new segment on disk at
// the specified path.
// Drops is a set of bitmaps (one for each segment) indicating which
// documents can be dropped from the segments during the merge.
// If the closeCh channel is closed, Merge will cease doing work at
// the next opportunity, and return an error (closed).
// StatsReporter can optionally be provided, in which case progress
// made during the merge is reported while operation continues.
// Returns:
// A slice of new document numbers (one for each input segment),
// this allows the caller to know a particular document's new
// document number in the newly merged segment.
// The number of bytes written to the new segment file.
// An error, if any occurred.
Merge(segments []segment.Segment, drops []*roaring.Bitmap, path string,
closeCh chan struct{}, s segment.StatsReporter) (
[][]uint64, uint64, error)
}

var supportedSegmentPlugins map[string]map[uint32]SegmentPlugin
var defaultSegmentPlugin SegmentPlugin

func init() {
ResetPlugins()
RegisterPlugin(zapv15.Plugin(), false)
RegisterPlugin(zapv14.Plugin(), false)
RegisterPlugin(zapv13.Plugin(), false)
RegisterPlugin(zapv12.Plugin(), false)
RegisterPlugin(zapv11.Plugin(), true)
ResetSegmentPlugins()
RegisterSegmentPlugin(&zapv15.ZapPlugin{}, false)
RegisterSegmentPlugin(&zapv14.ZapPlugin{}, false)
RegisterSegmentPlugin(&zapv13.ZapPlugin{}, false)
RegisterSegmentPlugin(&zapv12.ZapPlugin{}, false)
RegisterSegmentPlugin(&zapv11.ZapPlugin{}, true)
}

func ResetPlugins() {
supportedSegmentPlugins = map[string]map[uint32]segment.Plugin{}
func ResetSegmentPlugins() {
supportedSegmentPlugins = map[string]map[uint32]SegmentPlugin{}
}

func RegisterPlugin(plugin segment.Plugin, makeDefault bool) {
func RegisterSegmentPlugin(plugin SegmentPlugin, makeDefault bool) {
if _, ok := supportedSegmentPlugins[plugin.Type()]; !ok {
supportedSegmentPlugins[plugin.Type()] = map[uint32]segment.Plugin{}
supportedSegmentPlugins[plugin.Type()] = map[uint32]SegmentPlugin{}
}
supportedSegmentPlugins[plugin.Type()][plugin.Version()] = plugin
if makeDefault {
Expand All @@ -67,7 +107,7 @@ func SupportedSegmentTypeVersions(typ string) (rv []uint32) {
}

func chooseSegmentPlugin(forcedSegmentType string,
forcedSegmentVersion uint32) (segment.Plugin, error) {
forcedSegmentVersion uint32) (SegmentPlugin, error) {
if versions, ok := supportedSegmentPlugins[forcedSegmentType]; ok {
if segPlugin, ok := versions[uint32(forcedSegmentVersion)]; ok {
return segPlugin, nil
Expand Down