Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add OTEL tracer to metricsstore reader component #4595

Merged
merged 13 commits into from
Jul 29, 2023
7 changes: 5 additions & 2 deletions pkg/jtracer/jtracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func New(serviceName string) (*JTracer, error) {
}

func NoOp() *JTracer {
return &JTracer{OT: opentracing.NoopTracer{}, OTEL: trace.NewNoopTracerProvider()}
return &JTracer{OT: opentracing.NoopTracer{}, OTEL: trace.NewNoopTracerProvider(), closer: nil}
}

// initOTEL initializes OTEL Tracer
Expand Down Expand Up @@ -107,5 +107,8 @@ func otelExporter(ctx context.Context) (sdktrace.SpanExporter, error) {

// Shutdown the tracerProvider to clean up resources
func (jt *JTracer) Close(ctx context.Context) error {
return jt.closer(ctx)
if jt.closer != nil {
return jt.closer(ctx)
}
return nil
}
6 changes: 5 additions & 1 deletion plugin/metrics/prometheus/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"flag"

"github.com/spf13/viper"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/plugin"
Expand All @@ -31,11 +33,13 @@ var _ plugin.Configurable = (*Factory)(nil)
type Factory struct {
options *Options
logger *zap.Logger
tracer trace.TracerProvider
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
tracer: otel.GetTracerProvider(),
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
options: NewOptions("prometheus"),
}
}
Expand All @@ -60,5 +64,5 @@ func (f *Factory) Initialize(logger *zap.Logger) error {

// CreateMetricsReader implements storage.MetricsFactory.
func (f *Factory) CreateMetricsReader() (metricsstore.Reader, error) {
return prometheusstore.NewMetricsReader(f.logger, f.options.Primary.Configuration)
return prometheusstore.NewMetricsReader(f.options.Primary.Configuration, f.logger, f.tracer)
}
33 changes: 18 additions & 15 deletions plugin/metrics/prometheus/metricsstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"time"
"unicode"

"github.com/opentracing/opentracing-go"
ottag "github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/api"
promapi "github.com/prometheus/client_golang/api/prometheus/v1"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/bearertoken"
Expand All @@ -49,6 +49,7 @@ type (
MetricsReader struct {
client promapi.API
logger *zap.Logger
tracer trace.Tracer

metricsTranslator dbmodel.Translator
latencyMetricName string
Expand All @@ -73,7 +74,7 @@ type (
)

// NewMetricsReader returns a new MetricsReader.
func NewMetricsReader(logger *zap.Logger, cfg config.Configuration) (*MetricsReader, error) {
func NewMetricsReader(cfg config.Configuration, logger *zap.Logger, tracer trace.TracerProvider) (*MetricsReader, error) {
logger.Info("Creating metrics reader", zap.Any("configuration", cfg))

roundTripper, err := getHTTPRoundTripper(&cfg, logger)
Expand All @@ -96,6 +97,7 @@ func NewMetricsReader(logger *zap.Logger, cfg config.Configuration) (*MetricsRea
mr := &MetricsReader{
client: promapi.NewAPI(client),
logger: logger,
tracer: tracer.Tracer("prom-metrics-reader"),

metricsTranslator: dbmodel.New(operationLabel),
callsMetricName: buildFullCallsMetricName(cfg),
Expand Down Expand Up @@ -224,8 +226,8 @@ func (m MetricsReader) executeQuery(ctx context.Context, p metricsQueryParams) (
}
promQuery := m.buildPromQuery(p)

span, ctx := startSpanForQuery(ctx, p.metricName, promQuery)
defer span.Finish()
ctx, span := startSpanForQuery(ctx, p.metricName, promQuery, m.tracer)
defer span.End()

queryRange := promapi.Range{
Start: p.EndTime.Add(-1 * *p.Lookback),
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -287,17 +289,18 @@ func promqlDurationString(d *time.Duration) string {
return string(b)
}

func startSpanForQuery(ctx context.Context, metricName, query string) (opentracing.Span, context.Context) {
span, ctx := opentracing.StartSpanFromContext(ctx, metricName)
ottag.DBStatement.Set(span, query)
ottag.DBType.Set(span, "prometheus")
ottag.Component.Set(span, "promql")
return span, ctx
func startSpanForQuery(ctx context.Context, metricName, query string, tp trace.Tracer) (context.Context, trace.Span) {
ctx, span := tp.Start(ctx, metricName)
span.SetAttributes(
attribute.Key(semconv.DBStatementKey).String(query),
attribute.Key(semconv.DBSystemKey).String("prometheus"),
attribute.Key("component").String("promql"),
)
return ctx, span
}

func logErrorToSpan(span opentracing.Span, err error) {
ottag.Error.Set(span, true)
span.LogFields(otlog.Error(err))
func logErrorToSpan(span trace.Span, err error) {
span.RecordError(err, trace.WithAttributes(semconv.OTelStatusCodeError))
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
}

func getHTTPRoundTripper(c *config.Configuration, logger *zap.Logger) (rt http.RoundTripper, err error) {
Expand Down
51 changes: 39 additions & 12 deletions plugin/metrics/prometheus/metricsstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/bearertoken"
Expand Down Expand Up @@ -61,22 +64,36 @@ var defaultConfig = config.Configuration{
LatencyUnit: "ms",
}

func tracerProvider(t *testing.T) (trace.TracerProvider, *tracetest.InMemoryExporter) {
exporter := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithBatcher(exporter),
)
defer tp.Shutdown(context.Background())
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
return tp, exporter
}

func TestNewMetricsReaderValidAddress(t *testing.T) {
logger := zap.NewNop()
reader, err := NewMetricsReader(logger, config.Configuration{
tracer, exp := tracerProvider(t)
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "http://localhost:1234",
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
assert.NotEmpty(t, exp.GetSpans(), "No spans recorded during the test.")
require.NoError(t, err)
assert.NotNil(t, reader)
}

func TestNewMetricsReaderInvalidAddress(t *testing.T) {
logger := zap.NewNop()
reader, err := NewMetricsReader(logger, config.Configuration{
tracer, exp := tracerProvider(t)
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "\n",
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
assert.NotEmpty(t, exp.GetSpans(), "No spans recorded during the test.")
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to initialize prometheus client")
assert.Nil(t, reader)
Expand All @@ -85,14 +102,16 @@ func TestNewMetricsReaderInvalidAddress(t *testing.T) {
func TestGetMinStepDuration(t *testing.T) {
params := metricsstore.MinStepDurationQueryParameters{}
logger := zap.NewNop()
tracer, exp := tracerProvider(t)
listener, err := net.Listen("tcp", "localhost:")
require.NoError(t, err)
assert.NotNil(t, listener)

reader, err := NewMetricsReader(logger, config.Configuration{
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "http://" + listener.Addr().String(),
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
assert.NotEmpty(t, exp.GetSpans(), "No spans recorded during the test.")
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)

minStep, err := reader.GetMinStepDuration(context.Background(), &params)
Expand Down Expand Up @@ -121,11 +140,13 @@ func TestMetricsServerError(t *testing.T) {
defer mockPrometheus.Close()

logger := zap.NewNop()
tracer, exp := tracerProvider(t)
address := mockPrometheus.Listener.Addr().String()
reader, err := NewMetricsReader(logger, config.Configuration{
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "http://" + address,
ConnectTimeout: defaultTimeout,
})
}, logger, tracer)
assert.NotEmpty(t, exp.GetSpans(), "No spans recorded during the test.")
require.NoError(t, err)

m, err := reader.GetCallRates(context.Background(), &params)
Expand Down Expand Up @@ -460,12 +481,14 @@ func TestInvalidLatencyUnit(t *testing.T) {
t.Errorf("Expected a panic due to invalid latency unit")
}
}()
tracer, exp := tracerProvider(t)
cfg := config.Configuration{
SupportSpanmetricsConnector: true,
NormalizeDuration: true,
LatencyUnit: "something invalid",
}
_, _ = NewMetricsReader(zap.NewNop(), cfg)
assert.NotEmpty(t, exp.GetSpans(), "No spans recorded during the test.")
_, _ = NewMetricsReader(cfg, zap.NewNop(), tracer)
}

func TestWarningResponse(t *testing.T) {
Expand Down Expand Up @@ -571,14 +594,16 @@ func TestGetRoundTripperTokenError(t *testing.T) {

func TestInvalidCertFile(t *testing.T) {
logger := zap.NewNop()
reader, err := NewMetricsReader(logger, config.Configuration{
tracer, exp := tracerProvider(t)
reader, err := NewMetricsReader(config.Configuration{
ServerURL: "https://localhost:1234",
ConnectTimeout: defaultTimeout,
TLS: tlscfg.Options{
Enabled: true,
CAPath: "foo",
},
})
}, logger, tracer)
assert.NotEmpty(t, exp.GetSpans(), "No spans recorded during the test.")
require.Error(t, err)
assert.Nil(t, reader)
}
Expand Down Expand Up @@ -639,12 +664,14 @@ func prepareMetricsReaderAndServer(t *testing.T, config config.Configuration, wa
mockPrometheus := startMockPrometheusServer(t, wantPromQlQuery, wantWarnings)

logger := zap.NewNop()
tracer, exp := tracerProvider(t)
address := mockPrometheus.Listener.Addr().String()

config.ServerURL = "http://" + address
config.ConnectTimeout = defaultTimeout

reader, err := NewMetricsReader(logger, config)
assert.NotEmpty(t, exp.GetSpans(), "No spans recorded during the test.")
reader, err := NewMetricsReader(config, logger, tracer)
require.NoError(t, err)

return reader, mockPrometheus
Expand Down