Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INFOPLAT-1575 Implement Prometheus to OTel metrics Forwarder #1013

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
59db144
[promotel] Implement Prometheus to OpenTelemetry exporter
pkcll Jan 28, 2025
622fa61
Use github.com/pkcll/prometheus
pkcll Jan 28, 2025
5577fbf
Bump github.com/pkcll/opentelemetry-collector-contrib/receiver/promet…
pkcll Jan 29, 2025
61b91d1
Delete promotel/cmd
pkcll Feb 6, 2025
5ebc25b
Refactor test
pkcll Feb 6, 2025
90108d9
Use logger.Logger
pkcll Feb 6, 2025
592222e
Bump prometheusreceiver
pkcll Feb 6, 2025
7ea779b
Add exporter to the test
pkcll Feb 6, 2025
d182a28
Add Export method to MetricExporter
pkcll Feb 7, 2025
797b9e3
Pass Logger to metric receiver settings
pkcll Feb 8, 2025
f6366e8
Move promotel config
pkcll Feb 10, 2025
b405431
Refactor promotel config
pkcll Feb 10, 2025
8efb59f
Move receiver, exporter to internal
pkcll Feb 12, 2025
a16458e
Refactor TestMetricReceiver
pkcll Feb 12, 2025
0e6f395
Rename test file
pkcll Feb 12, 2025
ea40931
Refactor TestExample
pkcll Feb 12, 2025
014e9d9
Be able to set receiver interval
pkcll Feb 12, 2025
389e476
Add promotel Forwarder
pkcll Feb 12, 2025
28b3995
Use services.StopChan in Forwarder
pkcll Feb 12, 2025
e681b75
Update readme
pkcll Feb 12, 2025
bd86e0e
Make test metric name unique
pkcll Feb 12, 2025
37b9f4c
Fix lint errors
pkcll Feb 12, 2025
746eeba
Add go checks to Makefile
pkcll Feb 12, 2025
81156d6
Update endpoint in TestExample
pkcll Feb 12, 2025
d439eba
bump prometheusreceiver
pkcll Feb 13, 2025
21d670f
Add test for DefaultForwarderOptions
pkcll Feb 13, 2025
98139c0
Remove comment
pkcll Feb 13, 2025
99919ac
Remove unused module
pkcll Feb 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 198 additions & 48 deletions go.mod
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did we end up more than doubling (+150) our dependencies? Are these all from otel?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The k8s deps in particular are concerning. We have had enough trouble with those just in our test modules. I don't think we can allow it to infect everything like this.

Large diffs are not rendered by default.

1,003 changes: 880 additions & 123 deletions go.sum

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion pkg/logger/logger.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package logger

