-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
INFOPLAT-1575 Implement Prometheus to OTel metrics Forwarder #1013
Open
pkcll
wants to merge
28
commits into
main
Choose a base branch
from
INFOPLAT-1575/promotel-forked
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
28 commits
Select commit
Hold shift + click to select a range
59db144
[promotel] Implement Prometheus to OpenTelemetry exporter
pkcll 622fa61
Use github.com/pkcll/prometheus
pkcll 5577fbf
Bump github.com/pkcll/opentelemetry-collector-contrib/receiver/promet…
pkcll 61b91d1
Delete promotel/cmd
pkcll 5ebc25b
Refactor test
pkcll 90108d9
Use logger.Logger
pkcll 592222e
Bump prometheusreceiver
pkcll 7ea779b
Add exporter to the test
pkcll d182a28
Add Export method to MetricExporter
pkcll 797b9e3
Pass Logger to metric receiver settings
pkcll f6366e8
Move promotel config
pkcll b405431
Refactor promotel config
pkcll 8efb59f
Move receiver, exporter to internal
pkcll a16458e
Refactor TestMetricReceiver
pkcll 0e6f395
Rename test file
pkcll ea40931
Refactor TestExample
pkcll 014e9d9
Be able to set receiver interval
pkcll 389e476
Add promotel Forwarder
pkcll 28b3995
Use services.StopChan in Forwarder
pkcll e681b75
Update readme
pkcll bd86e0e
Make test metric name unique
pkcll 37b9f4c
Fix lint errors
pkcll 746eeba
Add go checks to Makefile
pkcll 81156d6
Update endpoint in TestExample
pkcll d439eba
bump prometheusreceiver
pkcll 21d670f
Add test for DefaultForwarderOptions
pkcll 98139c0
Remove comment
pkcll 99919ac
Remove unused module
pkcll File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
help: ## Print this help text | ||
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-40s\033[0m %s\n", $$1, $$2}' | ||
|
||
.PHONY: \ | ||
tidy \ | ||
fmt \ | ||
lint \ | ||
checks \ | ||
test | ||
|
||
tidy: fmt ## run go mod tidy | ||
go mod tidy | ||
|
||
fmt: ## run go fmt | ||
go fmt ./... | ||
|
||
lint: ## run golangci-lint | ||
golangci-lint run ./... --config=../../.golangci.yml | ||
|
||
checks: ## run go checks | ||
go mod verify | ||
go vet ./... | ||
go run honnef.co/go/tools/cmd/staticcheck@latest -checks=all,-ST1000,-U1000,-ST1003 ./... | ||
go run golang.org/x/vuln/cmd/govulncheck@latest ./... | ||
|
||
test: ## run unit tests | ||
go test -v ./... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
# Package Overview | ||
The package provides components for performing Prometheus to OTel metrics conversion. | ||
|
||
Main components: MetricsReceiver, MetricsExporter | ||
|
||
## Receiver | ||
- Wraps [prometheusreceiver](github.com/pkcll/opentelemetry-collector-contrib/receiver/prometheusreceiver) | ||
- Fetches prometheus metrics data via `prometheus.Gatherer` (same process memory, no HTTP calls) | ||
- Uses custom implementation of `prometheus.scraper` (from here https://github.com/pkcll/prometheus/pull/1) to shortcut HTTP request calls and fetch data from `prometheus.Gatherer` | ||
- Converts Prometheus metrics into OTel format using [prometheusreceiver](github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver) | ||
- Passes OTel metrics data to downstream OTel [otlpexporter](go.opentelemetry.io/collector/exporter/otlpexporter) | ||
|
||
## Exporter | ||
- Wraps [otlpexporter](go.opentelemetry.io/collector/exporter/otlpexporter) | ||
- Receives metric data from the receiver | ||
- Export OTel metrics data to otel collector endpoint via [otlpexporter](go.opentelemetry.io/collector/exporter/otlpexporter) | ||
|
||
|
||
|
||
### Usage | ||
|
||
```go | ||
... | ||
forwarder, err := promotel.NewForwarder(g, r, lggr, promotel.ForwarderOptions{ | ||
Endpoint: srv.URL, | ||
TLSInsecure: true, | ||
Interval: interval, | ||
}) | ||
err = forwarder.Start(ctx) | ||
defer forwarder.Close() | ||
... | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
package promotel | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/promauto" | ||
"go.opentelemetry.io/collector/pdata/pmetric" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/logger" | ||
"github.com/smartcontractkit/chainlink-common/pkg/promotel/internal" | ||
"github.com/smartcontractkit/chainlink-common/pkg/services" | ||
) | ||
|
||
const ( | ||
heartbeatMetricName = "promotel_heartbeat" | ||
hearbeatInterval = 15 * time.Second | ||
scopeName = "PromOTELForwarder" | ||
) | ||
|
||
type ForwarderOptions struct { | ||
Endpoint string | ||
TLSInsecure bool | ||
AuthHeaders map[string]string | ||
Interval time.Duration | ||
Verbose bool | ||
} | ||
|
||
type Forwarder struct { | ||
lggr logger.Logger | ||
heartbeat prometheus.Counter | ||
exporter internal.MetricExporter | ||
receiver internal.MetricReceiver | ||
closeOnce sync.Once | ||
startOnce sync.Once | ||
stopCh services.StopChan | ||
} | ||
|
||
func NewForwarder(g prometheus.Gatherer, r prometheus.Registerer, lggr logger.Logger, opts ForwarderOptions) (*Forwarder, error) { | ||
exporter, err := newMetricExporter(opts, lggr) | ||
if err != nil { | ||
return nil, err | ||
} | ||
receiver, err := newMetricReceiver(g, r, opts.Interval, lggr, func(ctx context.Context, md pmetric.Metrics) error { | ||
if opts.Verbose { | ||
logOtelMetric(md, lggr) | ||
} | ||
return exporter.Export(ctx, md) | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &Forwarder{ | ||
lggr: logger.Named(lggr, scopeName), | ||
heartbeat: promauto.With(r).NewCounter(prometheus.CounterOpts{ | ||
Name: heartbeatMetricName, | ||
ConstLabels: prometheus.Labels{ | ||
"source": scopeName, | ||
}, | ||
}), | ||
exporter: exporter, | ||
receiver: receiver, | ||
stopCh: make(chan struct{}), | ||
}, nil | ||
} | ||
|
||
func (f *Forwarder) Start(ctx context.Context) error { | ||
f.startOnce.Do(func() { | ||
go f.run(ctx) | ||
}) | ||
return nil | ||
} | ||
|
||
func (f *Forwarder) run(ctx context.Context) { | ||
newCtx, _ := f.stopCh.Ctx(ctx) | ||
go f.reportHeartbeatMetric(newCtx) | ||
go f.startMetricExporter(newCtx) | ||
go f.startMetricReceiver(newCtx) | ||
<-newCtx.Done() | ||
} | ||
|
||
func (f *Forwarder) startMetricReceiver(ctx context.Context) { | ||
f.lggr.Debug("Starting promotel metric receiver") | ||
if err := f.receiver.Start(ctx); err != nil { | ||
f.lggr.Errorw("Failed to start promotel metric receiver, closing forwarder", "error", err) | ||
f.Close() | ||
} | ||
select { | ||
case <-ctx.Done(): | ||
f.lggr.Debug("Context done, closing receiver") | ||
case <-f.stopCh: | ||
f.lggr.Debug("Stop channel closed, closing receiver") | ||
} | ||
if err := f.receiver.Close(); err != nil { | ||
f.lggr.Errorw("Failed to close receiver", "error", err) | ||
} | ||
} | ||
|
||
func (f *Forwarder) startMetricExporter(ctx context.Context) { | ||
f.lggr.Debug("Starting promotel metric exporter") | ||
if err := f.exporter.Start(ctx); err != nil { | ||
f.lggr.Error("Failed to start exporter, closing forwarder", err) | ||
f.Close() | ||
return | ||
} | ||
select { | ||
case <-ctx.Done(): | ||
f.lggr.Debug("Context done, closing exporter") | ||
case <-f.stopCh: | ||
f.lggr.Debug("Stop channel closed, closing exporter") | ||
} | ||
if err := f.exporter.Close(); err != nil { | ||
f.lggr.Errorw("Failed to close exporter", "error", err) | ||
} | ||
} | ||
|
||
func (f *Forwarder) reportHeartbeatMetric(ctx context.Context) { | ||
ticker := time.NewTicker(hearbeatInterval) | ||
defer ticker.Stop() | ||
for { | ||
f.heartbeat.Inc() | ||
f.lggr.Debug("Heartbeat promotel") | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-ticker.C: | ||
} | ||
} | ||
} | ||
|
||
func (f *Forwarder) Close() error { | ||
f.closeOnce.Do(func() { | ||
close(f.stopCh) | ||
}) | ||
return nil | ||
} | ||
|
||
func DefaultForwarderOptions() ForwarderOptions { | ||
return ForwarderOptions{ | ||
Endpoint: "localhost:4317", | ||
TLSInsecure: true, | ||
AuthHeaders: nil, | ||
Verbose: false, | ||
Interval: 15 * time.Second, | ||
} | ||
} | ||
|
||
// type ExporterConfig configgrpc.ClientConfig | ||
func newExporterConfig(opts ForwarderOptions) (*internal.ExporterConfig, error) { | ||
return internal.NewMetricExporterConfig(opts.Endpoint, opts.TLSInsecure, opts.AuthHeaders) | ||
} | ||
|
||
func newMetricExporter(opts ForwarderOptions, lggr logger.Logger) (internal.MetricExporter, error) { | ||
expConfig, err := newExporterConfig(opts) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to create exporter config %w", err) | ||
} | ||
// Sends metrics data in OTLP format to otel-collector endpoint | ||
exporter, err := internal.NewMetricExporter(expConfig, lggr) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to create metric exporter %w", err) | ||
} | ||
return exporter, nil | ||
} | ||
|
||
func newMetricReceiver(g prometheus.Gatherer, r prometheus.Registerer, interval time.Duration, lggr logger.Logger, next internal.NextFunc) (internal.MetricReceiver, error) { | ||
receiverConfig, err := internal.NewReceiverConfig() | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to create config %w", err) | ||
} | ||
receiver, err := internal.NewMetricReceiver(receiverConfig, g, r, interval, lggr, next) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to create debug metric receiver %w", err) | ||
} | ||
return receiver, nil | ||
} | ||
|
||
func logOtelMetric(md pmetric.Metrics, lggr logger.Logger) { | ||
rms := md.ResourceMetrics() | ||
for i := 0; i < rms.Len(); i++ { | ||
rm := rms.At(i) | ||
ilms := rm.ScopeMetrics() | ||
for j := 0; j < ilms.Len(); j++ { | ||
ilm := ilms.At(j) | ||
metrics := ilm.Metrics() | ||
for k := 0; k < metrics.Len(); k++ { | ||
metric := metrics.At(k) | ||
lggr.Debugw("Exporting OTel metric ", "name", metric.Name()) | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package promotel_test | ||
|
||
import ( | ||
"context" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.uber.org/zap" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/logger" | ||
"github.com/smartcontractkit/chainlink-common/pkg/promotel" | ||
internal "github.com/smartcontractkit/chainlink-common/pkg/promotel/internal" | ||
) | ||
|
||
func TestExample(t *testing.T) { | ||
var ( | ||
g = prometheus.DefaultGatherer | ||
r = prometheus.DefaultRegisterer | ||
lggr, observed = logger.TestObserved(t, zap.DebugLevel) | ||
testMetricName = t.Name() + "_test_counter_metric" | ||
interval = 10 * time.Millisecond | ||
) | ||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | ||
defer cancel() | ||
|
||
go internal.ReportTestMetrics(ctx, r, testMetricName) | ||
|
||
doneCh := make(chan struct{}) | ||
go func() { | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
default: | ||
for _, l := range observed.All() { | ||
metricName, ok := l.ContextMap()["name"].(string) | ||
if ok && strings.Contains(metricName, testMetricName) { | ||
doneCh <- struct{}{} | ||
} | ||
} | ||
time.Sleep(1 * time.Second) | ||
} | ||
} | ||
}() | ||
|
||
forwarder, err := promotel.NewForwarder(g, r, lggr, promotel.ForwarderOptions{ | ||
Endpoint: "localhost:4317", | ||
TLSInsecure: true, | ||
Interval: interval, | ||
Verbose: true, | ||
}) | ||
require.NoError(t, err) | ||
|
||
require.NoError(t, forwarder.Start(ctx)) | ||
|
||
defer forwarder.Close() | ||
|
||
select { | ||
case <-ctx.Done(): | ||
t.Fatal("Test timed out. Expected metric not found") | ||
case <-doneCh: | ||
t.Log("Found metric.") | ||
} | ||
} | ||
|
||
func TestDefaultForwarderOptions(t *testing.T) { | ||
opts := promotel.DefaultForwarderOptions() | ||
assert.Equal(t, "localhost:4317", opts.Endpoint) | ||
assert.True(t, opts.TLSInsecure) | ||
assert.Nil(t, opts.AuthHeaders) | ||
assert.False(t, opts.Verbose) | ||
assert.Equal(t, 15*time.Second, opts.Interval) | ||
|
||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did we end up more than doubling (+150) our dependencies? Are these all from otel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The k8s deps in particular are concerning. We have had enough trouble with those just in our test modules. I don't think we can allow it to infect everything like this.