From bda73849d6b48ddef7e7367b2864f00643d98658 Mon Sep 17 00:00:00 2001 From: ShourieG <105607378+ShourieG@users.noreply.github.com> Date: Tue, 8 Nov 2022 12:49:54 +0530 Subject: [PATCH] [filebeat][httpjson] - Separation of global transform contexts and introduction of parent transform context within chains (#33499) * initial commit for transform context separation and introduction of parent object * removed ioutil from tests * updated asciidoc changelog * added support for dot's in values and expressions for replace_with clause, added doc updates and tests * added linter ignores for errcheck in test scenario * updated as per pr suggetions * added processExpression Tests --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-httpjson.asciidoc | 62 ++++- x-pack/filebeat/input/httpjson/input_test.go | 227 +++++++++++++++++- x-pack/filebeat/input/httpjson/request.go | 29 ++- .../input/httpjson/request_chain_helper.go | 63 +++-- .../httpjson/request_chain_helper_test.go | 28 +++ x-pack/filebeat/input/httpjson/transform.go | 4 +- x-pack/filebeat/input/httpjson/value_tpl.go | 4 + 8 files changed, 375 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1617c6b27dfd..c87e8d1e956c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -149,6 +149,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix handling of invalid UserIP and LocalIP values. {pull}32896[32896] - Allow http_endpoint instances to share ports. {issue}32578[32578] {pull}33377[33377] - Improve httpjson documentation for split processor. {pull}33473[33473] +- Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc index 92bdccc6bdab..f8baaec0835a 100644 --- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc @@ -531,7 +531,7 @@ List of transforms to apply to the request before each execution. Available transforms for request: [`append`, `delete`, `set`]. -Can read state from: [`.first_response.*`,`.last_response.*`, `.last_event.*`, `.cursor.*`, `.header.*`, `.url.*`, `.body.*`]. +Can read state from: [`.first_response.*`,`.last_response.*`, `.parent_last_response.*` `.last_event.*`, `.cursor.*`, `.header.*`, `.url.*`, `.body.*`]. Can write state to: [`body.*`, `header.*`, `url.*`]. @@ -547,6 +547,47 @@ filebeat.inputs: value: '[[now (parseDuration "-1h")]]' ---- +NOTE: The clause `.parent_last_response.` should only be used from within chain steps and when pagination exists at the root request level. If pagination +does not exist at the root level, please use the clause `.first_response.` to access parent response object from within chains. You can look at this +<> below for a better idea. + + +["source","yaml",subs="attributes",id="parent-last-response"] + filebeat.inputs: + - type: httpjson + enabled: true + id: my-httpjson-id + request.url: http://xyz.com/services/data/v1.0/export_ids/page + request.method: POST + interval: 1h + request.retry.max_attempts: 2 + request.retry.wait_min: 5s + request.transforms: + - set: + target: body.page + value: 0 + response.request_body_on_pagination: true + response.pagination: + - set: + target: body.page + value: '[[ .last_response.body.page ]]' + fail_on_template_error: true + chain: + - step: + request.url: http://xyz.com/services/data/v1.0/$.exportId/export_ids/$.files[:].id/info + request.method: POST + request.transforms: + - set: + target: body.exportId + value: '[[ .parent_last_response.body.exportId ]]' + replace: $.files[:].id + replace_with: '$.exportId,.parent_last_response.body.exportId' + +Here we can see that the chain step uses `.parent_last_response.body.exportId` only because `response.pagination` is present for the parent (root) request. +However if `response.pagination` was not present in the parent (root) request, `replace_with` clause should have used `.first_response.body.exportId`. This is +because when pagination does not exist at the parent level `parent_last_response` object is not populated with required values for performance reasons, but the +`first_response` object always stores the very first response in the process chain. + [float] ==== `request.tracer.filename` @@ -1141,7 +1182,7 @@ Collect and make events from response in any format supported by httpjson for al The `replace_with: "pattern,value"` clause is used to replace a fixed pattern string defined in `request.url` with the given value. The fixed pattern must have a `$.` prefix, for example: `$.xyz`. The `value` may be hard coded or extracted from context variables -like [`.last_response.*`, `.first_response.*`] etc. The `replace_with` clause can be used in combination with the `replace` clause +like [`.last_response.*`, `.first_response.*`, `.parent_last_response.*`] etc. The `replace_with` clause can be used in combination with the `replace` clause thus providing a lot of flexibility in the logic of chain requests. Example: @@ -1167,7 +1208,7 @@ filebeat.inputs: - step: request.url: https://example.com/services/data/v1.0/$.exportId/files request.method: GET - replace_with: '$.exportId,first_response.body.exportId' + replace_with: '$.exportId,.first_response.body.exportId' ---- Example: @@ -1217,8 +1258,19 @@ response_json using exportId as '2212': ---- This behaviour of targeted fixed pattern replacement in the url helps solve various use cases. -NOTE: Fixed patterns must not contain commas in their definition. String replacement patterns are matched by the -`replace_with` processor with exact string matching. +**Some useful points to remember:- ** + + 1. If you want the `value` to be treated as an expression to be evaluated for data extraction from context variables, it should always have a + **single '.' (dot) prefix**. Example: `replace_with: '$.exportId,.first_response.body.exportId'`. Anything more or less will have the internal + processor treat it as a hard coded value, `replace_with: '$.exportId,..first_response.body.exportId'` (more than one '.' (dot) as prefix) or + `replace_with:'$.exportId,first_response.body.exportId'` (no '.' dot as prefix) + + 2. Incomplete `value expressions` will cause an error while processing. Example: `replace_with: '$.exportId,.first_response.'`, `replace_with: + '$.exportId,.last_response.'` etc. These expressions are incomplete because they do not evaluate down to a valid key that can be extracted from + the context variables. The value expression: `.first_response.`, on processing, will result in an array `[first_response ""]` where the key to be + extrated becomes `"" (an empty string)`, which has no definition within any context variable. + +NOTE: Fixed patterns must not contain commas in their definition. String replacement patterns are matched by the `replace_with` processor with exact string matching. [float] ==== `chain[].while` diff --git a/x-pack/filebeat/input/httpjson/input_test.go b/x-pack/filebeat/input/httpjson/input_test.go index 643c45cc2c7a..ca2542b2b502 100644 --- a/x-pack/filebeat/input/httpjson/input_test.go +++ b/x-pack/filebeat/input/httpjson/input_test.go @@ -7,7 +7,7 @@ package httpjson import ( "context" "fmt" - "io/ioutil" + "io" "math/rand" "net/http" "net/http/httptest" @@ -683,17 +683,227 @@ func TestInput(t *testing.T) { "step": map[string]interface{}{ "request.method": http.MethodGet, "replace": "$.files[:].id", - "replace_with": "$.exportId,first_response.body.exportId", + "replace_with": "$.exportId,.first_response.body.exportId", + }, + }, + }, + }, + expected: []string{ + `{"hello":{"world":"moon"}}`, + `{"space":{"cake":"pumpkin"}}`, + }, + }, + { + name: "Test replace_with clause with hardcoded value_1", + setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { + r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/": + fmt.Fprintln(w, `{"files":[{"id":"1"},{"id":"2"}]}`) + case "/2212/1": + fmt.Fprintln(w, `{"hello":{"world":"moon"}}`) + case "/2212/2": + fmt.Fprintln(w, `{"space":{"cake":"pumpkin"}}`) + } + }) + server := httptest.NewServer(r) + config["request.url"] = server.URL + config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id" + t.Cleanup(server.Close) + }, + baseConfig: map[string]interface{}{ + "interval": 1, + "request.method": http.MethodGet, + "chain": []interface{}{ + map[string]interface{}{ + "step": map[string]interface{}{ + "request.method": http.MethodGet, + "replace": "$.files[:].id", + "replace_with": "$.exportId,2212", + }, + }, + }, + }, + expected: []string{ + `{"hello":{"world":"moon"}}`, + `{"space":{"cake":"pumpkin"}}`, + }, + }, + { + name: "Test replace_with clause with hardcoded value (no dot prefix)", + setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { + r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/": + fmt.Fprintln(w, `{"files":[{"id":"1"},{"id":"2"}]}`) + case "/first_response.body.id/1": + fmt.Fprintln(w, `{"hello":{"world":"moon"}}`) + case "/first_response.body.id/2": + fmt.Fprintln(w, `{"space":{"cake":"pumpkin"}}`) + } + }) + server := httptest.NewServer(r) + config["request.url"] = server.URL + config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id" + t.Cleanup(server.Close) + }, + baseConfig: map[string]interface{}{ + "interval": 1, + "request.method": http.MethodGet, + "chain": []interface{}{ + map[string]interface{}{ + "step": map[string]interface{}{ + "request.method": http.MethodGet, + "replace": "$.files[:].id", + "replace_with": "$.exportId,first_response.body.id", }, }, }, }, - handler: defaultHandler(http.MethodGet, "", ""), expected: []string{ `{"hello":{"world":"moon"}}`, `{"space":{"cake":"pumpkin"}}`, }, }, + { + name: "Test replace_with clause with hardcoded value (more than one dot prefix)", + setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { + r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/": + fmt.Fprintln(w, `{"files":[{"id":"1"},{"id":"2"}]}`) + case "/..first_response.body.id/1": + fmt.Fprintln(w, `{"hello":{"world":"moon"}}`) + case "/..first_response.body.id/2": + fmt.Fprintln(w, `{"space":{"cake":"pumpkin"}}`) + } + }) + server := httptest.NewServer(r) + config["request.url"] = server.URL + config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id" + t.Cleanup(server.Close) + }, + baseConfig: map[string]interface{}{ + "interval": 1, + "request.method": http.MethodGet, + "chain": []interface{}{ + map[string]interface{}{ + "step": map[string]interface{}{ + "request.method": http.MethodGet, + "replace": "$.files[:].id", + "replace_with": "$.exportId,..first_response.body.id", + }, + }, + }, + }, + expected: []string{ + `{"hello":{"world":"moon"}}`, + `{"space":{"cake":"pumpkin"}}`, + }, + }, + { + name: "Test replace_with clause with hardcoded value containing '.' (dots)", + setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { + r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/": + fmt.Fprintln(w, `{"files":[{"id":"1"},{"id":"2"}]}`) + case "/.xyz.2212.abc./1": + fmt.Fprintln(w, `{"hello":{"world":"moon"}}`) + case "/.xyz.2212.abc./2": + fmt.Fprintln(w, `{"space":{"cake":"pumpkin"}}`) + } + }) + server := httptest.NewServer(r) + config["request.url"] = server.URL + config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id" + t.Cleanup(server.Close) + }, + baseConfig: map[string]interface{}{ + "interval": 1, + "request.method": http.MethodGet, + "chain": []interface{}{ + map[string]interface{}{ + "step": map[string]interface{}{ + "request.method": http.MethodGet, + "replace": "$.files[:].id", + "replace_with": "$.exportId,.xyz.2212.abc.", + }, + }, + }, + }, + expected: []string{ + `{"hello":{"world":"moon"}}`, + `{"space":{"cake":"pumpkin"}}`, + }, + }, + { + name: "Test global transform context separation with parent_last_response object", + setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { + var serverURL string + registerPaginationTransforms() + registerRequestTransforms() + r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/": + fmt.Fprintf(w, `{"files":[{"id":"1"},{"id":"2"}],"exportId":"2212", "nextLink":"%s/link1"}`, serverURL) + case "/link1": + fmt.Fprintln(w, `{"files":[{"id":"3"},{"id":"4"}], "exportId":"2213"}`) + case "/2212/1": + matchBody(w, r, `{"exportId":"2212"}`, `{"hello":{"world":"moon"}}`) + case "/2212/2": + matchBody(w, r, `{"exportId":"2212"}`, `{"space":{"cake":"pumpkin"}}`) + case "/2213/3": + matchBody(w, r, `{"exportId":"2213"}`, `{"hello":{"cake":"pumpkin"}}`) + case "/2213/4": + matchBody(w, r, `{"exportId":"2213"}`, `{"space":{"world":"moon"}}`) + } + }) + server := httptest.NewServer(r) + t.Cleanup(func() { registeredTransforms = newRegistry() }) + config["request.url"] = server.URL + serverURL = server.URL + config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id" + t.Cleanup(server.Close) + }, + baseConfig: map[string]interface{}{ + "interval": 1, + "request.method": http.MethodPost, + "response.request_body_on_pagination": true, + "response.pagination": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "target": "url.value", + "value": "[[.last_response.body.nextLink]]", + "fail_on_template_error": true, + }, + }, + }, + "chain": []interface{}{ + map[string]interface{}{ + "step": map[string]interface{}{ + "request.method": http.MethodPost, + "replace": "$.files[:].id", + "replace_with": "$.exportId,.parent_last_response.body.exportId", + "request.transforms": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "target": "body.exportId", + "value": "[[ .parent_last_response.body.exportId ]]", + }, + }, + }, + }, + }, + }, + }, + expected: []string{ + `{"hello":{"world":"moon"}}`, + `{"space":{"cake":"pumpkin"}}`, + `{"hello":{"cake":"pumpkin"}}`, + `{"space":{"world":"moon"}}`, + }, + }, } for _, testCase := range testCases { @@ -826,6 +1036,15 @@ func newV2Context() (v2.Context, func()) { }, cancel } +//nolint:errcheck // We can safely ignore errors here +func matchBody(w io.Writer, req *http.Request, match, response string) { + body, _ := io.ReadAll(req.Body) + req.Body.Close() + if string(body) == match { + w.Write([]byte(response)) + } +} + func defaultHandler(expectedMethod, expectedBody, msg string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { w.Header().Set("content-type", "application/json") @@ -837,7 +1056,7 @@ func defaultHandler(expectedMethod, expectedBody, msg string) http.HandlerFunc { w.WriteHeader(http.StatusBadRequest) msg = fmt.Sprintf(`{"error":"expected method was %q"}`, expectedMethod) case expectedBody != "": - body, _ := ioutil.ReadAll(r.Body) + body, _ := io.ReadAll(r.Body) r.Body.Close() if expectedBody != string(body) { w.WriteHeader(http.StatusBadRequest) diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index 3792aac209db..b9195d41b881 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -303,7 +303,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p return fmt.Errorf("failed to execute rf.collectResponse: %w", err) } // store first response in transform context - var bodyMap mapstr.M + var bodyMap map[string]interface{} body, err := io.ReadAll(httpResp.Body) if err != nil { return fmt.Errorf("failed to read http response body: %w", err) @@ -319,6 +319,8 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p body: bodyMap, } trCtx.updateFirstResponse(firstResponse) + // since, initially the first response and last response are the same + trCtx.updateLastResponse(firstResponse) if len(r.requestFactories) == 1 { finalResps = append(finalResps, httpResp) @@ -357,7 +359,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p urlCopy = rf.url urlString = rf.url.String() - // new transform context for every chain step , derived from parent transform context + // new transform context for every chain step, derived from parent transform context var chainTrCtx *transformContext if rf.isChain { chainTrCtx = trCtx.clone() @@ -368,7 +370,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p var replaceArr []string if rf.replaceWith != "" { replaceArr = strings.Split(rf.replaceWith, ",") - val, doReplaceWith, err = fetchValueFromContext(trCtx, strings.TrimSpace(replaceArr[1])) + val, doReplaceWith, err = fetchValueFromContext(chainTrCtx, strings.TrimSpace(replaceArr[1])) if err != nil { return err } @@ -418,11 +420,11 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p var events <-chan maybeMsg if rf.isChain { - events = rf.chainResponseProcessor.startProcessing(stdCtx, trCtx, resps) + events = rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps) } else { events = r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps) } - n += processAndPublishEvents(trCtx, events, publisher, i < len(r.requestFactories), r.log) + n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log) } } @@ -517,7 +519,7 @@ func processAndPublishEvents(trCtx *transformContext, events <-chan maybeMsg, pu return n } -// processRemainingChainEvents , processes the remaining pagination events for chain blocks +// processRemainingChainEvents, processes the remaining pagination events for chain blocks func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int { // we start from 0, and skip the 1st event since we have already processed it events := r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp) @@ -542,7 +544,7 @@ func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *t } response.Body = io.NopCloser(body) - // for each pagination response , we repeat all the chain steps / blocks + // for each pagination response, we repeat all the chain steps / blocks count, err := r.processChainPaginationEvents(stdCtx, trCtx, publisher, &response, chainIndex, r.log) if err != nil { r.log.Errorf("error processing chain event: %w", err) @@ -592,18 +594,15 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx * urlCopy = rf.url urlString = rf.url.String() - // new transform context for every chain step , derived from parent transform context - var chainTrCtx *transformContext - if rf.isChain { - chainTrCtx = trCtx.clone() - } + // new transform context for every chain step, derived from parent transform context + chainTrCtx := trCtx.clone() var val string var doReplaceWith bool var replaceArr []string if rf.replaceWith != "" { replaceArr = strings.Split(rf.replaceWith, ",") - val, doReplaceWith, err = fetchValueFromContext(trCtx, strings.TrimSpace(replaceArr[1])) + val, doReplaceWith, err = fetchValueFromContext(chainTrCtx, strings.TrimSpace(replaceArr[1])) if err != nil { return n, err } @@ -651,8 +650,8 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx * } resps = intermediateResps } - events := rf.chainResponseProcessor.startProcessing(stdCtx, trCtx, resps) - n += processAndPublishEvents(trCtx, events, publisher, i < len(r.requestFactories), r.log) + events := rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps) + n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log) } defer httpResp.Body.Close() diff --git a/x-pack/filebeat/input/httpjson/request_chain_helper.go b/x-pack/filebeat/input/httpjson/request_chain_helper.go index 8e2f8d7d09b6..a894e4c6248a 100644 --- a/x-pack/filebeat/input/httpjson/request_chain_helper.go +++ b/x-pack/filebeat/input/httpjson/request_chain_helper.go @@ -21,8 +21,14 @@ import ( ) const ( - lastResponse = "last_response" + // This is generally updated with chain responses, if present, as they continue to occur + // Otherwise this is always the last response of the root request w.r.t pagination + lastResponse = "last_response" + // This is always the first root response firstResponse = "first_response" + // This is always the last response of the parent (root) request w.r.t pagination + // This is only set if chaining is used + parentLastResponse = "parent_last_response" ) func newChainHTTPClient(ctx context.Context, authCfg *authConfig, requestCfg *requestConfig, log *logp.Logger, p ...*Policy) (*httpClient, error) { @@ -98,9 +104,18 @@ func evaluateResponse(expression *valueTpl, data []byte, log *logp.Logger) (bool func fetchValueFromContext(trCtx *transformContext, expression string) (string, bool, error) { var val interface{} - switch keys := strings.Split(expression, "."); keys[0] { + switch keys := processExpression(expression); keys[0] { case lastResponse: - respMap, err := responseToMap(trCtx.lastResponse, true) + respMap, err := responseToMap(trCtx.lastResponse) + if err != nil { + return "", false, err + } + val, err = iterateRecursive(respMap, keys[1:], 0) + if err != nil { + return "", false, err + } + case parentLastResponse: + respMap, err := responseToMap(trCtx.parentTrCtx.lastResponse) if err != nil { return "", false, err } @@ -110,7 +125,7 @@ func fetchValueFromContext(trCtx *transformContext, expression string) (string, } case firstResponse: // since first response body is already a map, we do not need to transform it - respMap, err := responseToMap(trCtx.firstResponse, false) + respMap, err := responseToMap(trCtx.firstResponse) if err != nil { return "", false, err } @@ -118,14 +133,17 @@ func fetchValueFromContext(trCtx *transformContext, expression string) (string, if err != nil { return "", false, err } + // In this scenario we treat the expression as a hardcoded value, with which we will replace the fixed-pattern + case expression: + return expression, true, nil default: - return "", false, fmt.Errorf("context value not supported: %q in %q", keys[0], expression) + return "", false, fmt.Errorf("context value not supported for key: %q in expression %q", keys[0], expression) } return fmt.Sprint(val), true, nil } -func responseToMap(r *response, mapBody bool) (mapstr.M, error) { +func responseToMap(r *response) (mapstr.M, error) { if r.body == nil { return nil, fmt.Errorf("response body is empty for request url: %s", &r.url) } @@ -139,16 +157,7 @@ func responseToMap(r *response, mapBody bool) (mapstr.M, error) { key: value, } } - if mapBody { - var bodyMap mapstr.M - err := json.Unmarshal(r.body.([]byte), &bodyMap) - if err != nil { - return nil, err - } - respMap["body"] = bodyMap - } else { - respMap["body"] = r.body - } + respMap["body"] = r.body return respMap, nil } @@ -157,7 +166,7 @@ func iterateRecursive(m mapstr.M, keys []string, depth int) (interface{}, error) val := m[keys[depth]] if val == nil { - return nil, fmt.Errorf("value of expression could not be determined for %s", strings.Join(keys[:depth+1], ".")) + return nil, fmt.Errorf("value of expression could not be determined for key %s", strings.Join(keys[:depth+1], ".")) } switch v := reflect.ValueOf(val); v.Kind() { @@ -172,7 +181,7 @@ func iterateRecursive(m mapstr.M, keys []string, depth int) (interface{}, error) case reflect.String: return v.String(), nil case reflect.Map: - nextMap, ok := v.Interface().(mapstr.M) + nextMap, ok := v.Interface().(map[string]interface{}) if !ok { return nil, errors.New("unable to parse the value of the given expression") } @@ -186,6 +195,24 @@ func iterateRecursive(m mapstr.M, keys []string, depth int) (interface{}, error) } } +// processExpression, splits the expression string based on the separator and looks for +// supported keywords. If present, returns an expression array containing separated elements. +// If no keywords are present, the expression is treated as a hardcoded value and returned +// as a merged string which is the only array element. +func processExpression(expression string) []string { + if !strings.HasPrefix(expression, ".") { + return []string{expression} + } + switch { + case strings.HasPrefix(expression, "."+firstResponse+"."), + strings.HasPrefix(expression, "."+lastResponse+"."), + strings.HasPrefix(expression, "."+parentLastResponse+"."): + return strings.Split(expression, ".")[1:] + default: + return []string{expression} + } +} + func tryAssignAuth(parentConfig *authConfig, childConfig *authConfig) *authConfig { if parentConfig != nil && childConfig == nil { return parentConfig diff --git a/x-pack/filebeat/input/httpjson/request_chain_helper_test.go b/x-pack/filebeat/input/httpjson/request_chain_helper_test.go index f01484ec9151..6ac4995fa988 100644 --- a/x-pack/filebeat/input/httpjson/request_chain_helper_test.go +++ b/x-pack/filebeat/input/httpjson/request_chain_helper_test.go @@ -133,3 +133,31 @@ func Test_evaluateResponse(t *testing.T) { }) } } + +func TestProcessExpression(t *testing.T) { + tests := []struct { + in string + want []string + }{ + // Cursor values. + {in: ".first_response.foo", want: []string{"first_response", "foo"}}, + {in: ".first_response.", want: []string{"first_response", ""}}, + {in: ".last_response.foo", want: []string{"last_response", "foo"}}, + {in: ".last_response.", want: []string{"last_response", ""}}, + {in: ".parent_last_response.foo", want: []string{"parent_last_response", "foo"}}, + {in: ".parent_last_response.", want: []string{"parent_last_response", ""}}, + + // Literal values. + {in: ".literal_foo", want: []string{".literal_foo"}}, + {in: ".literal_foo.bar", want: []string{".literal_foo.bar"}}, + {in: "literal.foo.bar", want: []string{"literal.foo.bar"}}, + {in: "first_response.foo", want: []string{"first_response.foo"}}, + {in: ".first_response", want: []string{".first_response"}}, + {in: ".last_response", want: []string{".last_response"}}, + {in: ".parent_last_response", want: []string{".parent_last_response"}}, + } + for _, test := range tests { + got := processExpression(test.in) + assert.Equal(t, test.want, got) + } +} diff --git a/x-pack/filebeat/input/httpjson/transform.go b/x-pack/filebeat/input/httpjson/transform.go index 9d63a81403b3..fe7744bde349 100644 --- a/x-pack/filebeat/input/httpjson/transform.go +++ b/x-pack/filebeat/input/httpjson/transform.go @@ -27,6 +27,7 @@ type transforms []transform type transformContext struct { lock sync.RWMutex cursor *cursor + parentTrCtx *transformContext firstEvent *mapstr.M lastEvent *mapstr.M lastResponse *response @@ -91,7 +92,6 @@ func (ctx *transformContext) updateCursor() { func (ctx *transformContext) clone() *transformContext { ctx.lock.Lock() - defer ctx.lock.Unlock() newCtx := emptyTransformContext() newCtx.lastEvent = ctx.lastEvent @@ -99,7 +99,9 @@ func (ctx *transformContext) clone() *transformContext { newCtx.lastResponse = ctx.lastResponse newCtx.firstResponse = ctx.firstResponse newCtx.cursor = ctx.cursor + newCtx.parentTrCtx = ctx + ctx.lock.Unlock() return newCtx } diff --git a/x-pack/filebeat/input/httpjson/value_tpl.go b/x-pack/filebeat/input/httpjson/value_tpl.go index 976cbacb0ca1..7a774e0ad9c7 100644 --- a/x-pack/filebeat/input/httpjson/value_tpl.go +++ b/x-pack/filebeat/input/httpjson/value_tpl.go @@ -119,6 +119,10 @@ func (t *valueTpl) Execute(trCtx *transformContext, tr transformable, defaultVal if trCtx.firstResponse != nil { data.Put("first_response", trCtx.firstResponseClone().templateValues()) } + // This is only set when chaining is used + if trCtx.parentTrCtx != nil { + data.Put("parent_last_response", trCtx.parentTrCtx.lastResponseClone().templateValues()) + } if err := t.Template.Execute(buf, data); err != nil { return fallback(err)