Skip to content

Commit

Permalink
Add path prefix for otlp http receiver
Browse files Browse the repository at this point in the history
Supports multiple otlp recivers running behind a reverse
proxy/ingress by setting up different paths.

Fixes #7511
  • Loading branch information
fredthomsen committed Apr 26, 2023
1 parent 8e33ded commit 27a51e2
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 54 deletions.
16 changes: 16 additions & 0 deletions .chloggen/otlp-receiver-add-http-path-prefix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otlpreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add http path prefix config option to otlpreceiver

# One or more tracking issues or pull requests related to the change
issues: [7511]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
9 changes: 6 additions & 3 deletions receiver/otlpreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@ The OTLP receiver can receive trace export calls via HTTP/JSON in addition to
gRPC. The HTTP/JSON address is the same as gRPC as the protocol is recognized
and processed accordingly. Note the serialization format needs to be [protobuf JSON](https://developers.google.com/protocol-buffers/docs/proto3#json).

To write traces with HTTP/JSON, `POST` to `[address]/v1/traces` for traces,
to `[address]/v1/metrics` for metrics, to `[address]/v1/logs` for logs. The default
port is `4318`.
The HTTP/JSON configuration also provides `path_prefix` configuration to allow the URL
path to be modified. The defaults to an empty string and has no impact for GRPC.

To write traces with HTTP/JSON, `POST` to `[address]/[path_prefix]/v1/traces` for traces,
to `[address]/[path_prefix]/v1/metrics` for metrics, to `[address]/[path_prefix]/v1/logs`
for logs. The default port is `4318`.

### CORS (Cross-origin resource sharing)

Expand Down
7 changes: 6 additions & 1 deletion receiver/otlpreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@ const (
protoHTTP = "protocols::http"
)

type httpServerSettings struct {
*confighttp.HTTPServerSettings `mapstructure:",squash"`
PathPrefix string `mapstructure:"path_prefix,omitempty"`
}

// Protocols is the configuration for the supported protocols.
type Protocols struct {
GRPC *configgrpc.GRPCServerSettings `mapstructure:"grpc"`
HTTP *confighttp.HTTPServerSettings `mapstructure:"http"`
HTTP *httpServerSettings `mapstructure:"http"`
}

// Config defines configuration for OTLP receiver.
Expand Down
31 changes: 18 additions & 13 deletions receiver/otlpreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,21 @@ func TestUnmarshalConfig(t *testing.T) {
},
},
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:4318",
TLSSetting: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "test.crt",
KeyFile: "test.key",
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:4318",
TLSSetting: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "test.crt",
KeyFile: "test.key",
},
},
CORS: &confighttp.CORSSettings{
AllowedOrigins: []string{"https://*.test.com", "https://test.com"},
MaxAge: 7200,
},
},
CORS: &confighttp.CORSSettings{
AllowedOrigins: []string{"https://*.test.com", "https://test.com"},
MaxAge: 7200,
},
PathPrefix: "testprefix",
},
},
}, cfg)
Expand All @@ -160,9 +163,11 @@ func TestUnmarshalConfigUnix(t *testing.T) {
},
ReadBufferSize: 512 * 1024,
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "/tmp/http_otlp.sock",
// Transport: "unix",
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: "/tmp/http_otlp.sock",
},
PathPrefix: defaultPathPrefix,
},
},
}, cfg)
Expand Down
8 changes: 6 additions & 2 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (

defaultGRPCEndpoint = "0.0.0.0:4317"
defaultHTTPEndpoint = "0.0.0.0:4318"
defaultPathPrefix = ""
)

// NewFactory creates a new OTLP receiver factory.
Expand All @@ -55,8 +56,11 @@ func createDefaultConfig() component.Config {
// We almost write 0 bytes, so no need to tune WriteBufferSize.
ReadBufferSize: 512 * 1024,
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: defaultHTTPEndpoint,
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: defaultHTTPEndpoint,
},
PathPrefix: defaultPathPrefix,
},
},
}
Expand Down
42 changes: 28 additions & 14 deletions receiver/otlpreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ func TestCreateTracesReceiver(t *testing.T) {
Transport: "tcp",
},
}
defaultHTTPSettings := &confighttp.HTTPServerSettings{
Endpoint: testutil.GetAvailableLocalAddress(t),
defaultHTTPSettings := &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: testutil.GetAvailableLocalAddress(t),
},
}

tests := []struct {
Expand Down Expand Up @@ -100,8 +102,10 @@ func TestCreateTracesReceiver(t *testing.T) {
cfg: &Config{
Protocols: Protocols{
GRPC: defaultGRPCSettings,
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "localhost:112233",
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: "localhost:112233",
},
},
},
},
Expand Down Expand Up @@ -135,8 +139,10 @@ func TestCreateMetricReceiver(t *testing.T) {
Transport: "tcp",
},
}
defaultHTTPSettings := &confighttp.HTTPServerSettings{
Endpoint: testutil.GetAvailableLocalAddress(t),
defaultHTTPSettings := &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: testutil.GetAvailableLocalAddress(t),
},
}

