Skip to content

Commit

Permalink
Add 'count' connector, which demonstrates basic cross signal function…
Browse files Browse the repository at this point in the history
…ality
  • Loading branch information
djaglowski committed Sep 23, 2022
1 parent c52086d commit d56dffe
Show file tree
Hide file tree
Showing 10 changed files with 254 additions and 23 deletions.
2 changes: 2 additions & 0 deletions cmd/builder/internal/config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@ processors:
connectors:
- import: go.opentelemetry.io/collector/connector/nopconnector
gomod: go.opentelemetry.io/collector v0.60.0
- import: go.opentelemetry.io/collector/connector/countconnector
gomod: go.opentelemetry.io/collector v0.60.0
2 changes: 2 additions & 0 deletions cmd/otelcorecol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ processors:
connectors:
- import: go.opentelemetry.io/collector/connector/nopconnector
gomod: go.opentelemetry.io/collector v0.60.0
- import: go.opentelemetry.io/collector/connector/countconnector
gomod: go.opentelemetry.io/collector v0.60.0

replaces:
- go.opentelemetry.io/collector => ../../
Expand Down
4 changes: 4 additions & 0 deletions cmd/otelcorecol/components.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 20 additions & 17 deletions component/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,6 @@ import (
"go.opentelemetry.io/collector/config"
)

// Connector exports telemetry data from one pipeline to another.
// type Connector interface {
// Component
// Exporter
// Receiver
// }

// // ConnectorCreateSettings configures Connector creators.
// type ConnectorCreateSettings struct {
// TelemetrySettings

// // BuildInfo can be used by components for informational purposes
// BuildInfo BuildInfo
// }

// ConnectorFactory is factory interface for connectors.
//
// This interface cannot be directly implemented. Implementations must
Expand Down Expand Up @@ -60,6 +45,12 @@ type ConnectorCreateDefaultConfigFunc func() config.Connector
func (f ConnectorCreateDefaultConfigFunc) CreateDefaultConfig() config.Connector {
return f()
}
func (f ConnectorCreateDefaultConfigFunc) createDefaultReceiverConfig() config.Receiver {
return f()
}
func (f ConnectorCreateDefaultConfigFunc) CreateDefaultExporterConfig() config.Exporter {
return f()
}

