Skip to content

Commit

Permalink
Add Filebeat input httpjson (#13546)
Browse files Browse the repository at this point in the history
* Add httpjson input for Filebeat to address issue #13545
  • Loading branch information
Lei Qiu authored Sep 26, 2019
1 parent cd867da commit 5d02540
Show file tree
Hide file tree
Showing 5 changed files with 597 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update CoreDNS module to populate ECS DNS fields. {issue}13320[13320] {pull}13505[13505]
- Parse query steps in PostgreSQL slowlogs. {issue}13496[13496] {pull}13701[13701]
- Add support to set the document id in the json reader. {pull}5844[5844]
- Add input httpjson. {issue}13545[13545] {pull}13546[13546]

*Heartbeat*
- Add non-privileged icmp on linux and darwin(mac). {pull}13795[13795] {issue}11498[11498]
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 60 additions & 0 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package httpjson

import (
"strings"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
)

// Config contains information about httpjson configuration
type config struct {
APIKey string `config:"api_key"`
HTTPClientTimeout time.Duration `config:"http_client_timeout"`
HTTPHeaders common.MapStr `config:"http_headers"`
HTTPMethod string `config:"http_method" validate:"required"`
HTTPRequestBody common.MapStr `config:"http_request_body"`
Interval time.Duration `config:"interval" validate:"required"`
JSONObjects string `config:"json_objects_array"`
Pagination *Pagination `config:"pagination"`
TLS *tlscommon.Config `config:"ssl"`
URL string `config:"url" validate:"required"`
}

// Pagination contains information about httpjson pagination settings
type Pagination struct {
IsEnabled bool `config:"enabled"`
ExtraBodyContent common.MapStr `config:"extra_body_content"`
IDField string `config:"id_field"`
RequestField string `config:"req_field"`
URL string `config:"url"`
}

func (c *config) Validate() error {
if c.Interval < 3600*time.Second && c.Interval != 0 {
return errors.New("httpjson input: interval must not be less than 3600 seconds - ")
}
switch strings.ToUpper(c.HTTPMethod) {
case "GET":
break
case "POST":
break
default:
return errors.Errorf("httpjson input: Invalid http_method, %s - ", c.HTTPMethod)
}
return nil
}

func defaultConfig() config {
var c config
c.HTTPMethod = "GET"
c.HTTPClientTimeout = 60 * time.Second
return c
}
206 changes: 206 additions & 0 deletions x-pack/filebeat/input/httpjson/httpjson_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package httpjson

import (
"context"
"encoding/json"
"io/ioutil"
"log"
"net/http"
"net/http/httptest"
"sync"
"testing"

"golang.org/x/sync/errgroup"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

var (
once sync.Once
url string
)

func testSetup(t *testing.T) {
t.Helper()
once.Do(func() {
logp.TestingSetup()
})
}

func runTest(t *testing.T, m map[string]interface{}, run func(input *httpjsonInput, out *stubOutleter, t *testing.T)) {
// Setup httpbin environment
testSetup(t)
// Create test http server
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
req, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
if err != nil {
log.Fatalln(err)
}
var m interface{}
err = json.Unmarshal(req, &m)
w.Header().Set("Content-Type", "application/json")
if err != nil {
w.WriteHeader(http.StatusBadRequest)
} else {
w.WriteHeader(http.StatusOK)
w.Write(req)
}
} else {
message := map[string]interface{}{
"hello": "world",
"embedded": map[string]string{
"hello": "world",
},
}
b, _ := json.Marshal(message)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(b)
}
}))
defer ts.Close()
m["url"] = ts.URL
cfg := common.MustNewConfigFrom(m)
// Simulate input.Context from Filebeat input runner.
inputCtx := newInputContext()
defer close(inputCtx.Done)

// Stub outlet for receiving events generated by the input.
eventOutlet := newStubOutlet()
defer eventOutlet.Close()

connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) {
return eventOutlet, nil
})

in, err := NewInput(cfg, connector, inputCtx)
if err != nil {
t.Fatal(err)
}
input := in.(*httpjsonInput)
defer input.Stop()

run(input, eventOutlet, t)
}

func newInputContext() input.Context {
return input.Context{
Done: make(chan struct{}),
}
}

type stubOutleter struct {
sync.Mutex
cond *sync.Cond
done bool
Events []beat.Event
}

func newStubOutlet() *stubOutleter {
o := &stubOutleter{}
o.cond = sync.NewCond(o)
return o
}

func (o *stubOutleter) waitForEvents(numEvents int) ([]beat.Event, bool) {
o.Lock()
defer o.Unlock()

for len(o.Events) < numEvents && !o.done {
o.cond.Wait()
}

size := numEvents
if size >= len(o.Events) {
size = len(o.Events)
}

out := make([]beat.Event, size)
copy(out, o.Events)
return out, len(out) == numEvents
}

func (o *stubOutleter) Close() error {
o.Lock()
defer o.Unlock()
o.done = true
return nil
}

func (o *stubOutleter) Done() <-chan struct{} { return nil }

func (o *stubOutleter) OnEvent(event beat.Event) bool {
o.Lock()
defer o.Unlock()
o.Events = append(o.Events, event)
o.cond.Broadcast()
return !o.done
}

// --- Test Cases

func TestGET(t *testing.T) {
m := map[string]interface{}{
"http_method": "GET",
"interval": 0,
}
runTest(t, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

events, ok := out.waitForEvents(1)
if !ok {
t.Fatalf("Expected 1 events, but got %d.", len(events))
}
input.Stop()

if err := group.Wait(); err != nil {
t.Fatal(err)
}
})
}

func TestPOST(t *testing.T) {
m := map[string]interface{}{
"http_method": "POST",
"http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}},
"interval": 0,
}
runTest(t, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

events, ok := out.waitForEvents(1)
if !ok {
t.Fatalf("Expected 1 events, but got %d.", len(events))
}
input.Stop()

if err := group.Wait(); err != nil {
t.Fatal(err)
}
})
}

func TestRunStop(t *testing.T) {
m := map[string]interface{}{
"http_method": "GET",
"interval": 0,
}
runTest(t, m, func(input *httpjsonInput, out *stubOutleter, t *testing.T) {
input.Run()
input.Stop()
input.Run()
input.Stop()
})
}
Loading

0 comments on commit 5d02540

Please sign in to comment.