Skip to content

Commit

Permalink
[8.6](backport #34014) x-pack/filebeat/input/{cel,httpjson}: make tra…
Browse files Browse the repository at this point in the history
…nsport keep-alives configurable (#34299)

* x-pack/filebeat/input/{cel,httpjson}: make transport keep-alives configurable (#34014)

The behaviour defaults to the old behaviour, but exposes the full set of
httpcommon.WithKeepaliveSettings configuration options.

(cherry picked from commit 3cd8d81)

# Conflicts:
#	x-pack/filebeat/input/cel/input.go
#	x-pack/filebeat/input/httpjson/input.go
#	x-pack/filebeat/input/httpjson/request_chain_helper.go

* resolve conflicts

Co-authored-by: Dan Kortschak <90160302+efd6@users.noreply.github.com>
Co-authored-by: Dan Kortschak <dan.kortschak@elastic.co>
  • Loading branch information
3 people authored Jan 19, 2023
1 parent 6400b56 commit 41d17fd
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- httpjson input: Add request tracing logger. {issue}32402[32402] {pull}32412[32412]
- Add cloudflare R2 to provider list in AWS S3 input. {pull}32620[32620]
- Add support for single string containing multiple relation-types in getRFC5988Link. {pull}32811[32811]
- Allow user configuration of keep-alive behaviour for HTTPJSON and CEL inputs. {issue}33951[33951] {pull}34014[34014]

*Auditbeat*

Expand Down
20 changes: 20 additions & 0 deletions x-pack/filebeat/docs/inputs/input-cel.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,26 @@ This specifies SSL/TLS configuration. If the ssl section is missing, the host's
CAs are used for HTTPS connections. See <<configuration-ssl>> for more
information.

[float]
==== `resource.keep_alive.disable`

This specifies whether to disable keep-alives for HTTP end-points. Default: `true`.

[float]
==== `resource.keep_alive.max_idle_connections`

The maximum number of idle connections across all hosts. Zero means no limit. Default: `0`.

[float]
==== `resource.keep_alive.max_idle_connections_per_host`

The maximum idle connections to keep per-host. If zero, defaults to two. Default: `0`.

[float]
==== `resource.keep_alive.idle_connection_timeout`

The maximum amount of time an idle connection will remain idle before closing itself. Valid time units are `ns`, `us`, `ms`, `s`, `m`, `h`. Zero means no limit. Default: `0s`.

[float]
==== `resource.retry.max_attempts`

Expand Down
20 changes: 20 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,26 @@ filebeat.inputs:
request.proxy_url: http://proxy.example:8080
----

[float]
==== `request.keep_alive.disable`

This specifies whether to disable keep-alives for HTTP end-points. Default: `true`.

[float]
==== `request.keep_alive.max_idle_connections`

The maximum number of idle connections across all hosts. Zero means no limit. Default: `0`.

[float]
==== `request.keep_alive.max_idle_connections_per_host`

The maximum idle connections to keep per-host. If zero, defaults to two. Default: `0`.

[float]
==== `request.keep_alive.idle_connection_timeout`

The maximum amount of time an idle connection will remain idle before closing itself. Valid time units are `ns`, `us`, `ms`, `s`, `m`, `h`. Zero means no limit. Default: `0s`.

[float]
==== `request.retry.max_attempts`

Expand Down
33 changes: 33 additions & 0 deletions x-pack/filebeat/input/cel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,46 @@ func (c rateLimitConfig) Validate() error {
return nil
}

type keepAlive struct {
Disable *bool `config:"disable"`
MaxIdleConns int `config:"max_idle_connections"`
MaxIdleConnsPerHost int `config:"max_idle_connections_per_host"` // If zero, http.DefaultMaxIdleConnsPerHost is the value used by http.Transport.
IdleConnTimeout time.Duration `config:"idle_connection_timeout"`
}

func (c keepAlive) Validate() error {
if c.Disable == nil || *c.Disable {
return nil
}
if c.MaxIdleConns < 0 {
return errors.New("max_idle_connections must not be negative")
}
if c.MaxIdleConnsPerHost < 0 {
return errors.New("max_idle_connections_per_host must not be negative")
}
if c.IdleConnTimeout < 0 {
return errors.New("idle_connection_timeout must not be negative")
}
return nil
}

func (c keepAlive) settings() httpcommon.WithKeepaliveSettings {
return httpcommon.WithKeepaliveSettings{
Disable: c.Disable == nil || *c.Disable,
MaxIdleConns: c.MaxIdleConns,
MaxIdleConnsPerHost: c.MaxIdleConnsPerHost,
IdleConnTimeout: c.IdleConnTimeout,
}
}

type ResourceConfig struct {
URL *urlConfig `config:"url" validate:"required"`
Retry retryConfig `config:"retry"`
RedirectForwardHeaders bool `config:"redirect.forward_headers"`
RedirectHeadersBanList []string `config:"redirect.headers_ban_list"`
RedirectMaxRedirects int `config:"redirect.max_redirects"`
RateLimit *rateLimitConfig `config:"rate_limit"`
KeepAlive keepAlive `config:"keep_alive"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`

Expand Down
58 changes: 58 additions & 0 deletions x-pack/filebeat/input/cel/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"golang.org/x/oauth2/google"

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

func TestProviderCanonical(t *testing.T) {
Expand Down Expand Up @@ -466,3 +467,60 @@ func TestConfigOauth2Validation(t *testing.T) {
})
}
}

var keepAliveTests = []struct {
name string
input map[string]interface{}
want httpcommon.WithKeepaliveSettings
wantErr error
}{
{
name: "keep_alive_none", // Default to the old behaviour of true.
input: map[string]interface{}{},
want: httpcommon.WithKeepaliveSettings{Disable: true},
},
{
name: "keep_alive_true",
input: map[string]interface{}{
"resource.keep_alive.disable": true,
},
want: httpcommon.WithKeepaliveSettings{Disable: true},
},
{
name: "keep_alive_false",
input: map[string]interface{}{
"resource.keep_alive.disable": false,
},
want: httpcommon.WithKeepaliveSettings{Disable: false},
},
{
name: "keep_alive_invalid_max",
input: map[string]interface{}{
"resource.keep_alive.disable": false,
"resource.keep_alive.max_idle_connections": -1,
},
wantErr: errors.New("max_idle_connections must not be negative accessing 'resource.keep_alive'"),
},
}

func TestKeepAliveSetting(t *testing.T) {
for _, test := range keepAliveTests {
t.Run(test.name, func(t *testing.T) {
test.input["resource.url"] = "localhost"
cfg := conf.MustNewConfigFrom(test.input)
conf := defaultConfig()
conf.Program = "{}" // Provide an empty program to avoid validation error from that.
err := cfg.Unpack(&conf)
if fmt.Sprint(err) != fmt.Sprint(test.wantErr) {
t.Errorf("unexpected error return from Unpack: got: %v want: %v", err, test.wantErr)
}
if err != nil {
return
}
got := conf.Resource.KeepAlive.settings()
if got != test.want {
t.Errorf("unexpected setting for %s: got: %#v\nwant:%#v", test.name, got, test.want)
}
})
}
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client,
}
c, err := cfg.Resource.Transport.Client(
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: true},
cfg.Resource.KeepAlive.settings(),
)
if err != nil {
return nil, err
Expand Down
33 changes: 33 additions & 0 deletions x-pack/filebeat/input/httpjson/config_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,38 @@ func (c rateLimitConfig) Validate() error {
return nil
}

type keepAlive struct {
Disable *bool `config:"disable"`
MaxIdleConns int `config:"max_idle_connections"`
MaxIdleConnsPerHost int `config:"max_idle_connections_per_host"` // If zero, http.DefaultMaxIdleConnsPerHost is the value used by http.Transport.
IdleConnTimeout time.Duration `config:"idle_connection_timeout"`
}

func (c keepAlive) Validate() error {
if c.Disable == nil || *c.Disable {
return nil
}
if c.MaxIdleConns < 0 {
return errors.New("max_idle_connections must not be negative")
}
if c.MaxIdleConnsPerHost < 0 {
return errors.New("max_idle_connections_per_host must not be negative")
}
if c.IdleConnTimeout < 0 {
return errors.New("idle_connection_timeout must not be negative")
}
return nil
}

func (c keepAlive) settings() httpcommon.WithKeepaliveSettings {
return httpcommon.WithKeepaliveSettings{
Disable: c.Disable == nil || *c.Disable,
MaxIdleConns: c.MaxIdleConns,
MaxIdleConnsPerHost: c.MaxIdleConnsPerHost,
IdleConnTimeout: c.IdleConnTimeout,
}
}

type urlConfig struct {
*url.URL
}
Expand All @@ -99,6 +131,7 @@ type requestConfig struct {
RedirectHeadersBanList []string `config:"redirect.headers_ban_list"`
RedirectMaxRedirects int `config:"redirect.max_redirects"`
RateLimit *rateLimitConfig `config:"rate_limit"`
KeepAlive keepAlive `config:"keep_alive"`
Transforms transformsConfig `config:"transforms"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`
Expand Down
58 changes: 58 additions & 0 deletions x-pack/filebeat/input/httpjson/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package httpjson
import (
"context"
"errors"
"fmt"
"os"
"testing"

Expand All @@ -15,6 +16,7 @@ import (
"golang.org/x/oauth2/google"

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

func TestProviderCanonical(t *testing.T) {
Expand Down Expand Up @@ -485,3 +487,59 @@ func TestCursorEntryConfig(t *testing.T) {
assert.True(t, conf["entry3"].mustIgnoreEmptyValue())
assert.True(t, conf["entry4"].mustIgnoreEmptyValue())
}

var keepAliveTests = []struct {
name string
input map[string]interface{}
want httpcommon.WithKeepaliveSettings
wantErr error
}{
{
name: "keep_alive_none", // Default to the old behaviour of true.
input: map[string]interface{}{},
want: httpcommon.WithKeepaliveSettings{Disable: true},
},
{
name: "keep_alive_true",
input: map[string]interface{}{
"request.keep_alive.disable": true,
},
want: httpcommon.WithKeepaliveSettings{Disable: true},
},
{
name: "keep_alive_false",
input: map[string]interface{}{
"request.keep_alive.disable": false,
},
want: httpcommon.WithKeepaliveSettings{Disable: false},
},
{
name: "keep_alive_invalid_max",
input: map[string]interface{}{
"request.keep_alive.disable": false,
"request.keep_alive.max_idle_connections": -1,
},
wantErr: errors.New("max_idle_connections must not be negative accessing 'request.keep_alive'"),
},
}

func TestKeepAliveSetting(t *testing.T) {
for _, test := range keepAliveTests {
t.Run(test.name, func(t *testing.T) {
test.input["request.url"] = "localhost"
cfg := conf.MustNewConfigFrom(test.input)
conf := defaultConfig()
err := cfg.Unpack(&conf)
if fmt.Sprint(err) != fmt.Sprint(test.wantErr) {
t.Errorf("unexpected error return from Unpack: got: %q want: %q", err, test.wantErr)
}
if err != nil {
return
}
got := conf.Request.KeepAlive.settings()
if got != test.want {
t.Errorf("unexpected setting for %s: got: %#v\nwant:%#v", test.name, got, test.want)
}
})
}
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpC
// Make retryable HTTP client
netHTTPClient, err := config.Request.Transport.Client(
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: true},
config.Request.KeepAlive.settings(),
)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/request_chain_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newChainHTTPClient(ctx context.Context, authCfg *authConfig, requestCfg *re
// Make retryable HTTP client
netHTTPClient, err := requestCfg.Transport.Client(
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: true},
requestCfg.KeepAlive.settings(),
)
if err != nil {
return nil, err
Expand Down

0 comments on commit 41d17fd

Please sign in to comment.