Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/{cel,httpjson}: make transport keep-alives configurable #34014

Merged
merged 2 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Convert UDP input to v2 input. {pull}33930[33930]
- Improve collection of risk information from Okta debug data. {issue}33677[33677] {pull}34030[34030]
- Adding filename details from zip to response for httpjson {issue}33952[33952] {pull}34044[34044]
- Allow user configuration of keep-alive behaviour for HTTPJSON and CEL inputs. {issue}33951[33951] {pull}34014[34014]

*Auditbeat*

Expand Down
20 changes: 20 additions & 0 deletions x-pack/filebeat/docs/inputs/input-cel.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,26 @@ This specifies SSL/TLS configuration. If the ssl section is missing, the host's
CAs are used for HTTPS connections. See <<configuration-ssl>> for more
information.

[float]
==== `resource.keep_alive.disable`

This specifies whether to disable keep-alives for HTTP end-points. Default: `true`.

[float]
==== `resource.keep_alive.max_idle_connections`

The maximum number of idle connections across all hosts. Zero means no limit. Default: `0`.

[float]
==== `resource.keep_alive.max_idle_connections_per_host`

The maximum idle connections to keep per-host. If zero, defaults to two. Default: `0`.

[float]
==== `resource.keep_alive.idle_connection_timeout`

The maximum amount of time an idle connection will remain idle before closing itself. Valid time units are `ns`, `us`, `ms`, `s`, `m`, `h`. Zero means no limit. Default: `0s`.

[float]
==== `resource.retry.max_attempts`

Expand Down
20 changes: 20 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,26 @@ filebeat.inputs:
request.proxy_url: http://proxy.example:8080
----

[float]
==== `request.keep_alive.disable`

This specifies whether to disable keep-alives for HTTP end-points. Default: `true`.

[float]
==== `request.keep_alive.max_idle_connections`

The maximum number of idle connections across all hosts. Zero means no limit. Default: `0`.

[float]
==== `request.keep_alive.max_idle_connections_per_host`

The maximum idle connections to keep per-host. If zero, defaults to two. Default: `0`.

[float]
==== `request.keep_alive.idle_connection_timeout`

The maximum amount of time an idle connection will remain idle before closing itself. Valid time units are `ns`, `us`, `ms`, `s`, `m`, `h`. Zero means no limit. Default: `0s`.

[float]
==== `request.retry.max_attempts`

Expand Down
33 changes: 33 additions & 0 deletions x-pack/filebeat/input/cel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,46 @@ func (c rateLimitConfig) Validate() error {
return nil
}

type keepAlive struct {
Disable *bool `config:"disable"`
MaxIdleConns int `config:"max_idle_connections"`
MaxIdleConnsPerHost int `config:"max_idle_connections_per_host"` // If zero, http.DefaultMaxIdleConnsPerHost is the value used by http.Transport.
IdleConnTimeout time.Duration `config:"idle_connection_timeout"`
}

func (c keepAlive) Validate() error {
if c.Disable == nil || *c.Disable {
return nil
}
if c.MaxIdleConns < 0 {
return errors.New("max_idle_connections must not be negative")
}
if c.MaxIdleConnsPerHost < 0 {
return errors.New("max_idle_connections_per_host must not be negative")
}
if c.IdleConnTimeout < 0 {
return errors.New("idle_connection_timeout must not be negative")
}
return nil
}

func (c keepAlive) settings() httpcommon.WithKeepaliveSettings {
return httpcommon.WithKeepaliveSettings{
Disable: c.Disable == nil || *c.Disable,
MaxIdleConns: c.MaxIdleConns,
MaxIdleConnsPerHost: c.MaxIdleConnsPerHost,
IdleConnTimeout: c.IdleConnTimeout,
}
}

type ResourceConfig struct {
URL *urlConfig `config:"url" validate:"required"`
Retry retryConfig `config:"retry"`
RedirectForwardHeaders bool `config:"redirect.forward_headers"`
RedirectHeadersBanList []string `config:"redirect.headers_ban_list"`
RedirectMaxRedirects int `config:"redirect.max_redirects"`
RateLimit *rateLimitConfig `config:"rate_limit"`
KeepAlive keepAlive `config:"keep_alive"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`

Expand Down
58 changes: 58 additions & 0 deletions x-pack/filebeat/input/cel/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"golang.org/x/oauth2/google"

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

func TestProviderCanonical(t *testing.T) {
Expand Down Expand Up @@ -466,3 +467,60 @@ func TestConfigOauth2Validation(t *testing.T) {
})
}
}

