Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OTLP Ingestion #5813

Merged
merged 9 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [CHANGE] Querier: Mark `-querier.ingester-streaming` flag as deprecated. Now query ingester streaming is always enabled. #5817
* [CHANGE] Compactor/Bucket Store: Added `-blocks-storage.bucket-store.block-discovery-strategy` to configure different block listing strategy. Reverted the current recursive block listing mechanism and use the strategy `Concurrent` as in 1.15. #5828
* [CHANGE] Compactor: Don't halt compactor when overlapped source blocks detected. #5854
* [FEATURE] OTLP ingestion experimental. #5813
* [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477
* [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605
* [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731
Expand Down
1 change: 1 addition & 0 deletions docs/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ For the sake of clarity, in this document we have grouped API endpoints by servi
| [Pprof](#pprof) | _All services_ || `GET /debug/pprof` |
| [Fgprof](#fgprof) | _All services_ || `GET /debug/fgprof` |
| [Remote write](#remote-write) | Distributor || `POST /api/v1/push` |
| [OTLP receiver](#otlp-receiver) | Distributor || `POST /api/v1/otlp/v1/metrics` |
| [Tenants stats](#tenants-stats) | Distributor || `GET /distributor/all_user_stats` |
| [HA tracker status](#ha-tracker-status) | Distributor || `GET /distributor/ha_tracker` |
| [Flush blocks](#flush-blocks) | Ingester || `GET,POST /ingester/flush` |
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,4 @@ Currently experimental features are:
- `-ruler.ring.final-sleep` (duration) CLI flag
- `store-gateway.sharding-ring.final-sleep` (duration) CLI flag
- `alertmanager-sharding-ring.final-sleep` (duration) CLI flag
- OTLP Receiver
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0
github.com/google/go-cmp v0.6.0
github.com/sercand/kuberesolver/v4 v4.0.0
go.opentelemetry.io/collector/pdata v1.3.0
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
google.golang.org/protobuf v1.33.0
)
Expand Down Expand Up @@ -208,7 +209,6 @@ require (
go.mongodb.org/mongo-driver v1.14.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector/featuregate v1.3.0 // indirect
go.opentelemetry.io/collector/pdata v1.3.0 // indirect
go.opentelemetry.io/collector/semconv v0.96.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
Expand Down
75 changes: 75 additions & 0 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
"github.com/prometheus/prometheus/storage/remote"
yaml "gopkg.in/yaml.v3"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"

"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/util/backoff"
)
Expand Down Expand Up @@ -142,6 +146,77 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
return res, nil
}

func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) {
var metricName string
attributes := make(map[string]any)
for _, label := range ts.Labels {
if label.Name == model.MetricNameLabel {
metricName = label.Value
} else {
attributes[label.Name] = label.Value
}
}
return metricName, attributes
}

func createDatapointsGauge(newMetric pmetric.Metric, attributes map[string]any, samples []prompb.Sample) {
newMetric.SetEmptyGauge()
for _, sample := range samples {
datapoint := newMetric.Gauge().DataPoints().AppendEmpty()
datapoint.SetDoubleValue(sample.Value)
datapoint.SetTimestamp(pcommon.Timestamp(sample.Timestamp * time.Millisecond.Nanoseconds()))
err := datapoint.Attributes().FromRaw(attributes)
if err != nil {
panic(err)
}
}
}

// Convert Timeseries to Metrics
func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries) pmetric.Metrics {
metrics := pmetric.NewMetrics()
for _, ts := range timeseries {
metricName, attributes := getNameAndAttributes(ts)
newMetric := metrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
newMetric.SetName(metricName)
//TODO Set description for new metric
//TODO Set unit for new metric
createDatapointsGauge(newMetric, attributes, ts.Samples)
//TODO(friedrichg): Add support for histograms
}
return metrics
}

// Push series to OTLP endpoint
func (c *Client) OTLP(timeseries []prompb.TimeSeries) (*http.Response, error) {

data, err := pmetricotlp.NewExportRequestFromMetrics(convertTimeseriesToMetrics(timeseries)).MarshalProto()
if err != nil {
return nil, err
}

// Create HTTP request
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/otlp/v1/metrics", c.distributorAddress), bytes.NewReader(data))
if err != nil {
return nil, err
}

req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Scope-OrgID", c.orgID)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}

defer res.Body.Close()
return res, nil
}

// Query runs an instant query.
func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
value, _, err := c.querierClient.Query(context.Background(), query, ts)
Expand Down
76 changes: 76 additions & 0 deletions integration/otlp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//go:build requires_docker
// +build requires_docker

package integration

import (
"fmt"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
)

func TestOTLP(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
minio := e2edb.NewMinio(9000, bucketName)
require.NoError(t, s.StartAndWaitReady(minio))

// Start Cortex components.
require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks.yaml", cortexConfigFile))

// Start Cortex in single binary mode, reading the config from file and overwriting
// the backend config to make it work with Minio.
flags := map[string]string{
"-blocks-storage.s3.access-key-id": e2edb.MinioAccessKey,
"-blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey,
"-blocks-storage.s3.bucket-name": bucketName,
"-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
"-blocks-storage.s3.insecure": "true",
}

cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex-1", cortexConfigFile, flags, "", 9009, 9095)
require.NoError(t, s.StartAndWaitReady(cortex))

c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// Push some series to Cortex.
now := time.Now()
series, expectedVector := generateSeries("series_1", now, prompb.Label{Name: "foo", Value: "bar"})

res, err := c.OTLP(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Query the series.
result, err := c.Query("series_1", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))

labelValues, err := c.LabelValues("foo", time.Time{}, time.Time{}, nil)
require.NoError(t, err)
require.Equal(t, model.LabelValues{"bar"}, labelValues)

labelNames, err := c.LabelNames(time.Time{}, time.Time{})
require.NoError(t, err)
require.Equal(t, []string{"__name__", "foo"}, labelNames)

// Check that a range query does not return an error to sanity check the queryrange tripperware.
_, err = c.QueryRange("series_1", now.Add(-15*time.Minute), now, 15*time.Second)
require.NoError(t, err)

//TODO(friedrichg): test histograms
}
1 change: 1 addition & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")

a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics")
Expand Down
174 changes: 174 additions & 0 deletions pkg/util/push/otlp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package push

import (
"net/http"

"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/log"
)

// OTLPHandler is a http.Handler which accepts OTLP metrics.
func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
friedrichg marked this conversation as resolved.
Show resolved Hide resolved
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := log.WithContext(ctx, log.Logger)
if sourceIPs != nil {
source := sourceIPs.Get(r)
if source != "" {
ctx = util.AddSourceIPsToOutgoingContext(ctx, source)
logger = log.WithSourceIPs(source, logger)
}
}
req, err := remote.DecodeOTLPWriteRequest(r)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

tsMap, err := prometheusremotewrite.FromMetrics(convertToMetricsAttributes(req.Metrics()), prometheusremotewrite.Settings{DisableTargetInfo: true})
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

prwReq := cortexpb.WriteRequest{
Source: cortexpb.API,
Metadata: nil,
SkipLabelNameValidation: false,
}

tsList := []cortexpb.PreallocTimeseries(nil)
for _, v := range tsMap {
tsList = append(tsList, cortexpb.PreallocTimeseries{TimeSeries: &cortexpb.TimeSeries{
Labels: makeLabels(v.Labels),
Samples: makeSamples(v.Samples),
Exemplars: makeExemplars(v.Exemplars),
}})
}
prwReq.Timeseries = tsList

if _, err := push(ctx, &prwReq); err != nil {
resp, ok := httpgrpc.HTTPResponseFromError(err)
if !ok {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if resp.GetCode()/100 == 5 {
level.Error(logger).Log("msg", "push error", "err", err)
} else if resp.GetCode() != http.StatusAccepted && resp.GetCode() != http.StatusTooManyRequests {
level.Warn(logger).Log("msg", "push refused", "err", err)
}
http.Error(w, string(resp.Body), int(resp.Code))
}
})
}

func makeLabels(in []prompb.Label) []cortexpb.LabelAdapter {
out := make(labels.Labels, 0, len(in))
for _, l := range in {
out = append(out, labels.Label{Name: l.Name, Value: l.Value})
}
return cortexpb.FromLabelsToLabelAdapters(out)
}

func makeSamples(in []prompb.Sample) []cortexpb.Sample {
out := make([]cortexpb.Sample, 0, len(in))
for _, s := range in {
out = append(out, cortexpb.Sample{
Value: s.Value,
TimestampMs: s.Timestamp,
})
}
return out
}

func makeExemplars(in []prompb.Exemplar) []cortexpb.Exemplar {
out := make([]cortexpb.Exemplar, 0, len(in))
for _, e := range in {
out = append(out, cortexpb.Exemplar{
Labels: makeLabels(e.Labels),
Value: e.Value,
TimestampMs: e.Timestamp,
})
}
return out
}

func convertToMetricsAttributes(md pmetric.Metrics) pmetric.Metrics {
cloneMd := pmetric.NewMetrics()
md.CopyTo(cloneMd)
rms := cloneMd.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
resource := rms.At(i).Resource()

ilms := rms.At(i).ScopeMetrics()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
metricSlice := ilm.Metrics()
for k := 0; k < metricSlice.Len(); k++ {
addAttributesToMetric(metricSlice.At(k), resource.Attributes())
}
}
}
return cloneMd
}

// addAttributesToMetric adds additional labels to the given metric
func addAttributesToMetric(metric pmetric.Metric, labelMap pcommon.Map) {
switch metric.Type() {
case pmetric.MetricTypeGauge:
addAttributesToNumberDataPoints(metric.Gauge().DataPoints(), labelMap)
case pmetric.MetricTypeSum:
addAttributesToNumberDataPoints(metric.Sum().DataPoints(), labelMap)
case pmetric.MetricTypeHistogram:
addAttributesToHistogramDataPoints(metric.Histogram().DataPoints(), labelMap)
case pmetric.MetricTypeSummary:
addAttributesToSummaryDataPoints(metric.Summary().DataPoints(), labelMap)
case pmetric.MetricTypeExponentialHistogram:
addAttributesToExponentialHistogramDataPoints(metric.ExponentialHistogram().DataPoints(), labelMap)
}
}

func addAttributesToNumberDataPoints(ps pmetric.NumberDataPointSlice, newAttributeMap pcommon.Map) {
for i := 0; i < ps.Len(); i++ {
joinAttributeMaps(newAttributeMap, ps.At(i).Attributes())
}
}

func addAttributesToHistogramDataPoints(ps pmetric.HistogramDataPointSlice, newAttributeMap pcommon.Map) {
for i := 0; i < ps.Len(); i++ {
joinAttributeMaps(newAttributeMap, ps.At(i).Attributes())
}
}

func addAttributesToSummaryDataPoints(ps pmetric.SummaryDataPointSlice, newAttributeMap pcommon.Map) {
for i := 0; i < ps.Len(); i++ {
joinAttributeMaps(newAttributeMap, ps.At(i).Attributes())
}
}

func addAttributesToExponentialHistogramDataPoints(ps pmetric.ExponentialHistogramDataPointSlice, newAttributeMap pcommon.Map) {
for i := 0; i < ps.Len(); i++ {
joinAttributeMaps(newAttributeMap, ps.At(i).Attributes())
}
}

func joinAttributeMaps(from, to pcommon.Map) {
from.Range(func(k string, v pcommon.Value) bool {
v.CopyTo(to.PutEmpty(k))
return true
})
}
Loading
Loading