From d21943267341e2f9e9d5db7d35e268dd16cc0588 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Tue, 19 Sep 2023 12:00:46 -0500 Subject: [PATCH] add support for queue settings under outputs - add support for `idle_connection_timeout` for ES output - add support for queue settings under output Closes #35615 --- CHANGELOG.next.asciidoc | 2 + NOTICE.txt | 4 +- auditbeat/auditbeat.reference.yml | 5 ++ filebeat/filebeat.reference.yml | 5 ++ go.mod | 2 +- go.sum | 4 +- heartbeat/heartbeat.reference.yml | 5 ++ .../output-elasticsearch.reference.yml.tmpl | 5 ++ libbeat/cmd/instance/beat.go | 18 +++++ libbeat/cmd/instance/beat_test.go | 73 +++++++++++++++++++ libbeat/docs/queueconfig.asciidoc | 9 ++- libbeat/outputs/elasticsearch/client.go | 1 + .../elasticsearch/docs/elasticsearch.asciidoc | 4 + .../outputs/elasticsearch/elasticsearch.go | 1 + metricbeat/metricbeat.reference.yml | 5 ++ packetbeat/packetbeat.reference.yml | 5 ++ winlogbeat/winlogbeat.reference.yml | 5 ++ x-pack/auditbeat/auditbeat.reference.yml | 5 ++ x-pack/filebeat/filebeat.reference.yml | 5 ++ .../functionbeat/functionbeat.reference.yml | 5 ++ x-pack/heartbeat/heartbeat.reference.yml | 5 ++ x-pack/metricbeat/metricbeat.reference.yml | 5 ++ x-pack/osquerybeat/osquerybeat.reference.yml | 5 ++ x-pack/packetbeat/packetbeat.reference.yml | 5 ++ x-pack/winlogbeat/winlogbeat.reference.yml | 5 ++ 25 files changed, 185 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a8ed9f032c5..7242aac3865 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -154,6 +154,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support for AWS external IDs. {issue}36321[36321] {pull}36322[36322] - [Enhanncement for host.ip and host.mac] Disabling netinfo.enabled option of add-host-metadata processor {pull}36506[36506] Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will disable the netinfo.enabled option of add_host_metadata processor +- allow `queue` configuration settings to be set under the output. {issue}35615[35615] {pull}99999[99999] +- elasticsearch output now supports `idle_connection_timeout`. {issue}35615[35615] {pull}99999[99999] *Auditbeat* diff --git a/NOTICE.txt b/NOTICE.txt index bc45b225aef..d323e10c2ee 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -12712,11 +12712,11 @@ SOFTWARE -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-libs -Version: v0.3.15-0.20230913212237-dbdaf18c898b +Version: v0.4.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.3.15-0.20230913212237-dbdaf18c898b/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.4.0/LICENSE: Apache License Version 2.0, January 2004 diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index 892d4e1b2d0..4d024ea405b 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -521,6 +521,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 2de0bc61f56..766eff10eb9 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -1617,6 +1617,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/go.mod b/go.mod index 4cf3e5c020e..0b39cccc6c7 100644 --- a/go.mod +++ b/go.mod @@ -202,7 +202,7 @@ require ( github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5 github.com/elastic/bayeux v1.0.5 github.com/elastic/elastic-agent-autodiscover v0.6.2 - github.com/elastic/elastic-agent-libs v0.3.15-0.20230913212237-dbdaf18c898b + github.com/elastic/elastic-agent-libs v0.4.0 github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3 github.com/elastic/elastic-agent-system-metrics v0.6.1 github.com/elastic/go-elasticsearch/v8 v8.10.0 diff --git a/go.sum b/go.sum index 371aa1efbb2..2b2c197fa5c 100644 --- a/go.sum +++ b/go.sum @@ -653,8 +653,8 @@ github.com/elastic/elastic-agent-autodiscover v0.6.2 h1:7P3cbMBWXjbzA80rxitQjc+P github.com/elastic/elastic-agent-autodiscover v0.6.2/go.mod h1:yXYKFAG+Py+TcE4CCR8EAbJiYb+6Dz9sCDoWgOveqtU= github.com/elastic/elastic-agent-client/v7 v7.4.0 h1:h75oTkkvIjgiKVm61NpvTZP4cy6QbQ3zrIpXKGigyjo= github.com/elastic/elastic-agent-client/v7 v7.4.0/go.mod h1:9/amG2K2y2oqx39zURcc+hnqcX+nyJ1cZrLgzsgo5c0= -github.com/elastic/elastic-agent-libs v0.3.15-0.20230913212237-dbdaf18c898b h1:a2iuOokwld+D7VhyFymVtsPoqxZ8fkkOCOOjeYU9CDM= -github.com/elastic/elastic-agent-libs v0.3.15-0.20230913212237-dbdaf18c898b/go.mod h1:mpSfrigixx8x+uMxWKl4LtdlrKIhZbA4yT2eIeIazUQ= +github.com/elastic/elastic-agent-libs v0.4.0 h1:P0b+xcvYK+dEwldvRXObO1dj3rjjR5qEXAl6TwRCAy0= +github.com/elastic/elastic-agent-libs v0.4.0/go.mod h1:mpSfrigixx8x+uMxWKl4LtdlrKIhZbA4yT2eIeIazUQ= github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3 h1:sb+25XJn/JcC9/VL8HX4r4QXSUq4uTNzGS2kxOE7u1U= github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3/go.mod h1:rWarFM7qYxJKsi9WcV6ONcFjH/NA3niDNpTxO+8/GVI= github.com/elastic/elastic-agent-system-metrics v0.6.1 h1:LCN1lvQTkdUuU/rKlpKyVMDU/G/I8/iZWCaW6K+mo4o= diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index b8281727026..a76b98f2406 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -613,6 +613,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl b/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl index edc40e632d7..346865a4445 100644 --- a/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl +++ b/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl @@ -80,6 +80,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 8bd493e1777..5abde84c79a 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -773,6 +773,10 @@ func (b *Beat) configure(settings Settings) error { return fmt.Errorf("error unpacking config data: %w", err) } + if err := mergeOutputQueueSettings(&b.Config); err != nil { + return fmt.Errorf("could not merge output queue settings: %w", err) + } + if err := features.UpdateFromConfig(b.RawConfig); err != nil { return fmt.Errorf("could not parse features: %w", err) } @@ -1466,3 +1470,17 @@ func sanitizeIPs(ips []string) []string { } return validIPs } + +func mergeOutputQueueSettings(bc *beatConfig) error { + if bc.Output.IsSet() && bc.Output.Config().Enabled() { + pc := pipeline.Config{} + err := bc.Output.Config().Unpack(&pc) + if err != nil { + return fmt.Errorf("error unpacking output queue settings: %w", err) + } + if pc.Queue.IsSet() { + bc.Pipeline.Queue = pc.Queue + } + } + return nil +} diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go index b6834d89b5d..bd7bfba39ee 100644 --- a/libbeat/cmd/instance/beat_test.go +++ b/libbeat/cmd/instance/beat_test.go @@ -27,7 +27,9 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/go-ucfg/yaml" "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" @@ -269,3 +271,74 @@ func (r *outputReloaderMock) Reload( r.cfg = cfg return nil } + +func TestMergeOutputQueueSettings(t *testing.T) { + tests := map[string]struct { + input []byte + memEvents int + }{ + "blank": {input: []byte(""), + memEvents: 4096}, + "defaults": {input: []byte(` +name: mockbeat +output: + elasticsearch: + hosts: + - "localhost:9200" +`), + memEvents: 4096}, + "topLevelQueue": {input: []byte(` +name: mockbeat +queue: + mem: + events: 8096 +output: + elasticsearch: + hosts: + - "localhost:9200" +`), + memEvents: 8096}, + "outputLevelQueue": {input: []byte(` +name: mockbeat +output: + elasticsearch: + hosts: + - "localhost:9200" + queue: + mem: + events: 8096 +`), + memEvents: 8096}, + "topAndOutputLevelQueue": {input: []byte(` +name: mockbeat +queue: + mem: + events: 2048 +output: + elasticsearch: + hosts: + - "localhost:9200" + queue: + mem: + events: 8096 +`), + memEvents: 8096}, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + cfg, err := yaml.NewConfig(tc.input) + require.NoError(t, err) + + config := beatConfig{} + err = cfg.Unpack(&config) + require.NoError(t, err) + + err = mergeOutputQueueSettings(&config) + require.NoError(t, err) + + ms, err := memqueue.SettingsForUserConfig(config.Pipeline.Queue.Config()) + require.NoError(t, err) + require.Equalf(t, tc.memEvents, ms.Events, "config was: %v", config.Pipeline.Queue.Config()) + }) + } +} diff --git a/libbeat/docs/queueconfig.asciidoc b/libbeat/docs/queueconfig.asciidoc index fb930831dac..a9a50288df7 100644 --- a/libbeat/docs/queueconfig.asciidoc +++ b/libbeat/docs/queueconfig.asciidoc @@ -9,9 +9,12 @@ queue is responsible for buffering and combining events into batches that can be consumed by the outputs. The outputs will use bulk operations to send a batch of events in one transaction. -You can configure the type and behavior of the internal queue by setting -options in the `queue` section of the +{beatname_lc}.yml+ config file. Only one -queue type can be configured. +You can configure the type and behavior of the internal queue by +setting options in the `queue` section of the +{beatname_lc}.yml+ +config file or by setting options in the `queue` section of the +output. Only one queue type can be configured. If both the top level +queue section and the output section are specified the output section +takes precedence. This sample configuration sets the memory queue to buffer up to 4096 events: diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index c80e95ebc90..b485807776e 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -102,6 +102,7 @@ func NewClient( CompressionLevel: s.CompressionLevel, EscapeHTML: s.EscapeHTML, Transport: s.Transport, + IdleConnTimeout: s.IdleConnTimeout, }) if err != nil { return nil, err diff --git a/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc b/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc index cbe74279dcb..3177173224e 100644 --- a/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc +++ b/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc @@ -689,6 +689,10 @@ default is `1s`. The maximum number of seconds to wait before attempting to connect to Elasticsearch after a network error. The default is `60s`. +===== `idle_connection_timeout` + +The maximum amount of time an idle connection will remain idle before closing itself. Zero means no limit. The format is a Go language duration (example 60s is 60 seconds). The default is 0. + ===== `timeout` The http request timeout in seconds for the Elasticsearch request. The default is 90. diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 9cd33ea8d8a..4d2d9aebe67 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -105,6 +105,7 @@ func makeES( Observer: observer, EscapeHTML: config.EscapeHTML, Transport: config.Transport, + IdleConnTimeout: config.Transport.IdleConnTimeout, }, Index: index, Pipeline: pipeline, diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index eabdcc8e918..de23d99be96 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -1356,6 +1356,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 7442b7f6a0f..3250dbe1587 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -987,6 +987,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index 64377c0fc6d..e7b1f36b8c3 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -403,6 +403,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/auditbeat/auditbeat.reference.yml b/x-pack/auditbeat/auditbeat.reference.yml index e9ecd33ae39..e7be2337bf1 100644 --- a/x-pack/auditbeat/auditbeat.reference.yml +++ b/x-pack/auditbeat/auditbeat.reference.yml @@ -577,6 +577,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 9ea0dabfb0b..88579730ac7 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -3987,6 +3987,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/functionbeat/functionbeat.reference.yml b/x-pack/functionbeat/functionbeat.reference.yml index 7855538f621..816309e0a62 100644 --- a/x-pack/functionbeat/functionbeat.reference.yml +++ b/x-pack/functionbeat/functionbeat.reference.yml @@ -645,6 +645,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/heartbeat/heartbeat.reference.yml b/x-pack/heartbeat/heartbeat.reference.yml index b8281727026..a76b98f2406 100644 --- a/x-pack/heartbeat/heartbeat.reference.yml +++ b/x-pack/heartbeat/heartbeat.reference.yml @@ -613,6 +613,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index fa15aca7fb6..00dd2509888 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -1917,6 +1917,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/osquerybeat/osquerybeat.reference.yml b/x-pack/osquerybeat/osquerybeat.reference.yml index 416462f3f47..e45aff8adb0 100644 --- a/x-pack/osquerybeat/osquerybeat.reference.yml +++ b/x-pack/osquerybeat/osquerybeat.reference.yml @@ -364,6 +364,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/packetbeat/packetbeat.reference.yml b/x-pack/packetbeat/packetbeat.reference.yml index 7442b7f6a0f..3250dbe1587 100644 --- a/x-pack/packetbeat/packetbeat.reference.yml +++ b/x-pack/packetbeat/packetbeat.reference.yml @@ -987,6 +987,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90 diff --git a/x-pack/winlogbeat/winlogbeat.reference.yml b/x-pack/winlogbeat/winlogbeat.reference.yml index 3c6799e7329..020084d9fe8 100644 --- a/x-pack/winlogbeat/winlogbeat.reference.yml +++ b/x-pack/winlogbeat/winlogbeat.reference.yml @@ -405,6 +405,11 @@ output.elasticsearch: # Elasticsearch after a network error. The default is 60s. #backoff.max: 60s + # The maximum amount of time an idle connection will remain idle + # before closing itself. Zero means no limit. The format is a Go + # language duration (example 60s is 60 seconds). The default is 0. + #idle_connection_timeout: 60s + # Configure HTTP request timeout before failing a request to Elasticsearch. #timeout: 90