From fba8df0befffade915d7d07f7bad077593a7ad2c Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Thu, 22 Feb 2024 18:34:48 -0500 Subject: [PATCH 1/6] POC for http settings --- extension/opampextension/config.go | 77 ++++++++++++++++++++++--- extension/opampextension/factory.go | 4 +- extension/opampextension/opamp_agent.go | 14 ++--- 3 files changed, 76 insertions(+), 19 deletions(-) diff --git a/extension/opampextension/config.go b/extension/opampextension/config.go index 825c52a9ab1b..c19eb9f36017 100644 --- a/extension/opampextension/config.go +++ b/extension/opampextension/config.go @@ -5,11 +5,14 @@ package opampextension // import "github.com/open-telemetry/opentelemetry-collec import ( "errors" + "net/url" "github.com/oklog/ulid/v2" + "github.com/open-telemetry/opamp-go/client" "github.com/open-telemetry/opamp-go/protobufs" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configtls" + "go.uber.org/zap" ) // Config contains the configuration for the opamp extension. Trying to mirror @@ -41,22 +44,80 @@ func (caps Capabilities) toAgentCapabilities() protobufs.AgentCapabilities { return agentCapabilities } +type commonFields struct { + Endpoint string `mapstructure:"endpoint"` + TLSSetting configtls.TLSClientSetting `mapstructure:"tls,omitempty"` + Headers map[string]configopaque.String `mapstructure:"headers,omitempty"` +} + // OpAMPServer contains the OpAMP transport configuration. type OpAMPServer struct { - WS *OpAMPWebsocket `mapstructure:"ws"` + WS commonFields `mapstructure:"ws,omitempty"` + Http commonFields `mapstructure:"http,omitempty"` } -// OpAMPWebsocket contains the OpAMP websocket transport configuration. -type OpAMPWebsocket struct { - Endpoint string `mapstructure:"endpoint"` - TLSSetting configtls.TLSClientSetting `mapstructure:"tls,omitempty"` - Headers map[string]configopaque.String `mapstructure:"headers,omitempty"` +func (c commonFields) Scheme() string { + uri, err := url.ParseRequestURI(c.Endpoint) + if err != nil { + return "" + } + return uri.Scheme +} + +func (c commonFields) Validate() error { + if c.Endpoint == "" { + return errors.New("opamp server endpoint must be provided") + } + return nil +} + +func (s OpAMPServer) GetClient(logger *zap.Logger) client.OpAMPClient { + scheme := s.WS.Scheme() + if len(scheme) > 0 { + return client.NewWebSocket(newLoggerFromZap(logger.With(zap.String("client", "ws")))) + } else { + return client.NewHTTP(newLoggerFromZap(logger.With(zap.String("client", "http")))) + } +} + +func (s OpAMPServer) GetHeaders() map[string]configopaque.String { + if len(s.WS.Endpoint) > 0 { + return s.WS.Headers + } else { + return s.Http.Headers + } +} + +func (s OpAMPServer) GetTLSSetting() configtls.TLSClientSetting { + if len(s.WS.Endpoint) > 0 { + return s.WS.TLSSetting + } else { + return s.Http.TLSSetting + } +} + +func (s OpAMPServer) GetEndpoint() string { + if len(s.WS.Endpoint) > 0 { + return s.WS.Endpoint + } else { + return s.Http.Endpoint + } } // Validate checks if the extension configuration is valid func (cfg *Config) Validate() error { - if cfg.Server.WS.Endpoint == "" { - return errors.New("opamp server websocket endpoint must be provided") + if len(cfg.Server.WS.Endpoint) == 0 && len(cfg.Server.Http.Endpoint) == 0 { + return errors.New("opamp server must have at least ws or http set") + } else if len(cfg.Server.WS.Endpoint) > 0 && len(cfg.Server.Http.Endpoint) > 0 { + return errors.New("opamp server must have only ws or http set") + } else if len(cfg.Server.WS.Endpoint) != 0 { + if err := cfg.Server.WS.Validate(); err != nil { + return err + } + } else if len(cfg.Server.Http.Endpoint) != 0 { + if err := cfg.Server.Http.Validate(); err != nil { + return err + } } if cfg.InstanceUID != "" { diff --git a/extension/opampextension/factory.go b/extension/opampextension/factory.go index 91c39e9a011b..0974399752c4 100644 --- a/extension/opampextension/factory.go +++ b/extension/opampextension/factory.go @@ -23,9 +23,7 @@ func NewFactory() extension.Factory { func createDefaultConfig() component.Config { return &Config{ - Server: &OpAMPServer{ - WS: &OpAMPWebsocket{}, - }, + Server: &OpAMPServer{}, Capabilities: Capabilities{ ReportsEffectiveConfig: true, }, diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index 9b4d666ab67c..5f80d46ea1d9 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -43,15 +43,12 @@ type opampAgent struct { } func (o *opampAgent) Start(_ context.Context, _ component.Host) error { - // TODO: Add OpAMP HTTP transport support. - o.opampClient = client.NewWebSocket(newLoggerFromZap(o.logger)) - header := http.Header{} - for k, v := range o.cfg.Server.WS.Headers { + for k, v := range o.cfg.Server.GetHeaders() { header.Set(k, string(v)) } - tls, err := o.cfg.Server.WS.TLSSetting.LoadTLSConfig() + tls, err := o.cfg.Server.GetTLSSetting().LoadTLSConfig() if err != nil { return err } @@ -59,7 +56,7 @@ func (o *opampAgent) Start(_ context.Context, _ component.Host) error { settings := types.StartSettings{ Header: header, TLSConfig: tls, - OpAMPServerURL: o.cfg.Server.WS.Endpoint, + OpAMPServerURL: o.cfg.Server.GetEndpoint(), InstanceUid: o.instanceID.String(), Callbacks: types.CallbacksStruct{ OnConnectFunc: func(_ context.Context) { @@ -148,11 +145,11 @@ func newOpampAgent(cfg *Config, logger *zap.Logger, build component.BuildInfo, r } else { sid, ok := res.Attributes().Get(semconv.AttributeServiceInstanceID) if ok { - uuid, err := uuid.Parse(sid.AsString()) + parsedUuid, err := uuid.Parse(sid.AsString()) if err != nil { return nil, err } - uid = ulid.ULID(uuid) + uid = ulid.ULID(parsedUuid) } } @@ -163,6 +160,7 @@ func newOpampAgent(cfg *Config, logger *zap.Logger, build component.BuildInfo, r agentVersion: agentVersion, instanceID: uid, capabilities: cfg.Capabilities, + opampClient: cfg.Server.GetClient(logger), } return agent, nil From 250af4bee915bc05b78f01298ad06d5c3217c615 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Mon, 26 Feb 2024 17:21:44 -0500 Subject: [PATCH 2/6] added some tests --- .chloggen/opamp-http.yaml | 27 ++++ extension/opampextension/config.go | 25 ++- extension/opampextension/config_test.go | 152 ++++++++++++++++-- .../opampextension/testdata/config_http.yaml | 4 + 4 files changed, 180 insertions(+), 28 deletions(-) create mode 100755 .chloggen/opamp-http.yaml create mode 100644 extension/opampextension/testdata/config_http.yaml diff --git a/.chloggen/opamp-http.yaml b/.chloggen/opamp-http.yaml new file mode 100755 index 000000000000..9d5857815b30 --- /dev/null +++ b/.chloggen/opamp-http.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opampextension + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: enables creating and using an http client + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31389] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/extension/opampextension/config.go b/extension/opampextension/config.go index c19eb9f36017..09bc84d78a30 100644 --- a/extension/opampextension/config.go +++ b/extension/opampextension/config.go @@ -52,11 +52,11 @@ type commonFields struct { // OpAMPServer contains the OpAMP transport configuration. type OpAMPServer struct { - WS commonFields `mapstructure:"ws,omitempty"` - Http commonFields `mapstructure:"http,omitempty"` + WS *commonFields `mapstructure:"ws,omitempty"` + Http *commonFields `mapstructure:"http,omitempty"` } -func (c commonFields) Scheme() string { +func (c *commonFields) Scheme() string { uri, err := url.ParseRequestURI(c.Endpoint) if err != nil { return "" @@ -64,7 +64,7 @@ func (c commonFields) Scheme() string { return uri.Scheme } -func (c commonFields) Validate() error { +func (c *commonFields) Validate() error { if c.Endpoint == "" { return errors.New("opamp server endpoint must be provided") } @@ -72,8 +72,7 @@ func (c commonFields) Validate() error { } func (s OpAMPServer) GetClient(logger *zap.Logger) client.OpAMPClient { - scheme := s.WS.Scheme() - if len(scheme) > 0 { + if s.WS != nil { return client.NewWebSocket(newLoggerFromZap(logger.With(zap.String("client", "ws")))) } else { return client.NewHTTP(newLoggerFromZap(logger.With(zap.String("client", "http")))) @@ -81,7 +80,7 @@ func (s OpAMPServer) GetClient(logger *zap.Logger) client.OpAMPClient { } func (s OpAMPServer) GetHeaders() map[string]configopaque.String { - if len(s.WS.Endpoint) > 0 { + if s.WS != nil { return s.WS.Headers } else { return s.Http.Headers @@ -89,7 +88,7 @@ func (s OpAMPServer) GetHeaders() map[string]configopaque.String { } func (s OpAMPServer) GetTLSSetting() configtls.TLSClientSetting { - if len(s.WS.Endpoint) > 0 { + if s.WS != nil { return s.WS.TLSSetting } else { return s.Http.TLSSetting @@ -97,7 +96,7 @@ func (s OpAMPServer) GetTLSSetting() configtls.TLSClientSetting { } func (s OpAMPServer) GetEndpoint() string { - if len(s.WS.Endpoint) > 0 { + if s.WS != nil { return s.WS.Endpoint } else { return s.Http.Endpoint @@ -106,15 +105,15 @@ func (s OpAMPServer) GetEndpoint() string { // Validate checks if the extension configuration is valid func (cfg *Config) Validate() error { - if len(cfg.Server.WS.Endpoint) == 0 && len(cfg.Server.Http.Endpoint) == 0 { + if cfg.Server.WS == nil && cfg.Server.Http == nil { return errors.New("opamp server must have at least ws or http set") - } else if len(cfg.Server.WS.Endpoint) > 0 && len(cfg.Server.Http.Endpoint) > 0 { + } else if cfg.Server.WS != nil && cfg.Server.Http != nil { return errors.New("opamp server must have only ws or http set") - } else if len(cfg.Server.WS.Endpoint) != 0 { + } else if cfg.Server.WS != nil { if err := cfg.Server.WS.Validate(); err != nil { return err } - } else if len(cfg.Server.Http.Endpoint) != 0 { + } else if cfg.Server.Http != nil { if err := cfg.Server.Http.Validate(); err != nil { return err } diff --git a/extension/opampextension/config_test.go b/extension/opampextension/config_test.go index 184c97b387d1..0d0db77de840 100644 --- a/extension/opampextension/config_test.go +++ b/extension/opampextension/config_test.go @@ -4,6 +4,7 @@ package opampextension import ( + "fmt" "path/filepath" "testing" @@ -30,7 +31,7 @@ func TestUnmarshalConfig(t *testing.T) { assert.Equal(t, &Config{ Server: &OpAMPServer{ - WS: &OpAMPWebsocket{ + WS: &commonFields{ Endpoint: "wss://127.0.0.1:4320/v1/opamp", }, }, @@ -41,20 +42,141 @@ func TestUnmarshalConfig(t *testing.T) { }, cfg) } -func TestConfigValidate(t *testing.T) { - cfg := &Config{ - Server: &OpAMPServer{ - WS: &OpAMPWebsocket{}, +func TestUnmarshalHttpConfig(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config_http.yaml")) + require.NoError(t, err) + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.NoError(t, component.UnmarshalConfig(cm, cfg)) + assert.Equal(t, + &Config{ + Server: &OpAMPServer{ + Http: &commonFields{ + Endpoint: "https://127.0.0.1:4320/v1/opamp", + }, + }, + InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZ", + Capabilities: Capabilities{ + ReportsEffectiveConfig: true, + }, + }, cfg) +} + +func TestConfig_Validate(t *testing.T) { + type fields struct { + Server *OpAMPServer + InstanceUID string + Capabilities Capabilities + } + tests := []struct { + name string + fields fields + wantErr assert.ErrorAssertionFunc + }{ + { + name: "WS must have endpoint", + fields: fields{ + Server: &OpAMPServer{ + WS: &commonFields{}, + }, + }, + wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + return assert.Equal(t, "opamp server endpoint must be provided", err.Error()) + }, + }, + { + name: "WS valid endpoint and invalid instance id", + fields: fields{ + Server: &OpAMPServer{ + WS: &commonFields{ + Endpoint: "wss://127.0.0.1:4320/v1/opamp", + }, + }, + InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZFAIL", + }, + wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + return assert.Equal(t, "opamp instance_uid is invalid", err.Error()) + }, + }, + { + name: "WS valid endpoint and valid instance id", + fields: fields{ + Server: &OpAMPServer{ + WS: &commonFields{ + Endpoint: "wss://127.0.0.1:4320/v1/opamp", + }, + }, + InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZ", + }, + wantErr: assert.NoError, + }, + { + name: "HTTP must have endpoint", + fields: fields{ + Server: &OpAMPServer{ + Http: &commonFields{}, + }, + }, + wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + return assert.Equal(t, "opamp server endpoint must be provided", err.Error()) + }, + }, + { + name: "HTTP valid endpoint and invalid instance id", + fields: fields{ + Server: &OpAMPServer{ + Http: &commonFields{ + Endpoint: "https://127.0.0.1:4320/v1/opamp", + }, + }, + InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZFAIL", + }, + wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + return assert.Equal(t, "opamp instance_uid is invalid", err.Error()) + }, }, + { + name: "HTTP valid endpoint and valid instance id", + fields: fields{ + Server: &OpAMPServer{ + Http: &commonFields{ + Endpoint: "https://127.0.0.1:4320/v1/opamp", + }, + }, + InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZ", + }, + wantErr: assert.NoError, + }, + { + name: "neither config set", + fields: fields{ + Server: &OpAMPServer{}, + }, + wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + return assert.Equal(t, "opamp server must have at least ws or http set", err.Error()) + }, + }, + { + name: "both config set", + fields: fields{ + Server: &OpAMPServer{ + WS: &commonFields{}, + Http: &commonFields{}, + }, + }, + wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + return assert.Equal(t, "opamp server must have only ws or http set", err.Error()) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := &Config{ + Server: tt.fields.Server, + InstanceUID: tt.fields.InstanceUID, + Capabilities: tt.fields.Capabilities, + } + tt.wantErr(t, cfg.Validate(), fmt.Sprintf("Validate()")) + }) } - err := cfg.Validate() - assert.Equal(t, "opamp server websocket endpoint must be provided", err.Error()) - cfg.Server.WS.Endpoint = "wss://127.0.0.1:4320/v1/opamp" - assert.NoError(t, cfg.Validate()) - cfg.InstanceUID = "01BX5ZZKBKACTAV9WEVGEMMVRZFAIL" - err = cfg.Validate() - require.Error(t, err) - assert.Equal(t, "opamp instance_uid is invalid", err.Error()) - cfg.InstanceUID = "01BX5ZZKBKACTAV9WEVGEMMVRZ" - require.NoError(t, cfg.Validate()) } diff --git a/extension/opampextension/testdata/config_http.yaml b/extension/opampextension/testdata/config_http.yaml new file mode 100644 index 000000000000..c97da589a392 --- /dev/null +++ b/extension/opampextension/testdata/config_http.yaml @@ -0,0 +1,4 @@ +server: + http: + endpoint: https://127.0.0.1:4320/v1/opamp +instance_uid: 01BX5ZZKBKACTAV9WEVGEMMVRZ From 5ac9aadb65ae635ebdf1adbd62d0dfe7bf680424 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Mon, 26 Feb 2024 17:37:37 -0500 Subject: [PATCH 3/6] Fix lint --- extension/opampextension/config.go | 25 +++++++++++-------------- extension/opampextension/config_test.go | 25 ++++++++++++------------- extension/opampextension/opamp_agent.go | 4 ++-- 3 files changed, 25 insertions(+), 29 deletions(-) diff --git a/extension/opampextension/config.go b/extension/opampextension/config.go index 09bc84d78a30..368303e0215a 100644 --- a/extension/opampextension/config.go +++ b/extension/opampextension/config.go @@ -53,7 +53,7 @@ type commonFields struct { // OpAMPServer contains the OpAMP transport configuration. type OpAMPServer struct { WS *commonFields `mapstructure:"ws,omitempty"` - Http *commonFields `mapstructure:"http,omitempty"` + HTTP *commonFields `mapstructure:"http,omitempty"` } func (c *commonFields) Scheme() string { @@ -74,47 +74,44 @@ func (c *commonFields) Validate() error { func (s OpAMPServer) GetClient(logger *zap.Logger) client.OpAMPClient { if s.WS != nil { return client.NewWebSocket(newLoggerFromZap(logger.With(zap.String("client", "ws")))) - } else { - return client.NewHTTP(newLoggerFromZap(logger.With(zap.String("client", "http")))) } + return client.NewHTTP(newLoggerFromZap(logger.With(zap.String("client", "http")))) } func (s OpAMPServer) GetHeaders() map[string]configopaque.String { if s.WS != nil { return s.WS.Headers - } else { - return s.Http.Headers } + return s.HTTP.Headers } func (s OpAMPServer) GetTLSSetting() configtls.TLSClientSetting { if s.WS != nil { return s.WS.TLSSetting - } else { - return s.Http.TLSSetting } + return s.HTTP.TLSSetting } func (s OpAMPServer) GetEndpoint() string { if s.WS != nil { return s.WS.Endpoint - } else { - return s.Http.Endpoint } + return s.HTTP.Endpoint } // Validate checks if the extension configuration is valid func (cfg *Config) Validate() error { - if cfg.Server.WS == nil && cfg.Server.Http == nil { + switch { + case cfg.Server.WS == nil && cfg.Server.HTTP == nil: return errors.New("opamp server must have at least ws or http set") - } else if cfg.Server.WS != nil && cfg.Server.Http != nil { + case cfg.Server.WS != nil && cfg.Server.HTTP != nil: return errors.New("opamp server must have only ws or http set") - } else if cfg.Server.WS != nil { + case cfg.Server.WS != nil: if err := cfg.Server.WS.Validate(); err != nil { return err } - } else if cfg.Server.Http != nil { - if err := cfg.Server.Http.Validate(); err != nil { + case cfg.Server.HTTP != nil: + if err := cfg.Server.HTTP.Validate(); err != nil { return err } } diff --git a/extension/opampextension/config_test.go b/extension/opampextension/config_test.go index 0d0db77de840..560a3eb03e20 100644 --- a/extension/opampextension/config_test.go +++ b/extension/opampextension/config_test.go @@ -4,7 +4,6 @@ package opampextension import ( - "fmt" "path/filepath" "testing" @@ -51,7 +50,7 @@ func TestUnmarshalHttpConfig(t *testing.T) { assert.Equal(t, &Config{ Server: &OpAMPServer{ - Http: &commonFields{ + HTTP: &commonFields{ Endpoint: "https://127.0.0.1:4320/v1/opamp", }, }, @@ -80,7 +79,7 @@ func TestConfig_Validate(t *testing.T) { WS: &commonFields{}, }, }, - wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + wantErr: func(t assert.TestingT, err error, i ...any) bool { return assert.Equal(t, "opamp server endpoint must be provided", err.Error()) }, }, @@ -94,7 +93,7 @@ func TestConfig_Validate(t *testing.T) { }, InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZFAIL", }, - wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + wantErr: func(t assert.TestingT, err error, i ...any) bool { return assert.Equal(t, "opamp instance_uid is invalid", err.Error()) }, }, @@ -114,10 +113,10 @@ func TestConfig_Validate(t *testing.T) { name: "HTTP must have endpoint", fields: fields{ Server: &OpAMPServer{ - Http: &commonFields{}, + HTTP: &commonFields{}, }, }, - wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + wantErr: func(t assert.TestingT, err error, i ...any) bool { return assert.Equal(t, "opamp server endpoint must be provided", err.Error()) }, }, @@ -125,13 +124,13 @@ func TestConfig_Validate(t *testing.T) { name: "HTTP valid endpoint and invalid instance id", fields: fields{ Server: &OpAMPServer{ - Http: &commonFields{ + HTTP: &commonFields{ Endpoint: "https://127.0.0.1:4320/v1/opamp", }, }, InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZFAIL", }, - wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + wantErr: func(t assert.TestingT, err error, i ...any) bool { return assert.Equal(t, "opamp instance_uid is invalid", err.Error()) }, }, @@ -139,7 +138,7 @@ func TestConfig_Validate(t *testing.T) { name: "HTTP valid endpoint and valid instance id", fields: fields{ Server: &OpAMPServer{ - Http: &commonFields{ + HTTP: &commonFields{ Endpoint: "https://127.0.0.1:4320/v1/opamp", }, }, @@ -152,7 +151,7 @@ func TestConfig_Validate(t *testing.T) { fields: fields{ Server: &OpAMPServer{}, }, - wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + wantErr: func(t assert.TestingT, err error, i ...any) bool { return assert.Equal(t, "opamp server must have at least ws or http set", err.Error()) }, }, @@ -161,10 +160,10 @@ func TestConfig_Validate(t *testing.T) { fields: fields{ Server: &OpAMPServer{ WS: &commonFields{}, - Http: &commonFields{}, + HTTP: &commonFields{}, }, }, - wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + wantErr: func(t assert.TestingT, err error, i ...any) bool { return assert.Equal(t, "opamp server must have only ws or http set", err.Error()) }, }, @@ -176,7 +175,7 @@ func TestConfig_Validate(t *testing.T) { InstanceUID: tt.fields.InstanceUID, Capabilities: tt.fields.Capabilities, } - tt.wantErr(t, cfg.Validate(), fmt.Sprintf("Validate()")) + tt.wantErr(t, cfg.Validate(), "Validate()") }) } } diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index 5f80d46ea1d9..c8230d38490f 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -145,11 +145,11 @@ func newOpampAgent(cfg *Config, logger *zap.Logger, build component.BuildInfo, r } else { sid, ok := res.Attributes().Get(semconv.AttributeServiceInstanceID) if ok { - parsedUuid, err := uuid.Parse(sid.AsString()) + parsedUUID, err := uuid.Parse(sid.AsString()) if err != nil { return nil, err } - uid = ulid.ULID(parsedUuid) + uid = ulid.ULID(parsedUUID) } } From 4f4e8061c1f95fe4607d8b09864bc65123b5f206 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Tue, 27 Feb 2024 10:35:01 -0500 Subject: [PATCH 4/6] feedback --- extension/opampextension/config_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/extension/opampextension/config_test.go b/extension/opampextension/config_test.go index 560a3eb03e20..53d1e67774f9 100644 --- a/extension/opampextension/config_test.go +++ b/extension/opampextension/config_test.go @@ -79,7 +79,7 @@ func TestConfig_Validate(t *testing.T) { WS: &commonFields{}, }, }, - wantErr: func(t assert.TestingT, err error, i ...any) bool { + wantErr: func(t assert.TestingT, err error, _ ...any) bool { return assert.Equal(t, "opamp server endpoint must be provided", err.Error()) }, }, @@ -93,7 +93,7 @@ func TestConfig_Validate(t *testing.T) { }, InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZFAIL", }, - wantErr: func(t assert.TestingT, err error, i ...any) bool { + wantErr: func(t assert.TestingT, err error, _ ...any) bool { return assert.Equal(t, "opamp instance_uid is invalid", err.Error()) }, }, @@ -116,7 +116,7 @@ func TestConfig_Validate(t *testing.T) { HTTP: &commonFields{}, }, }, - wantErr: func(t assert.TestingT, err error, i ...any) bool { + wantErr: func(t assert.TestingT, err error, _ ...any) bool { return assert.Equal(t, "opamp server endpoint must be provided", err.Error()) }, }, @@ -130,7 +130,7 @@ func TestConfig_Validate(t *testing.T) { }, InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZFAIL", }, - wantErr: func(t assert.TestingT, err error, i ...any) bool { + wantErr: func(t assert.TestingT, err error, _ ...any) bool { return assert.Equal(t, "opamp instance_uid is invalid", err.Error()) }, }, @@ -151,7 +151,7 @@ func TestConfig_Validate(t *testing.T) { fields: fields{ Server: &OpAMPServer{}, }, - wantErr: func(t assert.TestingT, err error, i ...any) bool { + wantErr: func(t assert.TestingT, err error, _ ...any) bool { return assert.Equal(t, "opamp server must have at least ws or http set", err.Error()) }, }, @@ -163,7 +163,7 @@ func TestConfig_Validate(t *testing.T) { HTTP: &commonFields{}, }, }, - wantErr: func(t assert.TestingT, err error, i ...any) bool { + wantErr: func(t assert.TestingT, err error, _ ...any) bool { return assert.Equal(t, "opamp server must have only ws or http set", err.Error()) }, }, @@ -175,7 +175,7 @@ func TestConfig_Validate(t *testing.T) { InstanceUID: tt.fields.InstanceUID, Capabilities: tt.fields.Capabilities, } - tt.wantErr(t, cfg.Validate(), "Validate()") + tt.wantErr(t, cfg.Validate()) }) } } From 4ab1d3e90e74fc835318d02b63a41b56c25bfdfe Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Tue, 27 Feb 2024 11:09:01 -0500 Subject: [PATCH 5/6] fixes for tests --- extension/opampextension/config_test.go | 68 +++++++++++++++++++++++++ extension/opampextension/factory.go | 4 +- extension/opampextension/opamp_agent.go | 9 +++- 3 files changed, 79 insertions(+), 2 deletions(-) diff --git a/extension/opampextension/config_test.go b/extension/opampextension/config_test.go index 53d1e67774f9..0d75a5252922 100644 --- a/extension/opampextension/config_test.go +++ b/extension/opampextension/config_test.go @@ -10,6 +10,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configopaque" + "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/confmaptest" ) @@ -61,6 +63,72 @@ func TestUnmarshalHttpConfig(t *testing.T) { }, cfg) } +func TestConfig_Getters(t *testing.T) { + type fields struct { + Server *OpAMPServer + } + type expected struct { + headers assert.ValueAssertionFunc + tls assert.ValueAssertionFunc + endpoint assert.ValueAssertionFunc + } + tests := []struct { + name string + fields fields + expected expected + }{ + { + name: "WS valid endpoint, headers, tls", + fields: fields{ + Server: &OpAMPServer{ + WS: &commonFields{ + Endpoint: "wss://127.0.0.1:4320/v1/opamp", + Headers: map[string]configopaque.String{ + "test": configopaque.String("test"), + }, + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + }, + }, + }, + expected: expected{ + headers: assert.NotEmpty, + tls: assert.NotEmpty, + endpoint: assert.NotEmpty, + }, + }, + { + name: "HTTP valid endpoint and valid instance id", + fields: fields{ + Server: &OpAMPServer{ + HTTP: &commonFields{ + Endpoint: "https://127.0.0.1:4320/v1/opamp", + Headers: map[string]configopaque.String{ + "test": configopaque.String("test"), + }, + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + }, + }, + }, + expected: expected{ + headers: assert.NotEmpty, + tls: assert.NotEmpty, + endpoint: assert.NotEmpty, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.expected.headers(t, tt.fields.Server.GetHeaders()) + tt.expected.tls(t, tt.fields.Server.GetTLSSetting()) + tt.expected.endpoint(t, tt.fields.Server.GetEndpoint()) + }) + } +} + func TestConfig_Validate(t *testing.T) { type fields struct { Server *OpAMPServer diff --git a/extension/opampextension/factory.go b/extension/opampextension/factory.go index 0974399752c4..486afa1d42c3 100644 --- a/extension/opampextension/factory.go +++ b/extension/opampextension/factory.go @@ -23,7 +23,9 @@ func NewFactory() extension.Factory { func createDefaultConfig() component.Config { return &Config{ - Server: &OpAMPServer{}, + Server: &OpAMPServer{ + WS: &commonFields{}, + }, Capabilities: Capabilities{ ReportsEffectiveConfig: true, }, diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index c8230d38490f..99156d790d05 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "runtime" + "strings" "sync" "github.com/google/uuid" @@ -101,7 +102,13 @@ func (o *opampAgent) Shutdown(ctx context.Context) error { return nil } o.logger.Debug("Stopping OpAMP client...") - return o.opampClient.Stop(ctx) + err := o.opampClient.Stop(ctx) + // Opamp-go considers this an error, but the collector does not. + // https://github.com/open-telemetry/opamp-go/issues/255 + if err != nil && strings.EqualFold(err.Error(), "cannot stop because not started") { + return nil + } + return err } func (o *opampAgent) NotifyConfig(ctx context.Context, conf *confmap.Conf) error { From f9485c3cc57e7711ea298205405230adf5418c88 Mon Sep 17 00:00:00 2001 From: Jacob Aronoff Date: Tue, 27 Feb 2024 11:18:52 -0500 Subject: [PATCH 6/6] defence --- extension/opampextension/config.go | 12 +++++++++--- extension/opampextension/config_test.go | 11 +++++++++++ extension/opampextension/factory.go | 4 +--- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/extension/opampextension/config.go b/extension/opampextension/config.go index 368303e0215a..0ed2bb11e4a1 100644 --- a/extension/opampextension/config.go +++ b/extension/opampextension/config.go @@ -81,22 +81,28 @@ func (s OpAMPServer) GetClient(logger *zap.Logger) client.OpAMPClient { func (s OpAMPServer) GetHeaders() map[string]configopaque.String { if s.WS != nil { return s.WS.Headers + } else if s.HTTP != nil { + return s.HTTP.Headers } - return s.HTTP.Headers + return map[string]configopaque.String{} } func (s OpAMPServer) GetTLSSetting() configtls.TLSClientSetting { if s.WS != nil { return s.WS.TLSSetting + } else if s.HTTP != nil { + return s.HTTP.TLSSetting } - return s.HTTP.TLSSetting + return configtls.TLSClientSetting{} } func (s OpAMPServer) GetEndpoint() string { if s.WS != nil { return s.WS.Endpoint + } else if s.HTTP != nil { + return s.HTTP.Endpoint } - return s.HTTP.Endpoint + return "" } // Validate checks if the extension configuration is valid diff --git a/extension/opampextension/config_test.go b/extension/opampextension/config_test.go index 0d75a5252922..364f1236f1e5 100644 --- a/extension/opampextension/config_test.go +++ b/extension/opampextension/config_test.go @@ -77,6 +77,17 @@ func TestConfig_Getters(t *testing.T) { fields fields expected expected }{ + { + name: "nothing set", + fields: fields{ + Server: &OpAMPServer{}, + }, + expected: expected{ + headers: assert.Empty, + tls: assert.Empty, + endpoint: assert.Empty, + }, + }, { name: "WS valid endpoint, headers, tls", fields: fields{ diff --git a/extension/opampextension/factory.go b/extension/opampextension/factory.go index 486afa1d42c3..0974399752c4 100644 --- a/extension/opampextension/factory.go +++ b/extension/opampextension/factory.go @@ -23,9 +23,7 @@ func NewFactory() extension.Factory { func createDefaultConfig() component.Config { return &Config{ - Server: &OpAMPServer{ - WS: &commonFields{}, - }, + Server: &OpAMPServer{}, Capabilities: Capabilities{ ReportsEffectiveConfig: true, },