Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle partial success response from OTLP server in otlpmetric exporters #3440

Merged
merged 7 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Exported `Status` codes in the `go.opentelemetry.io/otel/exporters/zipkin` exporter are now exported as all upper case values. (#3340)
- Reenabled Attribute Filters in the Metric SDK. (#3396)
- Do not report empty partial-success responses in the `go.opentelemetry.io/otel/exporters/otlp` exporters. (#3438, #3432)
- Handle partial success responses in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric` exporters. (#3162, #3440)

## [1.11.1/0.33.0] 2022-10-19

Expand Down
63 changes: 57 additions & 6 deletions exporters/otlp/otlpmetric/internal/otest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package otest // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/inte

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -24,9 +25,11 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/metric/unit"
semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
collpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
cpb "go.opentelemetry.io/proto/otlp/common/v1"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
rpb "go.opentelemetry.io/proto/otlp/resource/v1"
Expand Down Expand Up @@ -168,7 +171,10 @@ var (
// otlpmetric.Client implementation that is connected to also returned
// Collector implementation. The Client is ready to upload metric data to the
// Collector which is ready to store that data.
type ClientFactory func() (otlpmetric.Client, Collector)
//
// If resultCh is not nil, the returned Collector needs to use the responses
// from that channel to send back to the client for every export request.
type ClientFactory func(resultCh <-chan ExportResult) (otlpmetric.Client, Collector)

// RunClientTests runs a suite of Client integration tests. For example:
//
Expand All @@ -177,17 +183,17 @@ func RunClientTests(f ClientFactory) func(*testing.T) {
return func(t *testing.T) {
t.Run("ClientHonorsContextErrors", func(t *testing.T) {
t.Run("Shutdown", testCtxErrs(func() func(context.Context) error {
c, _ := f()
c, _ := f(nil)
return c.Shutdown
}))

t.Run("ForceFlush", testCtxErrs(func() func(context.Context) error {
c, _ := f()
c, _ := f(nil)
return c.ForceFlush
}))

t.Run("UploadMetrics", testCtxErrs(func() func(context.Context) error {
c, _ := f()
c, _ := f(nil)
return func(ctx context.Context) error {
return c.UploadMetrics(ctx, nil)
}
Expand All @@ -196,7 +202,7 @@ func RunClientTests(f ClientFactory) func(*testing.T) {

t.Run("ForceFlushFlushes", func(t *testing.T) {
ctx := context.Background()
client, collector := f()
client, collector := f(nil)
require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))

require.NoError(t, client.ForceFlush(ctx))
Expand All @@ -211,7 +217,7 @@ func RunClientTests(f ClientFactory) func(*testing.T) {

t.Run("UploadMetrics", func(t *testing.T) {
ctx := context.Background()
client, coll := f()
client, coll := f(nil)

require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))
require.NoError(t, client.Shutdown(ctx))
Expand All @@ -222,6 +228,51 @@ func RunClientTests(f ClientFactory) func(*testing.T) {
t.Fatalf("unexpected ResourceMetrics:\n%s", diff)
}
})

t.Run("PartialSuccess", func(t *testing.T) {
const n, msg = 2, "bad data"
rCh := make(chan ExportResult, 3)
rCh <- ExportResult{
Response: &collpb.ExportMetricsServiceResponse{
PartialSuccess: &collpb.ExportMetricsPartialSuccess{
RejectedDataPoints: n,
ErrorMessage: msg,
},
},
}
rCh <- ExportResult{
Response: &collpb.ExportMetricsServiceResponse{
PartialSuccess: &collpb.ExportMetricsPartialSuccess{
// Should not be logged.
RejectedDataPoints: 0,
ErrorMessage: "",
},
},
}
rCh <- ExportResult{
Response: &collpb.ExportMetricsServiceResponse{},
}

ctx := context.Background()
client, _ := f(rCh)

defer func(orig otel.ErrorHandler) {
otel.SetErrorHandler(orig)
}(otel.GetErrorHandler())

errs := []error{}
eh := otel.ErrorHandlerFunc(func(e error) { errs = append(errs, e) })
otel.SetErrorHandler(eh)

require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))
require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))
require.NoError(t, client.UploadMetrics(ctx, resourceMetrics))
require.NoError(t, client.Shutdown(ctx))

require.Equal(t, 1, len(errs))
want := fmt.Sprintf("%s (%d metric data points rejected)", msg, n)
assert.ErrorContains(t, errs[0], want)
})
}
}

Expand Down
18 changes: 16 additions & 2 deletions exporters/otlp/otlpmetric/internal/otest/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"testing"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
Expand All @@ -28,6 +30,7 @@ import (
)

type client struct {
rCh <-chan ExportResult
storage *Storage
}

Expand All @@ -47,15 +50,26 @@ func (c *client) UploadMetrics(ctx context.Context, rm *mpb.ResourceMetrics) err
c.storage.Add(&cpb.ExportMetricsServiceRequest{
ResourceMetrics: []*mpb.ResourceMetrics{rm},
})
if c.rCh != nil {
r := <-c.rCh
if r.Response != nil && r.Response.GetPartialSuccess() != nil {
msg := r.Response.GetPartialSuccess().GetErrorMessage()
n := r.Response.GetPartialSuccess().GetRejectedDataPoints()
if msg != "" || n > 0 {
otel.Handle(internal.MetricPartialSuccessError(n, msg))
}
}
return r.Err
}
return ctx.Err()
}

func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() }
func (c *client) Shutdown(ctx context.Context) error { return ctx.Err() }

func TestClientTests(t *testing.T) {
factory := func() (otlpmetric.Client, Collector) {
c := &client{storage: NewStorage()}
factory := func(rCh <-chan ExportResult) (otlpmetric.Client, Collector) {
c := &client{rCh: rCh, storage: NewStorage()}
return c, c
}

Expand Down
73 changes: 46 additions & 27 deletions exporters/otlp/otlpmetric/internal/otest/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ type Collector interface {
Collect() *Storage
}

type ExportResult struct {
Response *collpb.ExportMetricsServiceResponse
Err error
}

// Storage stores uploaded OTLP metric data in their proto form.
type Storage struct {
dataMu sync.Mutex
Expand Down Expand Up @@ -86,7 +91,7 @@ type GRPCCollector struct {
headers metadata.MD
storage *Storage

errCh <-chan error
resultCh <-chan ExportResult
listener net.Listener
srv *grpc.Server
}
Expand All @@ -100,14 +105,14 @@ type GRPCCollector struct {
// If errCh is not nil, the collector will respond to Export calls with errors
// sent on that channel. This means that if errCh is not nil Export calls will
// block until an error is received.
func NewGRPCCollector(endpoint string, errCh <-chan error) (*GRPCCollector, error) {
func NewGRPCCollector(endpoint string, resultCh <-chan ExportResult) (*GRPCCollector, error) {
if endpoint == "" {
endpoint = "localhost:0"
}

c := &GRPCCollector{
storage: NewStorage(),
errCh: errCh,
storage: NewStorage(),
resultCh: resultCh,
}

var err error
Expand Down Expand Up @@ -155,11 +160,14 @@ func (c *GRPCCollector) Export(ctx context.Context, req *collpb.ExportMetricsSer
c.headersMu.Unlock()
}

var err error
if c.errCh != nil {
err = <-c.errCh
if c.resultCh != nil {
r := <-c.resultCh
if r.Response == nil {
return &collpb.ExportMetricsServiceResponse{}, r.Err
}
return r.Response, r.Err
}
return &collpb.ExportMetricsServiceResponse{}, err
return &collpb.ExportMetricsServiceResponse{}, nil
}

var emptyExportMetricsServiceResponse = func() []byte {
Expand Down Expand Up @@ -189,7 +197,7 @@ type HTTPCollector struct {
headers http.Header
storage *Storage

errCh <-chan error
resultCh <-chan ExportResult
listener net.Listener
srv *http.Server
}
Expand All @@ -207,7 +215,7 @@ type HTTPCollector struct {
// If errCh is not nil, the collector will respond to HTTP requests with errors
// sent on that channel. This means that if errCh is not nil Export calls will
// block until an error is received.
func NewHTTPCollector(endpoint string, errCh <-chan error) (*HTTPCollector, error) {
func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPCollector, error) {
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
Expand All @@ -220,9 +228,9 @@ func NewHTTPCollector(endpoint string, errCh <-chan error) (*HTTPCollector, erro
}

c := &HTTPCollector{
headers: http.Header{},
storage: NewStorage(),
errCh: errCh,
headers: http.Header{},
storage: NewStorage(),
resultCh: resultCh,
}

c.listener, err = net.Listen("tcp", u.Host)
Expand Down Expand Up @@ -276,22 +284,25 @@ func (c *HTTPCollector) handler(w http.ResponseWriter, r *http.Request) {
c.respond(w, c.record(r))
}

func (c *HTTPCollector) record(r *http.Request) error {
func (c *HTTPCollector) record(r *http.Request) ExportResult {
// Currently only supports protobuf.
if v := r.Header.Get("Content-Type"); v != "application/x-protobuf" {
return fmt.Errorf("content-type not supported: %s", v)
err := fmt.Errorf("content-type not supported: %s", v)
return ExportResult{Err: err}
}

body, err := c.readBody(r)
if err != nil {
return err
return ExportResult{Err: err}
}
pbRequest := &collpb.ExportMetricsServiceRequest{}
err = proto.Unmarshal(body, pbRequest)
if err != nil {
return &HTTPResponseError{
Err: err,
Status: http.StatusInternalServerError,
return ExportResult{
Err: &HTTPResponseError{
Err: err,
Status: http.StatusInternalServerError,
},
}
}
c.storage.Add(pbRequest)
Expand All @@ -304,10 +315,10 @@ func (c *HTTPCollector) record(r *http.Request) error {
}
c.headersMu.Unlock()

if c.errCh != nil {
err = <-c.errCh
if c.resultCh != nil {
return <-c.resultCh
}
return err
return ExportResult{Err: err}
}

func (c *HTTPCollector) readBody(r *http.Request) (body []byte, err error) {
Expand Down Expand Up @@ -345,12 +356,12 @@ func (c *HTTPCollector) readBody(r *http.Request) (body []byte, err error) {
return body, err
}

func (c *HTTPCollector) respond(w http.ResponseWriter, err error) {
if err != nil {
func (c *HTTPCollector) respond(w http.ResponseWriter, resp ExportResult) {
if resp.Err != nil {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
var e *HTTPResponseError
if errors.As(err, &e) {
if errors.As(resp.Err, &e) {
for k, vals := range e.Header {
for _, v := range vals {
w.Header().Add(k, v)
Expand All @@ -360,14 +371,22 @@ func (c *HTTPCollector) respond(w http.ResponseWriter, err error) {
fmt.Fprintln(w, e.Error())
} else {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintln(w, err.Error())
fmt.Fprintln(w, resp.Err.Error())
}
return
}

w.Header().Set("Content-Type", "application/x-protobuf")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(emptyExportMetricsServiceResponse)
if resp.Response == nil {
_, _ = w.Write(emptyExportMetricsServiceResponse)
} else {
r, err := proto.Marshal(resp.Response)
if err != nil {
panic(err)
}
_, _ = w.Write(r)
}
}

type mathRandReader struct{}
Expand Down
12 changes: 11 additions & 1 deletion exporters/otlp/otlpmetric/otlpmetricgrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/internal"
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
Expand Down Expand Up @@ -163,9 +165,17 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
defer cancel()

return c.requestFunc(ctx, func(iCtx context.Context) error {
_, err := c.msc.Export(iCtx, &colmetricpb.ExportMetricsServiceRequest{
resp, err := c.msc.Export(iCtx, &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics},
})
if resp != nil && resp.PartialSuccess != nil {
msg := resp.PartialSuccess.GetErrorMessage()
n := resp.PartialSuccess.GetRejectedDataPoints()
if n != 0 || msg != "" {
err := internal.MetricPartialSuccessError(n, msg)
otel.Handle(err)
}
}
// nil is converted to OK.
if status.Code(err) == codes.OK {
// Success.
Expand Down
Loading