diff --git a/cmd/builder/internal/config/default.yaml b/cmd/builder/internal/config/default.yaml index b1a13a37378..b2582c5755b 100644 --- a/cmd/builder/internal/config/default.yaml +++ b/cmd/builder/internal/config/default.yaml @@ -28,3 +28,5 @@ processors: connectors: - import: go.opentelemetry.io/collector/connector/nopconnector gomod: go.opentelemetry.io/collector v0.60.0 + - import: go.opentelemetry.io/collector/connector/countconnector + gomod: go.opentelemetry.io/collector v0.60.0 diff --git a/cmd/otelcorecol/builder-config.yaml b/cmd/otelcorecol/builder-config.yaml index 6f5540b3178..bcd27fab7b0 100644 --- a/cmd/otelcorecol/builder-config.yaml +++ b/cmd/otelcorecol/builder-config.yaml @@ -28,6 +28,8 @@ processors: connectors: - import: go.opentelemetry.io/collector/connector/nopconnector gomod: go.opentelemetry.io/collector v0.60.0 + - import: go.opentelemetry.io/collector/connector/countconnector + gomod: go.opentelemetry.io/collector v0.60.0 replaces: - go.opentelemetry.io/collector => ../../ diff --git a/cmd/otelcorecol/components.go b/cmd/otelcorecol/components.go index f3050b37712..ca897250c35 100644 --- a/cmd/otelcorecol/components.go +++ b/cmd/otelcorecol/components.go @@ -13,6 +13,7 @@ import ( memorylimiterprocessor "go.opentelemetry.io/collector/processor/memorylimiterprocessor" otlpreceiver "go.opentelemetry.io/collector/receiver/otlpreceiver" nopconnector "go.opentelemetry.io/collector/connector/nopconnector" + countconnector "go.opentelemetry.io/collector/connector/countconnector" ) func components() (component.Factories, error) { @@ -29,6 +30,7 @@ func components() (component.Factories, error) { factories.Connectors, err = component.MakeConnectorFactoryMap( nopconnector.NewFactory(), + countconnector.NewFactory(), ) if err != nil { return component.Factories{}, err @@ -37,6 +39,7 @@ func components() (component.Factories, error) { factories.Receivers, err = component.MakeReceiverFactoryMap( otlpreceiver.NewFactory(), nopconnector.NewFactory().NewReceiverFactory(), + countconnector.NewFactory().NewReceiverFactory(), ) if err != nil { return component.Factories{}, err @@ -47,6 +50,7 @@ func components() (component.Factories, error) { otlpexporter.NewFactory(), otlphttpexporter.NewFactory(), nopconnector.NewFactory().NewExporterFactory(), + countconnector.NewFactory().NewExporterFactory(), ) if err != nil { return component.Factories{}, err diff --git a/component/connector.go b/component/connector.go index 292a80d0d01..83faf5e3ef0 100644 --- a/component/connector.go +++ b/component/connector.go @@ -18,21 +18,6 @@ import ( "go.opentelemetry.io/collector/config" ) -// Connector exports telemetry data from one pipeline to another. -// type Connector interface { -// Component -// Exporter -// Receiver -// } - -// // ConnectorCreateSettings configures Connector creators. -// type ConnectorCreateSettings struct { -// TelemetrySettings - -// // BuildInfo can be used by components for informational purposes -// BuildInfo BuildInfo -// } - // ConnectorFactory is factory interface for connectors. // // This interface cannot be directly implemented. Implementations must @@ -60,6 +45,12 @@ type ConnectorCreateDefaultConfigFunc func() config.Connector func (f ConnectorCreateDefaultConfigFunc) CreateDefaultConfig() config.Connector { return f() } +func (f ConnectorCreateDefaultConfigFunc) createDefaultReceiverConfig() config.Receiver { + return f() +} +func (f ConnectorCreateDefaultConfigFunc) CreateDefaultExporterConfig() config.Exporter { + return f() +} type connectorFactory struct { baseFactory @@ -84,9 +75,21 @@ func NewConnectorFactory( } func (f *connectorFactory) NewExporterFactory() ExporterFactory { - return NewExporterFactory(f.cfgType, nil, f.exporterFactoryOptions...) + return NewExporterFactory(f.cfgType, f.CreateDefaultExporterConfig, f.exporterFactoryOptions...) } func (f *connectorFactory) NewReceiverFactory() ReceiverFactory { - return NewReceiverFactory(f.cfgType, nil, f.receiverFactoryOptions...) + return NewReceiverFactory(f.cfgType, f.createDefaultReceiverConfig, f.receiverFactoryOptions...) } + +// TODO Implement and enforce ConnectorFactoryOptions that enumerate valid signal combos. +// +// Example: nopconnector +// func AsLogsToLogsConnector() ConnectorFactoryOption +// func AsMetricsToMetricsConnector() ConnectorFactoryOption +// func AsTracesToTracesConnector() ConnectorFactoryOption +// +// Example: countconnector +// func AsLogsToMetricsConnector() ConnectorFactoryOption +// func AsMetricsToMetricsConnector() ConnectorFactoryOption +// func AsTracesToMetricsConnector() ConnectorFactoryOption diff --git a/config/connector.go b/config/connector.go index c54a5d953ec..e9c0001919b 100644 --- a/config/connector.go +++ b/config/connector.go @@ -23,7 +23,9 @@ type Connector interface { identifiable validatable - privateConfigConnector() + // Implement both to ensure: + // 1. Only connectors are defined in 'connectors' section + // 2. Connectors may be placed in receiver and exporter positions in pipelines privateConfigExporter() privateConfigReceiver() } @@ -67,8 +69,6 @@ func (es *ConnectorSettings) Validate() error { return nil } -func (es *ConnectorSettings) privateConfigConnector() {} - func (es *ConnectorSettings) privateConfigExporter() {} func (es *ConnectorSettings) privateConfigReceiver() {} diff --git a/connector/countconnector/README.md b/connector/countconnector/README.md new file mode 100644 index 00000000000..1ab48a7eafc --- /dev/null +++ b/connector/countconnector/README.md @@ -0,0 +1,14 @@ +# Count Connector + +| Status | | +| ------------------------ | ---------------------------------- | +| Stability | traces [in development] | +| | metrics [in development] | +| | logs [in development] | +| Supported pipeline types | as exporter: traces, metrics, logs | +| Supported pipeline types | as receiver: metrics | +| Distributions | | + +Counts any type of signal and emits metrics. + +[in development]:https://github.com/open-telemetry/opentelemetry-collector#in-development diff --git a/connector/countconnector/count.go b/connector/countconnector/count.go new file mode 100644 index 00000000000..6fca12f40fb --- /dev/null +++ b/connector/countconnector/count.go @@ -0,0 +1,190 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package countconnector // import "go.opentelemetry.io/collector/connector/countconnector" + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/internal/sharedcomponent" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +const ( + typeStr = "count" +) + +type Config struct { + config.ConnectorSettings `mapstructure:",squash"` +} + +var _ config.Connector = (*Config)(nil) + +// NewFactory returns a ConnectorFactory. +func NewFactory() component.ConnectorFactory { + return component.NewConnectorFactory( + typeStr, + createDefaultConfig, + []component.ExporterFactoryOption{ + component.WithTracesExporter(createTracesExporter, component.StabilityLevelInDevelopment), + component.WithMetricsExporter(createMetricsExporter, component.StabilityLevelInDevelopment), + component.WithLogsExporter(createLogExporter, component.StabilityLevelInDevelopment), + }, + []component.ReceiverFactoryOption{ + component.WithMetricsReceiver(createMetricsReceiver, component.StabilityLevelInDevelopment), + }, + ) +} + +// createDefaultConfig creates the default configuration. +func createDefaultConfig() config.Connector { + return &Config{} +} + +// createTracesExporter creates a trace receiver based on provided config. +func createTracesExporter( + _ context.Context, + set component.ExporterCreateSettings, + cfg config.Exporter, +) (component.TracesExporter, error) { + comp := connectors.GetOrAdd(cfg.ID(), func() component.Component { + return newCountConnector(cfg.(*Config), set) + }) + + conn := comp.Unwrap().(*countConnector) + return conn, nil +} + +// createMetricsExporter creates a metrics receiver based on provided config. +func createMetricsExporter( + _ context.Context, + set component.ExporterCreateSettings, + cfg config.Exporter, +) (component.MetricsExporter, error) { + comp := connectors.GetOrAdd(cfg.ID(), func() component.Component { + return newCountConnector(cfg.(*Config), set) + }) + + conn := comp.Unwrap().(*countConnector) + return conn, nil +} + +// createLogExporter creates a log receiver based on provided config. +func createLogExporter( + _ context.Context, + set component.ExporterCreateSettings, + cfg config.Exporter, +) (component.LogsExporter, error) { + comp := connectors.GetOrAdd(cfg.ID(), func() component.Component { + return newCountConnector(cfg.(*Config), set) + }) + + conn := comp.Unwrap().(*countConnector) + return conn, nil +} + +// createMetricsReceiver creates a metrics receiver based on provided config. +func createMetricsReceiver( + _ context.Context, + set component.ReceiverCreateSettings, + cfg config.Receiver, + nextConsumer consumer.Metrics, +) (component.MetricsReceiver, error) { + comp := connectors.GetOrAdd(cfg.ID(), func() component.Component { + // Expect to have already created this component as an exporter + return nil + }) + + if comp == nil { + return nil, fmt.Errorf("connector must be initialized as exporter and receiver") + } + + conn := comp.Unwrap().(*countConnector) + conn.metricsConsumer = nextConsumer + return conn, nil +} + +// This is the map of already created count connectors for particular configurations. +// We maintain this map because the Factory is asked trace, metric, and log receivers +// separately but they must not create separate objects. When the connector is shutdown +// it should be removed from this map so the same configuration can be recreated successfully. +var connectors = sharedcomponent.NewSharedComponents() + +// otlpReceiver is the type that exposes Trace and Metrics reception. +type countConnector struct { + cfg *Config + + metricsConsumer consumer.Metrics + + // Use ExporterCreateSettings because exporters are created first. + // Receiver settings should be the same anyways. + settings component.ExporterCreateSettings +} + +func newCountConnector(cfg *Config, set component.ExporterCreateSettings) *countConnector { + return &countConnector{ + cfg: cfg, + settings: set, + } +} + +func (c *countConnector) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +func (c *countConnector) Start(ctx context.Context, host component.Host) error { + return nil +} + +func (c *countConnector) Shutdown(ctx context.Context) error { + return nil +} + +func (c *countConnector) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + return c.metricsConsumer.ConsumeMetrics(ctx, newCountMetric("spans", td.SpanCount())) +} + +func (c *countConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + return c.metricsConsumer.ConsumeMetrics(ctx, newCountMetric("metrics", md.MetricCount())) +} + +func (c *countConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + return c.metricsConsumer.ConsumeMetrics(ctx, newCountMetric("logs", ld.LogRecordCount())) +} + +func newCountMetric(signalType string, count int) pmetric.Metrics { + ms := pmetric.NewMetrics() + rms := ms.ResourceMetrics().AppendEmpty() + sms := rms.ScopeMetrics().AppendEmpty() + cm := sms.Metrics().AppendEmpty() + cm.SetName(fmt.Sprintf("count.%s", signalType)) + cm.SetDescription(fmt.Sprintf("The number of %ss observed.", signalType)) + sum := cm.SetEmptySum() + sum.SetIsMonotonic(true) + sum.SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta) + dp := sum.DataPoints().AppendEmpty() + dp.Attributes().PutString("signal.type", signalType) + dp.SetIntVal(int64(count)) + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + return ms +} diff --git a/connector/countconnector/doc.go b/connector/countconnector/doc.go new file mode 100644 index 00000000000..bfbe4b53df9 --- /dev/null +++ b/connector/countconnector/doc.go @@ -0,0 +1,16 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package countconnector counts signals other pipelines. +package countconnector // import "go.opentelemetry.io/collector/connector/countconnector" diff --git a/connector/nopconnector/doc.go b/connector/nopconnector/doc.go index 81a0ae778db..4b93f3d6633 100644 --- a/connector/nopconnector/doc.go +++ b/connector/nopconnector/doc.go @@ -12,5 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package nopconnector passes singals from one pipeline to another. -package nopconnector // import "go.opentelemetry.io/collector/receiver/nopconnector" +// Package nopconnector passes signals from one pipeline to another. +package nopconnector // import "go.opentelemetry.io/collector/connector/nopconnector" diff --git a/connector/nopconnector/nop.go b/connector/nopconnector/nop.go index 5741453d2c3..70fd706545c 100644 --- a/connector/nopconnector/nop.go +++ b/connector/nopconnector/nop.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package nopconnector // import "go.opentelemetry.io/collector/receiver/nopconnector" +package nopconnector // import "go.opentelemetry.io/collector/connector/nopconnector" import ( "context"