Skip to content

Commit

Permalink
added bugfix for duplicate data issue elastic#33213
Browse files Browse the repository at this point in the history
  • Loading branch information
ShourieG committed Nov 14, 2022
1 parent 3e928b7 commit d9fd946
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 15 deletions.
17 changes: 10 additions & 7 deletions x-pack/filebeat/input/httpjson/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p

if len(r.requestFactories) == 1 {
finalResps = append(finalResps, httpResp)
events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps)
events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true)
n = processAndPublishEvents(trCtx, events, publisher, true, r.log)
continue
}
Expand All @@ -347,10 +347,13 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
return err
}
// we will only processAndPublishEvents here if chains & root level pagination do not exist, inorder to avoid unnecessary pagination
var events <-chan maybeMsg
if !isChainExpected {
events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps)
n = processAndPublishEvents(trCtx, events, publisher, false, r.log)
events = r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true)
} else {
events = r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false)
}
n = processAndPublishEvents(trCtx, events, publisher, false, r.log)
} else {
if len(ids) == 0 {
n = 0
Expand Down Expand Up @@ -420,9 +423,9 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p

var events <-chan maybeMsg
if rf.isChain {
events = rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps)
events = rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true)
} else {
events = r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps)
events = r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true)
}
n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log)
}
Expand Down Expand Up @@ -522,7 +525,7 @@ func processAndPublishEvents(trCtx *transformContext, events <-chan maybeMsg, pu
// processRemainingChainEvents, processes the remaining pagination events for chain blocks
func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int {
// we start from 0, and skip the 1st event since we have already processed it
events := r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp)
events := r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true)

var n int
var eventCount int
Expand Down Expand Up @@ -650,7 +653,7 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *
}
resps = intermediateResps
}
events := rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps)
events := rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true)
n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log)
}

Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/httpjson/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, log *
return rp
}

func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response) <-chan maybeMsg {
trCtx.clearIntervalData()
func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, clearInterval bool) <-chan maybeMsg {
trCtx.clearIntervalData(clearInterval)

ch := make(chan maybeMsg)
go func() {
Expand Down
14 changes: 8 additions & 6 deletions x-pack/filebeat/input/httpjson/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,14 @@ func (ctx *transformContext) updateFirstResponse(r response) {
ctx.lock.Unlock()
}

func (ctx *transformContext) clearIntervalData() {
ctx.lock.Lock()
defer ctx.lock.Unlock()
ctx.lastEvent = &mapstr.M{}
ctx.firstEvent = &mapstr.M{}
ctx.lastResponse = &response{}
func (ctx *transformContext) clearIntervalData(clear bool) {
if clear {
ctx.lock.Lock()
defer ctx.lock.Unlock()
ctx.lastEvent = &mapstr.M{}
ctx.firstEvent = &mapstr.M{}
ctx.lastResponse = &response{}
}
}

type transformable mapstr.M
Expand Down

0 comments on commit d9fd946

Please sign in to comment.