Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Introduce a Global Registry #7392

Merged
merged 9 commits into from
Jun 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG-developer.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,5 @@ The list below covers the major changes between 6.3.0 and master only.
- Fix permissions of generated Filebeat filesets. {pull}7140[7140]

==== Added

- Libbeat provides a global registry for beats developer that allow to register and retrieve plugin. {pull}7392[7392]
45 changes: 45 additions & 0 deletions libbeat/feature/bundle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package feature

// Bundleable merges featurable and bundle interface together.
type bundleable interface {
Features() []Featurable
}

// Bundle defines a list of features available in the current beat.
type Bundle struct {
features []Featurable
}

// NewBundle creates a new Bundle of feature to be registered.
func NewBundle(features ...Featurable) *Bundle {
return &Bundle{features: features}
}

// Filter creates a new bundle with only the feature matching the requested stability.
func (b *Bundle) Filter(stabilities ...Stability) *Bundle {
var filtered []Featurable

for _, feature := range b.features {
for _, stability := range stabilities {
if feature.Stability() == stability {
filtered = append(filtered, feature)
break
}
}
}
return NewBundle(filtered...)
}

// Features returns the interface features slice so
func (b *Bundle) Features() []Featurable {
return b.features
}

// MustBundle takes existing bundle or features and create a new Bundle with all the merged Features.
func MustBundle(bundle ...bundleable) *Bundle {
var merged []Featurable
for _, feature := range bundle {
merged = append(merged, feature.Features()...)
}
return NewBundle(merged...)
}
54 changes: 54 additions & 0 deletions libbeat/feature/bundle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package feature

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestBundle(t *testing.T) {
factory := func() {}
features := []Featurable{
New("libbeat.outputs", "elasticsearch", factory, Stable),
New("libbeat.outputs", "edge", factory, Experimental),
New("libbeat.input", "tcp", factory, Beta),
}

t.Run("Creates a new Bundle", func(t *testing.T) {
b := NewBundle(features...)
assert.Equal(t, 3, len(b.Features()))
})

t.Run("Filters feature based on stability", func(t *testing.T) {
b := NewBundle(features...)
new := b.Filter(Experimental)
assert.Equal(t, 1, len(new.Features()))
})

t.Run("Filters feature based on multiple different stability", func(t *testing.T) {
b := NewBundle(features...)
new := b.Filter(Experimental, Stable)
assert.Equal(t, 2, len(new.Features()))
})

t.Run("Creates a new Bundle from specified feature", func(t *testing.T) {
f1 := New("libbeat.outputs", "elasticsearch", factory, Stable)
b := MustBundle(f1)
assert.Equal(t, 1, len(b.Features()))
})

t.Run("Creates a new Bundle with grouped features", func(t *testing.T) {
f1 := New("libbeat.outputs", "elasticsearch", factory, Stable)
f2 := New("libbeat.outputs", "edge", factory, Experimental)
f3 := New("libbeat.input", "tcp", factory, Beta)
f4 := New("libbeat.input", "udp", factory, Beta)

b := MustBundle(
MustBundle(f1),
MustBundle(f2),
MustBundle(f3, f4),
)

assert.Equal(t, 4, len(b.Features()))
})
}
103 changes: 103 additions & 0 deletions libbeat/feature/feature.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package feature

import (
"fmt"
)

// Registry is the global plugin registry, this variable is meant to be temporary to move all the
// internal factory to receive a context that include the current beat registry.
var Registry = newRegistry()

// Featurable implements the description of a feature.
type Featurable interface {
bundleable

// Namespace is the kind of plugin or functionality we want to expose as a feature.
// Examples: Autodiscover's provider, processors, outputs.
Namespace() string

// Name is the name of the feature, the name must unique by namespace and be a description of the
// actual functionality, it is usually the name of the package.
// Examples: dissect, elasticsearch, redis
Name() string

// Factory returns the function used to create an instance of the Feature, the signature
// of the method is type checked by the 'FindFactory' of each namespace.
Factory() interface{}

// Stability is the stability of the Feature, this allow the user to filter embedded functionality
// by their maturity at runtime.
// Example: Beta, Experimental, Stable or Undefined.
Stability() Stability

String() string
}

// Feature contains the information for a specific feature
type Feature struct {
namespace string
name string
factory interface{}
stability Stability
}

// Namespace return the namespace of the feature.
func (f *Feature) Namespace() string {
return f.namespace
}

// Name returns the name of the feature.
func (f *Feature) Name() string {
return f.name
}

// Factory returns the factory for the feature.
func (f *Feature) Factory() interface{} {
return f.factory
}

