Skip to content

Commit

Permalink
x-pack/filebeat/input/httpjson: clean up scaffolding
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed Sep 4, 2023
1 parent 49d4240 commit fe1454f
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 43 deletions.
16 changes: 0 additions & 16 deletions x-pack/filebeat/input/httpjson/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,22 +743,6 @@ func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bo
}
}

// processAndPublishEvents process and publish events based on event type
func (p *publisher) processAndPublishEvents(events stream) {
for maybeMsg := range events.ch {
p.processAndPublishEvent(maybeMsg)
}
}

// processAndPublishEvent processes and publishes one events based on event type
func (p *publisher) processAndPublishEvent(evt maybeMsg) {
if evt.failed() {
p.fail(evt.err)
return
}
p.event(nil, evt.msg)
}

func (p *publisher) event(_ context.Context, msg mapstr.M) {
if p.pub != nil {
event, err := makeEvent(msg)
Expand Down
20 changes: 0 additions & 20 deletions x-pack/filebeat/input/httpjson/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,26 +186,6 @@ type sendStream interface {
close()
}

type stream struct {
ch chan maybeMsg
}

func newStream() stream {
return stream{make(chan maybeMsg)}
}

func (s stream) event(_ context.Context, e mapstr.M) {
s.ch <- maybeMsg{msg: e}
}

func (s stream) fail(err error) {
s.ch <- maybeMsg{err: err}
}

func (s stream) close() {
close(s.ch)
}

func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) {
trCtx.clearIntervalData()

Expand Down
27 changes: 20 additions & 7 deletions x-pack/filebeat/input/httpjson/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package httpjson

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -702,9 +703,8 @@ func TestSplit(t *testing.T) {
}

for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
events := stream{make(chan maybeMsg, len(tc.expectedMessages))}
events := &stream{}
split, err := newSplitResponse(tc.config, logp.NewLogger(""))
assert.NoError(t, err)
err = split.run(tc.ctx, tc.resp, events)
Expand All @@ -714,12 +714,25 @@ func TestSplit(t *testing.T) {
assert.EqualError(t, err, tc.expectedErr.Error())
}
events.close()
assert.Equal(t, len(tc.expectedMessages), len(events.ch))
for _, msg := range tc.expectedMessages {
e := <-events.ch
assert.NoError(t, e.err)
assert.Equal(t, msg.Flatten(), e.msg.Flatten())
assert.Equal(t, len(tc.expectedMessages), len(events.collected))
for i, msg := range tc.expectedMessages {
assert.NoError(t, events.collected[i].err)
assert.Equal(t, msg.Flatten(), events.collected[i].msg.Flatten())
}
})
}
}

type stream struct {
collected []maybeMsg
}

func (s *stream) event(_ context.Context, e mapstr.M) {
s.collected = append(s.collected, maybeMsg{msg: e})
}

func (s *stream) fail(err error) {
s.collected = append(s.collected, maybeMsg{err: err})
}

func (s *stream) close() {}

0 comments on commit fe1454f

Please sign in to comment.