Skip to content

Commit

Permalink
Use the same prometheus registry for both oc and otel exporters (#6297)
Browse files Browse the repository at this point in the history
* share prometheus registry between oc and otel exporters

* add tests

* wording

* more wording

* add chloggen entry

* assert return of view.Register

* change tests to be unit tests

* formats and adds todo

* run gofmt

* only starts http server if there is not error, go fmt again

* reverts otel-config changes

* fix opencensus flaky tests

* use `view.RetrieveData()` to force collection

* Update service/telemetry_test.go

Co-authored-by: Bogdan Drutu <lazy@splunk.com>

Co-authored-by: Bogdan Drutu <lazy@splunk.com>
  • Loading branch information
paivagustavo and Bogdan Drutu authored Oct 14, 2022
1 parent c0e0900 commit f4556f5
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 29 deletions.
18 changes: 18 additions & 0 deletions .chloggen/use-prometheus-registry-as-bridge-for-otel-and-oc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Use the same `prometheus.Registry` for the OpenCensus and OpenTelemetry Go prometheus exporters to act as a bridge for internal telemetry"


# One or more tracking issues or pull requests related to the change
issues: [6297]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

66 changes: 37 additions & 29 deletions service/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
ocprom "contrib.go.opencensus.io/exporter/prometheus"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
ocmetric "go.opencensus.io/metric"
"go.opencensus.io/metric/metricproducer"
"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -84,22 +83,30 @@ func (tel *telemetryInitializer) init(buildInfo component.BuildInfo, logger *zap
var err error
tel.doInitOnce.Do(
func() {
err = tel.initOnce(buildInfo, logger, cfg, asyncErrorChannel)
if cfg.Metrics.Level == configtelemetry.LevelNone || cfg.Metrics.Address == "" {
logger.Info(
"Skipping telemetry setup.",
zap.String(zapKeyTelemetryAddress, cfg.Metrics.Address),
zap.String(zapKeyTelemetryLevel, cfg.Metrics.Level.String()),
)
return
}

err = tel.initOnce(buildInfo, logger, cfg)
if err == nil {
go func() {
if serveErr := tel.server.ListenAndServe(); serveErr != nil && !errors.Is(serveErr, http.ErrServerClosed) {
asyncErrorChannel <- serveErr
}
}()
}

},
)
return err
}