tests := []struct {
Expand Down Expand Up @@ -173,8 +179,10 @@ func TestCreateMetricReceiver(t *testing.T) {
cfg: &Config{
Protocols: Protocols{
GRPC: defaultGRPCSettings,
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "327.0.0.1:1122",
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: "327.0.0.1:1122",
},
},
},
},
Expand Down Expand Up @@ -207,8 +215,10 @@ func TestCreateLogReceiver(t *testing.T) {
Transport: "tcp",
},
}
defaultHTTPSettings := &confighttp.HTTPServerSettings{
Endpoint: testutil.GetAvailableLocalAddress(t),
defaultHTTPSettings := &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: testutil.GetAvailableLocalAddress(t),
},
}

tests := []struct {
Expand Down Expand Up @@ -249,8 +259,10 @@ func TestCreateLogReceiver(t *testing.T) {
cfg: &Config{
Protocols: Protocols{
GRPC: defaultGRPCSettings,
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "327.0.0.1:1122",
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: "327.0.0.1:1122",
},
},
},
},
Expand All @@ -262,8 +274,10 @@ func TestCreateLogReceiver(t *testing.T) {
cfg: &Config{
Protocols: Protocols{
GRPC: defaultGRPCSettings,
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "327.0.0.1:1122",
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: "327.0.0.1:1122",
},
},
},
},
Expand Down
13 changes: 9 additions & 4 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"net"
"net/http"
"net/url"
"path/filepath"
"sync"

"go.uber.org/zap"
Expand Down Expand Up @@ -162,7 +164,7 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error {
return err
}