import (
"io"
"fmt"
"io"
"reflect"
"testing"

Expand Down Expand Up @@ -263,3 +263,11 @@ func Criticalw(l Logger, msg string, keysAndValues ...interface{}) {
s := &sugared{Logger: l, h: Helper(l, 2)}
s.Criticalw(msg, keysAndValues...)
}

// Convert Logger to a zap.SugaredLogger for use in the opentelemetry collector component settings.
func (l *logger) ToZapLogger(lggr Logger) *zap.Logger {
if l == nil || l.SugaredLogger == nil {
return nil
}
return l.SugaredLogger.Desugar()
}
27 changes: 27 additions & 0 deletions pkg/promotel/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
help: ## Print this help text
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-40s\033[0m %s\n", $$1, $$2}'

.PHONY: \
tidy \
fmt \
lint \
checks \
test

tidy: fmt ## run go mod tidy
go mod tidy

fmt: ## run go fmt
go fmt ./...

lint: ## run golangci-lint
golangci-lint run ./... --config=../../.golangci.yml

checks: ## run go checks
go mod verify
go vet ./...
go run honnef.co/go/tools/cmd/staticcheck@latest -checks=all,-ST1000,-U1000,-ST1003 ./...
go run golang.org/x/vuln/cmd/govulncheck@latest ./...

test: ## run unit tests
go test -v ./...
32 changes: 32 additions & 0 deletions pkg/promotel/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Package Overview
The package provides components for performing Prometheus to OTel metrics conversion.

Main components: MetricsReceiver, MetricsExporter

## Receiver
- Wraps [prometheusreceiver](github.com/pkcll/opentelemetry-collector-contrib/receiver/prometheusreceiver)
- Fetches prometheus metrics data via `prometheus.Gatherer` (same process memory, no HTTP calls)
- Uses custom implementation of `prometheus.scraper` (from here https://github.com/pkcll/prometheus/pull/1) to shortcut HTTP request calls and fetch data from `prometheus.Gatherer`
- Converts Prometheus metrics into OTel format using [prometheusreceiver](github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver)
- Passes OTel metrics data to downstream OTel [otlpexporter](go.opentelemetry.io/collector/exporter/otlpexporter)

## Exporter
- Wraps [otlpexporter](go.opentelemetry.io/collector/exporter/otlpexporter)
- Receives metric data from the receiver
- Export OTel metrics data to otel collector endpoint via [otlpexporter](go.opentelemetry.io/collector/exporter/otlpexporter)



### Usage

```go
...
forwarder, err := promotel.NewForwarder(g, r, lggr, promotel.ForwarderOptions{
Endpoint: srv.URL,
TLSInsecure: true,
Interval: interval,
})
err = forwarder.Start(ctx)
defer forwarder.Close()
...
```
195 changes: 195 additions & 0 deletions pkg/promotel/forwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package promotel

import (
"context"
"fmt"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/promotel/internal"
"github.com/smartcontractkit/chainlink-common/pkg/services"
)

const (
heartbeatMetricName = "promotel_heartbeat"
hearbeatInterval = 15 * time.Second
scopeName = "PromOTELForwarder"
)

type ForwarderOptions struct {
Endpoint string
TLSInsecure bool
AuthHeaders map[string]string
Interval time.Duration
Verbose bool
}

type Forwarder struct {
lggr logger.Logger
heartbeat prometheus.Counter
exporter internal.MetricExporter
receiver internal.MetricReceiver
closeOnce sync.Once
startOnce sync.Once
stopCh services.StopChan
}

func NewForwarder(g prometheus.Gatherer, r prometheus.Registerer, lggr logger.Logger, opts ForwarderOptions) (*Forwarder, error) {
exporter, err := newMetricExporter(opts, lggr)
if err != nil {
return nil, err
}
receiver, err := newMetricReceiver(g, r, opts.Interval, lggr, func(ctx context.Context, md pmetric.Metrics) error {
if opts.Verbose {
logOtelMetric(md, lggr)
}
return exporter.Export(ctx, md)
})
if err != nil {
return nil, err
}
return &Forwarder{
lggr: logger.Named(lggr, scopeName),
heartbeat: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: heartbeatMetricName,
ConstLabels: prometheus.Labels{
"source": scopeName,
},
}),
exporter: exporter,
receiver: receiver,
stopCh: make(chan struct{}),
}, nil
}

func (f *Forwarder) Start(ctx context.Context) error {
f.startOnce.Do(func() {
go f.run(ctx)
})
return nil
}

func (f *Forwarder) run(ctx context.Context) {
newCtx, _ := f.stopCh.Ctx(ctx)
go f.reportHeartbeatMetric(newCtx)
go f.startMetricExporter(newCtx)
go f.startMetricReceiver(newCtx)
<-newCtx.Done()
}

func (f *Forwarder) startMetricReceiver(ctx context.Context) {
f.lggr.Debug("Starting promotel metric receiver")
if err := f.receiver.Start(ctx); err != nil {
f.lggr.Errorw("Failed to start promotel metric receiver, closing forwarder", "error", err)
f.Close()
}
select {
case <-ctx.Done():
f.lggr.Debug("Context done, closing receiver")
case <-f.stopCh:
f.lggr.Debug("Stop channel closed, closing receiver")
}
if err := f.receiver.Close(); err != nil {
f.lggr.Errorw("Failed to close receiver", "error", err)
}
}

func (f *Forwarder) startMetricExporter(ctx context.Context) {
f.lggr.Debug("Starting promotel metric exporter")
if err := f.exporter.Start(ctx); err != nil {
f.lggr.Error("Failed to start exporter, closing forwarder", err)
f.Close()
return
}
select {
case <-ctx.Done():
f.lggr.Debug("Context done, closing exporter")
case <-f.stopCh:
f.lggr.Debug("Stop channel closed, closing exporter")
}
if err := f.exporter.Close(); err != nil {
f.lggr.Errorw("Failed to close exporter", "error", err)
}
}

func (f *Forwarder) reportHeartbeatMetric(ctx context.Context) {
ticker := time.NewTicker(hearbeatInterval)
defer ticker.Stop()
for {
f.heartbeat.Inc()
f.lggr.Debug("Heartbeat promotel")
select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}

func (f *Forwarder) Close() error {
f.closeOnce.Do(func() {
close(f.stopCh)
})
return nil
}

func DefaultForwarderOptions() ForwarderOptions {
return ForwarderOptions{
Endpoint: "localhost:4317",
TLSInsecure: true,
AuthHeaders: nil,
Verbose: false,
Interval: 15 * time.Second,
}
}

// type ExporterConfig configgrpc.ClientConfig
func newExporterConfig(opts ForwarderOptions) (*internal.ExporterConfig, error) {
return internal.NewMetricExporterConfig(opts.Endpoint, opts.TLSInsecure, opts.AuthHeaders)
}

func newMetricExporter(opts ForwarderOptions, lggr logger.Logger) (internal.MetricExporter, error) {
expConfig, err := newExporterConfig(opts)
if err != nil {
return nil, fmt.Errorf("failed to create exporter config %w", err)
}
// Sends metrics data in OTLP format to otel-collector endpoint
exporter, err := internal.NewMetricExporter(expConfig, lggr)
if err != nil {
return nil, fmt.Errorf("failed to create metric exporter %w", err)
}
return exporter, nil
}

func newMetricReceiver(g prometheus.Gatherer, r prometheus.Registerer, interval time.Duration, lggr logger.Logger, next internal.NextFunc) (internal.MetricReceiver, error) {
receiverConfig, err := internal.NewReceiverConfig()
if err != nil {
return nil, fmt.Errorf("failed to create config %w", err)
}
receiver, err := internal.NewMetricReceiver(receiverConfig, g, r, interval, lggr, next)
if err != nil {
return nil, fmt.Errorf("failed to create debug metric receiver %w", err)
}
return receiver, nil
}

func logOtelMetric(md pmetric.Metrics, lggr logger.Logger) {
rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
ilms := rm.ScopeMetrics()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
metrics := ilm.Metrics()
for k := 0; k < metrics.Len(); k++ {
metric := metrics.At(k)
lggr.Debugw("Exporting OTel metric ", "name", metric.Name())
}
}
}
}
78 changes: 78 additions & 0 deletions pkg/promotel/forwarder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package promotel_test

