Skip to content

Commit

Permalink
x-pack/filebeat/input/httpjson: clean up scaffolding
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed Sep 4, 2023
1 parent 9bd1ea9 commit 7606d19
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 50 deletions.
20 changes: 0 additions & 20 deletions x-pack/filebeat/input/httpjson/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,6 @@ func (p *chainProcessor) eventCount() int {
return p.n
}

func (*chainProcessor) close() {}

// processChainPaginationEvents takes a pagination response as input and runs all the chain blocks for the input
//
//nolint:bodyclose // response body is closed through drainBody method
Expand Down Expand Up @@ -742,22 +740,6 @@ func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bo
}
}

// processAndPublishEvents process and publish events based on event type
func (p *publisher) processAndPublishEvents(events stream) {
for maybeMsg := range events.ch {
p.processAndPublishEvent(maybeMsg)
}
}

// processAndPublishEvent processes and publishes one events based on event type
func (p *publisher) processAndPublishEvent(evt maybeMsg) {
if evt.failed() {
p.fail(evt.err)
return
}
p.event(nil, evt.msg)
}

func (p *publisher) event(_ context.Context, msg mapstr.M) {
if p.pub != nil {
event, err := makeEvent(msg)
Expand Down Expand Up @@ -788,8 +770,6 @@ func (p *publisher) eventCount() int {
return p.n
}

func (p *publisher) close() {}

const (
// This is generally updated with chain responses, if present, as they continue to occur
// Otherwise this is always the last response of the root request w.r.t pagination
Expand Down
23 changes: 0 additions & 23 deletions x-pack/filebeat/input/httpjson/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,35 +183,12 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, xmlDe
type sendStream interface {
event(context.Context, mapstr.M)
fail(error)
close()
}

type stream struct {
ch chan maybeMsg
}

func newStream() stream {
return stream{make(chan maybeMsg)}
}

func (s stream) event(_ context.Context, e mapstr.M) {
s.ch <- maybeMsg{msg: e}
}

func (s stream) fail(err error) {
s.ch <- maybeMsg{err: err}
}

func (s stream) close() {
close(s.ch)
}

func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) {
trCtx.clearIntervalData()

defer ch.close()
var npages int64

for i, httpResp := range resps {
iter := rp.pagination.newPageIterator(stdCtx, trCtx, httpResp, rp.xmlDetails)
for {
Expand Down
27 changes: 20 additions & 7 deletions x-pack/filebeat/input/httpjson/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package httpjson

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -702,9 +703,8 @@ func TestSplit(t *testing.T) {
}

for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
events := stream{make(chan maybeMsg, len(tc.expectedMessages))}
events := &stream{}
split, err := newSplitResponse(tc.config, logp.NewLogger(""))
assert.NoError(t, err)
err = split.run(tc.ctx, tc.resp, events)
Expand All @@ -714,12 +714,25 @@ func TestSplit(t *testing.T) {
assert.EqualError(t, err, tc.expectedErr.Error())
}
events.close()
assert.Equal(t, len(tc.expectedMessages), len(events.ch))
for _, msg := range tc.expectedMessages {
e := <-events.ch
assert.NoError(t, e.err)
assert.Equal(t, msg.Flatten(), e.msg.Flatten())
assert.Equal(t, len(tc.expectedMessages), len(events.collected))
for i, msg := range tc.expectedMessages {
assert.NoError(t, events.collected[i].err)
assert.Equal(t, msg.Flatten(), events.collected[i].msg.Flatten())
}
})
}
}

type stream struct {
collected []maybeMsg
}

func (s *stream) event(_ context.Context, e mapstr.M) {
s.collected = append(s.collected, maybeMsg{msg: e})
}

func (s *stream) fail(err error) {
s.collected = append(s.collected, maybeMsg{err: err})
}

func (s *stream) close() {}

0 comments on commit 7606d19

Please sign in to comment.