err = r.startHTTPServer(r.cfg.HTTP, host)
err = r.startHTTPServer(r.cfg.HTTP.HTTPServerSettings, host)
if err != nil {
return err
}
Expand Down Expand Up @@ -200,7 +202,8 @@ func (r *otlpReceiver) registerTraceConsumer(tc consumer.Traces) error {
r.tracesReceiver = trace.New(tc, r.obsrepGRPC)
httpTracesReceiver := trace.New(tc, r.obsrepHTTP)
if r.httpMux != nil {
r.httpMux.HandleFunc("/v1/traces", func(resp http.ResponseWriter, req *http.Request) {
urlPath := filepath.Join("/", url.PathEscape(r.cfg.HTTP.PathPrefix), "v1/traces")
r.httpMux.HandleFunc(urlPath, func(resp http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
handleUnmatchedMethod(resp)
return
Expand All @@ -225,7 +228,8 @@ func (r *otlpReceiver) registerMetricsConsumer(mc consumer.Metrics) error {
r.metricsReceiver = metrics.New(mc, r.obsrepGRPC)
httpMetricsReceiver := metrics.New(mc, r.obsrepHTTP)
if r.httpMux != nil {
r.httpMux.HandleFunc("/v1/metrics", func(resp http.ResponseWriter, req *http.Request) {
urlPath := filepath.Join("/", url.PathEscape(r.cfg.HTTP.PathPrefix), "v1/metrics")
r.httpMux.HandleFunc(urlPath, func(resp http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
handleUnmatchedMethod(resp)
return
Expand All @@ -250,7 +254,8 @@ func (r *otlpReceiver) registerLogsConsumer(lc consumer.Logs) error {
r.logsReceiver = logs.New(lc, r.obsrepGRPC)
httpLogsReceiver := logs.New(lc, r.obsrepHTTP)
if r.httpMux != nil {
r.httpMux.HandleFunc("/v1/logs", func(resp http.ResponseWriter, req *http.Request) {
urlPath := filepath.Join("/", url.PathEscape(r.cfg.HTTP.PathPrefix), "v1/logs")
r.httpMux.HandleFunc(urlPath, func(resp http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
handleUnmatchedMethod(resp)
return
Expand Down
41 changes: 24 additions & 17 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,11 @@ func TestJsonHttp(t *testing.T) {
},
}
addr := testutil.GetAvailableLocalAddress(t)
pathPrefix := "json"

// Set the buffer count to 1 to make it flush the test span immediately.
sink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}
ocr := newHTTPReceiver(t, addr, sink, nil)
ocr := newHTTPReceiver(t, addr, pathPrefix, sink, nil)

require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver")
t.Cleanup(func() { require.NoError(t, ocr.Shutdown(context.Background())) })
Expand All @@ -177,7 +178,7 @@ func TestJsonHttp(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
url := fmt.Sprintf("http://%s/v1/traces", addr)
url := fmt.Sprintf("http://%s/%s/v1/traces", addr, pathPrefix)
sink.Reset()
testHTTPJSONRequest(t, url, sink, test.encoding, test.err)
})
Expand All @@ -187,7 +188,7 @@ func TestJsonHttp(t *testing.T) {
func TestHandleInvalidRequests(t *testing.T) {
endpoint := testutil.GetAvailableLocalAddress(t)
cfg := &Config{
Protocols: Protocols{HTTP: &confighttp.HTTPServerSettings{Endpoint: endpoint}},
Protocols: Protocols{HTTP: &httpServerSettings{HTTPServerSettings: &confighttp.HTTPServerSettings{Endpoint: endpoint}}},
}

// Traces
Expand Down Expand Up @@ -408,10 +409,11 @@ func TestProtoHttp(t *testing.T) {
},
}
addr := testutil.GetAvailableLocalAddress(t)
pathPrefix := "proto"

// Set the buffer count to 1 to make it flush the test span immediately.
tSink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}
ocr := newHTTPReceiver(t, addr, tSink, consumertest.NewNop())
ocr := newHTTPReceiver(t, addr, pathPrefix, tSink, consumertest.NewNop())

require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver")
t.Cleanup(func() { require.NoError(t, ocr.Shutdown(context.Background())) })
Expand All @@ -427,7 +429,7 @@ func TestProtoHttp(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
url := fmt.Sprintf("http://%s/v1/traces", addr)
url := fmt.Sprintf("http://%s/%s/v1/traces", addr, pathPrefix)
tSink.Reset()
testHTTPProtobufRequest(t, url, tSink, test.encoding, traceBytes, test.err, td)
})
Expand Down Expand Up @@ -542,7 +544,7 @@ func TestOTLPReceiverInvalidContentEncoding(t *testing.T) {
// Set the buffer count to 1 to make it flush the test span immediately.
tSink := new(consumertest.TracesSink)
mSink := new(consumertest.MetricsSink)
ocr := newHTTPReceiver(t, addr, tSink, mSink)
ocr := newHTTPReceiver(t, addr, defaultPathPrefix, tSink, mSink)

require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver")
t.Cleanup(func() { require.NoError(t, ocr.Shutdown(context.Background())) })
Expand Down Expand Up @@ -601,7 +603,7 @@ func TestHTTPNewPortAlreadyUsed(t *testing.T) {
assert.NoError(t, ln.Close())
})

r := newHTTPReceiver(t, addr, consumertest.NewNop(), consumertest.NewNop())
r := newHTTPReceiver(t, addr, defaultPathPrefix, consumertest.NewNop(), consumertest.NewNop())
require.NotNil(t, r)

require.Error(t, r.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -712,7 +714,7 @@ func TestOTLPReceiverHTTPTracesIngestTest(t *testing.T) {

sink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)}

ocr := newHTTPReceiver(t, addr, sink, nil)
ocr := newHTTPReceiver(t, addr, defaultPathPrefix, sink, nil)
require.NotNil(t, ocr)
require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() { require.NoError(t, ocr.Shutdown(context.Background())) })
Expand Down Expand Up @@ -825,11 +827,13 @@ func TestGRPCMaxRecvSize(t *testing.T) {
func TestHTTPInvalidTLSCredentials(t *testing.T) {
cfg := &Config{
Protocols: Protocols{
HTTP: &confighttp.HTTPServerSettings{
Endpoint: testutil.GetAvailableLocalAddress(t),
TLSSetting: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "willfail",
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: testutil.GetAvailableLocalAddress(t),
TLSSetting: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "willfail",
},
},
},
},
Expand All @@ -853,9 +857,11 @@ func testHTTPMaxRequestBodySizeJSON(t *testing.T, payload []byte, size int, expe
url := fmt.Sprintf("http://%s/v1/traces", endpoint)
cfg := &Config{
Protocols: Protocols{
HTTP: &confighttp.HTTPServerSettings{
Endpoint: endpoint,
MaxRequestBodySize: int64(size),
HTTP: &httpServerSettings{
HTTPServerSettings: &confighttp.HTTPServerSettings{
Endpoint: endpoint,
MaxRequestBodySize: int64(size),
},
},
},
}
Expand Down Expand Up @@ -899,10 +905,11 @@ func newGRPCReceiver(t *testing.T, endpoint string, tc consumer.Traces, mc consu
return newReceiver(t, factory, cfg, otlpReceiverID, tc, mc)
}

func newHTTPReceiver(t *testing.T, endpoint string, tc consumer.Traces, mc consumer.Metrics) component.Component {
func newHTTPReceiver(t *testing.T, endpoint string, pathPrefix string, tc consumer.Traces, mc consumer.Metrics) component.Component {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.HTTP.Endpoint = endpoint
cfg.HTTP.PathPrefix = pathPrefix
cfg.GRPC = nil
return newReceiver(t, factory, cfg, otlpReceiverID, tc, mc)
}
Expand Down
1 change: 1 addition & 0 deletions receiver/otlpreceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ protocols:
- https://*.test.com # Wildcard subdomain. Allows domains like https://www.test.com and https://foo.test.com but not https://wwwtest.com.
- https://test.com # Fully qualified domain name. Allows https://test.com only.
max_age: 7200
path_prefix: "testprefix"

0 comments on commit 27a51e2

Please sign in to comment.