var keepAliveTests = []struct {
name string
input map[string]interface{}
want httpcommon.WithKeepaliveSettings
wantErr error
}{
{
name: "keep_alive_none", // Default to the old behaviour of true.
input: map[string]interface{}{},
want: httpcommon.WithKeepaliveSettings{Disable: true},
},
{
name: "keep_alive_true",
input: map[string]interface{}{
"resource.keep_alive.disable": true,
},
want: httpcommon.WithKeepaliveSettings{Disable: true},
},
{
name: "keep_alive_false",
input: map[string]interface{}{
"resource.keep_alive.disable": false,
},
want: httpcommon.WithKeepaliveSettings{Disable: false},
},
{
name: "keep_alive_invalid_max",
input: map[string]interface{}{
"resource.keep_alive.disable": false,
"resource.keep_alive.max_idle_connections": -1,
},
wantErr: errors.New("max_idle_connections must not be negative accessing 'resource.keep_alive'"),
},
}

func TestKeepAliveSetting(t *testing.T) {
for _, test := range keepAliveTests {
t.Run(test.name, func(t *testing.T) {
test.input["resource.url"] = "localhost"
cfg := conf.MustNewConfigFrom(test.input)
conf := defaultConfig()
conf.Program = "{}" // Provide an empty program to avoid validation error from that.
err := cfg.Unpack(&conf)
if fmt.Sprint(err) != fmt.Sprint(test.wantErr) {
t.Errorf("unexpected error return from Unpack: got: %v want: %v", err, test.wantErr)
}
if err != nil {
return
}
got := conf.Resource.KeepAlive.settings()
if got != test.want {
t.Errorf("unexpected setting for %s: got: %#v\nwant:%#v", test.name, got, test.want)
}
})
}
}
8 changes: 4 additions & 4 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client,
if !wantClient(cfg) {
return nil, nil
}
c, err := cfg.Resource.Transport.Client(clientOptions(cfg.Resource.URL.URL)...)
c, err := cfg.Resource.Transport.Client(clientOptions(cfg.Resource.URL.URL, cfg.Resource.KeepAlive.settings())...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -696,7 +696,7 @@ func wantClient(cfg config) bool {

// clientOption returns constructed client configuration options, including
// setting up http+unix and http+npipe transports if requested.
func clientOptions(u *url.URL) []httpcommon.TransportOption {
func clientOptions(u *url.URL, keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption {
scheme, trans, ok := strings.Cut(u.Scheme, "+")
var dialer transport.Dialer
switch {
Expand All @@ -705,7 +705,7 @@ func clientOptions(u *url.URL) []httpcommon.TransportOption {
case !ok:
return []httpcommon.TransportOption{
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: true},
keepalive,
}

// We set the host for the unix socket and Windows named
Expand All @@ -723,7 +723,7 @@ func clientOptions(u *url.URL) []httpcommon.TransportOption {
u.Scheme = scheme
return []httpcommon.TransportOption{
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: true},
keepalive,
httpcommon.WithBaseDialer(dialer),
}
}
Expand Down
33 changes: 33 additions & 0 deletions x-pack/filebeat/input/httpjson/config_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,38 @@ func (c rateLimitConfig) Validate() error {
return nil
}

type keepAlive struct {
Disable *bool `config:"disable"`
MaxIdleConns int `config:"max_idle_connections"`
MaxIdleConnsPerHost int `config:"max_idle_connections_per_host"` // If zero, http.DefaultMaxIdleConnsPerHost is the value used by http.Transport.
IdleConnTimeout time.Duration `config:"idle_connection_timeout"`
}

func (c keepAlive) Validate() error {
if c.Disable == nil || *c.Disable {
return nil
}
if c.MaxIdleConns < 0 {
return errors.New("max_idle_connections must not be negative")
}
if c.MaxIdleConnsPerHost < 0 {
return errors.New("max_idle_connections_per_host must not be negative")
}
if c.IdleConnTimeout < 0 {
return errors.New("idle_connection_timeout must not be negative")
}
return nil
}

func (c keepAlive) settings() httpcommon.WithKeepaliveSettings {
return httpcommon.WithKeepaliveSettings{
Disable: c.Disable == nil || *c.Disable,
MaxIdleConns: c.MaxIdleConns,
MaxIdleConnsPerHost: c.MaxIdleConnsPerHost,
IdleConnTimeout: c.IdleConnTimeout,
}
}

type urlConfig struct {
*url.URL
}
Expand All @@ -99,6 +131,7 @@ type requestConfig struct {
RedirectHeadersBanList []string `config:"redirect.headers_ban_list"`
RedirectMaxRedirects int `config:"redirect.max_redirects"`
RateLimit *rateLimitConfig `config:"rate_limit"`
KeepAlive keepAlive `config:"keep_alive"`
Transforms transformsConfig `config:"transforms"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`
Expand Down
58 changes: 58 additions & 0 deletions x-pack/filebeat/input/httpjson/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package httpjson
import (
"context"
"errors"
"fmt"
"os"
"testing"

Expand All @@ -15,6 +16,7 @@ import (
"golang.org/x/oauth2/google"

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

func TestProviderCanonical(t *testing.T) {
Expand Down Expand Up @@ -485,3 +487,59 @@ func TestCursorEntryConfig(t *testing.T) {
assert.True(t, conf["entry3"].mustIgnoreEmptyValue())
assert.True(t, conf["entry4"].mustIgnoreEmptyValue())
}

var keepAliveTests = []struct {
name string
input map[string]interface{}
want httpcommon.WithKeepaliveSettings
wantErr error
}{
{
name: "keep_alive_none", // Default to the old behaviour of true.
input: map[string]interface{}{},
want: httpcommon.WithKeepaliveSettings{Disable: true},
},
{
name: "keep_alive_true",
input: map[string]interface{}{
"request.keep_alive.disable": true,
},
want: httpcommon.WithKeepaliveSettings{Disable: true},
},
{
name: "keep_alive_false",
input: map[string]interface{}{
"request.keep_alive.disable": false,
},
want: httpcommon.WithKeepaliveSettings{Disable: false},
},
{
name: "keep_alive_invalid_max",
input: map[string]interface{}{
"request.keep_alive.disable": false,
"request.keep_alive.max_idle_connections": -1,
},
wantErr: errors.New("max_idle_connections must not be negative accessing 'request.keep_alive'"),
},
}

func TestKeepAliveSetting(t *testing.T) {
for _, test := range keepAliveTests {
t.Run(test.name, func(t *testing.T) {
test.input["request.url"] = "localhost"
cfg := conf.MustNewConfigFrom(test.input)
conf := defaultConfig()
err := cfg.Unpack(&conf)
if fmt.Sprint(err) != fmt.Sprint(test.wantErr) {
t.Errorf("unexpected error return from Unpack: got: %q want: %q", err, test.wantErr)
}
if err != nil {
return
}
got := conf.Request.KeepAlive.settings()
if got != test.want {
t.Errorf("unexpected setting for %s: got: %#v\nwant:%#v", test.name, got, test.want)
}
})
}
}
8 changes: 4 additions & 4 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func run(

func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpClient, error) {
// Make retryable HTTP client
netHTTPClient, err := config.Request.Transport.Client(clientOptions(config.Request.URL.URL)...)
netHTTPClient, err := config.Request.Transport.Client(clientOptions(config.Request.URL.URL, config.Request.KeepAlive.settings())...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -201,7 +201,7 @@ func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpC

// clientOption returns constructed client configuration options, including
// setting up http+unix and http+npipe transports if requested.
func clientOptions(u *url.URL) []httpcommon.TransportOption {
func clientOptions(u *url.URL, keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption {
scheme, trans, ok := strings.Cut(u.Scheme, "+")
var dialer transport.Dialer
switch {
Expand All @@ -210,7 +210,7 @@ func clientOptions(u *url.URL) []httpcommon.TransportOption {
case !ok:
return []httpcommon.TransportOption{
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: true},
keepalive,
}

// We set the host for the unix socket and Windows named
Expand All @@ -228,7 +228,7 @@ func clientOptions(u *url.URL) []httpcommon.TransportOption {
u.Scheme = scheme
return []httpcommon.TransportOption{
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: true},
keepalive,
httpcommon.WithBaseDialer(dialer),
}
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/request_chain_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (

func newChainHTTPClient(ctx context.Context, authCfg *authConfig, requestCfg *requestConfig, log *logp.Logger, p ...*Policy) (*httpClient, error) {
// Make retryable HTTP client
netHTTPClient, err := requestCfg.Transport.Client(clientOptions(requestCfg.URL.URL)...)
netHTTPClient, err := requestCfg.Transport.Client(clientOptions(requestCfg.URL.URL, requestCfg.KeepAlive.settings())...)
if err != nil {
return nil, err
}
Expand Down