Skip to content

Commit

Permalink
[service/extensions] enforce order of start and shutdown of extension…
Browse files Browse the repository at this point in the history
…s according to configuration
  • Loading branch information
atoulme committed Oct 24, 2023
1 parent e97ceca commit 24dfe76
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 13 deletions.
25 changes: 25 additions & 0 deletions .chloggen/extension-lifecycle-order.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service/extensions

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implement strict ordering of startup, shutdown and notifications to extensions.

# One or more tracking issues or pull requests related to the change
issues: [8732]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
35 changes: 22 additions & 13 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@ const zExtensionName = "zextensionname"

// Extensions is a map of extensions created from extension configs.
type Extensions struct {
telemetry servicetelemetry.TelemetrySettings
extMap map[component.ID]extension.Extension
instanceIDs map[component.ID]*component.InstanceID
telemetry servicetelemetry.TelemetrySettings
extensionIDs []component.ID
extMap map[component.ID]extension.Extension
instanceIDs map[component.ID]*component.InstanceID
}

// Start starts all extensions.
func (bes *Extensions) Start(ctx context.Context, host component.Host) error {
bes.telemetry.Logger.Info("Starting extensions...")
for extID, ext := range bes.extMap {
for _, extID := range bes.extensionIDs {
extLogger := components.ExtensionLogger(bes.telemetry.Logger, extID)
extLogger.Info("Extension is starting...")
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStarting))
if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil {
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(err))
Expand All @@ -49,8 +51,9 @@ func (bes *Extensions) Start(ctx context.Context, host component.Host) error {
func (bes *Extensions) Shutdown(ctx context.Context) error {
bes.telemetry.Logger.Info("Stopping extensions...")
var errs error
for extID, ext := range bes.extMap {
for _, extID := range bes.extensionIDs {
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopping))
if err := ext.Shutdown(ctx); err != nil {
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(err))
Expand All @@ -64,7 +67,8 @@ func (bes *Extensions) Shutdown(ctx context.Context) error {
}

func (bes *Extensions) NotifyPipelineReady() error {
for extID, ext := range bes.extMap {
for _, extID := range bes.extensionIDs {
ext := bes.extMap[extID]
if pw, ok := ext.(extension.PipelineWatcher); ok {
if err := pw.Ready(); err != nil {
return fmt.Errorf("failed to notify extension %q: %w", extID, err)
Expand All @@ -77,7 +81,8 @@ func (bes *Extensions) NotifyPipelineReady() error {
func (bes *Extensions) NotifyPipelineNotReady() error {
// Notify extensions in reverse order.
var errs error
for _, ext := range bes.extMap {
for _, extID := range bes.extensionIDs {
ext := bes.extMap[extID]
if pw, ok := ext.(extension.PipelineWatcher); ok {
errs = multierr.Append(errs, pw.NotReady())
}
Expand All @@ -87,7 +92,8 @@ func (bes *Extensions) NotifyPipelineNotReady() error {

func (bes *Extensions) NotifyConfig(ctx context.Context, conf *confmap.Conf) error {
var errs error
for _, ext := range bes.extMap {
for _, extID := range bes.extensionIDs {
ext := bes.extMap[extID]
if cw, ok := ext.(extension.ConfigWatcher); ok {
clonedConf := confmap.NewFromStringMap(conf.ToStringMap())
errs = multierr.Append(errs, cw.NotifyConfig(ctx, clonedConf))
Expand All @@ -97,7 +103,8 @@ func (bes *Extensions) NotifyConfig(ctx context.Context, conf *confmap.Conf) err
}

func (bes *Extensions) NotifyComponentStatusChange(source *component.InstanceID, event *component.StatusEvent) {
for _, ext := range bes.extMap {
for _, extID := range bes.extensionIDs {
ext := bes.extMap[extID]
if sw, ok := ext.(extension.StatusWatcher); ok {
sw.ComponentStatusChanged(source, event)
}
Expand All @@ -120,7 +127,7 @@ func (bes *Extensions) HandleZPages(w http.ResponseWriter, r *http.Request) {
data := zpages.SummaryExtensionsTableData{}

data.Rows = make([]zpages.SummaryExtensionsTableRowData, 0, len(bes.extMap))
for id := range bes.extMap {
for _, id := range bes.extensionIDs {
row := zpages.SummaryExtensionsTableRowData{FullName: id.String()}
data.Rows = append(data.Rows, row)
}
Expand Down Expand Up @@ -150,9 +157,10 @@ type Settings struct {
// New creates a new Extensions from Config.
func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) {
exts := &Extensions{
telemetry: set.Telemetry,
extMap: make(map[component.ID]extension.Extension),
instanceIDs: make(map[component.ID]*component.InstanceID),
telemetry: set.Telemetry,
extMap: make(map[component.ID]extension.Extension),
instanceIDs: make(map[component.ID]*component.InstanceID),
extensionIDs: make([]component.ID, 0, len(cfg)),
}
for _, extID := range cfg {
instanceID := &component.InstanceID{
Expand All @@ -178,6 +186,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) {

exts.extMap[extID] = ext
exts.instanceIDs[extID] = instanceID
exts.extensionIDs = append(exts.extensionIDs, extID)
}

return exts, nil
Expand Down
69 changes: 69 additions & 0 deletions service/extensions/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,44 @@ func TestBuildExtensions(t *testing.T) {
}
}

func TestOrdering(t *testing.T) {
var startOrder []string
var shutdownOrder []string

recordingExtensionFactory := newRecordingExtensionFactory(func(set extension.CreateSettings, host component.Host) error {
startOrder = append(startOrder, set.ID.String())
return nil
}, func(set extension.CreateSettings) error {
shutdownOrder = append(shutdownOrder, set.ID.String())
return nil
})

exts, err := New(context.Background(), Settings{
Telemetry: servicetelemetry.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
Extensions: extension.NewBuilder(
map[component.ID]component.Config{
component.NewID(recordingExtensionFactory.Type()): struct{}{},
component.NewIDWithName(recordingExtensionFactory.Type(), "foo"): struct{}{},
component.NewIDWithName(recordingExtensionFactory.Type(), "bar"): struct{}{},
},
map[component.Type]extension.Factory{
recordingExtensionFactory.Type(): recordingExtensionFactory,
}),
}, Config{
component.NewID(recordingExtensionFactory.Type()),
component.NewIDWithName(recordingExtensionFactory.Type(), "foo"),
component.NewIDWithName(recordingExtensionFactory.Type(), "bar"),
})
require.NoError(t, err)
err = exts.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
require.Equal(t, []string{"recording", "recording/foo", "recording/bar"}, startOrder)
err = exts.Shutdown(context.Background())
require.NoError(t, err)
require.Equal(t, []string{"recording", "recording/foo", "recording/bar"}, shutdownOrder)
}

func TestNotifyConfig(t *testing.T) {
notificationError := errors.New("Error processing config")
nopExtensionFactory := extensiontest.NewNopFactory()
Expand Down Expand Up @@ -362,3 +400,34 @@ func newStatusTestExtensionFactory(name component.Type, startErr, shutdownErr er
component.StabilityLevelDevelopment,
)
}

func newRecordingExtensionFactory(startCallback func(set extension.CreateSettings, host component.Host) error, shutdownCallback func(set extension.CreateSettings) error) extension.Factory {
return extension.NewFactory(
"recording",
func() component.Config {
return &struct{}{}
},
func(ctx context.Context, set extension.CreateSettings, extension component.Config) (extension.Extension, error) {
return &recordingExtension{
createSettings: set,
startCallback: startCallback,
shutdownCallback: shutdownCallback,
}, nil
},
component.StabilityLevelDevelopment,
)
}

type recordingExtension struct {
startCallback func(set extension.CreateSettings, host component.Host) error
shutdownCallback func(set extension.CreateSettings) error
createSettings extension.CreateSettings
}

func (ext *recordingExtension) Start(_ context.Context, host component.Host) error {
return ext.startCallback(ext.createSettings, host)
}

func (ext *recordingExtension) Shutdown(_ context.Context) error {
return ext.shutdownCallback(ext.createSettings)
}

0 comments on commit 24dfe76

Please sign in to comment.