import (
"context"
"strings"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/promotel"
internal "github.com/smartcontractkit/chainlink-common/pkg/promotel/internal"
)

func TestExample(t *testing.T) {
var (
g = prometheus.DefaultGatherer
r = prometheus.DefaultRegisterer
lggr, observed = logger.TestObserved(t, zap.DebugLevel)
testMetricName = t.Name() + "_test_counter_metric"
interval = 10 * time.Millisecond
)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

go internal.ReportTestMetrics(ctx, r, testMetricName)

doneCh := make(chan struct{})
go func() {
for {
select {
case <-ctx.Done():
return
default:
for _, l := range observed.All() {
metricName, ok := l.ContextMap()["name"].(string)
if ok && strings.Contains(metricName, testMetricName) {
doneCh <- struct{}{}
}
}
time.Sleep(1 * time.Second)
}
}
}()

forwarder, err := promotel.NewForwarder(g, r, lggr, promotel.ForwarderOptions{
Endpoint: "localhost:4317",
TLSInsecure: true,
Interval: interval,
Verbose: true,
})
require.NoError(t, err)

require.NoError(t, forwarder.Start(ctx))

defer forwarder.Close()

select {
case <-ctx.Done():
t.Fatal("Test timed out. Expected metric not found")
case <-doneCh:
t.Log("Found metric.")
}
}

func TestDefaultForwarderOptions(t *testing.T) {
opts := promotel.DefaultForwarderOptions()
assert.Equal(t, "localhost:4317", opts.Endpoint)
assert.True(t, opts.TLSInsecure)
assert.Nil(t, opts.AuthHeaders)
assert.False(t, opts.Verbose)
assert.Equal(t, 15*time.Second, opts.Interval)

}
Loading
Loading