diff --git a/.chloggen/opamp-http.yaml b/.chloggen/opamp-http.yaml new file mode 100755 index 000000000000..9d5857815b30 --- /dev/null +++ b/.chloggen/opamp-http.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opampextension + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: enables creating and using an http client + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31389] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/extension/opampextension/config.go b/extension/opampextension/config.go index 825c52a9ab1b..0ed2bb11e4a1 100644 --- a/extension/opampextension/config.go +++ b/extension/opampextension/config.go @@ -5,11 +5,14 @@ package opampextension // import "github.com/open-telemetry/opentelemetry-collec import ( "errors" + "net/url" "github.com/oklog/ulid/v2" + "github.com/open-telemetry/opamp-go/client" "github.com/open-telemetry/opamp-go/protobufs" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configtls" + "go.uber.org/zap" ) // Config contains the configuration for the opamp extension. Trying to mirror @@ -41,22 +44,82 @@ func (caps Capabilities) toAgentCapabilities() protobufs.AgentCapabilities { return agentCapabilities } +type commonFields struct { + Endpoint string `mapstructure:"endpoint"` + TLSSetting configtls.TLSClientSetting `mapstructure:"tls,omitempty"` + Headers map[string]configopaque.String `mapstructure:"headers,omitempty"` +} + // OpAMPServer contains the OpAMP transport configuration. type OpAMPServer struct { - WS *OpAMPWebsocket `mapstructure:"ws"` + WS *commonFields `mapstructure:"ws,omitempty"` + HTTP *commonFields `mapstructure:"http,omitempty"` } -// OpAMPWebsocket contains the OpAMP websocket transport configuration. -type OpAMPWebsocket struct { - Endpoint string `mapstructure:"endpoint"` - TLSSetting configtls.TLSClientSetting `mapstructure:"tls,omitempty"` - Headers map[string]configopaque.String `mapstructure:"headers,omitempty"` +func (c *commonFields) Scheme() string { + uri, err := url.ParseRequestURI(c.Endpoint) + if err != nil { + return "" + } + return uri.Scheme +} + +func (c *commonFields) Validate() error { + if c.Endpoint == "" { + return errors.New("opamp server endpoint must be provided") + } + return nil +} + +func (s OpAMPServer) GetClient(logger *zap.Logger) client.OpAMPClient { + if s.WS != nil { + return client.NewWebSocket(newLoggerFromZap(logger.With(zap.String("client", "ws")))) + } + return client.NewHTTP(newLoggerFromZap(logger.With(zap.String("client", "http")))) +} + +func (s OpAMPServer) GetHeaders() map[string]configopaque.String { + if s.WS != nil { + return s.WS.Headers + } else if s.HTTP != nil { + return s.HTTP.Headers + } + return map[string]configopaque.String{} +} + +func (s OpAMPServer) GetTLSSetting() configtls.TLSClientSetting { + if s.WS != nil { + return s.WS.TLSSetting + } else if s.HTTP != nil { + return s.HTTP.TLSSetting + } + return configtls.TLSClientSetting{} +} + +func (s OpAMPServer) GetEndpoint() string { + if s.WS != nil { + return s.WS.Endpoint + } else if s.HTTP != nil { + return s.HTTP.Endpoint + } + return "" } // Validate checks if the extension configuration is valid func (cfg *Config) Validate() error { - if cfg.Server.WS.Endpoint == "" { - return errors.New("opamp server websocket endpoint must be provided") + switch { + case cfg.Server.WS == nil && cfg.Server.HTTP == nil: + return errors.New("opamp server must have at least ws or http set") + case cfg.Server.WS != nil && cfg.Server.HTTP != nil: + return errors.New("opamp server must have only ws or http set") + case cfg.Server.WS != nil: + if err := cfg.Server.WS.Validate(); err != nil { + return err + } + case cfg.Server.HTTP != nil: + if err := cfg.Server.HTTP.Validate(); err != nil { + return err + } } if cfg.InstanceUID != "" { diff --git a/extension/opampextension/config_test.go b/extension/opampextension/config_test.go index 184c97b387d1..364f1236f1e5 100644 --- a/extension/opampextension/config_test.go +++ b/extension/opampextension/config_test.go @@ -10,6 +10,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configopaque" + "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/confmaptest" ) @@ -30,7 +32,7 @@ func TestUnmarshalConfig(t *testing.T) { assert.Equal(t, &Config{ Server: &OpAMPServer{ - WS: &OpAMPWebsocket{ + WS: &commonFields{ Endpoint: "wss://127.0.0.1:4320/v1/opamp", }, }, @@ -41,20 +43,218 @@ func TestUnmarshalConfig(t *testing.T) { }, cfg) } -func TestConfigValidate(t *testing.T) { - cfg := &Config{ - Server: &OpAMPServer{ - WS: &OpAMPWebsocket{}, +func TestUnmarshalHttpConfig(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config_http.yaml")) + require.NoError(t, err) + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.NoError(t, component.UnmarshalConfig(cm, cfg)) + assert.Equal(t, + &Config{ + Server: &OpAMPServer{ + HTTP: &commonFields{ + Endpoint: "https://127.0.0.1:4320/v1/opamp", + }, + }, + InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZ", + Capabilities: Capabilities{ + ReportsEffectiveConfig: true, + }, + }, cfg) +} + +func TestConfig_Getters(t *testing.T) { + type fields struct { + Server *OpAMPServer + } + type expected struct { + headers assert.ValueAssertionFunc + tls assert.ValueAssertionFunc + endpoint assert.ValueAssertionFunc + } + tests := []struct { + name string + fields fields + expected expected + }{ + { + name: "nothing set", + fields: fields{ + Server: &OpAMPServer{}, + }, + expected: expected{ + headers: assert.Empty, + tls: assert.Empty, + endpoint: assert.Empty, + }, + }, + { + name: "WS valid endpoint, headers, tls", + fields: fields{ + Server: &OpAMPServer{ + WS: &commonFields{ + Endpoint: "wss://127.0.0.1:4320/v1/opamp", + Headers: map[string]configopaque.String{ + "test": configopaque.String("test"), + }, + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + }, + }, + }, + expected: expected{ + headers: assert.NotEmpty, + tls: assert.NotEmpty, + endpoint: assert.NotEmpty, + }, }, + { + name: "HTTP valid endpoint and valid instance id", + fields: fields{ + Server: &OpAMPServer{ + HTTP: &commonFields{ + Endpoint: "https://127.0.0.1:4320/v1/opamp", + Headers: map[string]configopaque.String{ + "test": configopaque.String("test"), + }, + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + }, + }, + }, + expected: expected{ + headers: assert.NotEmpty, + tls: assert.NotEmpty, + endpoint: assert.NotEmpty, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.expected.headers(t, tt.fields.Server.GetHeaders()) + tt.expected.tls(t, tt.fields.Server.GetTLSSetting()) + tt.expected.endpoint(t, tt.fields.Server.GetEndpoint()) + }) + } +} + +func TestConfig_Validate(t *testing.T) { + type fields struct { + Server *OpAMPServer + InstanceUID string + Capabilities Capabilities + } + tests := []struct { + name string + fields fields + wantErr assert.ErrorAssertionFunc + }{ + { + name: "WS must have endpoint", + fields: fields{ + Server: &OpAMPServer{ + WS: &commonFields{}, + }, + }, + wantErr: func(t assert.TestingT, err error, _ ...any) bool { + return assert.Equal(t, "opamp server endpoint must be provided", err.Error()) + }, + }, + { + name: "WS valid endpoint and invalid instance id", + fields: fields{ + Server: &OpAMPServer{ + WS: &commonFields{ + Endpoint: "wss://127.0.0.1:4320/v1/opamp", + }, + }, + InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZFAIL", + }, + wantErr: func(t assert.TestingT, err error, _ ...any) bool { + return assert.Equal(t, "opamp instance_uid is invalid", err.Error()) + }, + }, + { + name: "WS valid endpoint and valid instance id", + fields: fields{ + Server: &OpAMPServer{ + WS: &commonFields{ + Endpoint: "wss://127.0.0.1:4320/v1/opamp", + }, + }, + InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZ", + }, + wantErr: assert.NoError, + }, + { + name: "HTTP must have endpoint", + fields: fields{ + Server: &OpAMPServer{ + HTTP: &commonFields{}, + }, + }, + wantErr: func(t assert.TestingT, err error, _ ...any) bool { + return assert.Equal(t, "opamp server endpoint must be provided", err.Error()) + }, + }, + { + name: "HTTP valid endpoint and invalid instance id", + fields: fields{ + Server: &OpAMPServer{ + HTTP: &commonFields{ + Endpoint: "https://127.0.0.1:4320/v1/opamp", + }, + }, + InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZFAIL", + }, + wantErr: func(t assert.TestingT, err error, _ ...any) bool { + return assert.Equal(t, "opamp instance_uid is invalid", err.Error()) + }, + }, + { + name: "HTTP valid endpoint and valid instance id", + fields: fields{ + Server: &OpAMPServer{ + HTTP: &commonFields{ + Endpoint: "https://127.0.0.1:4320/v1/opamp", + }, + }, + InstanceUID: "01BX5ZZKBKACTAV9WEVGEMMVRZ", + }, + wantErr: assert.NoError, + }, + { + name: "neither config set", + fields: fields{ + Server: &OpAMPServer{}, + }, + wantErr: func(t assert.TestingT, err error, _ ...any) bool { + return assert.Equal(t, "opamp server must have at least ws or http set", err.Error()) + }, + }, + { + name: "both config set", + fields: fields{ + Server: &OpAMPServer{ + WS: &commonFields{}, + HTTP: &commonFields{}, + }, + }, + wantErr: func(t assert.TestingT, err error, _ ...any) bool { + return assert.Equal(t, "opamp server must have only ws or http set", err.Error()) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := &Config{ + Server: tt.fields.Server, + InstanceUID: tt.fields.InstanceUID, + Capabilities: tt.fields.Capabilities, + } + tt.wantErr(t, cfg.Validate()) + }) } - err := cfg.Validate() - assert.Equal(t, "opamp server websocket endpoint must be provided", err.Error()) - cfg.Server.WS.Endpoint = "wss://127.0.0.1:4320/v1/opamp" - assert.NoError(t, cfg.Validate()) - cfg.InstanceUID = "01BX5ZZKBKACTAV9WEVGEMMVRZFAIL" - err = cfg.Validate() - require.Error(t, err) - assert.Equal(t, "opamp instance_uid is invalid", err.Error()) - cfg.InstanceUID = "01BX5ZZKBKACTAV9WEVGEMMVRZ" - require.NoError(t, cfg.Validate()) } diff --git a/extension/opampextension/factory.go b/extension/opampextension/factory.go index 91c39e9a011b..0974399752c4 100644 --- a/extension/opampextension/factory.go +++ b/extension/opampextension/factory.go @@ -23,9 +23,7 @@ func NewFactory() extension.Factory { func createDefaultConfig() component.Config { return &Config{ - Server: &OpAMPServer{ - WS: &OpAMPWebsocket{}, - }, + Server: &OpAMPServer{}, Capabilities: Capabilities{ ReportsEffectiveConfig: true, }, diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index 9b4d666ab67c..99156d790d05 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "runtime" + "strings" "sync" "github.com/google/uuid" @@ -43,15 +44,12 @@ type opampAgent struct { } func (o *opampAgent) Start(_ context.Context, _ component.Host) error { - // TODO: Add OpAMP HTTP transport support. - o.opampClient = client.NewWebSocket(newLoggerFromZap(o.logger)) - header := http.Header{} - for k, v := range o.cfg.Server.WS.Headers { + for k, v := range o.cfg.Server.GetHeaders() { header.Set(k, string(v)) } - tls, err := o.cfg.Server.WS.TLSSetting.LoadTLSConfig() + tls, err := o.cfg.Server.GetTLSSetting().LoadTLSConfig() if err != nil { return err } @@ -59,7 +57,7 @@ func (o *opampAgent) Start(_ context.Context, _ component.Host) error { settings := types.StartSettings{ Header: header, TLSConfig: tls, - OpAMPServerURL: o.cfg.Server.WS.Endpoint, + OpAMPServerURL: o.cfg.Server.GetEndpoint(), InstanceUid: o.instanceID.String(), Callbacks: types.CallbacksStruct{ OnConnectFunc: func(_ context.Context) { @@ -104,7 +102,13 @@ func (o *opampAgent) Shutdown(ctx context.Context) error { return nil } o.logger.Debug("Stopping OpAMP client...") - return o.opampClient.Stop(ctx) + err := o.opampClient.Stop(ctx) + // Opamp-go considers this an error, but the collector does not. + // https://github.com/open-telemetry/opamp-go/issues/255 + if err != nil && strings.EqualFold(err.Error(), "cannot stop because not started") { + return nil + } + return err } func (o *opampAgent) NotifyConfig(ctx context.Context, conf *confmap.Conf) error { @@ -148,11 +152,11 @@ func newOpampAgent(cfg *Config, logger *zap.Logger, build component.BuildInfo, r } else { sid, ok := res.Attributes().Get(semconv.AttributeServiceInstanceID) if ok { - uuid, err := uuid.Parse(sid.AsString()) + parsedUUID, err := uuid.Parse(sid.AsString()) if err != nil { return nil, err } - uid = ulid.ULID(uuid) + uid = ulid.ULID(parsedUUID) } } @@ -163,6 +167,7 @@ func newOpampAgent(cfg *Config, logger *zap.Logger, build component.BuildInfo, r agentVersion: agentVersion, instanceID: uid, capabilities: cfg.Capabilities, + opampClient: cfg.Server.GetClient(logger), } return agent, nil diff --git a/extension/opampextension/testdata/config_http.yaml b/extension/opampextension/testdata/config_http.yaml new file mode 100644 index 000000000000..c97da589a392 --- /dev/null +++ b/extension/opampextension/testdata/config_http.yaml @@ -0,0 +1,4 @@ +server: + http: + endpoint: https://127.0.0.1:4320/v1/opamp +instance_uid: 01BX5ZZKBKACTAV9WEVGEMMVRZ