type connectorFactory struct {
baseFactory
Expand All @@ -84,9 +75,21 @@ func NewConnectorFactory(
}

func (f *connectorFactory) NewExporterFactory() ExporterFactory {
return NewExporterFactory(f.cfgType, nil, f.exporterFactoryOptions...)
return NewExporterFactory(f.cfgType, f.CreateDefaultExporterConfig, f.exporterFactoryOptions...)
}

func (f *connectorFactory) NewReceiverFactory() ReceiverFactory {
return NewReceiverFactory(f.cfgType, nil, f.receiverFactoryOptions...)
return NewReceiverFactory(f.cfgType, f.createDefaultReceiverConfig, f.receiverFactoryOptions...)
}

// TODO Implement and enforce ConnectorFactoryOptions that enumerate valid signal combos.
//
// Example: nopconnector
// func AsLogsToLogsConnector() ConnectorFactoryOption
// func AsMetricsToMetricsConnector() ConnectorFactoryOption
// func AsTracesToTracesConnector() ConnectorFactoryOption
//
// Example: countconnector
// func AsLogsToMetricsConnector() ConnectorFactoryOption
// func AsMetricsToMetricsConnector() ConnectorFactoryOption
// func AsTracesToMetricsConnector() ConnectorFactoryOption
6 changes: 3 additions & 3 deletions config/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ type Connector interface {
identifiable
validatable

privateConfigConnector()
// Implement both to ensure:
// 1. Only connectors are defined in 'connectors' section
// 2. Connectors may be placed in receiver and exporter positions in pipelines
privateConfigExporter()
privateConfigReceiver()
}
Expand Down Expand Up @@ -67,8 +69,6 @@ func (es *ConnectorSettings) Validate() error {
return nil
}

func (es *ConnectorSettings) privateConfigConnector() {}

func (es *ConnectorSettings) privateConfigExporter() {}

func (es *ConnectorSettings) privateConfigReceiver() {}
14 changes: 14 additions & 0 deletions connector/countconnector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Count Connector

| Status | |
| ------------------------ | ---------------------------------- |
| Stability | traces [in development] |
| | metrics [in development] |
| | logs [in development] |
| Supported pipeline types | as exporter: traces, metrics, logs |
| Supported pipeline types | as receiver: metrics |
| Distributions | |

Counts any type of signal and emits metrics.

[in development]:https://github.com/open-telemetry/opentelemetry-collector#in-development
190 changes: 190 additions & 0 deletions connector/countconnector/count.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package countconnector // import "go.opentelemetry.io/collector/connector/countconnector"

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/sharedcomponent"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

const (
typeStr = "count"
)

type Config struct {
config.ConnectorSettings `mapstructure:",squash"`
}

var _ config.Connector = (*Config)(nil)

// NewFactory returns a ConnectorFactory.
func NewFactory() component.ConnectorFactory {
return component.NewConnectorFactory(
typeStr,
createDefaultConfig,
[]component.ExporterFactoryOption{
component.WithTracesExporter(createTracesExporter, component.StabilityLevelInDevelopment),
component.WithMetricsExporter(createMetricsExporter, component.StabilityLevelInDevelopment),
component.WithLogsExporter(createLogExporter, component.StabilityLevelInDevelopment),
},
[]component.ReceiverFactoryOption{
component.WithMetricsReceiver(createMetricsReceiver, component.StabilityLevelInDevelopment),
},
)
}

// createDefaultConfig creates the default configuration.
func createDefaultConfig() config.Connector {
return &Config{}
}

// createTracesExporter creates a trace receiver based on provided config.
func createTracesExporter(
_ context.Context,
set component.ExporterCreateSettings,
cfg config.Exporter,
) (component.TracesExporter, error) {
comp := connectors.GetOrAdd(cfg.ID(), func() component.Component {
return newCountConnector(cfg.(*Config), set)
})

conn := comp.Unwrap().(*countConnector)
return conn, nil
}

// createMetricsExporter creates a metrics receiver based on provided config.
func createMetricsExporter(
_ context.Context,
set component.ExporterCreateSettings,
cfg config.Exporter,
) (component.MetricsExporter, error) {
comp := connectors.GetOrAdd(cfg.ID(), func() component.Component {
return newCountConnector(cfg.(*Config), set)
})

conn := comp.Unwrap().(*countConnector)
return conn, nil
}

// createLogExporter creates a log receiver based on provided config.
func createLogExporter(
_ context.Context,
set component.ExporterCreateSettings,
cfg config.Exporter,
) (component.LogsExporter, error) {
comp := connectors.GetOrAdd(cfg.ID(), func() component.Component {
return newCountConnector(cfg.(*Config), set)
})

conn := comp.Unwrap().(*countConnector)
return conn, nil
}

// createMetricsReceiver creates a metrics receiver based on provided config.
func createMetricsReceiver(
_ context.Context,
set component.ReceiverCreateSettings,
cfg config.Receiver,
nextConsumer consumer.Metrics,
) (component.MetricsReceiver, error) {
comp := connectors.GetOrAdd(cfg.ID(), func() component.Component {
// Expect to have already created this component as an exporter
return nil
})

if comp == nil {
return nil, fmt.Errorf("connector must be initialized as exporter and receiver")
}

conn := comp.Unwrap().(*countConnector)
conn.metricsConsumer = nextConsumer
return conn, nil
}

// This is the map of already created count connectors for particular configurations.
// We maintain this map because the Factory is asked trace, metric, and log receivers
// separately but they must not create separate objects. When the connector is shutdown
// it should be removed from this map so the same configuration can be recreated successfully.
var connectors = sharedcomponent.NewSharedComponents()

// otlpReceiver is the type that exposes Trace and Metrics reception.
type countConnector struct {
cfg *Config

metricsConsumer consumer.Metrics

// Use ExporterCreateSettings because exporters are created first.
// Receiver settings should be the same anyways.
settings component.ExporterCreateSettings
}

func newCountConnector(cfg *Config, set component.ExporterCreateSettings) *countConnector {
return &countConnector{
cfg: cfg,
settings: set,
}
}

func (c *countConnector) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (c *countConnector) Start(ctx context.Context, host component.Host) error {
return nil
}

func (c *countConnector) Shutdown(ctx context.Context) error {
return nil
}

func (c *countConnector) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
return c.metricsConsumer.ConsumeMetrics(ctx, newCountMetric("spans", td.SpanCount()))
}

func (c *countConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
return c.metricsConsumer.ConsumeMetrics(ctx, newCountMetric("metrics", md.MetricCount()))
}

func (c *countConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
return c.metricsConsumer.ConsumeMetrics(ctx, newCountMetric("logs", ld.LogRecordCount()))
}

func newCountMetric(signalType string, count int) pmetric.Metrics {
ms := pmetric.NewMetrics()
rms := ms.ResourceMetrics().AppendEmpty()
sms := rms.ScopeMetrics().AppendEmpty()
cm := sms.Metrics().AppendEmpty()
cm.SetName(fmt.Sprintf("count.%s", signalType))
cm.SetDescription(fmt.Sprintf("The number of %ss observed.", signalType))
sum := cm.SetEmptySum()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta)
dp := sum.DataPoints().AppendEmpty()
dp.Attributes().PutString("signal.type", signalType)
dp.SetIntVal(int64(count))
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
return ms
}
16 changes: 16 additions & 0 deletions connector/countconnector/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package countconnector counts signals other pipelines.
package countconnector // import "go.opentelemetry.io/collector/connector/countconnector"
4 changes: 2 additions & 2 deletions connector/nopconnector/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package nopconnector passes singals from one pipeline to another.
package nopconnector // import "go.opentelemetry.io/collector/receiver/nopconnector"
// Package nopconnector passes signals from one pipeline to another.
package nopconnector // import "go.opentelemetry.io/collector/connector/nopconnector"
2 changes: 1 addition & 1 deletion connector/nopconnector/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package nopconnector // import "go.opentelemetry.io/collector/receiver/nopconnector"
package nopconnector // import "go.opentelemetry.io/collector/connector/nopconnector"

import (
"context"
Expand Down

0 comments on commit d56dffe

Please sign in to comment.