// Stability returns the stability level of the feature, current: stable, beta, experimental.
func (f *Feature) Stability() Stability {
return f.stability
}

// Features return the current feature as a slice to be compatible with Bundle merging and filtering.
func (f *Feature) Features() []Featurable {
return []Featurable{f}
}

// String return the debug information
func (f *Feature) String() string {
return fmt.Sprintf("%s/%s (stability: %s)", f.namespace, f.name, f.stability)
}

// New returns a new Feature.
func New(namespace, name string, factory interface{}, stability Stability) *Feature {
return &Feature{
namespace: namespace,
name: name,
factory: factory,
stability: stability,
}
}

// RegisterBundle registers a bundle of features.
func RegisterBundle(bundle *Bundle) error {
for _, f := range bundle.Features() {
Registry.Register(f)
}
return nil
}

// Register register a new feature on the global registry.
func Register(feature Featurable) error {
return Registry.Register(feature)
}

// MustRegister register a new Feature on the global registry and panic on error.
func MustRegister(feature Featurable) {
err := Register(feature)
if err != nil {
panic(err)
}
}
157 changes: 157 additions & 0 deletions libbeat/feature/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package feature

import (
"fmt"
"reflect"
"sync"

"github.com/elastic/beats/libbeat/logp"
)

type mapper map[string]map[string]Featurable

// Registry implements a global registry for any kind of feature in beats.
// feature are grouped by namespace, a namespace is a kind of plugin like outputs, inputs, or queue.
// The feature name must be unique.
type registry struct {
sync.RWMutex
namespaces mapper
log *logp.Logger
}

// NewRegistry returns a new registry.
func newRegistry() *registry {
return &registry{
namespaces: make(mapper),
log: logp.NewLogger("registry"),
}
}

// Register registers a new feature into a specific namespace, namespace are lazy created.
// Feature name must be unique.
func (r *registry) Register(feature Featurable) error {
r.Lock()
defer r.Unlock()

ns := feature.Namespace()
name := feature.Name()

// Lazy create namespaces
_, found := r.namespaces[ns]
if !found {
r.namespaces[ns] = make(map[string]Featurable)
}

f, found := r.namespaces[ns][name]
if found {
if featuresEqual(feature, f) {
// Allow both old style and new style of plugin to work together.
r.log.Debugw(
"ignoring, feature '%s' is already registered in the namespace '%s'",
name,
ns,
)
return nil
}

return fmt.Errorf(
"could not register new feature '%s' in namespace '%s', feature name must be unique",
name,
ns,
)
}

r.log.Debugw(
"registering new feature",
"namespace",
ns,
"name",
name,
)

r.namespaces[ns][name] = feature

return nil
}

// Unregister removes a feature from the registry.
func (r *registry) Unregister(namespace, name string) error {
r.Lock()
defer r.Unlock()

v, found := r.namespaces[namespace]
if !found {
return fmt.Errorf("unknown namespace named '%s'", namespace)
}

_, found = v[name]
if !found {
return fmt.Errorf("unknown feature '%s' in namespace '%s'", name, namespace)
}

delete(r.namespaces[namespace], name)
return nil
}

// Lookup searches for a Feature by the namespace-name pair.
func (r *registry) Lookup(namespace, name string) (Featurable, error) {
r.RLock()
defer r.RUnlock()

v, found := r.namespaces[namespace]
if !found {
return nil, fmt.Errorf("unknown namespace named '%s'", namespace)
}

m, found := v[name]
if !found {
return nil, fmt.Errorf("unknown feature '%s' in namespace '%s'", name, namespace)
}

return m, nil
}

// LookupAll returns all the features for a specific namespace.
func (r *registry) LookupAll(namespace string) ([]Featurable, error) {
r.RLock()
defer r.RUnlock()

v, found := r.namespaces[namespace]
if !found {
return nil, fmt.Errorf("unknown namespace named '%s'", namespace)
}

list := make([]Featurable, len(v))
c := 0
for _, feature := range v {
list[c] = feature
c++
}

return list, nil
}

// Size returns the number of registered features in the registry.
func (r *registry) Size() int {
r.RLock()
defer r.RUnlock()

c := 0
for _, namespace := range r.namespaces {
c += len(namespace)
}

return c
}

func featuresEqual(f1, f2 Featurable) bool {
// There is no safe way to compare function in go,
// but since the function pointers are global it should be stable.
if f1.Name() == f2.Name() &&
f1.Namespace() == f2.Namespace() &&
reflect.ValueOf(f1.Factory()).Pointer() == reflect.ValueOf(f2.Factory()).Pointer() {
return true
}

return false
}
Loading