-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
Copy pathextension.go
163 lines (134 loc) · 5.73 KB
/
extension.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package extension // import "go.opentelemetry.io/collector/extension"
import (
"context"
"fmt"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
)
// Extension is the interface for objects hosted by the OpenTelemetry Collector that
// don't participate directly on data pipelines but provide some functionality
// to the service, examples: health check endpoint, z-pages, etc.
type Extension = component.Component
// PipelineWatcher is an extra interface for Extension hosted by the OpenTelemetry
// Collector that is to be implemented by extensions interested in changes to pipeline
// states. Typically this will be used by extensions that change their behavior if data is
// being ingested or not, e.g.: a k8s readiness probe.
type PipelineWatcher interface {
// Ready notifies the Extension that all pipelines were built and the
// receivers were started, i.e.: the service is ready to receive data
// (note that it may already have received data when this method is called).
Ready() error
// NotReady notifies the Extension that all receivers are about to be stopped,
// i.e.: pipeline receivers will not accept new data.
// This is sent before receivers are stopped, so the Extension can take any
// appropriate actions before that happens.
NotReady() error
}
// ConfigWatcher is an interface that should be implemented by an extension that
// wishes to be notified of the Collector's effective configuration.
type ConfigWatcher interface {
// NotifyConfig notifies the extension of the Collector's current effective configuration.
NotifyConfig(ctx context.Context, conf *confmap.Conf) error
}
// StatusWatcher is an extra interface for Extension hosted by the OpenTelemetry
// Collector that is to be implemented by extensions interested in changes to component
// status.
type StatusWatcher interface {
// ComponentStatusChanged notifies about a change in the source component status.
// Extensions that implement this interface must be ready that the ComponentStatusChanged
// may be called before, after or concurrently with calls to Component.Start() and Component.Shutdown().
// The function may be called concurrently with itself.
ComponentStatusChanged(source *component.InstanceID, event *component.StatusEvent)
}
// CreateSettings is passed to Factory.Create(...) function.
type CreateSettings struct {
// ID returns the ID of the component that will be created.
ID component.ID
component.TelemetrySettings
// BuildInfo can be used by components for informational purposes
BuildInfo component.BuildInfo
}
// CreateFunc is the equivalent of Factory.Create(...) function.
type CreateFunc func(context.Context, CreateSettings, component.Config) (Extension, error)
// CreateExtension implements Factory.Create.
func (f CreateFunc) CreateExtension(ctx context.Context, set CreateSettings, cfg component.Config) (Extension, error) {
return f(ctx, set, cfg)
}
type Factory interface {
component.Factory
// CreateExtension creates an extension based on the given config.
CreateExtension(ctx context.Context, set CreateSettings, cfg component.Config) (Extension, error)
// ExtensionStability gets the stability level of the Extension.
ExtensionStability() component.StabilityLevel
unexportedFactoryFunc()
}
type factory struct {
cfgType component.Type
component.CreateDefaultConfigFunc
CreateFunc
extensionStability component.StabilityLevel
}
func (f *factory) Type() component.Type {
return f.cfgType
}
func (f *factory) unexportedFactoryFunc() {}
func (f *factory) ExtensionStability() component.StabilityLevel {
return f.extensionStability
}
// NewFactory returns a new Factory based on this configuration.
func NewFactory(
cfgType component.Type,
createDefaultConfig component.CreateDefaultConfigFunc,
createServiceExtension CreateFunc,
sl component.StabilityLevel) Factory {
return &factory{
cfgType: cfgType,
CreateDefaultConfigFunc: createDefaultConfig,
CreateFunc: createServiceExtension,
extensionStability: sl,
}
}
// MakeFactoryMap takes a list of factories and returns a map with Factory type as keys.
// It returns a non-nil error when there are factories with duplicate type.
func MakeFactoryMap(factories ...Factory) (map[component.Type]Factory, error) {
fMap := map[component.Type]Factory{}
for _, f := range factories {
if _, ok := fMap[f.Type()]; ok {
return fMap, fmt.Errorf("duplicate extension factory %q", f.Type())
}
fMap[f.Type()] = f
}
return fMap, nil
}
// Builder extension is a helper struct that given a set of Configs and Factories helps with creating extensions.
type Builder struct {
cfgs map[component.ID]component.Config
factories map[component.Type]Factory
}
// NewBuilder creates a new extension.Builder to help with creating components form a set of configs and factories.
func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.Type]Factory) *Builder {
return &Builder{cfgs: cfgs, factories: factories}
}
// Create creates an extension based on the settings and configs available.
func (b *Builder) Create(ctx context.Context, set CreateSettings) (Extension, error) {
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("extension %q is not configured", set.ID)
}
f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("extension factory not available for: %q", set.ID)
}
sl := f.ExtensionStability()
if sl >= component.StabilityLevelAlpha {
set.Logger.Debug(sl.LogMessage())
} else {
set.Logger.Info(sl.LogMessage())
}
return f.CreateExtension(ctx, set, cfg)
}
func (b *Builder) Factory(componentType component.Type) component.Factory {
return b.factories[componentType]
}