From 577db893e223f16f45988de81ff8b1c61ce77567 Mon Sep 17 00:00:00 2001 From: Zeyad Kenawi Date: Thu, 22 Jun 2023 17:54:13 +0300 Subject: [PATCH] feat(internal): Add zstd to internal content_coding (#13423) --- internal/content_coding.go | 189 ++++++++++---- internal/content_coding_test.go | 247 +++++++++++++++--- plugins/inputs/amqp_consumer/amqp_consumer.go | 12 +- .../amqp_consumer/amqp_consumer_test.go | 1 - plugins/inputs/cloud_pubsub/cloud_pubsub.go | 12 +- .../inputs/cloud_pubsub/cloud_pubsub_test.go | 1 - .../inputs/socket_listener/packet_listener.go | 20 +- .../inputs/socket_listener/socket_listener.go | 5 - .../outputs/cloud_pubsub/cloud_pubsub_test.go | 2 +- plugins/outputs/cloud_pubsub/topic_stubbed.go | 16 +- 10 files changed, 382 insertions(+), 123 deletions(-) diff --git a/internal/content_coding.go b/internal/content_coding.go index 06f5f6043de95..543f38cb6822b 100644 --- a/internal/content_coding.go +++ b/internal/content_coding.go @@ -9,10 +9,25 @@ import ( "github.com/klauspost/compress/gzip" "github.com/klauspost/compress/zlib" + "github.com/klauspost/compress/zstd" "github.com/klauspost/pgzip" ) -const DefaultMaxDecompressionSize = 500 * 1024 * 1024 //500MB +const defaultMaxDecompressionSize int64 = 500 * 1024 * 1024 //500MB + +// DecodingOption provide methods to change the decoding from the standard +// configuration. +type DecodingOption func(*decoderConfig) + +type decoderConfig struct { + maxDecompressionSize int64 +} + +func WithMaxDecompressionSize(maxDecompressionSize int64) DecodingOption { + return func(cfg *decoderConfig) { + cfg.maxDecompressionSize = maxDecompressionSize + } +} type encoderConfig struct { level int @@ -92,10 +107,12 @@ func NewContentEncoder(encoding string, options ...EncodingOption) (ContentEncod switch encoding { case "gzip": return NewGzipEncoder(options...) - case "zlib": - return NewZlibEncoder(options...) case "identity", "": return NewIdentityEncoder(options...) + case "zlib": + return NewZlibEncoder(options...) + case "zstd": + return NewZstdEncoder(options...) default: return nil, errors.New("invalid value for content_encoding") } @@ -111,32 +128,34 @@ func (a *AutoDecoder) SetEncoding(encoding string) { a.encoding = encoding } -func (a *AutoDecoder) Decode(data []byte, maxDecompressionSize int64) ([]byte, error) { +func (a *AutoDecoder) Decode(data []byte) ([]byte, error) { if a.encoding == "gzip" { - return a.gzip.Decode(data, maxDecompressionSize) + return a.gzip.Decode(data) } - return a.identity.Decode(data, maxDecompressionSize) + return a.identity.Decode(data) } -func NewAutoContentDecoder() *AutoDecoder { +func NewAutoContentDecoder(options ...DecodingOption) *AutoDecoder { var a AutoDecoder - a.identity = NewIdentityDecoder() - a.gzip = NewGzipDecoder() + a.identity = NewIdentityDecoder(options...) + a.gzip = NewGzipDecoder(options...) return &a } // NewContentDecoder returns a ContentDecoder for the encoding type. -func NewContentDecoder(encoding string) (ContentDecoder, error) { +func NewContentDecoder(encoding string, options ...DecodingOption) (ContentDecoder, error) { switch encoding { + case "auto": + return NewAutoContentDecoder(options...), nil case "gzip": - return NewGzipDecoder(), nil - case "zlib": - return NewZlibDecoder(), nil + return NewGzipDecoder(options...), nil case "identity", "": - return NewIdentityDecoder(), nil - case "auto": - return NewAutoContentDecoder(), nil + return NewIdentityDecoder(options...), nil + case "zlib": + return NewZlibDecoder(options...), nil + case "zstd": + return NewZstdDecoder(options...) default: return nil, errors.New("invalid value for content_encoding") } @@ -165,7 +184,7 @@ func NewGzipEncoder(options ...EncodingOption) (*GzipEncoder, error) { case gzip.NoCompression, gzip.DefaultCompression, gzip.BestSpeed, gzip.BestCompression: // Do nothing as those are valid levels default: - return nil, fmt.Errorf("invalid compression level, only 0, 1 and 9 are supported") + return nil, errors.New("invalid compression level, only 0, 1 and 9 are supported") } var buf bytes.Buffer @@ -238,7 +257,7 @@ func NewZlibEncoder(options ...EncodingOption) (*ZlibEncoder, error) { case zlib.NoCompression, zlib.DefaultCompression, zlib.BestSpeed, zlib.BestCompression: // Do nothing as those are valid levels default: - return nil, fmt.Errorf("invalid compression level, only 0, 1 and 9 are supported") + return nil, errors.New("invalid compression level, only 0, 1 and 9 are supported") } var buf bytes.Buffer @@ -264,6 +283,41 @@ func (e *ZlibEncoder) Encode(data []byte) ([]byte, error) { return e.buf.Bytes(), nil } +type ZstdEncoder struct { + encoder *zstd.Encoder +} + +func NewZstdEncoder(options ...EncodingOption) (*ZstdEncoder, error) { + cfg := encoderConfig{level: 3} + for _, o := range options { + o(&cfg) + } + + // Map the levels + var level zstd.EncoderLevel + switch cfg.level { + case 1: + level = zstd.SpeedFastest + case 3: + level = zstd.SpeedDefault + case 7: + level = zstd.SpeedBetterCompression + case 11: + level = zstd.SpeedBestCompression + default: + return nil, errors.New("invalid compression level, only 1, 3, 7 and 11 are supported") + } + + e, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(level)) + return &ZstdEncoder{ + encoder: e, + }, err +} + +func (e *ZstdEncoder) Encode(data []byte) ([]byte, error) { + return e.encoder.EncodeAll(data, make([]byte, 0, len(data))), nil +} + // IdentityEncoder is a null encoder that applies no transformation. type IdentityEncoder struct{} @@ -282,49 +336,56 @@ func (*IdentityEncoder) Encode(data []byte) ([]byte, error) { // ContentDecoder removes a wrapper encoding from byte buffers. type ContentDecoder interface { SetEncoding(string) - Decode([]byte, int64) ([]byte, error) + Decode([]byte) ([]byte, error) } // GzipDecoder decompresses buffers with gzip compression. type GzipDecoder struct { - preader *pgzip.Reader - reader *gzip.Reader - buf *bytes.Buffer + preader *pgzip.Reader + reader *gzip.Reader + buf *bytes.Buffer + maxDecompressionSize int64 } -func NewGzipDecoder() *GzipDecoder { +func NewGzipDecoder(options ...DecodingOption) *GzipDecoder { + cfg := decoderConfig{maxDecompressionSize: defaultMaxDecompressionSize} + for _, o := range options { + o(&cfg) + } + return &GzipDecoder{ - preader: new(pgzip.Reader), - reader: new(gzip.Reader), - buf: new(bytes.Buffer), + preader: new(pgzip.Reader), + reader: new(gzip.Reader), + buf: new(bytes.Buffer), + maxDecompressionSize: cfg.maxDecompressionSize, } } func (*GzipDecoder) SetEncoding(string) {} -func (d *GzipDecoder) Decode(data []byte, maxDecompressionSize int64) ([]byte, error) { +func (d *GzipDecoder) Decode(data []byte) ([]byte, error) { // Parallel Gzip is only faster for larger data chunks. According to the // project's documentation the trade-off size is at about 1MB, so we switch // to parallel Gzip if the data is larger and run the built-in version // otherwise. if len(data) > 1024*1024 { - return d.decodeBig(data, maxDecompressionSize) + return d.decodeBig(data) } - return d.decodeSmall(data, maxDecompressionSize) + return d.decodeSmall(data) } -func (d *GzipDecoder) decodeSmall(data []byte, maxDecompressionSize int64) ([]byte, error) { +func (d *GzipDecoder) decodeSmall(data []byte) ([]byte, error) { err := d.reader.Reset(bytes.NewBuffer(data)) if err != nil { return nil, err } d.buf.Reset() - n, err := io.CopyN(d.buf, d.reader, maxDecompressionSize) + n, err := io.CopyN(d.buf, d.reader, d.maxDecompressionSize) if err != nil && !errors.Is(err, io.EOF) { return nil, err - } else if n == maxDecompressionSize { - return nil, fmt.Errorf("size of decoded data exceeds allowed size %d", maxDecompressionSize) + } else if n == d.maxDecompressionSize { + return nil, fmt.Errorf("size of decoded data exceeds allowed size %d", d.maxDecompressionSize) } err = d.reader.Close() @@ -334,18 +395,18 @@ func (d *GzipDecoder) decodeSmall(data []byte, maxDecompressionSize int64) ([]by return d.buf.Bytes(), nil } -func (d *GzipDecoder) decodeBig(data []byte, maxDecompressionSize int64) ([]byte, error) { +func (d *GzipDecoder) decodeBig(data []byte) ([]byte, error) { err := d.preader.Reset(bytes.NewBuffer(data)) if err != nil { return nil, err } d.buf.Reset() - n, err := io.CopyN(d.buf, d.preader, maxDecompressionSize) + n, err := io.CopyN(d.buf, d.preader, d.maxDecompressionSize) if err != nil && !errors.Is(err, io.EOF) { return nil, err - } else if n == maxDecompressionSize { - return nil, fmt.Errorf("size of decoded data exceeds allowed size %d", maxDecompressionSize) + } else if n == d.maxDecompressionSize { + return nil, fmt.Errorf("size of decoded data exceeds allowed size %d", d.maxDecompressionSize) } err = d.preader.Close() @@ -356,18 +417,25 @@ func (d *GzipDecoder) decodeBig(data []byte, maxDecompressionSize int64) ([]byte } type ZlibDecoder struct { - buf *bytes.Buffer + buf *bytes.Buffer + maxDecompressionSize int64 } -func NewZlibDecoder() *ZlibDecoder { +func NewZlibDecoder(options ...DecodingOption) *ZlibDecoder { + cfg := decoderConfig{maxDecompressionSize: defaultMaxDecompressionSize} + for _, o := range options { + o(&cfg) + } + return &ZlibDecoder{ - buf: new(bytes.Buffer), + buf: new(bytes.Buffer), + maxDecompressionSize: cfg.maxDecompressionSize, } } func (*ZlibDecoder) SetEncoding(string) {} -func (d *ZlibDecoder) Decode(data []byte, maxDecompressionSize int64) ([]byte, error) { +func (d *ZlibDecoder) Decode(data []byte) ([]byte, error) { d.buf.Reset() b := bytes.NewBuffer(data) @@ -376,11 +444,11 @@ func (d *ZlibDecoder) Decode(data []byte, maxDecompressionSize int64) ([]byte, e return nil, err } - n, err := io.CopyN(d.buf, r, maxDecompressionSize) + n, err := io.CopyN(d.buf, r, d.maxDecompressionSize) if err != nil && !errors.Is(err, io.EOF) { return nil, err - } else if n == maxDecompressionSize { - return nil, fmt.Errorf("size of decoded data exceeds allowed size %d", maxDecompressionSize) + } else if n == d.maxDecompressionSize { + return nil, fmt.Errorf("size of decoded data exceeds allowed size %d", d.maxDecompressionSize) } err = r.Close() @@ -390,19 +458,38 @@ func (d *ZlibDecoder) Decode(data []byte, maxDecompressionSize int64) ([]byte, e return d.buf.Bytes(), nil } +type ZstdDecoder struct { + decoder *zstd.Decoder +} + +func NewZstdDecoder(options ...DecodingOption) (*ZstdDecoder, error) { + cfg := decoderConfig{maxDecompressionSize: defaultMaxDecompressionSize} + for _, o := range options { + o(&cfg) + } + + d, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(0), zstd.WithDecoderMaxWindow(uint64(cfg.maxDecompressionSize))) + return &ZstdDecoder{ + decoder: d, + }, err +} + +func (*ZstdDecoder) SetEncoding(string) {} + +func (d *ZstdDecoder) Decode(data []byte) ([]byte, error) { + return d.decoder.DecodeAll(data, nil) +} + // IdentityDecoder is a null decoder that returns the input. -type IdentityDecoder struct{} +type IdentityDecoder struct { +} -func NewIdentityDecoder() *IdentityDecoder { +func NewIdentityDecoder(_ ...DecodingOption) *IdentityDecoder { return &IdentityDecoder{} } func (*IdentityDecoder) SetEncoding(string) {} -func (*IdentityDecoder) Decode(data []byte, maxDecompressionSize int64) ([]byte, error) { - size := int64(len(data)) - if size > maxDecompressionSize { - return nil, fmt.Errorf("size of decoded data: %d exceeds allowed size %d", size, maxDecompressionSize) - } +func (*IdentityDecoder) Decode(data []byte) ([]byte, error) { return data, nil } diff --git a/internal/content_coding_test.go b/internal/content_coding_test.go index 11e051193efd6..b1059a3f088c0 100644 --- a/internal/content_coding_test.go +++ b/internal/content_coding_test.go @@ -15,12 +15,12 @@ const maxDecompressionSize = 1024 func TestGzipEncodeDecode(t *testing.T) { enc, err := NewGzipEncoder() require.NoError(t, err) - dec := NewGzipDecoder() + dec := NewGzipDecoder(WithMaxDecompressionSize(maxDecompressionSize)) payload, err := enc.Encode([]byte("howdy")) require.NoError(t, err) - actual, err := dec.Decode(payload, maxDecompressionSize) + actual, err := dec.Decode(payload) require.NoError(t, err) require.Equal(t, "howdy", string(actual)) @@ -29,12 +29,12 @@ func TestGzipEncodeDecode(t *testing.T) { func TestGzipReuse(t *testing.T) { enc, err := NewGzipEncoder() require.NoError(t, err) - dec := NewGzipDecoder() + dec := NewGzipDecoder(WithMaxDecompressionSize(maxDecompressionSize)) payload, err := enc.Encode([]byte("howdy")) require.NoError(t, err) - actual, err := dec.Decode(payload, maxDecompressionSize) + actual, err := dec.Decode(payload) require.NoError(t, err) require.Equal(t, "howdy", string(actual)) @@ -42,7 +42,7 @@ func TestGzipReuse(t *testing.T) { payload, err = enc.Encode([]byte("doody")) require.NoError(t, err) - actual, err = dec.Decode(payload, maxDecompressionSize) + actual, err = dec.Decode(payload) require.NoError(t, err) require.Equal(t, "doody", string(actual)) @@ -51,12 +51,12 @@ func TestGzipReuse(t *testing.T) { func TestZlibEncodeDecode(t *testing.T) { enc, err := NewZlibEncoder() require.NoError(t, err) - dec := NewZlibDecoder() + dec := NewZlibDecoder(WithMaxDecompressionSize(maxDecompressionSize)) payload, err := enc.Encode([]byte("howdy")) require.NoError(t, err) - actual, err := dec.Decode(payload, maxDecompressionSize) + actual, err := dec.Decode(payload) require.NoError(t, err) require.Equal(t, "howdy", string(actual)) @@ -65,24 +65,62 @@ func TestZlibEncodeDecode(t *testing.T) { func TestZlibEncodeDecodeWithTooLargeMessage(t *testing.T) { enc, err := NewZlibEncoder() require.NoError(t, err) - dec := NewZlibDecoder() + dec := NewZlibDecoder(WithMaxDecompressionSize(3)) payload, err := enc.Encode([]byte("howdy")) require.NoError(t, err) - _, err = dec.Decode(payload, 3) + _, err = dec.Decode(payload) require.ErrorContains(t, err, "size of decoded data exceeds allowed size 3") } +func TestZstdEncodeDecode(t *testing.T) { + enc, err := NewZstdEncoder() + require.NoError(t, err) + dec, err := NewZstdDecoder(WithMaxDecompressionSize(maxDecompressionSize)) + require.NoError(t, err) + + payload, err := enc.Encode([]byte("howdy")) + require.NoError(t, err) + + actual, err := dec.Decode(payload) + require.NoError(t, err) + + require.Equal(t, "howdy", string(actual)) +} + +func TestZstdReuse(t *testing.T) { + enc, err := NewZstdEncoder() + require.NoError(t, err) + dec, err := NewZstdDecoder(WithMaxDecompressionSize(maxDecompressionSize)) + require.NoError(t, err) + + payload, err := enc.Encode([]byte("howdy")) + require.NoError(t, err) + + actual, err := dec.Decode(payload) + require.NoError(t, err) + + require.Equal(t, "howdy", string(actual)) + + payload, err = enc.Encode([]byte("doody")) + require.NoError(t, err) + + actual, err = dec.Decode(payload) + require.NoError(t, err) + + require.Equal(t, "doody", string(actual)) +} + func TestIdentityEncodeDecode(t *testing.T) { + dec := NewIdentityDecoder(WithMaxDecompressionSize(maxDecompressionSize)) enc, err := NewIdentityEncoder() require.NoError(t, err) - dec := NewIdentityDecoder() payload, err := enc.Encode([]byte("howdy")) require.NoError(t, err) - actual, err := dec.Decode(payload, maxDecompressionSize) + actual, err := dec.Decode(payload) require.NoError(t, err) require.Equal(t, "howdy", string(actual)) @@ -138,6 +176,11 @@ func TestCompressionLevel(t *testing.T) { validLevels: []int{0, 1, 9}, errormsg: "invalid compression level", }, + { + algorithm: "zstd", + validLevels: []int{1, 3, 7, 11}, + errormsg: "invalid compression level", + }, { algorithm: "identity", errormsg: "does not support options", @@ -154,7 +197,7 @@ func TestCompressionLevel(t *testing.T) { // Check invalid level t.Run(tt.algorithm+" invalid", func(t *testing.T) { - _, err := NewContentEncoder(tt.algorithm, WithCompressionLevel(11)) + _, err := NewContentEncoder(tt.algorithm, WithCompressionLevel(12)) require.ErrorContains(t, err, tt.errormsg) }) @@ -188,10 +231,10 @@ func BenchmarkGzipEncode(b *testing.B) { enc, err := NewGzipEncoder() require.NoError(b, err) - dec := NewGzipDecoder() + dec := NewGzipDecoder(WithMaxDecompressionSize(dataLen)) payload, err := enc.Encode(data) require.NoError(b, err) - actual, err := dec.Decode(payload, dataLen) + actual, err := dec.Decode(payload) require.NoError(b, err) require.Equal(b, data, actual) @@ -207,15 +250,15 @@ func BenchmarkGzipDecode(b *testing.B) { enc, err := NewGzipEncoder() require.NoError(b, err) - dec := NewGzipDecoder() + dec := NewGzipDecoder(WithMaxDecompressionSize(dataLen)) payload, err := enc.Encode(data) require.NoError(b, err) - actual, err := dec.Decode(payload, dataLen) + actual, err := dec.Decode(payload) require.NoError(b, err) require.Equal(b, data, actual) for n := 0; n < b.N; n++ { - _, err = dec.Decode(payload, dataLen) + _, err = dec.Decode(payload) require.NoError(b, err) } } @@ -226,10 +269,10 @@ func BenchmarkGzipEncodeDecode(b *testing.B) { enc, err := NewGzipEncoder() require.NoError(b, err) - dec := NewGzipDecoder() + dec := NewGzipDecoder(WithMaxDecompressionSize(dataLen)) payload, err := enc.Encode(data) require.NoError(b, err) - actual, err := dec.Decode(payload, dataLen) + actual, err := dec.Decode(payload) require.NoError(b, err) require.Equal(b, data, actual) @@ -237,7 +280,7 @@ func BenchmarkGzipEncodeDecode(b *testing.B) { payload, err := enc.Encode(data) require.NoError(b, err) - _, err = dec.Decode(payload, dataLen) + _, err = dec.Decode(payload) require.NoError(b, err) } } @@ -248,10 +291,10 @@ func BenchmarkGzipEncodeBig(b *testing.B) { enc, err := NewGzipEncoder() require.NoError(b, err) - dec := NewGzipDecoder() + dec := NewGzipDecoder(WithMaxDecompressionSize(dataLen)) payload, err := enc.Encode(data) require.NoError(b, err) - actual, err := dec.Decode(payload, dataLen) + actual, err := dec.Decode(payload) require.NoError(b, err) require.Equal(b, data, actual) @@ -267,15 +310,15 @@ func BenchmarkGzipDecodeBig(b *testing.B) { enc, err := NewGzipEncoder() require.NoError(b, err) - dec := NewGzipDecoder() + dec := NewGzipDecoder(WithMaxDecompressionSize(dataLen)) payload, err := enc.Encode(data) require.NoError(b, err) - actual, err := dec.Decode(payload, dataLen) + actual, err := dec.Decode(payload) require.NoError(b, err) require.Equal(b, data, actual) for n := 0; n < b.N; n++ { - _, err = dec.Decode(payload, dataLen) + _, err = dec.Decode(payload) require.NoError(b, err) } } @@ -286,10 +329,136 @@ func BenchmarkGzipEncodeDecodeBig(b *testing.B) { enc, err := NewGzipEncoder() require.NoError(b, err) - dec := NewGzipDecoder() + dec := NewGzipDecoder(WithMaxDecompressionSize(dataLen)) + payload, err := enc.Encode(data) + require.NoError(b, err) + actual, err := dec.Decode(payload) + require.NoError(b, err) + require.Equal(b, data, actual) + + for n := 0; n < b.N; n++ { + payload, err := enc.Encode(data) + require.NoError(b, err) + + _, err = dec.Decode(payload) + require.NoError(b, err) + } +} + +func BenchmarkZstdEncode(b *testing.B) { + data := []byte(strings.Repeat("-howdy stranger-", 64)) + dataLen := int64(len(data)) + 1 + + enc, err := NewZstdEncoder() + require.NoError(b, err) + dec, err := NewZstdDecoder(WithMaxDecompressionSize(dataLen)) + require.NoError(b, err) + payload, err := enc.Encode(data) + require.NoError(b, err) + actual, err := dec.Decode(payload) + require.NoError(b, err) + require.Equal(b, data, actual) + + for n := 0; n < b.N; n++ { + _, err := enc.Encode(data) + require.NoError(b, err) + } +} + +func BenchmarkZstdDecode(b *testing.B) { + data := []byte(strings.Repeat("-howdy stranger-", 64)) + dataLen := int64(len(data)) + 1 + + enc, err := NewZstdEncoder() + require.NoError(b, err) + dec, err := NewZstdDecoder(WithMaxDecompressionSize(dataLen)) + require.NoError(b, err) + payload, err := enc.Encode(data) + require.NoError(b, err) + actual, err := dec.Decode(payload) + require.NoError(b, err) + require.Equal(b, data, actual) + + for n := 0; n < b.N; n++ { + _, err = dec.Decode(payload) + require.NoError(b, err) + } +} + +func BenchmarkZstdEncodeDecode(b *testing.B) { + data := []byte(strings.Repeat("-howdy stranger-", 64)) + dataLen := int64(len(data)) + 1 + + enc, err := NewZstdEncoder() + require.NoError(b, err) + dec, err := NewZstdDecoder(WithMaxDecompressionSize(dataLen)) + require.NoError(b, err) + payload, err := enc.Encode(data) + require.NoError(b, err) + actual, err := dec.Decode(payload) + require.NoError(b, err) + require.Equal(b, data, actual) + + for n := 0; n < b.N; n++ { + payload, err := enc.Encode(data) + require.NoError(b, err) + + _, err = dec.Decode(payload) + require.NoError(b, err) + } +} + +func BenchmarkZstdEncodeBig(b *testing.B) { + data := []byte(strings.Repeat("-howdy stranger-", 1024*1024)) + dataLen := int64(len(data)) + 1 + + enc, err := NewZstdEncoder() + require.NoError(b, err) + dec, err := NewZstdDecoder(WithMaxDecompressionSize(dataLen)) + require.NoError(b, err) + payload, err := enc.Encode(data) + require.NoError(b, err) + actual, err := dec.Decode(payload) + require.NoError(b, err) + require.Equal(b, data, actual) + + for n := 0; n < b.N; n++ { + _, err := enc.Encode(data) + require.NoError(b, err) + } +} + +func BenchmarkZstdDecodeBig(b *testing.B) { + data := []byte(strings.Repeat("-howdy stranger-", 1024*1024)) + dataLen := int64(len(data)) + 1 + + enc, err := NewZstdEncoder() + require.NoError(b, err) + dec, err := NewZstdDecoder(WithMaxDecompressionSize(dataLen)) + require.NoError(b, err) + payload, err := enc.Encode(data) + require.NoError(b, err) + actual, err := dec.Decode(payload) + require.NoError(b, err) + require.Equal(b, data, actual) + + for n := 0; n < b.N; n++ { + _, err = dec.Decode(payload) + require.NoError(b, err) + } +} + +func BenchmarkZstdEncodeDecodeBig(b *testing.B) { + data := []byte(strings.Repeat("-howdy stranger-", 1024*1024)) + dataLen := int64(len(data)) + 1 + + enc, err := NewZstdEncoder() + require.NoError(b, err) + dec, err := NewZstdDecoder(WithMaxDecompressionSize(dataLen)) + require.NoError(b, err) payload, err := enc.Encode(data) require.NoError(b, err) - actual, err := dec.Decode(payload, dataLen) + actual, err := dec.Decode(payload) require.NoError(b, err) require.Equal(b, data, actual) @@ -297,7 +466,7 @@ func BenchmarkGzipEncodeDecodeBig(b *testing.B) { payload, err := enc.Encode(data) require.NoError(b, err) - _, err = dec.Decode(payload, dataLen) + _, err = dec.Decode(payload) require.NoError(b, err) } } @@ -308,10 +477,10 @@ func BenchmarkZlibEncode(b *testing.B) { enc, err := NewZlibEncoder() require.NoError(b, err) - dec := NewZlibDecoder() + dec := NewZlibDecoder(WithMaxDecompressionSize(dataLen)) payload, err := enc.Encode(data) require.NoError(b, err) - actual, err := dec.Decode(payload, dataLen) + actual, err := dec.Decode(payload) require.NoError(b, err) require.Equal(b, data, actual) @@ -327,15 +496,15 @@ func BenchmarkZlibDecode(b *testing.B) { enc, err := NewZlibEncoder() require.NoError(b, err) - dec := NewZlibDecoder() + dec := NewZlibDecoder(WithMaxDecompressionSize(dataLen)) payload, err := enc.Encode(data) require.NoError(b, err) - actual, err := dec.Decode(payload, dataLen) + actual, err := dec.Decode(payload) require.NoError(b, err) require.Equal(b, data, actual) for n := 0; n < b.N; n++ { - _, err = dec.Decode(payload, dataLen) + _, err = dec.Decode(payload) require.NoError(b, err) } } @@ -346,10 +515,10 @@ func BenchmarkZlibEncodeDecode(b *testing.B) { enc, err := NewZlibEncoder() require.NoError(b, err) - dec := NewZlibDecoder() + dec := NewZlibDecoder(WithMaxDecompressionSize(dataLen)) payload, err := enc.Encode(data) require.NoError(b, err) - actual, err := dec.Decode(payload, dataLen) + actual, err := dec.Decode(payload) require.NoError(b, err) require.Equal(b, data, actual) @@ -357,7 +526,7 @@ func BenchmarkZlibEncodeDecode(b *testing.B) { payload, err := enc.Encode(data) require.NoError(b, err) - _, err = dec.Decode(payload, dataLen) + _, err = dec.Decode(payload) require.NoError(b, err) } } @@ -366,13 +535,13 @@ func BenchmarkIdentityEncodeDecode(b *testing.B) { data := []byte(strings.Repeat("-howdy stranger-", 64)) dataLen := int64(len(data)) + 1 + dec := NewIdentityDecoder(WithMaxDecompressionSize(dataLen)) enc, err := NewIdentityEncoder() require.NoError(b, err) - dec := NewIdentityDecoder() payload, err := enc.Encode(data) require.NoError(b, err) - actual, err := dec.Decode(payload, dataLen) + actual, err := dec.Decode(payload) require.NoError(b, err) require.Equal(b, data, actual) @@ -380,7 +549,7 @@ func BenchmarkIdentityEncodeDecode(b *testing.B) { payload, err := enc.Encode(data) require.NoError(b, err) - _, err = dec.Decode(payload, dataLen) + _, err = dec.Decode(payload) require.NoError(b, err) } } diff --git a/plugins/inputs/amqp_consumer/amqp_consumer.go b/plugins/inputs/amqp_consumer/amqp_consumer.go index 2c16959d7aae5..0c7b9eb4f8cc6 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer.go @@ -114,10 +114,6 @@ func (a *AMQPConsumer) Init() error { a.MaxUndeliveredMessages = 1000 } - if a.MaxDecompressionSize <= 0 { - a.MaxDecompressionSize = internal.DefaultMaxDecompressionSize - } - return nil } @@ -163,7 +159,11 @@ func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error { return err } - a.decoder, err = internal.NewContentDecoder(a.ContentEncoding) + var options []internal.DecodingOption + if a.MaxDecompressionSize > 0 { + options = append(options, internal.WithMaxDecompressionSize(int64(a.MaxDecompressionSize))) + } + a.decoder, err = internal.NewContentDecoder(a.ContentEncoding, options...) if err != nil { return err } @@ -417,7 +417,7 @@ func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delive } a.decoder.SetEncoding(d.ContentEncoding) - body, err := a.decoder.Decode(d.Body, int64(a.MaxDecompressionSize)) + body, err := a.decoder.Decode(d.Body) if err != nil { onError() return err diff --git a/plugins/inputs/amqp_consumer/amqp_consumer_test.go b/plugins/inputs/amqp_consumer/amqp_consumer_test.go index fbfee4cdcbacf..f8b73b4b1fae2 100644 --- a/plugins/inputs/amqp_consumer/amqp_consumer_test.go +++ b/plugins/inputs/amqp_consumer/amqp_consumer_test.go @@ -24,7 +24,6 @@ func TestAutoEncoding(t *testing.T) { a.deliveries = make(map[telegraf.TrackingID]amqp091.Delivery) a.parser = parser a.decoder, err = internal.NewContentDecoder("auto") - a.MaxDecompressionSize = internal.DefaultMaxDecompressionSize require.NoError(t, err) acc := &testutil.Accumulator{} diff --git a/plugins/inputs/cloud_pubsub/cloud_pubsub.go b/plugins/inputs/cloud_pubsub/cloud_pubsub.go index a2c522824ef39..ca763c1e38f6c 100644 --- a/plugins/inputs/cloud_pubsub/cloud_pubsub.go +++ b/plugins/inputs/cloud_pubsub/cloud_pubsub.go @@ -217,7 +217,7 @@ func (ps *PubSub) decompressData(data []byte) ([]byte, error) { } ps.decoderMutex.Lock() - data, err := ps.decoder.Decode(data, int64(ps.MaxDecompressionSize)) + data, err := ps.decoder.Decode(data) if err != nil { ps.decoderMutex.Unlock() return nil, err @@ -322,7 +322,11 @@ func (ps *PubSub) Init() error { ps.ContentEncoding = "identity" case "gzip": var err error - ps.decoder, err = internal.NewContentDecoder(ps.ContentEncoding) + var options []internal.DecodingOption + if ps.MaxDecompressionSize > 0 { + options = append(options, internal.WithMaxDecompressionSize(int64(ps.MaxDecompressionSize))) + } + ps.decoder, err = internal.NewContentDecoder(ps.ContentEncoding, options...) if err != nil { return err } @@ -330,10 +334,6 @@ func (ps *PubSub) Init() error { return fmt.Errorf("invalid value %q for content_encoding", ps.ContentEncoding) } - if ps.MaxDecompressionSize <= 0 { - ps.MaxDecompressionSize = internal.DefaultMaxDecompressionSize - } - return nil } diff --git a/plugins/inputs/cloud_pubsub/cloud_pubsub_test.go b/plugins/inputs/cloud_pubsub/cloud_pubsub_test.go index a386ac58a2c7c..a63fed21d3696 100644 --- a/plugins/inputs/cloud_pubsub/cloud_pubsub_test.go +++ b/plugins/inputs/cloud_pubsub/cloud_pubsub_test.go @@ -131,7 +131,6 @@ func TestRunGzipDecode(t *testing.T) { Subscription: subID, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, ContentEncoding: "gzip", - MaxDecompressionSize: internal.DefaultMaxDecompressionSize, decoder: decoder, } diff --git a/plugins/inputs/socket_listener/packet_listener.go b/plugins/inputs/socket_listener/packet_listener.go index a4abb484d7295..f7d4e2d0dde28 100644 --- a/plugins/inputs/socket_listener/packet_listener.go +++ b/plugins/inputs/socket_listener/packet_listener.go @@ -37,7 +37,7 @@ func (l *packetListener) listen(acc telegraf.Accumulator) { break } - body, err := l.decoder.Decode(buf[:n], l.MaxDecompressionSize) + body, err := l.decoder.Decode(buf[:n]) if err != nil { acc.AddError(fmt.Errorf("unable to decode incoming packet: %w", err)) } @@ -82,7 +82,11 @@ func (l *packetListener) setupUnixgram(u *url.URL, socketMode string) error { } // Create a decoder for the given encoding - decoder, err := internal.NewContentDecoder(l.Encoding) + var options []internal.DecodingOption + if l.MaxDecompressionSize > 0 { + options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize)) + } + decoder, err := internal.NewContentDecoder(l.Encoding, options...) if err != nil { return fmt.Errorf("creating decoder failed: %w", err) } @@ -126,7 +130,11 @@ func (l *packetListener) setupUDP(u *url.URL, ifname string, bufferSize int) err l.conn = conn // Create a decoder for the given encoding - decoder, err := internal.NewContentDecoder(l.Encoding) + var options []internal.DecodingOption + if l.MaxDecompressionSize > 0 { + options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize)) + } + decoder, err := internal.NewContentDecoder(l.Encoding, options...) if err != nil { return fmt.Errorf("creating decoder failed: %w", err) } @@ -143,7 +151,11 @@ func (l *packetListener) setupIP(u *url.URL) error { l.conn = conn // Create a decoder for the given encoding - decoder, err := internal.NewContentDecoder(l.Encoding) + var options []internal.DecodingOption + if l.MaxDecompressionSize > 0 { + options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize)) + } + decoder, err := internal.NewContentDecoder(l.Encoding, options...) if err != nil { return fmt.Errorf("creating decoder failed: %w", err) } diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index cc926d0369d69..357380de0ae20 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -15,7 +15,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" - "github.com/influxdata/telegraf/internal" tlsint "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -160,10 +159,6 @@ func (sl *SocketListener) Start(acc telegraf.Accumulator) error { return fmt.Errorf("parsing address failed: %w", err) } - if sl.MaxDecompressionSize <= 0 { - sl.MaxDecompressionSize = internal.DefaultMaxDecompressionSize - } - switch u.Scheme { case "tcp", "tcp4", "tcp6": ssl := &streamListener{ diff --git a/plugins/outputs/cloud_pubsub/cloud_pubsub_test.go b/plugins/outputs/cloud_pubsub/cloud_pubsub_test.go index 1b7fefb3813aa..61a22aab162c4 100644 --- a/plugins/outputs/cloud_pubsub/cloud_pubsub_test.go +++ b/plugins/outputs/cloud_pubsub/cloud_pubsub_test.go @@ -208,7 +208,7 @@ func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string if gzipEncoded { decoder, _ := internal.NewContentDecoder("gzip") var err error - data, err = decoder.Decode(data, internal.DefaultMaxDecompressionSize) + data, err = decoder.Decode(data) if err != nil { t.Fatalf("Unable to decode expected gzip encoded message: %s", err) } diff --git a/plugins/outputs/cloud_pubsub/topic_stubbed.go b/plugins/outputs/cloud_pubsub/topic_stubbed.go index 33faf8143e89d..1ed583dbb20be 100644 --- a/plugins/outputs/cloud_pubsub/topic_stubbed.go +++ b/plugins/outputs/cloud_pubsub/topic_stubbed.go @@ -49,9 +49,8 @@ type ( ReturnErr map[string]bool telegraf.Parser *testing.T - Base64Data bool - ContentEncoding string - MaxDecompressionSize int64 + Base64Data bool + ContentEncoding string stopped bool pLock sync.Mutex @@ -71,11 +70,10 @@ func getTestResources(tT *testing.T, settings pubsub.PublishSettings, testM []te metrics := make([]telegraf.Metric, 0, len(testM)) t := &stubTopic{ - T: tT, - ReturnErr: make(map[string]bool), - published: make(map[string]*pubsub.Message), - ContentEncoding: "identity", - MaxDecompressionSize: internal.DefaultMaxDecompressionSize, + T: tT, + ReturnErr: make(map[string]bool), + published: make(map[string]*pubsub.Message), + ContentEncoding: "identity", } for _, tm := range testM { @@ -196,7 +194,7 @@ func (t *stubTopic) parseIDs(msg *pubsub.Message) []string { require.NoError(t, err) decoder, _ := internal.NewContentDecoder(t.ContentEncoding) - d, err := decoder.Decode(msg.Data, t.MaxDecompressionSize) + d, err := decoder.Decode(msg.Data) if err != nil { t.Errorf("unable to decode message: %v", err) }