From 98f182948f1cb791a979c7bf7b1d004e3b524e50 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Wed, 19 Oct 2022 12:41:49 +0200 Subject: [PATCH] [filebeat] Fix httpjson page number initialization and docs (#33400) (cherry picked from commit 229690b16e63518548aeadfc7b91ea93dc02966a) --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-httpjson.asciidoc | 2 +- x-pack/filebeat/input/httpjson/input_test.go | 25 +++++++++++++++---- x-pack/filebeat/input/httpjson/pagination.go | 7 ++++-- .../filebeat/input/httpjson/request_test.go | 4 +-- 5 files changed, 29 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7e38842a8ab..0eab239c16b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -72,6 +72,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix handling of Cisco 302020 messages in ASA and FTD modules. {pull}33089[33089] - Fix requestID parsing in AWS cloudtrail fileset. {pull}33143[33143] - Fix input metrics not being unregistered when an input closes. This led to panics when configuration was reloaded for the aws-s3, aws-cloudwatch, and lumberjack inputs. {pull}33259[33259] +- Fix `httpjson` input page number initialization and documentation. {pull}33400[33400] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc index fea4888386f..73e9bfe7d50 100644 --- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc @@ -109,7 +109,7 @@ The state has the following elements: - `last_response.url.params`: A https://pkg.go.dev/net/url#Values[`url.Values`] of the params from the URL in `last_response.url.value`. Can be queried with the https://pkg.go.dev/net/url#Values.Get[`Get`] function. - `last_response.header`: A map containing the headers from the last successful response. - `last_response.body`: A map containing the parsed JSON body from the last successful response. This is the response as it comes from the remote server. -- `last_response.page`: A number indicating the page number of the last response. +- `last_response.page`: A number indicating the page number of the last response. It starts with the value `0` at every interval. - `first_event`: A map representing the first event sent to the output (result from applying transforms to `last_response.body`). - `last_event`: A map representing the last event of the current request in the requests chain (result from applying transforms to `last_response.body`). - `url`: The last requested URL as a raw https://pkg.go.dev/net/url#URL[`url.URL`] Go type. diff --git a/x-pack/filebeat/input/httpjson/input_test.go b/x-pack/filebeat/input/httpjson/input_test.go index 0ec01ce968f..fadaac7818f 100644 --- a/x-pack/filebeat/input/httpjson/input_test.go +++ b/x-pack/filebeat/input/httpjson/input_test.go @@ -209,28 +209,41 @@ func TestInput(t *testing.T) { name: "Test pagination", setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { registerPaginationTransforms() + registerResponseTransforms() t.Cleanup(func() { registeredTransforms = newRegistry() }) server := httptest.NewServer(h) config["request.url"] = server.URL t.Cleanup(server.Close) }, baseConfig: map[string]interface{}{ - "interval": time.Second, + "interval": time.Millisecond, "request.method": http.MethodGet, "response.split": map[string]interface{}{ "target": "body.items", + "transforms": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "target": "body.page", + "value": "[[.last_response.page]]", + }, + }, + }, }, "response.pagination": []interface{}{ map[string]interface{}{ "set": map[string]interface{}{ - "target": "url.params.page", - "value": "[[.last_response.body.nextPageToken]]", + "target": "url.params.page", + "value": "[[.last_response.body.nextPageToken]]", + "fail_on_template_error": true, }, }, }, }, - handler: paginationHandler(), - expected: []string{`{"foo":"a"}`, `{"foo":"b"}`}, + handler: paginationHandler(), + expected: []string{ + `{"foo":"a","page":"0"}`, `{"foo":"b","page":"1"}`, `{"foo":"c","page":"0"}`, `{"foo":"d","page":"0"}`, + `{"foo":"a","page":"0"}`, `{"foo":"b","page":"1"}`, `{"foo":"c","page":"0"}`, `{"foo":"d","page":"0"}`, + }, }, { name: "Test first event", @@ -881,6 +894,8 @@ func paginationHandler() http.HandlerFunc { _, _ = w.Write([]byte(`{"@timestamp":"2002-10-02T15:00:02Z","items":[{"foo":"c"}]}`)) case 3: _, _ = w.Write([]byte(`{"@timestamp":"2002-10-02T15:00:03Z","items":[{"foo":"d"}]}`)) + count = 0 + return } count += 1 } diff --git a/x-pack/filebeat/input/httpjson/pagination.go b/x-pack/filebeat/input/httpjson/pagination.go index 3b21acd17f4..b5dc74d2754 100644 --- a/x-pack/filebeat/input/httpjson/pagination.go +++ b/x-pack/filebeat/input/httpjson/pagination.go @@ -138,7 +138,7 @@ func (iter *pageIterator) next() (*response, bool, error) { return nil, false, err } - resp, err := iter.pagination.httpClient.do(iter.stdCtx, httpReq) //nolint:bodyclose // Bad linter! The body is closed in the call. + resp, err := iter.pagination.httpClient.do(iter.stdCtx, httpReq) if err != nil { return nil, false, err } @@ -164,12 +164,15 @@ func (iter *pageIterator) getPage() (*response, error) { return nil, err } iter.resp.Body.Close() - iter.n += 1 var r response r.header = iter.resp.Header r.url = *iter.resp.Request.URL + + // we set the page number before increasing its value + // because the first page needs to be 0 for every interval r.page = iter.n + iter.n++ if len(bodyBytes) > 0 { if iter.pagination.decoder != nil { diff --git a/x-pack/filebeat/input/httpjson/request_test.go b/x-pack/filebeat/input/httpjson/request_test.go index 622d7c4b94b..67c270d24fe 100644 --- a/x-pack/filebeat/input/httpjson/request_test.go +++ b/x-pack/filebeat/input/httpjson/request_test.go @@ -96,7 +96,7 @@ func TestCtxAfterDoRequest(t *testing.T) { lastResp.header = nil assert.EqualValues(t, &response{ - page: 1, + page: 0, url: *(newURL(fmt.Sprintf("%s?%s", testServer.URL, "%24filter=alertCreationTime+ge+2002-10-02T14%3A50%3A00Z"))), body: mapstr.M{"@timestamp": "2002-10-02T15:00:00Z", "foo": "bar"}, }, @@ -128,7 +128,7 @@ func TestCtxAfterDoRequest(t *testing.T) { lastResp.header = nil assert.EqualValues(t, &response{ - page: 1, + page: 0, url: *(newURL(fmt.Sprintf("%s?%s", testServer.URL, "%24filter=alertCreationTime+ge+2002-10-02T15%3A00%3A00Z"))), body: mapstr.M{"@timestamp": "2002-10-02T15:00:01Z", "foo": "bar"}, },