From b7971f739f5df0f409a0a9ee7010432b04953666 Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Thu, 21 Jun 2018 10:59:19 -0400 Subject: [PATCH 1/9] Introduce a Global Registry *Motivation:* In the current libbeat implementation and also inside the actual beat we are defining registries for each types of feature that we want to expose. This add duplication to the project, the global registry is a way to keep the flexibility of multiple features and reduce the duplication code only to take care of the type satefy. Also all features now use an init function to make the plugin register with their specific registry. This PR is a step forward to remove that pattern and use a global variable in the package to identify the feature. This change will allow a beat author to build a beat with only a specific set of feature. Example: Build with only ES out and not Logstash and kafka, this could reduce the size of some beats. This PR is written in a backward compatible way, to make the init and the new feature works both at the same time. Instead of using an init function you will the following to expose the feature. ```golang // Feature exposes a spooling to disk queue. var Feature = queue.Feature("spool", create, feature.Beta) ``` Each new type of feature require to implement two things for type satefy: - A factory method to assert the correct type at runtime. - A sugar method like the `queue.Feature`, for type satefy at compile time. *Example:* ```golang // Feature creates a new type of queue. func Feature(name string, factory Factory, stability feature.Stability) *feature.Feature { return feature.New(Namespace, name, factory, stability) } // FindFactory retrieves a queue types constructor. Returns nil if queue type is unknown func FindFactory(name string) Factory { f, err := feature.Registry.Find(Namespace, name) if err != nil { return nil } factory, ok := f.Factory().(Factory) if !ok { return nil } return factory } ``` How it will look like for building beats with a minimal set of plugins: ``` b := MustBundle( MustBundle(docker.Feature), MustBundle(dissect.Feature), MustBundle(elasticsearch.Feature, logstash.Feature), ) feature.RegisterBundle(b) ``` *Caveats:* we still expose the methods and the registry as global, but this is a step to to isolate a registry per beat. --- libbeat/feature/bundle.go | 52 ++++++++++ libbeat/feature/bundle_test.go | 48 ++++++++++ libbeat/feature/feature.go | 120 +++++++++++++++++++++++ libbeat/feature/registry.go | 141 ++++++++++++++++++++++++++++ libbeat/feature/registry_test.go | 135 ++++++++++++++++++++++++++ libbeat/feature/stability_string.go | 16 ++++ 6 files changed, 512 insertions(+) create mode 100644 libbeat/feature/bundle.go create mode 100644 libbeat/feature/bundle_test.go create mode 100644 libbeat/feature/feature.go create mode 100644 libbeat/feature/registry.go create mode 100644 libbeat/feature/registry_test.go create mode 100644 libbeat/feature/stability_string.go diff --git a/libbeat/feature/bundle.go b/libbeat/feature/bundle.go new file mode 100644 index 00000000000..651f4a4acd8 --- /dev/null +++ b/libbeat/feature/bundle.go @@ -0,0 +1,52 @@ +package feature + +import "fmt" + +// Bundle defines a list of features available in the current beat. +type Bundle struct { + Features []Featurable +} + +// NewBundle creates a new Bundle of feature to be registered. +func NewBundle(features []Featurable) *Bundle { + return &Bundle{Features: features} +} + +// Filter creates a new bundle with only the feature matching the requested stability. +func (b *Bundle) Filter(stability Stability) *Bundle { + var filtered []Featurable + + for _, feature := range b.Features { + if feature.Stability() == stability { + filtered = append(filtered, feature) + } + } + return NewBundle(filtered) +} + +// MustBundle takes existing bundle or features and create a new Bundle with all the merged Features, +// will panic on errors. +func MustBundle(features ...interface{}) *Bundle { + b, err := BundleFeature(features...) + if err != nil { + panic(err) + } + return b +} + +// BundleFeature takes existing bundle or features and create a new Bundle with all the merged +// Features, +func BundleFeature(features ...interface{}) (*Bundle, error) { + var merged []Featurable + for _, feature := range features { + switch v := feature.(type) { + case Featurable: + merged = append(merged, v) + case *Bundle: + merged = append(merged, v.Features...) + default: + return nil, fmt.Errorf("unknown type, expecting 'Featurable' or 'Bundle' and received '%T'", v) + } + } + return NewBundle(merged), nil +} diff --git a/libbeat/feature/bundle_test.go b/libbeat/feature/bundle_test.go new file mode 100644 index 00000000000..2602f08cc6d --- /dev/null +++ b/libbeat/feature/bundle_test.go @@ -0,0 +1,48 @@ +package feature + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBundle(t *testing.T) { + factory := func() {} + features := []Featurable{ + New("libbeat.outputs", "elasticsearch", factory, Stable), + New("libbeat.outputs", "edge", factory, Experimental), + New("libbeat.input", "tcp", factory, Beta), + } + + t.Run("Creates a new Bundle", func(t *testing.T) { + b := NewBundle(features) + assert.Equal(t, 3, len(b.Features)) + }) + + t.Run("Filters feature based on stability", func(t *testing.T) { + b := NewBundle(features) + new := b.Filter(Experimental) + assert.Equal(t, 1, len(new.Features)) + }) + + t.Run("Creates a new Bundle from specified feature", func(t *testing.T) { + f1 := New("libbeat.outputs", "elasticsearch", factory, Stable) + b := MustBundle(f1) + assert.Equal(t, 1, len(b.Features)) + }) + + t.Run("Creates a new Bundle with grouped features", func(t *testing.T) { + f1 := New("libbeat.outputs", "elasticsearch", factory, Stable) + f2 := New("libbeat.outputs", "edge", factory, Experimental) + f3 := New("libbeat.input", "tcp", factory, Beta) + f4 := New("libbeat.input", "udp", factory, Beta) + + b := MustBundle( + MustBundle(f1), + MustBundle(f2), + MustBundle(f3, f4), + ) + + assert.Equal(t, 4, len(b.Features)) + }) +} diff --git a/libbeat/feature/feature.go b/libbeat/feature/feature.go new file mode 100644 index 00000000000..b2770139300 --- /dev/null +++ b/libbeat/feature/feature.go @@ -0,0 +1,120 @@ +package feature + +import ( + "fmt" + "reflect" +) + +//go:generate stringer -type=Stability + +// Registry is the global plugin registry, this variable is mean to be temporary to move all the +// internal factory to receive a context that include the current beat registry. +var Registry = newRegistry() + +// Featurable implements the description of a feature. +type Featurable interface { + // Namespace returns the namespace of the Feature. + Namespace() string + + // Name returns the name of the feature. The name must be unique for each namespace. + Name() string + + // Factory returns the factory func. + Factory() interface{} + + // Stability returns the stability of the feature. + Stability() Stability + + // Equal returns true if the two object are equal. + Equal(other Featurable) bool + + String() string +} + +// Feature contains the information for a specific feature +type Feature struct { + namespace string + name string + factory interface{} + stability Stability +} + +// Namespace return the namespace of the feature. +func (f *Feature) Namespace() string { + return f.namespace +} + +// Name returns the name of the feature. +func (f *Feature) Name() string { + return f.name +} + +// Factory returns the factory for the feature. +func (f *Feature) Factory() interface{} { + return f.factory +} + +// Stability returns the stability level of the feature, current: stable, beta, experimental. +func (f *Feature) Stability() Stability { + return f.stability +} + +// Equal return true if both object are equals. +func (f *Feature) Equal(other Featurable) bool { + // There is no safe way to compare function in go, + // but since the method are global it should be stable. + if f.Name() == other.Name() && + f.Namespace() == other.Namespace() && + reflect.ValueOf(f.Factory()).Pointer() == reflect.ValueOf(other.Factory()).Pointer() { + return true + } + + return false +} + +// String return the debug information +func (f *Feature) String() string { + return fmt.Sprintf("%s/%s (stability: %s)", f.namespace, f.name, f.stability) +} + +// Stability defines the stability of the feature, this value can be used to filter a bundler. +type Stability int + +// List all the available stability for a feature. +const ( + Stable Stability = iota + Beta + Experimental + Undefined +) + +// New returns a new Feature. +func New(namespace, name string, factory interface{}, stability Stability) *Feature { + return &Feature{ + namespace: namespace, + name: name, + factory: factory, + stability: stability, + } +} + +// RegisterBundle registers a bundle of features. +func RegisterBundle(bundle Bundle) error { + for _, f := range bundle.Features { + Registry.Register(f) + } + return nil +} + +// Register register a new feature on the global registry. +func Register(feature Featurable) error { + return Registry.Register(feature) +} + +// MustRegister register a new Feature on the global registry and panic on error. +func MustRegister(feature Featurable) { + err := Register(feature) + if err != nil { + panic(err) + } +} diff --git a/libbeat/feature/registry.go b/libbeat/feature/registry.go new file mode 100644 index 00000000000..276d8d244ed --- /dev/null +++ b/libbeat/feature/registry.go @@ -0,0 +1,141 @@ +package feature + +import ( + "fmt" + "sync" + + "github.com/elastic/beats/libbeat/logp" +) + +type mapper map[string]map[string]Featurable + +// Registry implements a global registry for any kind of feature in beats. +// feature are grouped by namespace, a namespace is a kind of plugin like outputs, inputs, or queue. +// The feature name must be unique. +type registry struct { + sync.RWMutex + namespaces mapper + log *logp.Logger +} + +// NewRegistry returns a new registry. +func newRegistry() *registry { + return ®istry{ + namespaces: make(mapper), + log: logp.NewLogger("registry"), + } +} + +// Register registers a new feature into a specific namespace, namespace are lazy created. +// Feature name must be unique. +func (r *registry) Register(feature Featurable) error { + r.Lock() + defer r.Unlock() + + // Lazy create namespaces + _, found := r.namespaces[feature.Namespace()] + if !found { + r.namespaces[feature.Namespace()] = make(map[string]Featurable) + } + + f, found := r.namespaces[feature.Namespace()][feature.Name()] + if found { + if feature.Equal(f) { + // Allow both old style and new style of plugin to work together. + r.log.Debugw( + "ignoring, feature '%s' is already registered in the namespace '%s'", + feature.Name(), + feature.Namespace(), + ) + return nil + } + + return fmt.Errorf( + "could not register new feature '%s' in namespace '%s', feature name must be unique", + feature.Name(), + feature.Namespace(), + ) + } + + r.log.Debugw( + "registering new feature", + "namespace", + feature.Namespace(), + "name", + feature.Name(), + ) + + r.namespaces[feature.Namespace()][feature.Name()] = feature + + return nil +} + +// Unregister removes a feature from the registry. +func (r *registry) Unregister(namespace, name string) error { + r.Lock() + defer r.Unlock() + + v, found := r.namespaces[namespace] + if !found { + return fmt.Errorf("unknown namespace named '%s'", namespace) + } + + _, found = v[name] + if !found { + return fmt.Errorf("unknown feature '%s' in namespace '%s'", name, namespace) + } + + delete(r.namespaces[namespace], name) + return nil +} + +// Find returns a specific Find from a namespace or an error if not found. +func (r *registry) Find(namespace, name string) (Featurable, error) { + r.RLock() + defer r.RUnlock() + + v, found := r.namespaces[namespace] + if !found { + return nil, fmt.Errorf("unknown namespace named '%s'", namespace) + } + + m, found := v[name] + if !found { + return nil, fmt.Errorf("unknown feature '%s' in namespace '%s'", name, namespace) + } + + return m, nil +} + +// FindAll returns all the features for a specific namespace. +func (r *registry) FindAll(namespace string) ([]Featurable, error) { + r.RLock() + defer r.RUnlock() + + v, found := r.namespaces[namespace] + if !found { + return nil, fmt.Errorf("unknown namespace named '%s'", namespace) + } + + list := make([]Featurable, len(v)) + c := 0 + for _, feature := range v { + list[c] = feature + c++ + } + + return list, nil +} + +// Size returns the number of registered features in the registry. +func (r *registry) Size() int { + r.RLock() + defer r.RUnlock() + + c := 0 + for _, namespace := range r.namespaces { + c += len(namespace) + } + + return c +} diff --git a/libbeat/feature/registry_test.go b/libbeat/feature/registry_test.go new file mode 100644 index 00000000000..8b7e4fa55cf --- /dev/null +++ b/libbeat/feature/registry_test.go @@ -0,0 +1,135 @@ +package feature + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRegister(t *testing.T) { + f := func() {} + + t.Run("namespace and feature doesn't exist", func(t *testing.T) { + r := newRegistry() + err := r.Register(New("outputs", "null", f, Stable)) + if !assert.NoError(t, err) { + return + } + + assert.Equal(t, 1, r.Size()) + }) + + t.Run("namespace exists and feature doesn't exist", func(t *testing.T) { + r := newRegistry() + r.Register(New("processor", "bar", f, Stable)) + err := r.Register(New("processor", "foo", f, Stable)) + if !assert.NoError(t, err) { + return + } + + assert.Equal(t, 2, r.Size()) + }) + + t.Run("namespace exists and feature exists and not the same factory", func(t *testing.T) { + r := newRegistry() + r.Register(New("processor", "foo", func() {}, Stable)) + err := r.Register(New("processor", "foo", f, Stable)) + if !assert.Error(t, err) { + return + } + assert.Equal(t, 1, r.Size()) + }) + + t.Run("when the exact feature is already registered", func(t *testing.T) { + feature := New("processor", "foo", f, Stable) + r := newRegistry() + r.Register(feature) + err := r.Register(feature) + if !assert.NoError(t, err) { + return + } + assert.Equal(t, 1, r.Size()) + }) +} + +func TestFeature(t *testing.T) { + f := func() {} + + r := newRegistry() + r.Register(New("processor", "foo", f, Stable)) + + t.Run("when namespace and feature are present", func(t *testing.T) { + feature, err := r.Find("processor", "foo") + if !assert.NotNil(t, feature.Factory()) { + return + } + assert.NoError(t, err) + }) + + t.Run("when namespace doesn't exist", func(t *testing.T) { + _, err := r.Find("hello", "foo") + if !assert.Error(t, err) { + return + } + }) +} + +func TestFeatures(t *testing.T) { + f := func() {} + + r := newRegistry() + r.Register(New("processor", "foo", f, Stable)) + r.Register(New("processor", "foo2", f, Stable)) + + t.Run("when namespace and feature are present", func(t *testing.T) { + features, err := r.FindAll("processor") + if !assert.NoError(t, err) { + return + } + assert.Equal(t, 2, len(features)) + }) + + t.Run("when namespace is not present", func(t *testing.T) { + _, err := r.FindAll("foobar") + if !assert.Error(t, err) { + return + } + }) +} + +func TestUnregister(t *testing.T) { + f := func() {} + + t.Run("when the namespace and the feature exists", func(t *testing.T) { + r := newRegistry() + r.Register(New("processor", "foo", f, Stable)) + assert.Equal(t, 1, r.Size()) + err := r.Unregister("processor", "foo") + if !assert.NoError(t, err) { + return + } + assert.Equal(t, 0, r.Size()) + }) + + t.Run("when the namespace exist and the feature doesn't", func(t *testing.T) { + r := newRegistry() + r.Register(New("processor", "foo", f, Stable)) + assert.Equal(t, 1, r.Size()) + err := r.Unregister("processor", "bar") + if assert.Error(t, err) { + return + } + assert.Equal(t, 0, r.Size()) + }) + + t.Run("when the namespace doesn't exists", func(t *testing.T) { + r := newRegistry() + r.Register(New("processor", "foo", f, Stable)) + assert.Equal(t, 1, r.Size()) + err := r.Unregister("outputs", "bar") + if assert.Error(t, err) { + return + } + assert.Equal(t, 0, r.Size()) + }) +} diff --git a/libbeat/feature/stability_string.go b/libbeat/feature/stability_string.go new file mode 100644 index 00000000000..67a7b06e882 --- /dev/null +++ b/libbeat/feature/stability_string.go @@ -0,0 +1,16 @@ +// Code generated by "stringer -type=Stability"; DO NOT EDIT. + +package feature + +import "strconv" + +const _Stability_name = "StableBetaExperimentalUndefined" + +var _Stability_index = [...]uint8{0, 6, 10, 22, 31} + +func (i Stability) String() string { + if i < 0 || i >= Stability(len(_Stability_index)-1) { + return "Stability(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _Stability_name[_Stability_index[i]:_Stability_index[i+1]] +} From ad6adc0a0d2e3e925a6f00e3689d3bf47468300f Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Thu, 21 Jun 2018 11:47:43 -0400 Subject: [PATCH 2/9] Example of backward compatibility for the mem/spool queues --- libbeat/publisher/queue/memqueue/broker.go | 4 ++++ libbeat/publisher/queue/queue_reg.go | 28 +++++++++++++++++----- libbeat/publisher/queue/spool/module.go | 4 ++++ 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 4f02e38ae9d..ad7332aebf5 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -5,10 +5,14 @@ import ( "time" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/feature" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/publisher/queue" ) +// Feature exposes a memory queue. +var Feature = queue.Feature("mem", create, feature.Stable) + type Broker struct { done chan struct{} diff --git a/libbeat/publisher/queue/queue_reg.go b/libbeat/publisher/queue/queue_reg.go index b445c6424bf..67b324c6323 100644 --- a/libbeat/publisher/queue/queue_reg.go +++ b/libbeat/publisher/queue/queue_reg.go @@ -4,23 +4,34 @@ import ( "fmt" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/feature" ) +// Namespace is the feature namespace for queue definition. +var Namespace = "libbeat.queue" + // Global queue type registry for configuring and loading a queue instance // via common.Config var queueReg = map[string]Factory{} // RegisterType registers a new queue type. -func RegisterType(name string, f Factory) { - if queueReg[name] != nil { - panic(fmt.Errorf("queue type '%v' exists already", name)) - } - queueReg[name] = f +func RegisterType(name string, fn Factory) { + f := feature.New(Namespace, name, fn, feature.Undefined) + feature.MustRegister(f) } // FindFactory retrieves a queue types constructor. Returns nil if queue type is unknown func FindFactory(name string) Factory { - return queueReg[name] + f, err := feature.Registry.Find(Namespace, name) + if err != nil { + return nil + } + factory, ok := f.Factory().(Factory) + if !ok { + return nil + } + + return factory } // Load instantiates a new queue. @@ -36,3 +47,8 @@ func Load(eventer Eventer, config common.ConfigNamespace) (Queue, error) { } return factory(eventer, cfg) } + +// Feature creates a new type of queue. +func Feature(name string, factory Factory, stability feature.Stability) *feature.Feature { + return feature.New(Namespace, name, factory, stability) +} diff --git a/libbeat/publisher/queue/spool/module.go b/libbeat/publisher/queue/spool/module.go index 78930f8b1c3..03da613fb7a 100644 --- a/libbeat/publisher/queue/spool/module.go +++ b/libbeat/publisher/queue/spool/module.go @@ -3,11 +3,15 @@ package spool import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/feature" "github.com/elastic/beats/libbeat/paths" "github.com/elastic/beats/libbeat/publisher/queue" "github.com/elastic/go-txfile" ) +// Feature exposes a spooling to disk queue. +var Feature = queue.Feature("spool", create, feature.Beta) + func init() { queue.RegisterType("spool", create) } From 2f46cf32968b69d181b9d50257efb14d88faf108 Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Thu, 21 Jun 2018 14:41:33 -0400 Subject: [PATCH 3/9] allow to filter on multiple stabilities --- libbeat/feature/bundle.go | 9 ++++++--- libbeat/feature/bundle_test.go | 6 ++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/libbeat/feature/bundle.go b/libbeat/feature/bundle.go index 651f4a4acd8..174c2f973b0 100644 --- a/libbeat/feature/bundle.go +++ b/libbeat/feature/bundle.go @@ -13,12 +13,15 @@ func NewBundle(features []Featurable) *Bundle { } // Filter creates a new bundle with only the feature matching the requested stability. -func (b *Bundle) Filter(stability Stability) *Bundle { +func (b *Bundle) Filter(stabilities ...Stability) *Bundle { var filtered []Featurable for _, feature := range b.Features { - if feature.Stability() == stability { - filtered = append(filtered, feature) + for _, stability := range stabilities { + if feature.Stability() == stability { + filtered = append(filtered, feature) + break + } } } return NewBundle(filtered) diff --git a/libbeat/feature/bundle_test.go b/libbeat/feature/bundle_test.go index 2602f08cc6d..df293cc4b14 100644 --- a/libbeat/feature/bundle_test.go +++ b/libbeat/feature/bundle_test.go @@ -25,6 +25,12 @@ func TestBundle(t *testing.T) { assert.Equal(t, 1, len(new.Features)) }) + t.Run("Filters feature based on multiple different stability", func(t *testing.T) { + b := NewBundle(features) + new := b.Filter(Experimental, Stable) + assert.Equal(t, 2, len(new.Features)) + }) + t.Run("Creates a new Bundle from specified feature", func(t *testing.T) { f1 := New("libbeat.outputs", "elasticsearch", factory, Stable) b := MustBundle(f1) From 2dfac4fdbdd62491cccd675881b08f947c5bfb43 Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Thu, 21 Jun 2018 16:14:56 -0400 Subject: [PATCH 4/9] use a reference --- libbeat/feature/feature.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/feature/feature.go b/libbeat/feature/feature.go index b2770139300..0931bbe5364 100644 --- a/libbeat/feature/feature.go +++ b/libbeat/feature/feature.go @@ -99,7 +99,7 @@ func New(namespace, name string, factory interface{}, stability Stability) *Feat } // RegisterBundle registers a bundle of features. -func RegisterBundle(bundle Bundle) error { +func RegisterBundle(bundle *Bundle) error { for _, f := range bundle.Features { Registry.Register(f) } From fdb524344fdd10a7b9dfa14ac7faeafb7755433b Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Tue, 26 Jun 2018 09:34:45 -0400 Subject: [PATCH 5/9] Stability extracts into his own file. --- libbeat/feature/feature.go | 13 ------------- libbeat/feature/stability.go | 14 ++++++++++++++ libbeat/feature/stability_string.go | 4 ++-- 3 files changed, 16 insertions(+), 15 deletions(-) create mode 100644 libbeat/feature/stability.go diff --git a/libbeat/feature/feature.go b/libbeat/feature/feature.go index 0931bbe5364..8dd567a9677 100644 --- a/libbeat/feature/feature.go +++ b/libbeat/feature/feature.go @@ -5,8 +5,6 @@ import ( "reflect" ) -//go:generate stringer -type=Stability - // Registry is the global plugin registry, this variable is mean to be temporary to move all the // internal factory to receive a context that include the current beat registry. var Registry = newRegistry() @@ -77,17 +75,6 @@ func (f *Feature) String() string { return fmt.Sprintf("%s/%s (stability: %s)", f.namespace, f.name, f.stability) } -// Stability defines the stability of the feature, this value can be used to filter a bundler. -type Stability int - -// List all the available stability for a feature. -const ( - Stable Stability = iota - Beta - Experimental - Undefined -) - // New returns a new Feature. func New(namespace, name string, factory interface{}, stability Stability) *Feature { return &Feature{ diff --git a/libbeat/feature/stability.go b/libbeat/feature/stability.go new file mode 100644 index 00000000000..7903cee89cc --- /dev/null +++ b/libbeat/feature/stability.go @@ -0,0 +1,14 @@ +package feature + +//go:generate stringer -type=Stability + +// Stability defines the stability of the feature, this value can be used to filter a bundler. +type Stability int + +// List all the available stability for a feature. +const ( + Undefined Stability = iota + Stable + Beta + Experimental +) diff --git a/libbeat/feature/stability_string.go b/libbeat/feature/stability_string.go index 67a7b06e882..130cabef2bd 100644 --- a/libbeat/feature/stability_string.go +++ b/libbeat/feature/stability_string.go @@ -4,9 +4,9 @@ package feature import "strconv" -const _Stability_name = "StableBetaExperimentalUndefined" +const _Stability_name = "UndefinedStableBetaExperimental" -var _Stability_index = [...]uint8{0, 6, 10, 22, 31} +var _Stability_index = [...]uint8{0, 9, 15, 19, 31} func (i Stability) String() string { if i < 0 || i >= Stability(len(_Stability_index)-1) { From a565de2cb8ffa104abb632a933f70d7f4de21e20 Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Tue, 26 Jun 2018 10:09:41 -0400 Subject: [PATCH 6/9] Review first round. --- libbeat/feature/bundle.go | 40 +++++++++++++--------------------- libbeat/feature/bundle_test.go | 10 ++++----- libbeat/feature/feature.go | 38 +++++++++++++++----------------- libbeat/feature/registry.go | 38 ++++++++++++++++++++++---------- 4 files changed, 64 insertions(+), 62 deletions(-) diff --git a/libbeat/feature/bundle.go b/libbeat/feature/bundle.go index 174c2f973b0..7e32c583c45 100644 --- a/libbeat/feature/bundle.go +++ b/libbeat/feature/bundle.go @@ -1,22 +1,25 @@ package feature -import "fmt" +// Bundleable merges featurable and bundle interface together. +type bundleable interface { + Features() []Featurable +} // Bundle defines a list of features available in the current beat. type Bundle struct { - Features []Featurable + features []Featurable } // NewBundle creates a new Bundle of feature to be registered. func NewBundle(features []Featurable) *Bundle { - return &Bundle{Features: features} + return &Bundle{features: features} } // Filter creates a new bundle with only the feature matching the requested stability. func (b *Bundle) Filter(stabilities ...Stability) *Bundle { var filtered []Featurable - for _, feature := range b.Features { + for _, feature := range b.features { for _, stability := range stabilities { if feature.Stability() == stability { filtered = append(filtered, feature) @@ -27,29 +30,16 @@ func (b *Bundle) Filter(stabilities ...Stability) *Bundle { return NewBundle(filtered) } -// MustBundle takes existing bundle or features and create a new Bundle with all the merged Features, -// will panic on errors. -func MustBundle(features ...interface{}) *Bundle { - b, err := BundleFeature(features...) - if err != nil { - panic(err) - } - return b +// Features returns the interface features slice so +func (b *Bundle) Features() []Featurable { + return b.features } -// BundleFeature takes existing bundle or features and create a new Bundle with all the merged -// Features, -func BundleFeature(features ...interface{}) (*Bundle, error) { +// MustBundle takes existing bundle or features and create a new Bundle with all the merged Features. +func MustBundle(bundle ...bundleable) *Bundle { var merged []Featurable - for _, feature := range features { - switch v := feature.(type) { - case Featurable: - merged = append(merged, v) - case *Bundle: - merged = append(merged, v.Features...) - default: - return nil, fmt.Errorf("unknown type, expecting 'Featurable' or 'Bundle' and received '%T'", v) - } + for _, feature := range bundle { + merged = append(merged, feature.Features()...) } - return NewBundle(merged), nil + return NewBundle(merged) } diff --git a/libbeat/feature/bundle_test.go b/libbeat/feature/bundle_test.go index df293cc4b14..e675a7f0941 100644 --- a/libbeat/feature/bundle_test.go +++ b/libbeat/feature/bundle_test.go @@ -16,25 +16,25 @@ func TestBundle(t *testing.T) { t.Run("Creates a new Bundle", func(t *testing.T) { b := NewBundle(features) - assert.Equal(t, 3, len(b.Features)) + assert.Equal(t, 3, len(b.Features())) }) t.Run("Filters feature based on stability", func(t *testing.T) { b := NewBundle(features) new := b.Filter(Experimental) - assert.Equal(t, 1, len(new.Features)) + assert.Equal(t, 1, len(new.Features())) }) t.Run("Filters feature based on multiple different stability", func(t *testing.T) { b := NewBundle(features) new := b.Filter(Experimental, Stable) - assert.Equal(t, 2, len(new.Features)) + assert.Equal(t, 2, len(new.Features())) }) t.Run("Creates a new Bundle from specified feature", func(t *testing.T) { f1 := New("libbeat.outputs", "elasticsearch", factory, Stable) b := MustBundle(f1) - assert.Equal(t, 1, len(b.Features)) + assert.Equal(t, 1, len(b.Features())) }) t.Run("Creates a new Bundle with grouped features", func(t *testing.T) { @@ -49,6 +49,6 @@ func TestBundle(t *testing.T) { MustBundle(f3, f4), ) - assert.Equal(t, 4, len(b.Features)) + assert.Equal(t, 4, len(b.Features())) }) } diff --git a/libbeat/feature/feature.go b/libbeat/feature/feature.go index 8dd567a9677..9e9c53bf8eb 100644 --- a/libbeat/feature/feature.go +++ b/libbeat/feature/feature.go @@ -2,30 +2,34 @@ package feature import ( "fmt" - "reflect" ) -// Registry is the global plugin registry, this variable is mean to be temporary to move all the +// Registry is the global plugin registry, this variable is meant to be temporary to move all the // internal factory to receive a context that include the current beat registry. var Registry = newRegistry() // Featurable implements the description of a feature. type Featurable interface { - // Namespace returns the namespace of the Feature. + bundleable + + // Namespace is the kind of plugin or functionality we want to expose as a feature. + // Examples: Autodiscover's provider, processors, outputs. Namespace() string - // Name returns the name of the feature. The name must be unique for each namespace. + // Name is the name of the feature, the name must unique by namespace and be a description of the + // actual functionality, it is usually the name of the package. + // Examples: dissect, elasticsearch, redis Name() string - // Factory returns the factory func. + // Factory returns the function used to create an instance of the Feature, the signature + // of the method is type checked by the 'FindFactory' of each namespace. Factory() interface{} - // Stability returns the stability of the feature. + // Stability is the stability of the Feature, this allow the user to filter embedded functionality + // by their maturity at runtime. + // Example: Beta, Experimental, Stable or Undefined. Stability() Stability - // Equal returns true if the two object are equal. - Equal(other Featurable) bool - String() string } @@ -57,17 +61,9 @@ func (f *Feature) Stability() Stability { return f.stability } -// Equal return true if both object are equals. -func (f *Feature) Equal(other Featurable) bool { - // There is no safe way to compare function in go, - // but since the method are global it should be stable. - if f.Name() == other.Name() && - f.Namespace() == other.Namespace() && - reflect.ValueOf(f.Factory()).Pointer() == reflect.ValueOf(other.Factory()).Pointer() { - return true - } - - return false +// Features return the current feature as a slice to be compatible with Bundle merging and filtering. +func (f *Feature) Features() []Featurable { + return []Featurable{f} } // String return the debug information @@ -87,7 +83,7 @@ func New(namespace, name string, factory interface{}, stability Stability) *Feat // RegisterBundle registers a bundle of features. func RegisterBundle(bundle *Bundle) error { - for _, f := range bundle.Features { + for _, f := range bundle.Features() { Registry.Register(f) } return nil diff --git a/libbeat/feature/registry.go b/libbeat/feature/registry.go index 276d8d244ed..f514963e6cb 100644 --- a/libbeat/feature/registry.go +++ b/libbeat/feature/registry.go @@ -2,6 +2,7 @@ package feature import ( "fmt" + "reflect" "sync" "github.com/elastic/beats/libbeat/logp" @@ -32,40 +33,43 @@ func (r *registry) Register(feature Featurable) error { r.Lock() defer r.Unlock() + ns := feature.Namespace() + name := feature.Name() + // Lazy create namespaces - _, found := r.namespaces[feature.Namespace()] + _, found := r.namespaces[ns] if !found { - r.namespaces[feature.Namespace()] = make(map[string]Featurable) + r.namespaces[ns] = make(map[string]Featurable) } - f, found := r.namespaces[feature.Namespace()][feature.Name()] + f, found := r.namespaces[ns][name] if found { - if feature.Equal(f) { + if featuresEqual(feature, f) { // Allow both old style and new style of plugin to work together. r.log.Debugw( "ignoring, feature '%s' is already registered in the namespace '%s'", - feature.Name(), - feature.Namespace(), + name, + ns, ) return nil } return fmt.Errorf( "could not register new feature '%s' in namespace '%s', feature name must be unique", - feature.Name(), - feature.Namespace(), + name, + ns, ) } r.log.Debugw( "registering new feature", "namespace", - feature.Namespace(), + ns, "name", - feature.Name(), + name, ) - r.namespaces[feature.Namespace()][feature.Name()] = feature + r.namespaces[ns][name] = feature return nil } @@ -139,3 +143,15 @@ func (r *registry) Size() int { return c } + +func featuresEqual(f1, f2 Featurable) bool { + // There is no safe way to compare function in go, + // but since the function pointers are global it should be stable. + if f1.Name() == f2.Name() && + f1.Namespace() == f2.Namespace() && + reflect.ValueOf(f1.Factory()).Pointer() == reflect.ValueOf(f2.Factory()).Pointer() { + return true + } + + return false +} From d3bea986d3943ffb33975ce065bcf12fb05b1865 Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Tue, 26 Jun 2018 10:13:58 -0400 Subject: [PATCH 7/9] uses lookup and lookupAll instead --- libbeat/feature/registry.go | 8 ++++---- libbeat/feature/registry_test.go | 8 ++++---- libbeat/publisher/queue/queue_reg.go | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/libbeat/feature/registry.go b/libbeat/feature/registry.go index f514963e6cb..40dea98c275 100644 --- a/libbeat/feature/registry.go +++ b/libbeat/feature/registry.go @@ -93,8 +93,8 @@ func (r *registry) Unregister(namespace, name string) error { return nil } -// Find returns a specific Find from a namespace or an error if not found. -func (r *registry) Find(namespace, name string) (Featurable, error) { +// Lookup searches for a Feature by the namespace-name pair. +func (r *registry) Lookup(namespace, name string) (Featurable, error) { r.RLock() defer r.RUnlock() @@ -111,8 +111,8 @@ func (r *registry) Find(namespace, name string) (Featurable, error) { return m, nil } -// FindAll returns all the features for a specific namespace. -func (r *registry) FindAll(namespace string) ([]Featurable, error) { +// LookupAll returns all the features for a specific namespace. +func (r *registry) LookupAll(namespace string) ([]Featurable, error) { r.RLock() defer r.RUnlock() diff --git a/libbeat/feature/registry_test.go b/libbeat/feature/registry_test.go index 8b7e4fa55cf..984b248e079 100644 --- a/libbeat/feature/registry_test.go +++ b/libbeat/feature/registry_test.go @@ -59,7 +59,7 @@ func TestFeature(t *testing.T) { r.Register(New("processor", "foo", f, Stable)) t.Run("when namespace and feature are present", func(t *testing.T) { - feature, err := r.Find("processor", "foo") + feature, err := r.Lookup("processor", "foo") if !assert.NotNil(t, feature.Factory()) { return } @@ -67,7 +67,7 @@ func TestFeature(t *testing.T) { }) t.Run("when namespace doesn't exist", func(t *testing.T) { - _, err := r.Find("hello", "foo") + _, err := r.Lookup("hello", "foo") if !assert.Error(t, err) { return } @@ -82,7 +82,7 @@ func TestFeatures(t *testing.T) { r.Register(New("processor", "foo2", f, Stable)) t.Run("when namespace and feature are present", func(t *testing.T) { - features, err := r.FindAll("processor") + features, err := r.LookupAll("processor") if !assert.NoError(t, err) { return } @@ -90,7 +90,7 @@ func TestFeatures(t *testing.T) { }) t.Run("when namespace is not present", func(t *testing.T) { - _, err := r.FindAll("foobar") + _, err := r.LookupAll("foobar") if !assert.Error(t, err) { return } diff --git a/libbeat/publisher/queue/queue_reg.go b/libbeat/publisher/queue/queue_reg.go index 67b324c6323..38dfb9e2b62 100644 --- a/libbeat/publisher/queue/queue_reg.go +++ b/libbeat/publisher/queue/queue_reg.go @@ -22,7 +22,7 @@ func RegisterType(name string, fn Factory) { // FindFactory retrieves a queue types constructor. Returns nil if queue type is unknown func FindFactory(name string) Factory { - f, err := feature.Registry.Find(Namespace, name) + f, err := feature.Registry.Lookup(Namespace, name) if err != nil { return nil } From 10c0f861edbf433f3ee975a00e1e75483a457552 Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Tue, 26 Jun 2018 10:16:27 -0400 Subject: [PATCH 8/9] adding developer changelog --- CHANGELOG-developer.asciidoc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG-developer.asciidoc b/CHANGELOG-developer.asciidoc index c12038d0c64..a4c10b360ae 100644 --- a/CHANGELOG-developer.asciidoc +++ b/CHANGELOG-developer.asciidoc @@ -29,3 +29,5 @@ The list below covers the major changes between 6.3.0 and master only. - Fix permissions of generated Filebeat filesets. {pull}7140[7140] ==== Added + +- Libbeat provides a global registry for beats developer that allow to register and retrieve plugin. {pull}7392[7392] From 0665439c6f7f0757c790743de5ddb294dff8af44 Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Tue, 26 Jun 2018 10:31:16 -0400 Subject: [PATCH 9/9] use a variadic function --- libbeat/feature/bundle.go | 6 +++--- libbeat/feature/bundle_test.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/libbeat/feature/bundle.go b/libbeat/feature/bundle.go index 7e32c583c45..e52bae74cb5 100644 --- a/libbeat/feature/bundle.go +++ b/libbeat/feature/bundle.go @@ -11,7 +11,7 @@ type Bundle struct { } // NewBundle creates a new Bundle of feature to be registered. -func NewBundle(features []Featurable) *Bundle { +func NewBundle(features ...Featurable) *Bundle { return &Bundle{features: features} } @@ -27,7 +27,7 @@ func (b *Bundle) Filter(stabilities ...Stability) *Bundle { } } } - return NewBundle(filtered) + return NewBundle(filtered...) } // Features returns the interface features slice so @@ -41,5 +41,5 @@ func MustBundle(bundle ...bundleable) *Bundle { for _, feature := range bundle { merged = append(merged, feature.Features()...) } - return NewBundle(merged) + return NewBundle(merged...) } diff --git a/libbeat/feature/bundle_test.go b/libbeat/feature/bundle_test.go index e675a7f0941..2dda7c20d1b 100644 --- a/libbeat/feature/bundle_test.go +++ b/libbeat/feature/bundle_test.go @@ -15,18 +15,18 @@ func TestBundle(t *testing.T) { } t.Run("Creates a new Bundle", func(t *testing.T) { - b := NewBundle(features) + b := NewBundle(features...) assert.Equal(t, 3, len(b.Features())) }) t.Run("Filters feature based on stability", func(t *testing.T) { - b := NewBundle(features) + b := NewBundle(features...) new := b.Filter(Experimental) assert.Equal(t, 1, len(new.Features())) }) t.Run("Filters feature based on multiple different stability", func(t *testing.T) { - b := NewBundle(features) + b := NewBundle(features...) new := b.Filter(Experimental, Stable) assert.Equal(t, 2, len(new.Features())) })