func (tel *telemetryInitializer) initOnce(buildInfo component.BuildInfo, logger *zap.Logger, cfg telemetry.Config, asyncErrorChannel chan error) error {
if cfg.Metrics.Level == configtelemetry.LevelNone || cfg.Metrics.Address == "" {
logger.Info(
"Skipping telemetry setup.",
zap.String(zapKeyTelemetryAddress, cfg.Metrics.Address),
zap.String(zapKeyTelemetryLevel, cfg.Metrics.Level.String()),
)
return nil
}

func (tel *telemetryInitializer) initOnce(buildInfo component.BuildInfo, logger *zap.Logger, cfg telemetry.Config) error {
logger.Info("Setting up own telemetry...")

// Construct telemetry attributes from build info and config's resource attributes.
Expand All @@ -113,11 +120,19 @@ func (tel *telemetryInitializer) initOnce(buildInfo component.BuildInfo, logger

var pe http.Handler
var err error
// This prometheus registry is shared between OpenCensus and OpenTelemetry exporters,
// acting as a bridge between OC and Otel.
// This is used as a path to migrate the existing OpenCensus instrumentation
// to the OpenTelemetry Go SDK without breaking existing metrics.
promRegistry := prometheus.NewRegistry()
if tel.registry.IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID) {
pe, err = tel.initOpenTelemetry(telAttrs)
} else {
pe, err = tel.initOpenCensus(cfg, telAttrs)
err = tel.initOpenTelemetry(telAttrs, promRegistry)
if err != nil {
return err
}
}

pe, err = tel.initOpenCensus(cfg, telAttrs, promRegistry)
if err != nil {
return err
}
Expand All @@ -136,12 +151,6 @@ func (tel *telemetryInitializer) initOnce(buildInfo component.BuildInfo, logger
Handler: mux,
}

go func() {
if serveErr := tel.server.ListenAndServe(); serveErr != nil && !errors.Is(serveErr, http.ErrServerClosed) {
asyncErrorChannel <- serveErr
}
}()

return nil
}

Expand Down Expand Up @@ -176,7 +185,7 @@ func buildTelAttrs(buildInfo component.BuildInfo, cfg telemetry.Config) map[stri
return telAttrs
}

func (tel *telemetryInitializer) initOpenCensus(cfg telemetry.Config, telAttrs map[string]string) (http.Handler, error) {
func (tel *telemetryInitializer) initOpenCensus(cfg telemetry.Config, telAttrs map[string]string, promRegistry *prometheus.Registry) (http.Handler, error) {
tel.ocRegistry = ocmetric.NewRegistry()
metricproducer.GlobalManager().AddProducer(tel.ocRegistry)

Expand All @@ -193,6 +202,7 @@ func (tel *telemetryInitializer) initOpenCensus(cfg telemetry.Config, telAttrs m
// Until we can use a generic metrics exporter, default to Prometheus.
opts := ocprom.Options{
Namespace: "otelcol",
Registry: promRegistry,
}

opts.ConstLabels = make(map[string]string)
Expand All @@ -210,7 +220,7 @@ func (tel *telemetryInitializer) initOpenCensus(cfg telemetry.Config, telAttrs m
return pe, nil
}

func (tel *telemetryInitializer) initOpenTelemetry(attrs map[string]string) (http.Handler, error) {
func (tel *telemetryInitializer) initOpenTelemetry(attrs map[string]string, promRegistry prometheus.Registerer) error {
// Initialize the ocRegistry, still used by the process metrics.
tel.ocRegistry = ocmetric.NewRegistry()

Expand All @@ -221,7 +231,7 @@ func (tel *telemetryInitializer) initOpenTelemetry(attrs map[string]string) (htt

res, err := resource.New(context.Background(), resource.WithAttributes(resAttrs...))
if err != nil {
return nil, fmt.Errorf("error creating otel resources: %w", err)
return fmt.Errorf("error creating otel resources: %w", err)
}

exporter := otelprom.New()
Expand All @@ -230,14 +240,12 @@ func (tel *telemetryInitializer) initOpenTelemetry(attrs map[string]string) (htt
sdkmetric.WithReader(exporter),
)

registry := prometheus.NewRegistry()

wrappedRegisterer := prometheus.WrapRegistererWithPrefix("otelcol_", registry)
wrappedRegisterer := prometheus.WrapRegistererWithPrefix("otelcol_", promRegistry)
if err := wrappedRegisterer.Register(exporter.Collector); err != nil {
return nil, fmt.Errorf("failed to register prometheus collector: %w", err)
return fmt.Errorf("failed to register prometheus collector: %w", err)
}

return promhttp.HandlerFor(registry, promhttp.HandlerOpts{}), nil
return nil
}

func (tel *telemetryInitializer) shutdown() error {
Expand Down
150 changes: 150 additions & 0 deletions service/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,34 @@
package service

import (
"context"
"net/http"
"net/http/httptest"
"testing"

io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/obsreportconfig"
semconv "go.opentelemetry.io/collector/semconv/v1.5.0"
"go.opentelemetry.io/collector/service/telemetry"
)

const (
metricPrefix = "otelcol_"
otelPrefix = "otel_sdk_"
ocPrefix = "oc_sdk_"
counterName = "test_counter"
)

func TestBuildTelAttrs(t *testing.T) {
buildInfo := component.NewDefaultBuildInfo()

Expand Down Expand Up @@ -67,3 +86,134 @@ func TestBuildTelAttrs(t *testing.T) {
assert.Equal(t, "b", telAttrs[semconv.AttributeServiceVersion])
assert.Equal(t, "c", telAttrs[semconv.AttributeServiceInstanceID])
}

func TestTelemetryInit(t *testing.T) {
type metricValue struct {
value float64
labels map[string]string
}

for _, tc := range []struct {
name string
useOtel bool
expectedMetrics map[string]metricValue
}{
{
name: "UseOpenCensusForInternalMetrics",
useOtel: false,
expectedMetrics: map[string]metricValue{
metricPrefix + ocPrefix + counterName: {
value: 13,
labels: map[string]string{
"service_name": "otelcol",
"service_version": "latest",
"service_instance_id": testInstanceID,
},
},
},
},
{
name: "UseOpenTelemetryForInternalMetrics",
useOtel: true,
// TODO: add a test to verify that OTel is emitting a `target_info`
// info metric with resource attributes https://github.com/open-telemetry/opentelemetry-go/issues/3166
expectedMetrics: map[string]metricValue{
metricPrefix + ocPrefix + counterName: {
value: 13,
labels: map[string]string{
"service_name": "otelcol",
"service_version": "latest",
"service_instance_id": testInstanceID,
},
},
metricPrefix + otelPrefix + counterName: {
value: 13,
labels: map[string]string{},
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
registry := featuregate.NewRegistry()
obsreportconfig.RegisterInternalMetricFeatureGate(registry)
require.NoError(t, registry.Apply(map[string]bool{obsreportconfig.UseOtelForInternalMetricsfeatureGateID: tc.useOtel}))

tel := newColTelemetry(registry)
buildInfo := component.NewDefaultBuildInfo()
cfg := telemetry.Config{
Resource: map[string]*string{
semconv.AttributeServiceInstanceID: &testInstanceID,
},
}

err := tel.initOnce(buildInfo, zap.NewNop(), cfg)
require.NoError(t, err)
defer func() {
require.NoError(t, tel.shutdown())
}()

v := createTestMetrics(t, tel.mp)
defer func() {
view.Unregister(v)
}()

metrics := getMetricsFromPrometheus(t, tel.server.Handler)
require.Equal(t, len(tc.expectedMetrics), len(metrics))

for metricName, metricValue := range tc.expectedMetrics {
mf, present := metrics[metricName]
require.True(t, present, "expected metric %q was not present", metricName)
require.Len(t, mf.Metric, 1, "only one measure should exist for metric %q", metricName)

labels := make(map[string]string)
for _, pair := range mf.Metric[0].Label {
labels[pair.GetName()] = pair.GetValue()
}

require.Equal(t, metricValue.labels, labels, "labels for metric %q was different than expected", metricName)
require.Equal(t, metricValue.value, mf.Metric[0].Counter.GetValue(), "value for metric %q was different than expected", metricName)
}
})

}
}

func createTestMetrics(t *testing.T, mp metric.MeterProvider) *view.View {
// Creates a OTel Go counter
counter, err := mp.Meter("collector_test").SyncInt64().Counter(otelPrefix + counterName)
require.NoError(t, err)
counter.Add(context.Background(), 13)

// Creates a OpenCensus measure
ocCounter := stats.Int64(ocPrefix+counterName, counterName, stats.UnitDimensionless)
v := &view.View{
Name: ocPrefix + counterName,
Description: ocCounter.Description(),
Measure: ocCounter,
Aggregation: view.Sum(),
}
err = view.Register(v)
require.NoError(t, err)

stats.Record(context.Background(), stats.Int64(ocPrefix+counterName, counterName, stats.UnitDimensionless).M(13))

// Forces a flush for the view data.
_, _ = view.RetrieveData(ocPrefix + counterName)

return v
}

func getMetricsFromPrometheus(t *testing.T, handler http.Handler) map[string]*io_prometheus_client.MetricFamily {
req, err := http.NewRequest("GET", "/metrics", nil)
require.NoError(t, err)

rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)

var parser expfmt.TextParser
parsed, err := parser.TextToMetricFamilies(rr.Body)
require.NoError(t, err)

return parsed

}

0 comments on commit f4556f5

Please sign in to comment.