Skip to content

Commit

Permalink
[filebeat] Fix httpjson page number initialization and docs (#33400)
Browse files Browse the repository at this point in the history
(cherry picked from commit 229690b)
  • Loading branch information
marc-gr authored and mergify[bot] committed Oct 19, 2022
1 parent 1146a48 commit 98f1829
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix handling of Cisco 302020 messages in ASA and FTD modules. {pull}33089[33089]
- Fix requestID parsing in AWS cloudtrail fileset. {pull}33143[33143]
- Fix input metrics not being unregistered when an input closes. This led to panics when configuration was reloaded for the aws-s3, aws-cloudwatch, and lumberjack inputs. {pull}33259[33259]
- Fix `httpjson` input page number initialization and documentation. {pull}33400[33400]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ The state has the following elements:
- `last_response.url.params`: A https://pkg.go.dev/net/url#Values[`url.Values`] of the params from the URL in `last_response.url.value`. Can be queried with the https://pkg.go.dev/net/url#Values.Get[`Get`] function.
- `last_response.header`: A map containing the headers from the last successful response.
- `last_response.body`: A map containing the parsed JSON body from the last successful response. This is the response as it comes from the remote server.
- `last_response.page`: A number indicating the page number of the last response.
- `last_response.page`: A number indicating the page number of the last response. It starts with the value `0` at every interval.
- `first_event`: A map representing the first event sent to the output (result from applying transforms to `last_response.body`).
- `last_event`: A map representing the last event of the current request in the requests chain (result from applying transforms to `last_response.body`).
- `url`: The last requested URL as a raw https://pkg.go.dev/net/url#URL[`url.URL`] Go type.
Expand Down
25 changes: 20 additions & 5 deletions x-pack/filebeat/input/httpjson/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,28 +209,41 @@ func TestInput(t *testing.T) {
name: "Test pagination",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
registerPaginationTransforms()
registerResponseTransforms()
t.Cleanup(func() { registeredTransforms = newRegistry() })
server := httptest.NewServer(h)
config["request.url"] = server.URL
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": time.Second,
"interval": time.Millisecond,
"request.method": http.MethodGet,
"response.split": map[string]interface{}{
"target": "body.items",
"transforms": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "body.page",
"value": "[[.last_response.page]]",
},
},
},
},
"response.pagination": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "url.params.page",
"value": "[[.last_response.body.nextPageToken]]",
"target": "url.params.page",
"value": "[[.last_response.body.nextPageToken]]",
"fail_on_template_error": true,
},
},
},
},
handler: paginationHandler(),
expected: []string{`{"foo":"a"}`, `{"foo":"b"}`},
handler: paginationHandler(),
expected: []string{
`{"foo":"a","page":"0"}`, `{"foo":"b","page":"1"}`, `{"foo":"c","page":"0"}`, `{"foo":"d","page":"0"}`,
`{"foo":"a","page":"0"}`, `{"foo":"b","page":"1"}`, `{"foo":"c","page":"0"}`, `{"foo":"d","page":"0"}`,
},
},
{
name: "Test first event",
Expand Down Expand Up @@ -881,6 +894,8 @@ func paginationHandler() http.HandlerFunc {
_, _ = w.Write([]byte(`{"@timestamp":"2002-10-02T15:00:02Z","items":[{"foo":"c"}]}`))
case 3:
_, _ = w.Write([]byte(`{"@timestamp":"2002-10-02T15:00:03Z","items":[{"foo":"d"}]}`))
count = 0
return
}
count += 1
}
Expand Down
7 changes: 5 additions & 2 deletions x-pack/filebeat/input/httpjson/pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (iter *pageIterator) next() (*response, bool, error) {
return nil, false, err
}

resp, err := iter.pagination.httpClient.do(iter.stdCtx, httpReq) //nolint:bodyclose // Bad linter! The body is closed in the call.
resp, err := iter.pagination.httpClient.do(iter.stdCtx, httpReq)
if err != nil {
return nil, false, err
}
Expand All @@ -164,12 +164,15 @@ func (iter *pageIterator) getPage() (*response, error) {
return nil, err
}
iter.resp.Body.Close()
iter.n += 1

var r response
r.header = iter.resp.Header
r.url = *iter.resp.Request.URL

// we set the page number before increasing its value
// because the first page needs to be 0 for every interval
r.page = iter.n
iter.n++

if len(bodyBytes) > 0 {
if iter.pagination.decoder != nil {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/httpjson/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestCtxAfterDoRequest(t *testing.T) {
lastResp.header = nil
assert.EqualValues(t,
&response{
page: 1,
page: 0,
url: *(newURL(fmt.Sprintf("%s?%s", testServer.URL, "%24filter=alertCreationTime+ge+2002-10-02T14%3A50%3A00Z"))),
body: mapstr.M{"@timestamp": "2002-10-02T15:00:00Z", "foo": "bar"},
},
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestCtxAfterDoRequest(t *testing.T) {
lastResp.header = nil
assert.EqualValues(t,
&response{
page: 1,
page: 0,
url: *(newURL(fmt.Sprintf("%s?%s", testServer.URL, "%24filter=alertCreationTime+ge+2002-10-02T15%3A00%3A00Z"))),
body: mapstr.M{"@timestamp": "2002-10-02T15:00:01Z", "foo": "bar"},
},
Expand Down

0 comments on commit 98f1829

Please sign in to comment.