From 982a57e095e997c2bd7afbfb23c2df9015127eb4 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Tue, 16 Feb 2021 20:12:33 -0500 Subject: [PATCH] Add "compression" option to otlphttp exporter All exporters that use HTTPClientSettings will now have a choice to have a config option "compression". The config option is an opt-in and is only active for exporters that set HTTPClientSettings.EnableCompression=true. Right now this is only otlphttpexporter. The default value of "compression" option is empty and results in no compression, which is the same as the old behavior. You can also specify "compression: gzip" which will result in gzip encoding of outgoing http requests. --- config/configgrpc/README.md | 2 +- config/confighttp/README.md | 1 + config/confighttp/confighttp.go | 24 ++++++++++ config/confighttp/confighttp_test.go | 34 +++++++++++-- exporter/otlphttpexporter/README.md | 2 + exporter/otlphttpexporter/config_test.go | 1 + exporter/otlphttpexporter/factory.go | 7 +-- exporter/otlphttpexporter/otlp_test.go | 55 +++++++++++++++++++++ internal/middleware/compression.go | 59 +++++++++++++++++++++++ internal/middleware/compression_test.go | 61 ++++++++++++++++++++++++ 10 files changed, 239 insertions(+), 7 deletions(-) diff --git a/config/configgrpc/README.md b/config/configgrpc/README.md index ef5fe4c0f4b4..d66408d75d28 100644 --- a/config/configgrpc/README.md +++ b/config/configgrpc/README.md @@ -16,7 +16,7 @@ configuration. For more information, see [configtls README](../configtls/README.md). - [`balancer_name`](https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md) -- `compression` (default = gzip): Compression type to use (only gzip is supported today) +- `compression` (default = none): Compression type to use (only gzip is supported today) - `endpoint`: Valid value syntax available [here](https://github.com/grpc/grpc/blob/master/doc/naming.md) - `headers`: name/value pairs added to the request - [`keepalive`](https://godoc.org/google.golang.org/grpc/keepalive#ClientParameters) diff --git a/config/confighttp/README.md b/config/confighttp/README.md index 8e1c8b5cf754..8f6e05f49868 100644 --- a/config/confighttp/README.md +++ b/config/confighttp/README.md @@ -15,6 +15,7 @@ configuration. For more information, see [configtls README](../configtls/README.md). - `endpoint`: address:port +- `compression` (default = none): Compression type to use (only gzip is supported today) - `headers`: name/value pairs added to the HTTP request headers - [`read_buffer_size`](https://golang.org/pkg/net/http/#Transport) - [`timeout`](https://golang.org/pkg/net/http/#Client) diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index 56936c74da84..0f16cc38ff8a 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -16,12 +16,15 @@ package confighttp import ( "crypto/tls" + "fmt" "net" "net/http" + "strings" "time" "github.com/rs/cors" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/internal/middleware" ) @@ -46,6 +49,15 @@ type HTTPClientSettings struct { // Existing header values are overwritten if collision happens. Headers map[string]string `mapstructure:"headers,omitempty"` + // The compression key for supported compression types within + // collector. Currently the only supported mode is `gzip`. + Compression string `mapstructure:"compression"` + + // EnableCompression must be set to true for Compression field to have effect. + // Set it to true in your CreateDefaultConfig if you want your exporter to + // have the "compression" setting supported. + EnableCompression bool `mapstructure:"-"` + // Custom Round Tripper to allow for individual components to intercept HTTP requests CustomRoundTripper func(next http.RoundTripper) (http.RoundTripper, error) } @@ -74,6 +86,18 @@ func (hcs *HTTPClientSettings) ToClient() (*http.Client, error) { } } + if hcs.Compression != "" { + if !hcs.EnableCompression { + return nil, fmt.Errorf("unsupported option \"compression\"") + } + + if strings.ToLower(hcs.Compression) == configgrpc.CompressionGzip { + clientTransport = middleware.NewCompressRoundTripper(clientTransport) + } else { + return nil, fmt.Errorf("unsupported compression type %q", hcs.Compression) + } + } + if hcs.CustomRoundTripper != nil { clientTransport, err = hcs.CustomRoundTripper(clientTransport) if err != nil { diff --git a/config/confighttp/confighttp_test.go b/config/confighttp/confighttp_test.go index 66d98e4925a7..26e7e93dc27b 100644 --- a/config/confighttp/confighttp_test.go +++ b/config/confighttp/confighttp_test.go @@ -63,6 +63,32 @@ func TestAllHTTPClientSettings(t *testing.T) { }, shouldError: true, }, + { + name: "error_compression_without_enabling", + settings: HTTPClientSettings{ + Endpoint: "localhost:1234", + Compression: "gzip", + }, + shouldError: true, + }, + { + name: "compression_gzip", + settings: HTTPClientSettings{ + Endpoint: "localhost:1234", + Compression: "gzip", + EnableCompression: true, + }, + shouldError: false, + }, + { + name: "compression_unknown", + settings: HTTPClientSettings{ + Endpoint: "localhost:1234", + Compression: "unknown", + EnableCompression: true, + }, + shouldError: true, + }, } for _, test := range tests { @@ -73,9 +99,11 @@ func TestAllHTTPClientSettings(t *testing.T) { return } assert.NoError(t, err) - transport := client.Transport.(*http.Transport) - assert.EqualValues(t, 1024, transport.ReadBufferSize) - assert.EqualValues(t, 512, transport.WriteBufferSize) + transport, ok := client.Transport.(*http.Transport) + if ok { + assert.EqualValues(t, 1024, transport.ReadBufferSize) + assert.EqualValues(t, 512, transport.WriteBufferSize) + } }) } } diff --git a/exporter/otlphttpexporter/README.md b/exporter/otlphttpexporter/README.md index 8c42bca107bc..7dd10b47f074 100644 --- a/exporter/otlphttpexporter/README.md +++ b/exporter/otlphttpexporter/README.md @@ -33,6 +33,8 @@ The following settings can be optionally configured: - `key_file` path to the TLS key to use for TLS required connections. Should only be used if `insecure` is set to false. +- `compression` (default = none): Compression type to use (only gzip is supported today) + - `timeout` (default = 30s): HTTP request time limit. For details see https://golang.org/pkg/net/http/#Client - `read_buffer_size` (default = 0): ReadBufferSize for HTTP client. - `write_buffer_size` (default = 512 * 1024): WriteBufferSize for HTTP client. diff --git a/exporter/otlphttpexporter/config_test.go b/exporter/otlphttpexporter/config_test.go index a0d3aa83a3b3..c9b8fea19d18 100644 --- a/exporter/otlphttpexporter/config_test.go +++ b/exporter/otlphttpexporter/config_test.go @@ -63,6 +63,7 @@ func TestLoadConfig(t *testing.T) { QueueSize: 10, }, HTTPClientSettings: confighttp.HTTPClientSettings{ + EnableCompression: true, Headers: map[string]string{ "can you have a . here?": "F0000000-0000-0000-0000-000000000000", "header1": "234", diff --git a/exporter/otlphttpexporter/factory.go b/exporter/otlphttpexporter/factory.go index 592f0e016ab7..4eb1eee141db 100644 --- a/exporter/otlphttpexporter/factory.go +++ b/exporter/otlphttpexporter/factory.go @@ -50,9 +50,10 @@ func createDefaultConfig() configmodels.Exporter { RetrySettings: exporterhelper.DefaultRetrySettings(), QueueSettings: exporterhelper.DefaultQueueSettings(), HTTPClientSettings: confighttp.HTTPClientSettings{ - Endpoint: "", - Timeout: 30 * time.Second, - Headers: map[string]string{}, + EnableCompression: true, + Endpoint: "", + Timeout: 30 * time.Second, + Headers: map[string]string{}, // We almost read 0 bytes, so no need to tune ReadBufferSize. WriteBufferSize: 512 * 1024, }, diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index f9bc3ce98df9..bf1fda8f7439 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -132,6 +132,61 @@ func TestTraceRoundTrip(t *testing.T) { } } +func TestCompressionOptions(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + + tests := []struct { + name string + baseURL string + compression string + err bool + }{ + { + name: "no compression", + baseURL: fmt.Sprintf("http://%s", addr), + compression: "", + }, + { + name: "gzip", + baseURL: fmt.Sprintf("http://%s", addr), + compression: "gzip", + }, + { + name: "incorrect compression", + baseURL: fmt.Sprintf("http://%s", addr), + compression: "gzip2", + err: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + sink := new(consumertest.TracesSink) + startTraceReceiver(t, addr, sink) + + factory := NewFactory() + cfg := createExporterConfig(test.baseURL, factory.CreateDefaultConfig()) + cfg.Compression = test.compression + exp, err := factory.CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg) + if test.err { + assert.Error(t, err) + return + } + require.NoError(t, err) + startAndCleanup(t, exp) + + td := testdata.GenerateTraceDataOneSpan() + assert.NoError(t, exp.ConsumeTraces(context.Background(), td)) + require.Eventually(t, func() bool { + return sink.SpansCount() > 0 + }, 1*time.Second, 10*time.Millisecond) + allTraces := sink.AllTraces() + require.Len(t, allTraces, 1) + assert.EqualValues(t, td, allTraces[0]) + }) + } +} + func TestMetricsError(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) diff --git a/internal/middleware/compression.go b/internal/middleware/compression.go index 13504e80953e..7e076d637fa2 100644 --- a/internal/middleware/compression.go +++ b/internal/middleware/compression.go @@ -15,12 +15,71 @@ package middleware import ( + "bytes" "compress/gzip" "compress/zlib" "io" "net/http" ) +const ( + headerContentEncoding = "Content-Encoding" + headerValueGZIP = "gzip" +) + +type CompressRoundTripper struct { + http.RoundTripper +} + +func NewCompressRoundTripper(rt http.RoundTripper) *CompressRoundTripper { + return &CompressRoundTripper{ + RoundTripper: rt, + } +} + +func (r *CompressRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + + if req.Header.Get(headerContentEncoding) != "" { + // If the header already specifies a content encoding then skip compression + // since we don't want to compress it again. This is a safeguard that normally + // should not happen since CompressRoundTripper is not intended to be used + // with http clients which already do their own compression. + return r.RoundTripper.RoundTrip(req) + } + + gzipWriter := gzip.NewWriter(nil) + + // Gzip the body. + buf := bytes.NewBuffer([]byte{}) + gzipWriter.Reset(buf) + _, copyErr := io.Copy(gzipWriter, req.Body) + closeErr := req.Body.Close() + + if err := gzipWriter.Close(); err != nil { + return nil, err + } + + if copyErr != nil { + return nil, copyErr + } + if closeErr != nil { + return nil, closeErr + } + + // Create a new request since the docs say that we cannot modify the "req" + // (see https://golang.org/pkg/net/http/#RoundTripper). + cReq, err := http.NewRequestWithContext(req.Context(), req.Method, req.URL.String(), buf) + if err != nil { + return nil, err + } + + // Clone the headers and add gzip encoding header. + cReq.Header = req.Header.Clone() + cReq.Header.Add(headerContentEncoding, headerValueGZIP) + + return r.RoundTripper.RoundTrip(cReq) +} + type ErrorHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int) type decompressor struct { diff --git a/internal/middleware/compression_test.go b/internal/middleware/compression_test.go index e8770ff4c3e5..a303851679b3 100644 --- a/internal/middleware/compression_test.go +++ b/internal/middleware/compression_test.go @@ -31,6 +31,67 @@ import ( "go.opentelemetry.io/collector/testutil" ) +func TestHTTPClientCompression(t *testing.T) { + testBody := []byte("uncompressed_text") + compressedBody, _ := compressGzip(testBody) + + tests := []struct { + name string + encoding string + reqBody []byte + }{ + { + name: "NoCompression", + encoding: "", + reqBody: testBody, + }, + { + name: "ValidGzip", + encoding: "gzip", + reqBody: compressedBody.Bytes(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + require.NoError(t, err, "failed to read request body: %v", err) + assert.EqualValues(t, tt.reqBody, body) + w.WriteHeader(200) + }) + + addr := testutil.GetAvailableLocalAddress(t) + ln, err := net.Listen("tcp", addr) + require.NoError(t, err, "failed to create listener: %v", err) + srv := &http.Server{ + Handler: handler, + } + go func() { + _ = srv.Serve(ln) + }() + // Wait for the servers to start + <-time.After(10 * time.Millisecond) + + serverURL := fmt.Sprintf("http://%s", ln.Addr().String()) + reqBody := bytes.NewBuffer(testBody) + + req, err := http.NewRequest("GET", serverURL, reqBody) + require.NoError(t, err, "failed to create request to test handler") + + client := http.Client{} + if tt.encoding == "gzip" { + client.Transport = NewCompressRoundTripper(http.DefaultTransport) + } + res, err := client.Do(req) + require.NoError(t, err) + + ioutil.ReadAll(res.Body) + require.NoError(t, res.Body.Close(), "failed to close request body: %v", err) + require.NoError(t, srv.Close()) + }) + } +} + func TestHTTPContentDecompressionHandler(t *testing.T) { testBody := []byte("uncompressed_text") tests := []struct {