diff --git a/CHANGELOG.md b/CHANGELOG.md index b59aa3f8738b..42f9b0edd165 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,15 +48,13 @@ In case of type mismatch, they don't panic right away but return an invalid zero-initialized instance for consistency with other OneOf field accessors (#5034) - Update OTLP to v0.15.0 (#5064) +- Create additional pdata packages separated by type for further split of pdata (#4918) +- Create additional consumer packages split by telemetry type (#5086) ### 🧰 Bug fixes 🧰 - The `featuregates` were not configured from the "--feature-gates" flag on windows service (#5060) -### 💡 Enhancements 💡 - -- Create additional pdata packages separated by type for further split of pdata (#4918) - ## v0.47.0 Beta ### 🛑 Breaking changes 🛑 diff --git a/consumer/consumer.go b/consumer/consumer.go index ddfce62cdda3..efa5e67fb508 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -15,53 +15,23 @@ package consumer // import "go.opentelemetry.io/collector/consumer" import ( - "errors" + "go.opentelemetry.io/collector/consumer/internal" ) // Capabilities describes the capabilities of a Processor. -type Capabilities struct { - // MutatesData is set to true if Consume* function of the - // processor modifies the input TraceData or MetricsData argument. - // Processors which modify the input data MUST set this flag to true. If the processor - // does not modify the data it MUST set this flag to false. If the processor creates - // a copy of the data before modifying then this flag can be safely set to false. - MutatesData bool -} - -type baseConsumer interface { - Capabilities() Capabilities -} - -var errNilFunc = errors.New("nil consumer func") - -type baseImpl struct { - capabilities Capabilities -} +type Capabilities = internal.Capabilities // Option to construct new consumers. -type Option func(*baseImpl) +type Option = internal.Option // WithCapabilities overrides the default GetCapabilities function for a processor. // The default GetCapabilities function returns mutable capabilities. -func WithCapabilities(capabilities Capabilities) Option { - return func(o *baseImpl) { - o.capabilities = capabilities - } -} +var WithCapabilities = internal.WithCapabilities -// Capabilities implementation of the base -func (bs baseImpl) Capabilities() Capabilities { - return bs.capabilities -} +type baseConsumer = internal.BaseConsumer -func newBaseImpl(options ...Option) *baseImpl { - bs := &baseImpl{ - capabilities: Capabilities{MutatesData: false}, - } +var errNilFunc = internal.ErrNilFunc - for _, op := range options { - op(bs) - } +type baseImpl = internal.BaseImpl - return bs -} +var newBaseImpl = internal.NewBaseImpl diff --git a/consumer/internal/consumer.go b/consumer/internal/consumer.go new file mode 100644 index 000000000000..23f86d6d8acb --- /dev/null +++ b/consumer/internal/consumer.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal // import "go.opentelemetry.io/collector/consumer/internal" + +import ( + "errors" +) + +// Capabilities describes the capabilities of a Processor. +type Capabilities struct { + // MutatesData is set to true if Consume* function of the + // processor modifies the input TraceData or MetricsData argument. + // Processors which modify the input data MUST set this flag to true. If the processor + // does not modify the data it MUST set this flag to false. If the processor creates + // a copy of the data before modifying then this flag can be safely set to false. + MutatesData bool +} + +type BaseConsumer interface { + Capabilities() Capabilities +} + +var ErrNilFunc = errors.New("nil consumer func") + +type BaseImpl struct { + Cap Capabilities +} + +// Option to construct new consumers. +type Option func(*BaseImpl) + +// WithCapabilities overrides the default GetCapabilities function for a processor. +// The default GetCapabilities function returns mutable capabilities. +func WithCapabilities(capabilities Capabilities) Option { + return func(o *BaseImpl) { + o.Cap = capabilities + } +} + +// Capabilities implementation of the base +func (bs BaseImpl) Capabilities() Capabilities { + return bs.Cap +} + +func NewBaseImpl(options ...Option) *BaseImpl { + bs := &BaseImpl{ + Cap: Capabilities{MutatesData: false}, + } + + for _, op := range options { + op(bs) + } + + return bs +} diff --git a/consumer/logs/consumer.go b/consumer/logs/consumer.go new file mode 100644 index 000000000000..610971c80d90 --- /dev/null +++ b/consumer/logs/consumer.go @@ -0,0 +1,54 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logs // import "go.opentelemetry.io/collector/consumer/logs" + +import ( + "context" + + "go.opentelemetry.io/collector/consumer/internal" + "go.opentelemetry.io/collector/model/pdata/logs" +) + +// Consumer is an interface that receives logs.Logs, processes it +// as needed, and sends it to the next processing node if any or to the destination. +type Consumer interface { + internal.BaseConsumer + // Consume receives logs.Logs for consumption. + Consume(ctx context.Context, ld logs.Logs) error +} + +// ConsumeFunc is a helper function that is similar to Consume. +type ConsumeFunc func(ctx context.Context, ld logs.Logs) error + +// Consume calls f(ctx, ld). +func (f ConsumeFunc) Consume(ctx context.Context, ld logs.Logs) error { + return f(ctx, ld) +} + +type base struct { + *internal.BaseImpl + ConsumeFunc +} + +// NewConsumer returns a Logs configured with the provided options. +func NewConsumer(consume ConsumeFunc, options ...internal.Option) (Consumer, error) { + if consume == nil { + return nil, internal.ErrNilFunc + } + return &base{ + BaseImpl: internal.NewBaseImpl(options...), + ConsumeFunc: consume, + }, nil +} diff --git a/consumer/metrics/consumer.go b/consumer/metrics/consumer.go new file mode 100644 index 000000000000..43f9867f2169 --- /dev/null +++ b/consumer/metrics/consumer.go @@ -0,0 +1,54 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics // import "go.opentelemetry.io/collector/consumer/metrics" + +import ( + "context" + + "go.opentelemetry.io/collector/consumer/internal" + "go.opentelemetry.io/collector/model/pdata/metrics" +) + +// Consumer is an interface that receives metrics.Metrics, processes it +// as needed, and sends it to the next processing node if any or to the destination. +type Consumer interface { + internal.BaseConsumer + // Consume receives metrics.Metrics for consumption. + Consume(ctx context.Context, ld metrics.Metrics) error +} + +// ConsumeFunc is a helper function that is similar to Consume. +type ConsumeFunc func(ctx context.Context, ld metrics.Metrics) error + +// Consume calls f(ctx, ld). +func (f ConsumeFunc) Consume(ctx context.Context, ld metrics.Metrics) error { + return f(ctx, ld) +} + +type base struct { + *internal.BaseImpl + ConsumeFunc +} + +// NewConsumer returns a Metrics configured with the provided options. +func NewConsumer(consume ConsumeFunc, options ...internal.Option) (Consumer, error) { + if consume == nil { + return nil, internal.ErrNilFunc + } + return &base{ + BaseImpl: internal.NewBaseImpl(options...), + ConsumeFunc: consume, + }, nil +} diff --git a/consumer/traces/consumer.go b/consumer/traces/consumer.go new file mode 100644 index 000000000000..bce722effff4 --- /dev/null +++ b/consumer/traces/consumer.go @@ -0,0 +1,54 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traces // import "go.opentelemetry.io/collector/consumer/traces" + +import ( + "context" + + "go.opentelemetry.io/collector/consumer/internal" + "go.opentelemetry.io/collector/model/pdata/traces" +) + +// Consumer is an interface that receives traces.Traces, processes it +// as needed, and sends it to the next processing node if any or to the destination. +type Consumer interface { + internal.BaseConsumer + // Consume receives traces.Traces for consumption. + Consume(ctx context.Context, ld traces.Traces) error +} + +// ConsumeFunc is a helper function that is similar to Consume. +type ConsumeFunc func(ctx context.Context, ld traces.Traces) error + +// Consume calls f(ctx, ld). +func (f ConsumeFunc) Consume(ctx context.Context, ld traces.Traces) error { + return f(ctx, ld) +} + +type base struct { + *internal.BaseImpl + ConsumeFunc +} + +// NewConsumer returns a Traces configured with the provided options. +func NewConsumer(consume ConsumeFunc, options ...internal.Option) (Consumer, error) { + if consume == nil { + return nil, internal.ErrNilFunc + } + return &base{ + BaseImpl: internal.NewBaseImpl(options...), + ConsumeFunc: consume, + }, nil +}