Skip to content

Commit

Permalink
x-pack/filebeat/input/http_endpoint: allow http_endpoint instances to…
Browse files Browse the repository at this point in the history
… share ports
  • Loading branch information
efd6 committed Oct 25, 2022
1 parent 51d0a0a commit a3deeb3
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Add cloudflare R2 to provider list in AWS S3 input. {pull}32620[32620]
- Add support for single string containing multiple relation-types in getRFC5988Link. {pull}32811[32811]
- Fix handling of invalid UserIP and LocalIP values. {pull}32896[32896]
- Allow http_endpoint instances to share ports. {issue}32578[32578] {pull}33377[33377]

*Auditbeat*

Expand Down
8 changes: 6 additions & 2 deletions x-pack/filebeat/input/http_endpoint/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -146,11 +147,14 @@ func Test_httpReadJSON(t *testing.T) {
}

type publisher struct {
mu sync.Mutex
events []beat.Event
}

func (p *publisher) Publish(event beat.Event) {
p.events = append(p.events, event)
func (p *publisher) Publish(e beat.Event) {
p.mu.Lock()
p.events = append(p.events, e)
p.mu.Unlock()
}

func Test_apiResponse(t *testing.T) {
Expand Down
108 changes: 92 additions & 16 deletions x-pack/filebeat/input/http_endpoint/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
package http_endpoint

import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"sync"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
Expand Down Expand Up @@ -81,28 +83,102 @@ func (e *httpEndpoint) Test(_ v2.TestContext) error {
}

func (e *httpEndpoint) Run(ctx v2.Context, publisher stateless.Publisher) error {
log := ctx.Logger.With("address", e.addr)
err := servers.serve(ctx, e, publisher)
if err != nil && err != http.ErrServerClosed {
return fmt.Errorf("unable to start server due to error: %w", err)
}
return nil
}

mux := http.NewServeMux()
mux.Handle(e.config.URL, newHandler(e.config, publisher, log))
server := &http.Server{Addr: e.addr, TLSConfig: e.tlsConfig, Handler: mux}
_, cancel := ctxtool.WithFunc(ctx.Cancelation, func() { server.Close() })
defer cancel()
// servers is the package-level server pool.
var servers = pool{servers: make(map[string]*server)}

// pool is a concurrence-safe pool of http servers.
type pool struct {
mu sync.Mutex
// servers is the server pool keyed on their address/port.
servers map[string]*server
}

// serve runs an http server configured with the provided end-point and
// publishing to pub. The server will run until either the context is
// cancelled or the context of another end-point sharing the same address
// has had its context cancelled. If an end-point is re-registered with
// the same address and mux pattern, serve will return an error.
func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher) error {
log := ctx.Logger.With("address", e.addr)
pattern := e.config.URL

var err error
if server.TLSConfig != nil {
log.Infof("Starting HTTPS server on %s", server.Addr)
// certificate is already loaded. That's why the parameters are empty
err = server.ListenAndServeTLS("", "")
} else {
log.Infof("Starting HTTP server on %s", server.Addr)
err = server.ListenAndServe()
p.mu.Lock()
s, ok := p.servers[e.addr]
if ok {
if s.pattern[pattern] {
err := fmt.Errorf("pattern already exists for %s: %s", e.addr, pattern)
s.setErr(err)
s.cancel()
p.mu.Unlock()
return err
}
log.Infof("Adding %s end point to server on %s", pattern, e.addr)
s.mux.Handle(pattern, newHandler(e.config, pub, log))
s.pattern[pattern] = true
p.mu.Unlock()
<-s.ctx.Done()
return s.getErr()
}

if err != nil && err != http.ErrServerClosed {
return fmt.Errorf("unable to start server due to error: %w", err)
mux := http.NewServeMux()
mux.Handle(pattern, newHandler(e.config, pub, log))
srv := &http.Server{Addr: e.addr, TLSConfig: e.tlsConfig, Handler: mux}
s = &server{
pattern: map[string]bool{pattern: true},
mux: mux,
srv: srv,
}
return nil
s.ctx, s.cancel = ctxtool.WithFunc(ctx.Cancelation, func() { srv.Close() })
p.servers[e.addr] = s
p.mu.Unlock()

if e.tlsConfig != nil {
log.Infof("Starting HTTPS server on %s with %s end point", srv.Addr, pattern)
// The certificate is already loaded so we do not need
// to pass the cert file and key file parameters.
err = s.srv.ListenAndServeTLS("", "")
} else {
log.Infof("Starting HTTP server on %s with %s end point", srv.Addr, pattern)
err = s.srv.ListenAndServe()
}
s.setErr(err)
s.cancel()
return err
}

// server is a collection of http end-points sharing the same underlying
// http.Server.
type server struct {
pattern map[string]bool

mux *http.ServeMux
srv *http.Server

ctx context.Context
cancel func()

mu sync.Mutex
err error
}

func (s *server) setErr(err error) {
s.mu.Lock()
s.err = err
s.mu.Unlock()
}

func (s *server) getErr() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.err
}

func newHandler(c config, pub stateless.Publisher, log *logp.Logger) http.Handler {
Expand Down
195 changes: 195 additions & 0 deletions x-pack/filebeat/input/http_endpoint/input_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package http_endpoint

import (
"bytes"
"context"
"io"
"net/http"
"strings"
"sync"
"testing"
"time"

"github.com/google/go-cmp/cmp"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

var serverPoolTests = []struct {
name string
cfgs []*httpEndpoint
events []target
want []mapstr.M
}{
{
name: "single",
cfgs: []*httpEndpoint{{
addr: "127.0.0.1:9001",
config: config{
ResponseCode: 200,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
URL: "/",
Prefix: "json",
ContentType: "application/json",
},
}},
events: []target{
{url: "http://127.0.0.1:9001/", event: `{"a":1}`},
{url: "http://127.0.0.1:9001/", event: `{"b":2}`},
{url: "http://127.0.0.1:9001/", event: `{"c":3}`},
},
want: []mapstr.M{
{"json": mapstr.M{"a": int64(1)}},
{"json": mapstr.M{"b": int64(2)}},
{"json": mapstr.M{"c": int64(3)}},
},
},
{
name: "distinct_ports",
cfgs: []*httpEndpoint{
{
addr: "127.0.0.1:9001",
config: config{
ResponseCode: 200,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
URL: "/a/",
Prefix: "json",
ContentType: "application/json",
},
},
{
addr: "127.0.0.1:9002",
config: config{
ResponseCode: 200,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9002",
URL: "/b/",
Prefix: "json",
ContentType: "application/json",
},
},
},
events: []target{
{url: "http://127.0.0.1:9001/a/", event: `{"a":1}`},
{url: "http://127.0.0.1:9002/b/", event: `{"b":2}`},
{url: "http://127.0.0.1:9001/a/", event: `{"c":3}`},
},
want: []mapstr.M{
{"json": mapstr.M{"a": int64(1)}},
{"json": mapstr.M{"b": int64(2)}},
{"json": mapstr.M{"c": int64(3)}},
},
},
{
name: "shared_ports",
cfgs: []*httpEndpoint{
{
addr: "127.0.0.1:9001",
config: config{
ResponseCode: 200,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
URL: "/a/",
Prefix: "json",
ContentType: "application/json",
},
},
{
addr: "127.0.0.1:9001",
config: config{
ResponseCode: 200,
ResponseBody: `{"message": "success"}`,
ListenAddress: "127.0.0.1",
ListenPort: "9001",
URL: "/b/",
Prefix: "json",
ContentType: "application/json",
},
},
},
events: []target{
{url: "http://127.0.0.1:9001/a/", event: `{"a":1}`},
{url: "http://127.0.0.1:9001/b/", event: `{"b":2}`},
{url: "http://127.0.0.1:9001/a/", event: `{"c":3}`},
},
want: []mapstr.M{
{"json": mapstr.M{"a": int64(1)}},
{"json": mapstr.M{"b": int64(2)}},
{"json": mapstr.M{"c": int64(3)}},
},
},
}

type target struct {
url string
event string
}

func TestServerPool(t *testing.T) {
for _, test := range serverPoolTests {
t.Run(test.name, func(t *testing.T) {
servers := pool{servers: make(map[string]*server)}

var pub publisher
ctx, cancel := newCtx("server_pool_test", test.name)
var wg sync.WaitGroup
for _, cfg := range test.cfgs {
cfg := cfg
wg.Add(1)
go func() {
defer wg.Done()
_ = servers.serve(ctx, cfg, &pub) // Always returns a non-nil error.
}()
}
time.Sleep(time.Second)

for i, e := range test.events {
resp, err := http.Post(e.url, "application/json", strings.NewReader(e.event))
if err != nil {
t.Fatalf("failed to post event #%d: %v", i, err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("unexpected response status code: %s (%d)\nresp: %s",
resp.Status, resp.StatusCode, dump(resp.Body))
}
}
cancel()
wg.Wait()
var got []mapstr.M
for _, e := range pub.events {
got = append(got, e.Fields)
}
if !cmp.Equal(got, test.want) {
t.Errorf("unexpected result:\n--- got\n--- want\n%s", cmp.Diff(got, test.want))
}
})
}
}

func newCtx(log, id string) (_ v2.Context, cancel func()) {
ctx, cancel := context.WithCancel(context.Background())
return v2.Context{
Logger: logp.NewLogger(log),
ID: id,
Cancelation: ctx,
}, cancel
}

func dump(r io.ReadCloser) string {
defer r.Close()
var buf bytes.Buffer
io.Copy(&buf, r)
return buf.String()
}

0 comments on commit a3deeb3

Please sign in to comment.