From 2c513fdf77c2c62e97da0019623367089cd59ff4 Mon Sep 17 00:00:00 2001 From: ShourieG <105607378+ShourieG@users.noreply.github.com> Date: Thu, 17 Nov 2022 11:43:10 +0530 Subject: [PATCH] [filebeat][httpjson] - Added bug-fix for duplicate data issue #33213 (#33664) * added bugfix for duplicate data issue #33213 * updated with PR suggetions * updated comments * re-engineered bugfix to update cursors properly * spelling fix (cherry picked from commit afb2bebc263075de0d157c0341f05c13873a1883) --- x-pack/filebeat/input/httpjson/input_test.go | 163 +++++++++++++++++++ x-pack/filebeat/input/httpjson/pagination.go | 4 +- x-pack/filebeat/input/httpjson/request.go | 73 +++++---- x-pack/filebeat/input/httpjson/response.go | 6 +- x-pack/filebeat/input/httpjson/transform.go | 2 +- 5 files changed, 214 insertions(+), 34 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/input_test.go b/x-pack/filebeat/input/httpjson/input_test.go index ca2542b2b50..7b6a8b62a90 100644 --- a/x-pack/filebeat/input/httpjson/input_test.go +++ b/x-pack/filebeat/input/httpjson/input_test.go @@ -904,6 +904,169 @@ func TestInput(t *testing.T) { `{"space":{"world":"moon"}}`, }, }, + { + name: "Test if cursor value is updated for root response with chaining & pagination", + setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { + var serverURL string + registerPaginationTransforms() + registerRequestTransforms() + r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/": + fmt.Fprintf(w, `{"files":[{"id":"1"},{"id":"2"}],"exportId":"2212", "createdAt":"22/02/2022", + "nextLink":"%s/link1"}`, serverURL) + case "/link1": + fmt.Fprintln(w, `{"files":[{"id":"3"},{"id":"4"}], "exportId":"2213", "createdAt":"24/04/2022"}`) + case "/2212/1": + matchBody(w, r, `{"createdAt":"22/02/2022","exportId":"2212"}`, `{"hello":{"world":"moon"}}`) + case "/2212/2": + matchBody(w, r, `{"createdAt":"22/02/2022","exportId":"2212"}`, `{"space":{"cake":"pumpkin"}}`) + case "/2213/3": + matchBody(w, r, `{"createdAt":"24/04/2022","exportId":"2213"}`, `{"hello":{"cake":"pumpkin"}}`) + case "/2213/4": + matchBody(w, r, `{"createdAt":"24/04/2022","exportId":"2213"}`, `{"space":{"world":"moon"}}`) + } + }) + server := httptest.NewServer(r) + t.Cleanup(func() { registeredTransforms = newRegistry() }) + config["request.url"] = server.URL + serverURL = server.URL + config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id" + t.Cleanup(server.Close) + }, + baseConfig: map[string]interface{}{ + "interval": 1, + "request.method": http.MethodPost, + "response.request_body_on_pagination": true, + "response.pagination": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "target": "url.value", + "value": "[[.last_response.body.nextLink]]", + "fail_on_template_error": true, + }, + }, + }, + "chain": []interface{}{ + map[string]interface{}{ + "step": map[string]interface{}{ + "request.method": http.MethodPost, + "replace": "$.files[:].id", + "replace_with": "$.exportId,.parent_last_response.body.exportId", + "request.transforms": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "target": "body.exportId", + "value": "[[ .parent_last_response.body.exportId ]]", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "target": "body.createdAt", + "value": "[[ .cursor.last_published_login ]]", + }, + }, + }, + }, + }, + }, + "cursor": map[string]interface{}{ + "last_published_login": map[string]interface{}{ + "value": "[[ .last_event.createdAt ]]", + }, + }, + }, + expected: []string{ + `{"hello":{"world":"moon"}}`, + `{"space":{"cake":"pumpkin"}}`, + `{"hello":{"cake":"pumpkin"}}`, + `{"space":{"world":"moon"}}`, + }, + }, + { + name: "Test if cursor value is updated for root response with chaining & pagination along with split operator", + setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { + var serverURL string + registerPaginationTransforms() + registerRequestTransforms() + r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/": + fmt.Fprintf(w, `{"files":[{"id":"1"},{"id":"2"}],"exportId":"2212","time":[{"timeStamp":"22/02/2022"}], + "nextLink":"%s/link1"}`, serverURL) + case "/link1": + fmt.Fprintln(w, `{"files":[{"id":"3"},{"id":"4"}], "exportId":"2213","time":[{"timeStamp":"24/04/2022"}]}`) + case "/2212/1": + matchBody(w, r, `{"createdAt":"22/02/2022","exportId":"2212"}`, `{"hello":{"world":"moon"}}`) + case "/2212/2": + matchBody(w, r, `{"createdAt":"22/02/2022","exportId":"2212"}`, `{"space":{"cake":"pumpkin"}}`) + case "/2213/3": + matchBody(w, r, `{"createdAt":"24/04/2022","exportId":"2213"}`, `{"hello":{"cake":"pumpkin"}}`) + case "/2213/4": + matchBody(w, r, `{"createdAt":"24/04/2022","exportId":"2213"}`, `{"space":{"world":"moon"}}`) + } + }) + server := httptest.NewServer(r) + t.Cleanup(func() { registeredTransforms = newRegistry() }) + config["request.url"] = server.URL + serverURL = server.URL + config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id" + t.Cleanup(server.Close) + }, + baseConfig: map[string]interface{}{ + "interval": 1, + "request.method": http.MethodPost, + "response.request_body_on_pagination": true, + "response.pagination": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "target": "url.value", + "value": "[[.last_response.body.nextLink]]", + "fail_on_template_error": true, + }, + }, + }, + "response.split": map[string]interface{}{ + "target": "body.time", + "type": "array", + "keep_parent": true, + }, + "chain": []interface{}{ + map[string]interface{}{ + "step": map[string]interface{}{ + "request.method": http.MethodPost, + "replace": "$.files[:].id", + "replace_with": "$.exportId,.parent_last_response.body.exportId", + "request.transforms": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "target": "body.exportId", + "value": "[[ .parent_last_response.body.exportId ]]", + }, + }, + map[string]interface{}{ + "set": map[string]interface{}{ + "target": "body.createdAt", + "value": "[[ .cursor.last_published_login ]]", + }, + }, + }, + }, + }, + }, + "cursor": map[string]interface{}{ + "last_published_login": map[string]interface{}{ + "value": "[[ .last_event.time.timeStamp ]]", + }, + }, + }, + expected: []string{ + `{"hello":{"world":"moon"}}`, + `{"space":{"cake":"pumpkin"}}`, + `{"hello":{"cake":"pumpkin"}}`, + `{"space":{"world":"moon"}}`, + }, + }, } for _, testCase := range testCases { diff --git a/x-pack/filebeat/input/httpjson/pagination.go b/x-pack/filebeat/input/httpjson/pagination.go index b5dc74d2754..9e9d8b02d35 100644 --- a/x-pack/filebeat/input/httpjson/pagination.go +++ b/x-pack/filebeat/input/httpjson/pagination.go @@ -7,7 +7,7 @@ package httpjson import ( "context" "errors" - "io/ioutil" + "io" "net/http" "net/url" @@ -159,7 +159,7 @@ func (iter *pageIterator) next() (*response, bool, error) { } func (iter *pageIterator) getPage() (*response, error) { - bodyBytes, err := ioutil.ReadAll(iter.resp.Body) + bodyBytes, err := io.ReadAll(iter.resp.Body) if err != nil { return nil, err } diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index b9195d41b88..30ab23b8a44 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -279,17 +279,17 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) { func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher) error { var ( - n int - ids []string - err error - urlCopy url.URL - urlString string - httpResp *http.Response - initialResponse []*http.Response - intermediateResps []*http.Response - finalResps []*http.Response - isChainExpected bool - chainIndex int + n int + ids []string + err error + urlCopy url.URL + urlString string + httpResp *http.Response + initialResponse []*http.Response + intermediateResps []*http.Response + finalResps []*http.Response + isChainWithPageExpected bool + chainIndex int ) for i, rf := range r.requestFactories { @@ -319,38 +319,47 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p body: bodyMap, } trCtx.updateFirstResponse(firstResponse) - // since, initially the first response and last response are the same - trCtx.updateLastResponse(firstResponse) if len(r.requestFactories) == 1 { finalResps = append(finalResps, httpResp) - events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps) + events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true) n = processAndPublishEvents(trCtx, events, publisher, true, r.log) continue } // if flow of control reaches here, that means there are more than 1 request factories - // if a pagination request factory at the root level and a chain step exists, only then we will initialize flags & variables - // which are required for chaining with pagination - if r.requestFactories[i+1].isChain && r.responseProcessors[i].pagination.requestFactory != nil { - isChainExpected = true + // if a chain step exists, only then we will initialize flags & variables here which are required for chaining + if r.requestFactories[i+1].isChain { chainIndex = i + 1 resp, err := cloneResponse(httpResp) if err != nil { return err } - initialResponse = append(initialResponse, resp) + // the response is cloned and added to finalResps here, since the response of the 1st page (whether pagination exists or not), will + // be sent for further processing to check if any response processors can be applied or not and at the same time update the last_response, + // first_event & last_event cursor values. + finalResps = append(finalResps, resp) + + // if a pagination request factory exists at the root level along with a chain step, only then we will initialize flags & variables here + // which are required for chaining with root level pagination + if r.responseProcessors[i].pagination.requestFactory != nil { + isChainWithPageExpected = true + resp, err := cloneResponse(httpResp) + if err != nil { + return err + } + initialResponse = append(initialResponse, resp) + } } + intermediateResps = append(intermediateResps, httpResp) ids, err = r.getIdsFromResponses(intermediateResps, r.requestFactories[i+1].replace) if err != nil { return err } - // we will only processAndPublishEvents here if chains & root level pagination do not exist, inorder to avoid unnecessary pagination - if !isChainExpected { - events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps) - n = processAndPublishEvents(trCtx, events, publisher, false, r.log) - } + // we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values + events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false) + n = processAndPublishEvents(trCtx, events, publisher, false, r.log) } else { if len(ids) == 0 { n = 0 @@ -420,17 +429,17 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p var events <-chan maybeMsg if rf.isChain { - events = rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps) + events = rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true) } else { - events = r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps) + events = r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true) } n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log) } } defer httpResp.Body.Close() - - if isChainExpected { + // if pagination exists for the parent request along with chaining, then for each page response the chain is processed + if isChainWithPageExpected { n += r.processRemainingChainEvents(stdCtx, trCtx, publisher, initialResponse, chainIndex) } r.log.Infof("request finished: %d events published", n) @@ -522,7 +531,7 @@ func processAndPublishEvents(trCtx *transformContext, events <-chan maybeMsg, pu // processRemainingChainEvents, processes the remaining pagination events for chain blocks func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int { // we start from 0, and skip the 1st event since we have already processed it - events := r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp) + events := r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true) var n int var eventCount int @@ -544,6 +553,10 @@ func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *t } response.Body = io.NopCloser(body) + // updates the cursor for pagination last_event & last_response when chaining is present + trCtx.updateLastEvent(maybeMsg.msg) + trCtx.updateCursor() + // for each pagination response, we repeat all the chain steps / blocks count, err := r.processChainPaginationEvents(stdCtx, trCtx, publisher, &response, chainIndex, r.log) if err != nil { @@ -650,7 +663,7 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx * } resps = intermediateResps } - events := rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps) + events := rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true) n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log) } diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go index f4504ce2fb6..700e3b3901e 100644 --- a/x-pack/filebeat/input/httpjson/response.go +++ b/x-pack/filebeat/input/httpjson/response.go @@ -129,7 +129,7 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, log * return rp } -func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response) <-chan maybeMsg { +func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool) <-chan maybeMsg { trCtx.clearIntervalData() ch := make(chan maybeMsg) @@ -158,6 +158,7 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran return } + // last_response context object is updated here organically trCtx.updateLastResponse(*page) rp.log.Debugf("last received page: %#v", trCtx.lastResponse) @@ -192,6 +193,9 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran } } } + if !paginate { + break + } } } }() diff --git a/x-pack/filebeat/input/httpjson/transform.go b/x-pack/filebeat/input/httpjson/transform.go index fe7744bde34..53db9b9d45d 100644 --- a/x-pack/filebeat/input/httpjson/transform.go +++ b/x-pack/filebeat/input/httpjson/transform.go @@ -131,10 +131,10 @@ func (ctx *transformContext) updateFirstResponse(r response) { func (ctx *transformContext) clearIntervalData() { ctx.lock.Lock() - defer ctx.lock.Unlock() ctx.lastEvent = &mapstr.M{} ctx.firstEvent = &mapstr.M{} ctx.lastResponse = &response{} + ctx.lock.Unlock() } type transformable mapstr.M