From 134c956cd02bd04ef83018cccfc5dcd9d26242d8 Mon Sep 17 00:00:00 2001 From: Koyomi Araragi Date: Thu, 19 Sep 2024 19:36:50 -0300 Subject: [PATCH 1/6] docs: update wildcard usage on cors (#9845) **Description:** Improved CORS docs. **Link to tracking Issue:** #9844 --- config/confighttp/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/confighttp/README.md b/config/confighttp/README.md index 24d2905d7af..07ac4a1d8a9 100644 --- a/config/confighttp/README.md +++ b/config/confighttp/README.md @@ -68,8 +68,8 @@ is hosted at a different [origin][origin]. If left blank or set to `null`, CORS will not be enabled. - `allowed_origins`: A list of [origins][origin] allowed to send requests to the receiver. An origin may contain a wildcard (`*`) to replace 0 or more - characters (e.g., `https://*.example.com`). To allow any origin, set to - `["*"]`. If no origins are listed, CORS will not be enabled. + characters (e.g., `https://*.example.com`). **Do not use** a plain wildcard + `["*"]`, as our CORS response includes `Access-Control-Allow-Credentials: true`, which makes browsers to **disallow a plain wildcard** (this is a security standard). To allow any origin, you can specify at least the protocol, for example `["https://*", "http://*"]`. If no origins are listed, CORS will not be enabled. - `allowed_headers`: Allow CORS requests to include headers outside the [default safelist][cors-headers]. By default, safelist headers and `X-Requested-With` will be allowed. To allow any request header, set to From 459b4295afa83793e4ab4c5186f487323677542b Mon Sep 17 00:00:00 2001 From: Jade Guiton Date: Fri, 20 Sep 2024 01:00:41 +0200 Subject: [PATCH 2/6] [configgrpc] wrap gRPC client/server options in extensible interface (#11069) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### Description To allow extending the possible option types provided to `configgrpc.ClientConfig.ToClientConn` and `configgrpc.ServerConfig.ToServer` in the future, we want to wrap the `grpc.DialOption` and `grpc.ServerOption` parameters in more generic `ToClientConnOption` and `ToServerOption` interfaces. For compatibility, we start by adding new `ToClientConnWithOptions` and `ToServerWithOptions` methods, to which the now deprecated `ToClientConn` and `ToServer` defer. A second PR will be needed to fully replace the original methods. #### Link to tracking issue Fixes #9480 #### Testing No tests have been added. Feel free to tell me if I should add some. #### Documentation No documentation has been added. --------- Co-authored-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- .chloggen/9480-configgrpc-option-wrapper.yaml | 28 +++++ config/configgrpc/configgrpc.go | 117 ++++++++++++++++-- config/configgrpc/configgrpc_test.go | 51 +++++++- exporter/otlpexporter/otlp.go | 4 +- receiver/otlpreceiver/otlp.go | 2 +- 5 files changed, 183 insertions(+), 19 deletions(-) create mode 100644 .chloggen/9480-configgrpc-option-wrapper.yaml diff --git a/.chloggen/9480-configgrpc-option-wrapper.yaml b/.chloggen/9480-configgrpc-option-wrapper.yaml new file mode 100644 index 00000000000..22e14f870e3 --- /dev/null +++ b/.chloggen/9480-configgrpc-option-wrapper.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'deprecation' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: configgrpc + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Deprecate `ClientConfig.ToClientConn`/`ServerConfig.ToServer` in favor of `ToClientConnWithOptions`/`ToServerWithOptions`" + +# One or more tracking issues or pull requests related to the change +issues: [9480] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Users providing a grpc.DialOption/grpc.ServerOption should now wrap them into + a generic option with `WithGrpcDialOption`/`WithGrpcServerOption`. + + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/config/configgrpc/configgrpc.go b/config/configgrpc/configgrpc.go index 9caa916b81c..2bf15063c57 100644 --- a/config/configgrpc/configgrpc.go +++ b/config/configgrpc/configgrpc.go @@ -226,16 +226,58 @@ func (gcs *ClientConfig) isSchemeHTTPS() bool { // a non-blocking dial (the function won't wait for connections to be // established, and connecting happens in the background). To make it a blocking // dial, use grpc.WithBlock() dial option. -func (gcs *ClientConfig) ToClientConn(ctx context.Context, host component.Host, settings component.TelemetrySettings, extraOpts ...grpc.DialOption) (*grpc.ClientConn, error) { - opts, err := gcs.toDialOptions(ctx, host, settings) +// +// Deprecated: [v0.110.0] If providing a [grpc.DialOption], use [ClientConfig.ToClientConnWithOptions] +// with [WithGrpcDialOption] instead. +func (gcs *ClientConfig) ToClientConn( + ctx context.Context, + host component.Host, + settings component.TelemetrySettings, + grpcOpts ...grpc.DialOption, +) (*grpc.ClientConn, error) { + var extraOpts []ToClientConnOption + for _, grpcOpt := range grpcOpts { + extraOpts = append(extraOpts, WithGrpcDialOption(grpcOpt)) + } + return gcs.ToClientConnWithOptions(ctx, host, settings, extraOpts...) +} + +// ToClientConnOption is a sealed interface wrapping options for [ClientConfig.ToClientConnWithOptions]. +type ToClientConnOption interface { + isToClientConnOption() +} + +type grpcDialOptionWrapper struct { + opt grpc.DialOption +} + +// WithGrpcDialOption wraps a [grpc.DialOption] into a [ToClientConnOption]. +func WithGrpcDialOption(opt grpc.DialOption) ToClientConnOption { + return grpcDialOptionWrapper{opt: opt} +} +func (grpcDialOptionWrapper) isToClientConnOption() {} + +// ToClientConnWithOptions is the same as [ClientConfig.ToClientConn], but uses the [ToClientConnOption] interface for options. +// This method will eventually replace [ClientConfig.ToClientConn]. +func (gcs *ClientConfig) ToClientConnWithOptions( + ctx context.Context, + host component.Host, + settings component.TelemetrySettings, + extraOpts ...ToClientConnOption, +) (*grpc.ClientConn, error) { + grpcOpts, err := gcs.getGrpcDialOptions(ctx, host, settings, extraOpts) if err != nil { return nil, err } - opts = append(opts, extraOpts...) - return grpc.NewClient(gcs.sanitizedEndpoint(), opts...) + return grpc.NewClient(gcs.sanitizedEndpoint(), grpcOpts...) } -func (gcs *ClientConfig) toDialOptions(ctx context.Context, host component.Host, settings component.TelemetrySettings) ([]grpc.DialOption, error) { +func (gcs *ClientConfig) getGrpcDialOptions( + ctx context.Context, + host component.Host, + settings component.TelemetrySettings, + extraOpts []ToClientConnOption, +) ([]grpc.DialOption, error) { var opts []grpc.DialOption if gcs.Compression.IsCompressed() { cp, err := getGRPCCompressionName(gcs.Compression) @@ -312,6 +354,12 @@ func (gcs *ClientConfig) toDialOptions(ctx context.Context, host component.Host, // Enable OpenTelemetry observability plugin. opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelOpts...))) + for _, opt := range extraOpts { + if wrapper, ok := opt.(grpcDialOptionWrapper); ok { + opts = append(opts, wrapper.opt) + } + } + return opts, nil } @@ -335,17 +383,58 @@ func (gss *ServerConfig) Validate() error { return nil } -// ToServer returns a grpc.Server for the configuration -func (gss *ServerConfig) ToServer(_ context.Context, host component.Host, settings component.TelemetrySettings, extraOpts ...grpc.ServerOption) (*grpc.Server, error) { - opts, err := gss.toServerOption(host, settings) +// ToServer returns a [grpc.Server] for the configuration +// +// Deprecated: [v0.110.0] If providing a [grpc.ServerOption], use [ServerConfig.ToServerWithOptions] +// with [WithGrpcServerOption] instead. +func (gss *ServerConfig) ToServer( + ctx context.Context, + host component.Host, + settings component.TelemetrySettings, + grpcOpts ...grpc.ServerOption, +) (*grpc.Server, error) { + var extraOpts []ToServerOption + for _, grpcOpt := range grpcOpts { + extraOpts = append(extraOpts, WithGrpcServerOption(grpcOpt)) + } + return gss.ToServerWithOptions(ctx, host, settings, extraOpts...) +} + +// ToServerOption is a sealed interface wrapping options for [ServerConfig.ToServerWithOptions]. +type ToServerOption interface { + isToServerOption() +} + +type grpcServerOptionWrapper struct { + opt grpc.ServerOption +} + +// WithGrpcServerOption wraps a [grpc.ServerOption] into a [ToServerOption]. +func WithGrpcServerOption(opt grpc.ServerOption) ToServerOption { + return grpcServerOptionWrapper{opt: opt} +} +func (grpcServerOptionWrapper) isToServerOption() {} + +// ToServerWithOptions is the same as [ServerConfig.ToServer], but uses the [ToServerOption] interface for options. +// This method will eventually replace [ServerConfig.ToServer]. +func (gss *ServerConfig) ToServerWithOptions( + _ context.Context, + host component.Host, + settings component.TelemetrySettings, + extraOpts ...ToServerOption, +) (*grpc.Server, error) { + grpcOpts, err := gss.getGrpcServerOptions(host, settings, extraOpts) if err != nil { return nil, err } - opts = append(opts, extraOpts...) - return grpc.NewServer(opts...), nil + return grpc.NewServer(grpcOpts...), nil } -func (gss *ServerConfig) toServerOption(host component.Host, settings component.TelemetrySettings) ([]grpc.ServerOption, error) { +func (gss *ServerConfig) getGrpcServerOptions( + host component.Host, + settings component.TelemetrySettings, + extraOpts []ToServerOption, +) ([]grpc.ServerOption, error) { switch gss.NetAddr.Transport { case confignet.TransportTypeTCP, confignet.TransportTypeTCP4, confignet.TransportTypeTCP6, confignet.TransportTypeUDP, confignet.TransportTypeUDP4, confignet.TransportTypeUDP6: internal.WarnOnUnspecifiedHost(settings.Logger, gss.NetAddr.Endpoint) @@ -435,6 +524,12 @@ func (gss *ServerConfig) toServerOption(host component.Host, settings component. opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler(otelOpts...)), grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...)) + for _, opt := range extraOpts { + if wrapper, ok := opt.(grpcServerOptionWrapper); ok { + opts = append(opts, wrapper.opt) + } + } + return opts, nil } diff --git a/config/configgrpc/configgrpc_test.go b/config/configgrpc/configgrpc_test.go index d2a6b1c15e0..585b0c6d7c1 100644 --- a/config/configgrpc/configgrpc_test.go +++ b/config/configgrpc/configgrpc_test.go @@ -126,11 +126,33 @@ func TestDefaultGrpcClientSettings(t *testing.T) { Insecure: true, }, } - opts, err := gcs.toDialOptions(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings()) + opts, err := gcs.getGrpcDialOptions(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings(), []ToClientConnOption{}) require.NoError(t, err) assert.Len(t, opts, 2) } +func TestGrpcClientExtraOption(t *testing.T) { + tt, err := componenttest.SetupTelemetry(componentID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + gcs := &ClientConfig{ + TLSSetting: configtls.ClientConfig{ + Insecure: true, + }, + } + extraOpt := grpc.WithUserAgent("test-agent") + opts, err := gcs.getGrpcDialOptions( + context.Background(), + componenttest.NewNopHost(), + tt.TelemetrySettings(), + []ToClientConnOption{WithGrpcDialOption(extraOpt)}, + ) + require.NoError(t, err) + assert.Len(t, opts, 3) + assert.Equal(t, opts[2], extraOpt) +} + func TestAllGrpcClientSettings(t *testing.T) { tt, err := componenttest.SetupTelemetry(componentID) require.NoError(t, err) @@ -231,7 +253,7 @@ func TestAllGrpcClientSettings(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - opts, err := test.settings.toDialOptions(context.Background(), test.host, tt.TelemetrySettings()) + opts, err := test.settings.getGrpcDialOptions(context.Background(), test.host, tt.TelemetrySettings(), []ToClientConnOption{}) require.NoError(t, err) assert.Len(t, opts, 9) }) @@ -244,11 +266,28 @@ func TestDefaultGrpcServerSettings(t *testing.T) { Endpoint: "0.0.0.0:1234", }, } - opts, err := gss.toServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings()) + opts, err := gss.getGrpcServerOptions(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{}) require.NoError(t, err) assert.Len(t, opts, 3) } +func TestGrpcServerExtraOption(t *testing.T) { + gss := &ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: "0.0.0.0:1234", + }, + } + extraOpt := grpc.ConnectionTimeout(1_000_000_000) + opts, err := gss.getGrpcServerOptions( + componenttest.NewNopHost(), + componenttest.NewNopTelemetrySettings(), + []ToServerOption{WithGrpcServerOption(extraOpt)}, + ) + require.NoError(t, err) + assert.Len(t, opts, 4) + assert.Equal(t, opts[3], extraOpt) +} + func TestGrpcServerValidate(t *testing.T) { tests := []struct { gss *ServerConfig @@ -329,7 +368,7 @@ func TestAllGrpcServerSettingsExceptAuth(t *testing.T) { }, }, } - opts, err := gss.toServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings()) + opts, err := gss.getGrpcServerOptions(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{}) require.NoError(t, err) assert.Len(t, opts, 10) } @@ -488,7 +527,7 @@ func TestUseSecure(t *testing.T) { TLSSetting: configtls.ClientConfig{}, Keepalive: nil, } - dialOpts, err := gcs.toDialOptions(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings()) + dialOpts, err := gcs.getGrpcDialOptions(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings(), []ToClientConnOption{}) require.NoError(t, err) assert.Len(t, dialOpts, 2) } @@ -540,7 +579,7 @@ func TestGRPCServerWarning(t *testing.T) { logger, observed := observer.New(zap.DebugLevel) set.Logger = zap.New(logger) - opts, err := test.settings.toServerOption(componenttest.NewNopHost(), set) + opts, err := test.settings.getGrpcServerOptions(componenttest.NewNopHost(), set, []ToServerOption{}) require.NoError(t, err) require.NotNil(t, opts) _ = grpc.NewServer(opts...) diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index b8d6dee2a75..ecda0e93add 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -17,6 +17,7 @@ import ( "google.golang.org/grpc/status" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -58,7 +59,8 @@ func newExporter(cfg component.Config, set exporter.Settings) *baseExporter { // start actually creates the gRPC connection. The client construction is deferred till this point as this // is the only place we get hold of Extensions which are required to construct auth round tripper. func (e *baseExporter) start(ctx context.Context, host component.Host) (err error) { - if e.clientConn, err = e.config.ClientConfig.ToClientConn(ctx, host, e.settings, grpc.WithUserAgent(e.userAgent)); err != nil { + agentOpt := configgrpc.WithGrpcDialOption(grpc.WithUserAgent(e.userAgent)) + if e.clientConn, err = e.config.ClientConfig.ToClientConnWithOptions(ctx, host, e.settings, agentOpt); err != nil { return err } e.traceExporter = ptraceotlp.NewGRPCClient(e.clientConn) diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index a4c5f08c7a9..82218ef88db 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -89,7 +89,7 @@ func (r *otlpReceiver) startGRPCServer(host component.Host) error { } var err error - if r.serverGRPC, err = r.cfg.GRPC.ToServer(context.Background(), host, r.settings.TelemetrySettings); err != nil { + if r.serverGRPC, err = r.cfg.GRPC.ToServerWithOptions(context.Background(), host, r.settings.TelemetrySettings); err != nil { return err } From 1391fab8dc5c225c6cb94f829d1e6c7ff3636e1c Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Fri, 20 Sep 2024 01:02:41 +0200 Subject: [PATCH 3/6] [chore][exporter/otlphttpexporter] Generalize `composeSignalURL` (#11206) #### Description Split off from #11131. Prepares OTLP HTTP exporter for supporting profiles (which uses `/v1development`) Co-authored-by: Damien Mathieu <42@dmathieu.com> --- exporter/otlphttpexporter/factory.go | 17 +++++++++++------ exporter/otlphttpexporter/factory_test.go | 10 ++++++++-- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/exporter/otlphttpexporter/factory.go b/exporter/otlphttpexporter/factory.go index c29940eaee7..8b7ec9dc807 100644 --- a/exporter/otlphttpexporter/factory.go +++ b/exporter/otlphttpexporter/factory.go @@ -49,7 +49,12 @@ func createDefaultConfig() component.Config { } } -func composeSignalURL(oCfg *Config, signalOverrideURL string, signalName string) (string, error) { +// composeSignalURL composes the final URL for the signal (traces, metrics, logs) based on the configuration. +// oCfg is the configuration of the exporter. +// signalOverrideURL is the URL specified in the signal specific configuration (empty if not specified). +// signalName is the name of the signal, e.g. "traces", "metrics", "logs". +// signalVersion is the version of the signal, e.g. "v1" or "v1development". +func composeSignalURL(oCfg *Config, signalOverrideURL string, signalName string, signalVersion string) (string, error) { switch { case signalOverrideURL != "": _, err := url.Parse(signalOverrideURL) @@ -61,9 +66,9 @@ func composeSignalURL(oCfg *Config, signalOverrideURL string, signalName string) return "", fmt.Errorf("either endpoint or %s_endpoint must be specified", signalName) default: if strings.HasSuffix(oCfg.Endpoint, "/") { - return oCfg.Endpoint + "v1/" + signalName, nil + return oCfg.Endpoint + signalVersion + "/" + signalName, nil } - return oCfg.Endpoint + "/v1/" + signalName, nil + return oCfg.Endpoint + "/" + signalVersion + "/" + signalName, nil } } @@ -78,7 +83,7 @@ func createTracesExporter( } oCfg := cfg.(*Config) - oce.tracesURL, err = composeSignalURL(oCfg, oCfg.TracesEndpoint, "traces") + oce.tracesURL, err = composeSignalURL(oCfg, oCfg.TracesEndpoint, "traces", "v1") if err != nil { return nil, err } @@ -104,7 +109,7 @@ func createMetricsExporter( } oCfg := cfg.(*Config) - oce.metricsURL, err = composeSignalURL(oCfg, oCfg.MetricsEndpoint, "metrics") + oce.metricsURL, err = composeSignalURL(oCfg, oCfg.MetricsEndpoint, "metrics", "v1") if err != nil { return nil, err } @@ -130,7 +135,7 @@ func createLogsExporter( } oCfg := cfg.(*Config) - oce.logsURL, err = composeSignalURL(oCfg, oCfg.LogsEndpoint, "logs") + oce.logsURL, err = composeSignalURL(oCfg, oCfg.LogsEndpoint, "logs", "v1") if err != nil { return nil, err } diff --git a/exporter/otlphttpexporter/factory_test.go b/exporter/otlphttpexporter/factory_test.go index 0aef54e96f6..5170e8b4500 100644 --- a/exporter/otlphttpexporter/factory_test.go +++ b/exporter/otlphttpexporter/factory_test.go @@ -215,13 +215,19 @@ func TestComposeSignalURL(t *testing.T) { // Has slash at end cfg.ClientConfig.Endpoint = "http://localhost:4318/" - url, err := composeSignalURL(cfg, "", "traces") + url, err := composeSignalURL(cfg, "", "traces", "v1") require.NoError(t, err) assert.Equal(t, "http://localhost:4318/v1/traces", url) // No slash at end cfg.ClientConfig.Endpoint = "http://localhost:4318" - url, err = composeSignalURL(cfg, "", "traces") + url, err = composeSignalURL(cfg, "", "traces", "v1") require.NoError(t, err) assert.Equal(t, "http://localhost:4318/v1/traces", url) + + // Different version + cfg.ClientConfig.Endpoint = "http://localhost:4318" + url, err = composeSignalURL(cfg, "", "traces", "v2") + require.NoError(t, err) + assert.Equal(t, "http://localhost:4318/v2/traces", url) } From 0c7d3475dd68268fd5c4e3ac7f878fb5cd026174 Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Fri, 20 Sep 2024 03:21:54 -0600 Subject: [PATCH 4/6] [chore] Add pipeline module (#11209) #### Description To facilitate the work in https://github.com/open-telemetry/opentelemetry-collector/pull/11204 as some breaking changes and some deprecations, this PR adds the new pipeline module separately so that future PRs can handle the breaking changes and deprecations. In order to make `Signal` uninstantiable outside of this repo, while still being extendable in places like `componentprofiles`, a new internal module is added to handle the `Signal` logic. To reduce the dependency sprawl that would happen if `signal` was an internal package in `go.opentelemetry.io/collector`, I made it a module, similar to `globalgates`. #### Link to tracking issue Related to https://github.com/open-telemetry/opentelemetry-collector/pull/10947 #### Testing Added unit tests #### Documentation Added godoc comments --- .chloggen/add-pipeline-module.yaml | 25 +++++ Makefile | 4 + internal/globalsignal/Makefile | 1 + internal/globalsignal/go.mod | 11 +++ internal/globalsignal/go.sum | 10 ++ internal/globalsignal/signal.go | 50 ++++++++++ internal/globalsignal/signal_test.go | 41 +++++++++ pipeline/Makefile | 1 + pipeline/go.mod | 16 ++++ pipeline/go.sum | 10 ++ pipeline/pipeline.go | 131 ++++++++++++++++++++++++++ pipeline/pipeline_test.go | 133 +++++++++++++++++++++++++++ pipeline/signal.go | 18 ++++ pipeline/signal_test.go | 31 +++++++ versions.yaml | 2 + 15 files changed, 484 insertions(+) create mode 100644 .chloggen/add-pipeline-module.yaml create mode 100644 internal/globalsignal/Makefile create mode 100644 internal/globalsignal/go.mod create mode 100644 internal/globalsignal/go.sum create mode 100644 internal/globalsignal/signal.go create mode 100644 internal/globalsignal/signal_test.go create mode 100644 pipeline/Makefile create mode 100644 pipeline/go.mod create mode 100644 pipeline/go.sum create mode 100644 pipeline/pipeline.go create mode 100644 pipeline/pipeline_test.go create mode 100644 pipeline/signal.go create mode 100644 pipeline/signal_test.go diff --git a/.chloggen/add-pipeline-module.yaml b/.chloggen/add-pipeline-module.yaml new file mode 100644 index 00000000000..60819a20673 --- /dev/null +++ b/.chloggen/add-pipeline-module.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: new_component + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: pipeline + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds new `pipeline` module to house the concept of pipeline ID and Signal. + +# One or more tracking issues or pull requests related to the change +issues: [11209] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/Makefile b/Makefile index d27740260b1..58b0335e895 100644 --- a/Makefile +++ b/Makefile @@ -303,11 +303,13 @@ check-contrib: -replace go.opentelemetry.io/collector/extension/zpagesextension=$(CURDIR)/extension/zpagesextension \ -replace go.opentelemetry.io/collector/featuregate=$(CURDIR)/featuregate \ -replace go.opentelemetry.io/collector/internal/globalgates=$(CURDIR)/internal/globalgates \ + -replace go.opentelemetry.io/collector/internal/globalsignal=$(CURDIR)/internal/globalsignal \ -replace go.opentelemetry.io/collector/otelcol=$(CURDIR)/otelcol \ -replace go.opentelemetry.io/collector/otelcol/otelcoltest=$(CURDIR)/otelcol/otelcoltest \ -replace go.opentelemetry.io/collector/pdata=$(CURDIR)/pdata \ -replace go.opentelemetry.io/collector/pdata/testdata=$(CURDIR)/pdata/testdata \ -replace go.opentelemetry.io/collector/pdata/pprofile=$(CURDIR)/pdata/pprofile \ + -replace go.opentelemetry.io/collector/pipeline=$(CURDIR)/pipeline \ -replace go.opentelemetry.io/collector/processor=$(CURDIR)/processor \ -replace go.opentelemetry.io/collector/processor/batchprocessor=$(CURDIR)/processor/batchprocessor \ -replace go.opentelemetry.io/collector/processor/memorylimiterprocessor=$(CURDIR)/processor/memorylimiterprocessor \ @@ -369,11 +371,13 @@ restore-contrib: -dropreplace go.opentelemetry.io/collector/extension/zpagesextension \ -dropreplace go.opentelemetry.io/collector/featuregate \ -dropreplace go.opentelemetry.io/collector/internal/globalgates \ + -dropreplace go.opentelemetry.io/collector/internal/globalsignal \ -dropreplace go.opentelemetry.io/collector/otelcol \ -dropreplace go.opentelemetry.io/collector/otelcol/otelcoltest \ -dropreplace go.opentelemetry.io/collector/pdata \ -dropreplace go.opentelemetry.io/collector/pdata/testdata \ -dropreplace go.opentelemetry.io/collector/pdata/pprofile \ + -dropreplace go.opentelemetry.io/collector/pipeline \ -dropreplace go.opentelemetry.io/collector/processor \ -dropreplace go.opentelemetry.io/collector/processor/batchprocessor \ -dropreplace go.opentelemetry.io/collector/processor/memorylimiterprocessor \ diff --git a/internal/globalsignal/Makefile b/internal/globalsignal/Makefile new file mode 100644 index 00000000000..ded7a36092d --- /dev/null +++ b/internal/globalsignal/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common diff --git a/internal/globalsignal/go.mod b/internal/globalsignal/go.mod new file mode 100644 index 00000000000..8ee5f5ddf1e --- /dev/null +++ b/internal/globalsignal/go.mod @@ -0,0 +1,11 @@ +module go.opentelemetry.io/collector/internal/globalsignal + +go 1.22.0 + +require github.com/stretchr/testify v1.9.0 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/internal/globalsignal/go.sum b/internal/globalsignal/go.sum new file mode 100644 index 00000000000..60ce688a041 --- /dev/null +++ b/internal/globalsignal/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/globalsignal/signal.go b/internal/globalsignal/signal.go new file mode 100644 index 00000000000..a10431743b0 --- /dev/null +++ b/internal/globalsignal/signal.go @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package globalsignal // import "go.opentelemetry.io/collector/internal/globalsignal" + +import ( + "fmt" + "regexp" +) + +// Signal represents the signals supported by the collector. +type Signal struct { + name string +} + +// String returns the string representation of the signal. +func (s Signal) String() string { + return s.name +} + +// MarshalText marshals the Signal. +func (s Signal) MarshalText() (text []byte, err error) { + return []byte(s.name), nil +} + +// signalRegex is used to validate the signal. +// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters. +var signalRegex = regexp.MustCompile(`^[a-z]{1,62}$`) + +// NewSignal creates a Signal. It returns an error if the Signal is invalid. +// A Signal must consist of 1 to 62 lowercase ASCII alphabetic characters. +func NewSignal(signal string) (Signal, error) { + if len(signal) == 0 { + return Signal{}, fmt.Errorf("signal must not be empty") + } + if !signalRegex.MatchString(signal) { + return Signal{}, fmt.Errorf("invalid character(s) in type %q", signal) + } + return Signal{name: signal}, nil +} + +// MustNewSignal creates a Signal. It panics if the Signal is invalid. +// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters. +func MustNewSignal(signal string) Signal { + s, err := NewSignal(signal) + if err != nil { + panic(err) + } + return s +} diff --git a/internal/globalsignal/signal_test.go b/internal/globalsignal/signal_test.go new file mode 100644 index 00000000000..0729c62c0b5 --- /dev/null +++ b/internal/globalsignal/signal_test.go @@ -0,0 +1,41 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package globalsignal + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_NewSignal(t *testing.T) { + s, err := NewSignal("traces") + require.NoError(t, err) + assert.Equal(t, Signal{name: "traces"}, s) +} + +func Test_NewSignal_Invalid(t *testing.T) { + _, err := NewSignal("") + require.Error(t, err) + _, err = NewSignal("TRACES") + require.Error(t, err) +} + +func Test_MustNewSignal(t *testing.T) { + s := MustNewSignal("traces") + assert.Equal(t, Signal{name: "traces"}, s) +} + +func Test_Signal_String(t *testing.T) { + s := MustNewSignal("traces") + assert.Equal(t, "traces", s.String()) +} + +func Test_Signal_MarshalText(t *testing.T) { + s := MustNewSignal("traces") + b, err := s.MarshalText() + require.NoError(t, err) + assert.Equal(t, []byte("traces"), b) +} diff --git a/pipeline/Makefile b/pipeline/Makefile new file mode 100644 index 00000000000..39734bfaebb --- /dev/null +++ b/pipeline/Makefile @@ -0,0 +1 @@ +include ../Makefile.Common diff --git a/pipeline/go.mod b/pipeline/go.mod new file mode 100644 index 00000000000..d033db037c3 --- /dev/null +++ b/pipeline/go.mod @@ -0,0 +1,16 @@ +module go.opentelemetry.io/collector/pipeline + +go 1.22.0 + +require ( + github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/collector/internal/globalsignal v0.109.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace go.opentelemetry.io/collector/internal/globalsignal => ../internal/globalsignal diff --git a/pipeline/go.sum b/pipeline/go.sum new file mode 100644 index 00000000000..60ce688a041 --- /dev/null +++ b/pipeline/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go new file mode 100644 index 00000000000..ae8ac833cc5 --- /dev/null +++ b/pipeline/pipeline.go @@ -0,0 +1,131 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pipeline // import "go.opentelemetry.io/collector/pipeline" +import ( + "errors" + "fmt" + "regexp" + "strings" + + "go.opentelemetry.io/collector/internal/globalsignal" +) + +// typeAndNameSeparator is the separator that is used between type and name in type/name composite keys. +const typeAndNameSeparator = "/" + +// ID represents the identity for a pipeline. It combines two values: +// * signal - the Signal of the pipeline. +// * name - the name of that pipeline. +type ID struct { + signal Signal `mapstructure:"-"` + name string `mapstructure:"-"` +} + +// NewID returns a new ID with the given Signal and empty name. +func NewID(signal Signal) ID { + return ID{signal: signal} +} + +// MustNewID builds a Signal and returns a new ID with the given Signal and empty name. +// It panics if the Signal is invalid. +// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters. +func MustNewID(signal string) ID { + return ID{signal: globalsignal.MustNewSignal(signal)} +} + +// NewIDWithName returns a new ID with the given Signal and name. +func NewIDWithName(signal Signal, name string) ID { + return ID{signal: signal, name: name} +} + +// MustNewIDWithName builds a Signal and returns a new ID with the given Signal and name. +// It panics if the Signal is invalid or name is invalid. +// A signal must consist of 1 to 62 lowercase ASCII alphabetic characters. +// A name must consist of 1 to 1024 unicode characters excluding whitespace, control characters, and symbols. +func MustNewIDWithName(signal string, name string) ID { + id := ID{signal: globalsignal.MustNewSignal(signal)} + err := validateName(name) + if err != nil { + panic(err) + } + id.name = name + return id +} + +// Signal returns the Signal of the ID. +func (i ID) Signal() Signal { + return i.signal +} + +// Name returns the name of the ID. +func (i ID) Name() string { + return i.name +} + +// MarshalText implements the encoding.TextMarshaler interface. +// This marshals the Signal and name as one string in the config. +func (i ID) MarshalText() (text []byte, err error) { + return []byte(i.String()), nil +} + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (i *ID) UnmarshalText(text []byte) error { + idStr := string(text) + items := strings.SplitN(idStr, typeAndNameSeparator, 2) + var signalStr, nameStr string + if len(items) >= 1 { + signalStr = strings.TrimSpace(items[0]) + } + + if len(items) == 1 && signalStr == "" { + return errors.New("id must not be empty") + } + + if signalStr == "" { + return fmt.Errorf("in %q id: the part before %s should not be empty", idStr, typeAndNameSeparator) + } + + if len(items) > 1 { + // "name" part is present. + nameStr = strings.TrimSpace(items[1]) + if nameStr == "" { + return fmt.Errorf("in %q id: the part after %s should not be empty", idStr, typeAndNameSeparator) + } + if err := validateName(nameStr); err != nil { + return fmt.Errorf("in %q id: %w", nameStr, err) + } + } + + var err error + if i.signal, err = globalsignal.NewSignal(signalStr); err != nil { + return fmt.Errorf("in %q id: %w", idStr, err) + } + i.name = nameStr + + return nil +} + +// String returns the ID string representation as "signal[/name]" format. +func (i ID) String() string { + if i.name == "" { + return i.signal.String() + } + + return i.signal.String() + typeAndNameSeparator + i.name +} + +// nameRegexp is used to validate the name of an ID. A name can consist of +// 1 to 1024 unicode characters excluding whitespace, control characters, and +// symbols. +var nameRegexp = regexp.MustCompile(`^[^\pZ\pC\pS]+$`) + +func validateName(nameStr string) error { + if len(nameStr) > 1024 { + return fmt.Errorf("name %q is longer than 1024 characters (%d characters)", nameStr, len(nameStr)) + } + if !nameRegexp.MatchString(nameStr) { + return fmt.Errorf("invalid character(s) in name %q", nameStr) + } + return nil +} diff --git a/pipeline/pipeline_test.go b/pipeline/pipeline_test.go new file mode 100644 index 00000000000..91727381092 --- /dev/null +++ b/pipeline/pipeline_test.go @@ -0,0 +1,133 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pipeline + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/internal/globalsignal" +) + +func Test_NewID(t *testing.T) { + id := NewID(SignalTraces) + assert.Equal(t, ID{signal: SignalTraces}, id) +} + +func Test_MustNewID(t *testing.T) { + id := MustNewID("traces") + assert.Equal(t, ID{signal: SignalTraces}, id) +} + +func Test_NewIDWithName(t *testing.T) { + id := NewIDWithName(SignalTraces, "test") + assert.Equal(t, ID{signal: SignalTraces, name: "test"}, id) +} + +func Test_MustNewIDWithName(t *testing.T) { + id := MustNewIDWithName("traces", "test") + assert.Equal(t, ID{signal: SignalTraces, name: "test"}, id) +} + +func TestMarshalText(t *testing.T) { + id := NewIDWithName(SignalTraces, "name") + got, err := id.MarshalText() + require.NoError(t, err) + assert.Equal(t, id.String(), string(got)) +} + +func TestUnmarshalText(t *testing.T) { + validSignal := globalsignal.MustNewSignal("valid") + var testCases = []struct { + idStr string + expectedErr bool + expectedID ID + }{ + { + idStr: "valid", + expectedID: ID{signal: validSignal, name: ""}, + }, + { + idStr: "valid/valid_name", + expectedID: ID{signal: validSignal, name: "valid_name"}, + }, + { + idStr: " valid / valid_name ", + expectedID: ID{signal: validSignal, name: "valid_name"}, + }, + { + idStr: "valid/中文好", + expectedID: ID{signal: validSignal, name: "中文好"}, + }, + { + idStr: "valid/name-with-dashes", + expectedID: ID{signal: validSignal, name: "name-with-dashes"}, + }, + // issue 10816 + { + idStr: "valid/Linux-Messages-File_01J49HCH3SWFXRVASWFZFRT3J2__processor0__logs", + expectedID: ID{signal: validSignal, name: "Linux-Messages-File_01J49HCH3SWFXRVASWFZFRT3J2__processor0__logs"}, + }, + { + idStr: "valid/1", + expectedID: ID{signal: validSignal, name: "1"}, + }, + { + idStr: "/valid_name", + expectedErr: true, + }, + { + idStr: " /valid_name", + expectedErr: true, + }, + { + idStr: "valid/", + expectedErr: true, + }, + { + idStr: "valid/ ", + expectedErr: true, + }, + { + idStr: " ", + expectedErr: true, + }, + { + idStr: "valid/invalid name", + expectedErr: true, + }, + { + idStr: "valid/" + strings.Repeat("a", 1025), + expectedErr: true, + }, + { + idStr: "INVALID", + expectedErr: true, + }, + { + idStr: "INVALID/name", + expectedErr: true, + }, + } + + for _, test := range testCases { + t.Run(test.idStr, func(t *testing.T) { + id := ID{} + err := id.UnmarshalText([]byte(test.idStr)) + if test.expectedErr { + assert.Error(t, err) + return + } + + require.NoError(t, err) + assert.Equal(t, test.expectedID, id) + assert.Equal(t, test.expectedID.Signal(), id.Signal()) + assert.Equal(t, test.expectedID.Name(), id.Name()) + assert.Equal(t, test.expectedID.String(), id.String()) + }) + } +} diff --git a/pipeline/signal.go b/pipeline/signal.go new file mode 100644 index 00000000000..eaa2c75b331 --- /dev/null +++ b/pipeline/signal.go @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pipeline // import "go.opentelemetry.io/collector/pipeline" + +import ( + "go.opentelemetry.io/collector/internal/globalsignal" +) + +// Signal represents the signals supported by the collector. We currently support +// collecting metrics, traces and logs, this can expand in the future. +type Signal = globalsignal.Signal + +var ( + SignalTraces = globalsignal.MustNewSignal("traces") + SignalMetrics = globalsignal.MustNewSignal("metrics") + SignalLogs = globalsignal.MustNewSignal("logs") +) diff --git a/pipeline/signal_test.go b/pipeline/signal_test.go new file mode 100644 index 00000000000..4e6d17bbcc2 --- /dev/null +++ b/pipeline/signal_test.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package pipeline + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_Signal_String(t *testing.T) { + assert.Equal(t, "traces", SignalTraces.String()) + assert.Equal(t, "metrics", SignalMetrics.String()) + assert.Equal(t, "logs", SignalLogs.String()) +} + +func Test_Signal_MarshalText(t *testing.T) { + val, err := SignalTraces.MarshalText() + require.NoError(t, err) + assert.Equal(t, []byte("traces"), val) + + val, err = SignalMetrics.MarshalText() + require.NoError(t, err) + assert.Equal(t, []byte("metrics"), val) + + val, err = SignalLogs.MarshalText() + require.NoError(t, err) + assert.Equal(t, []byte("logs"), val) +} diff --git a/versions.yaml b/versions.yaml index 2d1969ae6dd..4f0b4a31e69 100644 --- a/versions.yaml +++ b/versions.yaml @@ -21,6 +21,7 @@ module-sets: modules: - go.opentelemetry.io/collector - go.opentelemetry.io/collector/internal/globalgates + - go.opentelemetry.io/collector/internal/globalsignal - go.opentelemetry.io/collector/cmd/builder - go.opentelemetry.io/collector/cmd/mdatagen - go.opentelemetry.io/collector/component @@ -58,6 +59,7 @@ module-sets: - go.opentelemetry.io/collector/otelcol/otelcoltest - go.opentelemetry.io/collector/pdata/pprofile - go.opentelemetry.io/collector/pdata/testdata + - go.opentelemetry.io/collector/pipeline - go.opentelemetry.io/collector/processor - go.opentelemetry.io/collector/processor/batchprocessor - go.opentelemetry.io/collector/processor/memorylimiterprocessor From 18e4b4932b1999d4a850d31dd0c98aeb761c29e2 Mon Sep 17 00:00:00 2001 From: Damien Mathieu <42@dmathieu.com> Date: Fri, 20 Sep 2024 11:44:37 +0200 Subject: [PATCH 5/6] Introduce SampleCount to ProfilesSink (#11225) #### Description This is an extraction from https://github.com/open-telemetry/opentelemetry-collector/pull/11131. cc @mx-psi --- .../consumertest-profiles-samplecount.yaml | 20 +++++++++++++++++++ consumer/consumertest/sink.go | 14 +++++++++++-- consumer/consumertest/sink_test.go | 2 ++ 3 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 .chloggen/consumertest-profiles-samplecount.yaml diff --git a/.chloggen/consumertest-profiles-samplecount.yaml b/.chloggen/consumertest-profiles-samplecount.yaml new file mode 100644 index 00000000000..6101b539b06 --- /dev/null +++ b/.chloggen/consumertest-profiles-samplecount.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: consumertest + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Introduce SampleCount method in ProfilesSink struct. + +# One or more tracking issues or pull requests related to the change +issues: [11225] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/consumer/consumertest/sink.go b/consumer/consumertest/sink.go index ec35e717ae0..a6d2a424ee1 100644 --- a/consumer/consumertest/sink.go +++ b/consumer/consumertest/sink.go @@ -163,8 +163,9 @@ func (sle *LogsSink) Reset() { // stores all profiles and allows querying them for testing. type ProfilesSink struct { nonMutatingConsumer - mu sync.Mutex - profiles []pprofile.Profiles + mu sync.Mutex + profiles []pprofile.Profiles + sampleCount int } var _ consumerprofiles.Profiles = (*ProfilesSink)(nil) @@ -175,6 +176,7 @@ func (ste *ProfilesSink) ConsumeProfiles(_ context.Context, td pprofile.Profiles defer ste.mu.Unlock() ste.profiles = append(ste.profiles, td) + ste.sampleCount += td.SampleCount() return nil } @@ -189,10 +191,18 @@ func (ste *ProfilesSink) AllProfiles() []pprofile.Profiles { return copyProfiles } +// ProfileRecordCount returns the number of profiles stored by this sink since last Reset. +func (ste *ProfilesSink) SampleCount() int { + ste.mu.Lock() + defer ste.mu.Unlock() + return ste.sampleCount +} + // Reset deletes any stored data. func (ste *ProfilesSink) Reset() { ste.mu.Lock() defer ste.mu.Unlock() ste.profiles = nil + ste.sampleCount = 0 } diff --git a/consumer/consumertest/sink_test.go b/consumer/consumertest/sink_test.go index 3a377345fc4..5d7f7f3bf8a 100644 --- a/consumer/consumertest/sink_test.go +++ b/consumer/consumertest/sink_test.go @@ -71,6 +71,8 @@ func TestProfilesSink(t *testing.T) { want = append(want, td) } assert.Equal(t, want, sink.AllProfiles()) + assert.Equal(t, len(want), sink.SampleCount()) sink.Reset() assert.Empty(t, sink.AllProfiles()) + assert.Empty(t, sink.SampleCount()) } From 07c3e17b2045bd6ffc791578f4c2951335d6c585 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Fri, 20 Sep 2024 11:55:42 +0200 Subject: [PATCH 6/6] [chore][VERSIONING.md] Add long term support policy (#11001) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### Description Adds support policy for Collector SIG software artifacts with the following principles: - Support means that we will, at a minimum, fix critical bugs and security issues. We MAY fix other bugs and/or add features if needed (e.g. because a feature eases transition to a v2), but this is not required - We support artifacts for end-users for longer - Artifacts may contain sub-components that are not stable. The important bit is that users can easily tell when something is unstable. #### Link to tracking issue Fixes #10004 --------- Co-authored-by: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Co-authored-by: Juraci Paixão Kröhling Co-authored-by: Alex Boten <223565+codeboten@users.noreply.github.com> Co-authored-by: Tiffany Hrabusa <30397949+tiffany76@users.noreply.github.com> --- VERSIONING.md | 50 +++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 43 insertions(+), 7 deletions(-) diff --git a/VERSIONING.md b/VERSIONING.md index 8da5ba3ba23..68d69ee23fe 100644 --- a/VERSIONING.md +++ b/VERSIONING.md @@ -1,13 +1,42 @@ -# Versioning +# Versioning and stability -This document describes the versioning policy for this repository. This policy -is designed so that the following goal can be achieved: +The OpenTelemetry Collector SIG produces several artifacts for [a variety of audiences](CONTRIBUTING.md#target-audiences). This document describes the versioning and support policy for these artifacts. These policies are designed so that the following goal can be achieved: -**Users are provided a codebase of value that is stable and secure.** +**Users are provided software artifacts of value that are stable and secure.** -## Public API expectations +The policies are divided depending on the artifact's target audience. While an artifact is supported, [critical bugs](docs/release.md#bugfix-release-criteria) and security vulnerabilities MUST be addressed. The main criteria for the length of support for an artifact is how easy it is for an artifact's target audience to adapt to disruptive changes. -The following public API expectations apply to all modules in opentelemetry-collector and opentelemetry-collector-contrib. +These policies reflect the current consensus of the OpenTelemetry Collector SIG. They are subject to change as the project evolves. + +## Software artifacts for end users + +Software artifacts intended for [end users](CONTRIBUTING.md#end-users) of the OpenTelemetry Collector include +- Binary distributions of the OpenTelemetry Collector. +- Go modules that expose Collector components, such as receivers, processors, connectors, extensions and exporters. + +These artifacts are versioned according to the [semantic versioning v2.0.0](https://semver.org/) specification. + +### General considerations + +Binary distributions produced by the Collector SIG contain components and features with varying [levels of stability](README.md#stability-levels). We abide by the following principles to relate the Collector's version to the stability of its components and features: + +* The Collector's core framework behavior MUST be stable in order for a Collector distribution to be v1.0.0 or higher. +* Users can easily understand when they are opting in to use a component or feature that is not stable. + * The Collector MUST be configurable so that unstable components or features can be excluded ensuring that a fully stable configuration is possible. + * The Collector's telemetry (e.g. Collector logs) MUST provide the ability to identify usage of unstable components or features. + +### Long-term support after v1 + +The OpenTelemetry Collector SIG provides long-term support for stable binary distributions of the OpenTelemetry Collector and its components. The following policies apply to long-term support for any major version starting on v1: + +* A binary distribution of the OpenTelemetry Collector MUST be supported for a minimum of **one year** after the release of the next major version of said distribution. +* Components MUST be supported for a minimum of **6 months** after the release of the next major version of said component or after the component has been marked as deprecated. If a component has been deprecated for 6 months it MAY be removed from a binary distribution of the OpenTelemetry Collector. This does not imply a major version change in the Collector distribution. + +## Go modules + +Go modules are intended to be used by [component developers](CONTRIBUTING.md#component-developers) and [Collector library users](CONTRIBUTING.md#collector-library-users) of the OpenTelemetry Collector + +Unless otherwise specified, the following public API expectations apply to all modules in opentelemetry-collector and opentelemetry-collector-contrib. As a general rule, stability guarantees of modules versioned as `v1` or higher are aligned with [Go 1 compatibility promise](https://go.dev/doc/go1compat). ### General Go API considerations @@ -54,7 +83,7 @@ structure. must continue to be valid after a change to the validation rules, except when the configuration struct would cause an error on its intended usage (e.g. when calling a method or when passed to any method or function in any module under opentelemetry-collector). -## Versioning and module schema +### Module versioning and schema * Versioning of this project will be idiomatic of a Go project using [Go modules](https://golang.org/ref/mod#versions). @@ -135,3 +164,10 @@ on its intended usage (e.g. when calling a method or when passed to any method o * Contrib modules will be kept up to date with this project's releases. * GitHub releases will be made for all releases. * Go modules will be made available at Go package mirrors. + +### Long-term support after v1 + +The OpenTelemetry Collector SIG provides long-term support for stable Go modules. Support for modules depend on the module's [target audiences](CONTRIBUTING.md#target-audiences). The following policies apply to long-term support for any major version starting on v1: + +- Modules intended for **component developers** MUST be supported for a minimum of **1 year** after the release of the next major version of said module or after the module has been marked as deprecated. +- Modules intended for **Collector library users** MUST be supported for a minimum of **6 months** after the release of the next major version of said module or after the module has been marked as deprecated.