diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1d8cca8ca2d3..bfbe4e4a5fcc 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -162,6 +162,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Add cloudflare R2 to provider list in AWS S3 input. {pull}32620[32620] - Add support for single string containing multiple relation-types in getRFC5988Link. {pull}32811[32811] - Fix handling of invalid UserIP and LocalIP values. {pull}32896[32896] +- Allow http_endpoint instances to share ports. {issue}32578[32578] {pull}33377[33377] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc index 2e69f1f33bfe..56c1f7f26dbf 100644 --- a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc @@ -23,6 +23,12 @@ is sent with the request. This input can for example be used to receive incoming webhooks from a third-party application or service. +Multiple endpoints may be assigned to a single address and port, and the HTTP +Endpoint input will resolve requests based on the URL pattern configuration. +If multiple endpoints are configured on a single address they must all have the +same TLS configuration, either all disabled or all enabled with identical +configurations. + These are the possible response codes from the server. [options="header"] @@ -63,6 +69,27 @@ Custom response example: prefix: "json" ---- +Multiple endpoints example: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: http_endpoint + enabled: true + listen_address: 192.168.1.1 + listen_port: 8080 + url: "/open/" + tags: [open] +- type: http_endpoint + enabled: true + listen_address: 192.168.1.1 + listen_port: 8080 + url: "/admin/" + basic_auth: true + username: adminuser + password: somepassword + tags: [admin] +---- + Disable Content-Type checks ["source","yaml",subs="attributes"] ---- diff --git a/x-pack/filebeat/input/http_endpoint/handler_test.go b/x-pack/filebeat/input/http_endpoint/handler_test.go index 5ac1e6fc122d..071c32e98287 100644 --- a/x-pack/filebeat/input/http_endpoint/handler_test.go +++ b/x-pack/filebeat/input/http_endpoint/handler_test.go @@ -12,6 +12,7 @@ import ( "net/http" "net/http/httptest" "strings" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -146,11 +147,14 @@ func Test_httpReadJSON(t *testing.T) { } type publisher struct { + mu sync.Mutex events []beat.Event } -func (p *publisher) Publish(event beat.Event) { - p.events = append(p.events, event) +func (p *publisher) Publish(e beat.Event) { + p.mu.Lock() + p.events = append(p.events, e) + p.mu.Unlock() } func Test_apiResponse(t *testing.T) { diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index 38733a311e4e..294bf5b092e9 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -5,16 +5,20 @@ package http_endpoint import ( + "context" "crypto/tls" "fmt" "net" "net/http" + "reflect" + "sync" v2 "github.com/elastic/beats/v7/filebeat/input/v2" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/feature" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/transport/tlscommon" "github.com/elastic/go-concert/ctxtool" ) @@ -81,30 +85,139 @@ func (e *httpEndpoint) Test(_ v2.TestContext) error { } func (e *httpEndpoint) Run(ctx v2.Context, publisher stateless.Publisher) error { - log := ctx.Logger.With("address", e.addr) + err := servers.serve(ctx, e, publisher) + if err != nil && err != http.ErrServerClosed { + return fmt.Errorf("unable to start server due to error: %w", err) + } + return nil +} - mux := http.NewServeMux() - mux.Handle(e.config.URL, newHandler(e.config, publisher, log)) - server := &http.Server{Addr: e.addr, TLSConfig: e.tlsConfig, Handler: mux} - _, cancel := ctxtool.WithFunc(ctx.Cancelation, func() { server.Close() }) - defer cancel() +// servers is the package-level server pool. +var servers = pool{servers: make(map[string]*server)} + +// pool is a concurrence-safe pool of http servers. +type pool struct { + mu sync.Mutex + // servers is the server pool keyed on their address/port. + servers map[string]*server +} + +// serve runs an http server configured with the provided end-point and +// publishing to pub. The server will run until either the context is +// cancelled or the context of another end-point sharing the same address +// has had its context cancelled. If an end-point is re-registered with +// the same address and mux pattern, serve will return an error. +func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher) error { + log := ctx.Logger.With("address", e.addr) + pattern := e.config.URL var err error - if server.TLSConfig != nil { - log.Infof("Starting HTTPS server on %s", server.Addr) - // certificate is already loaded. That's why the parameters are empty - err = server.ListenAndServeTLS("", "") + p.mu.Lock() + s, ok := p.servers[e.addr] + if ok { + err = checkTLSConsistency(e.addr, s.tls, e.config.TLS) + if err != nil { + return err + } + + if old, ok := s.idOf[pattern]; ok { + err = fmt.Errorf("pattern already exists for %s: %s old=%s new=%s", + e.addr, pattern, old, ctx.ID) + s.setErr(err) + s.cancel() + p.mu.Unlock() + return err + } + log.Infof("Adding %s end point to server on %s", pattern, e.addr) + s.mux.Handle(pattern, newHandler(e.config, pub, log)) + s.idOf[pattern] = ctx.ID + p.mu.Unlock() + <-s.ctx.Done() + return s.getErr() + } + + mux := http.NewServeMux() + mux.Handle(pattern, newHandler(e.config, pub, log)) + srv := &http.Server{Addr: e.addr, TLSConfig: e.tlsConfig, Handler: mux} + s = &server{ + idOf: map[string]string{pattern: ctx.ID}, + tls: e.config.TLS, + mux: mux, + srv: srv, + } + s.ctx, s.cancel = ctxtool.WithFunc(ctx.Cancelation, func() { srv.Close() }) + p.servers[e.addr] = s + p.mu.Unlock() + + if e.tlsConfig != nil { + log.Infof("Starting HTTPS server on %s with %s end point", srv.Addr, pattern) + // The certificate is already loaded so we do not need + // to pass the cert file and key file parameters. + err = s.srv.ListenAndServeTLS("", "") } else { - log.Infof("Starting HTTP server on %s", server.Addr) - err = server.ListenAndServe() + log.Infof("Starting HTTP server on %s with %s end point", srv.Addr, pattern) + err = s.srv.ListenAndServe() } + s.setErr(err) + s.cancel() + return err +} - if err != nil && err != http.ErrServerClosed { - return fmt.Errorf("unable to start server due to error: %w", err) +func checkTLSConsistency(addr string, old, new *tlscommon.ServerConfig) error { + if (old == nil) != (new == nil) { + return fmt.Errorf("inconsistent TLS usage on %s: mixed TLS and unencrypted", addr) + } + if !reflect.DeepEqual(old, new) { + return fmt.Errorf("inconsistent TLS configuration on %s: configuration options do not agree: old=%s new=%s", + addr, renderTLSConfig(old), renderTLSConfig(new)) } return nil } +func renderTLSConfig(tls *tlscommon.ServerConfig) string { + c, err := conf.NewConfigFrom(tls) + if err != nil { + return fmt.Sprintf("!%v", err) + } + var m mapstr.M + err = c.Unpack(&m) + if err != nil { + return fmt.Sprintf("!%v", err) + } + return m.String() +} + +// server is a collection of http end-points sharing the same underlying +// http.Server. +type server struct { + // idOf is a map of mux pattern + // to input IDs for the server. + idOf map[string]string + + tls *tlscommon.ServerConfig + + mux *http.ServeMux + srv *http.Server + + ctx context.Context + cancel func() + + mu sync.Mutex + err error +} + +func (s *server) setErr(err error) { + s.mu.Lock() + s.err = err + s.mu.Unlock() +} + +func (s *server) getErr() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.err +} + func newHandler(c config, pub stateless.Publisher, log *logp.Logger) http.Handler { validator := &apiValidator{ basicAuth: c.BasicAuth, diff --git a/x-pack/filebeat/input/http_endpoint/input_test.go b/x-pack/filebeat/input/http_endpoint/input_test.go new file mode 100644 index 000000000000..87ac4e2f48ea --- /dev/null +++ b/x-pack/filebeat/input/http_endpoint/input_test.go @@ -0,0 +1,289 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http_endpoint + +import ( + "bytes" + "context" + "errors" + "io" + "net/http" + "strings" + "sync" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/transport/tlscommon" +) + +var serverPoolTests = []struct { + name string + cfgs []*httpEndpoint + events []target + want []mapstr.M + wantErr error +}{ + { + name: "single", + cfgs: []*httpEndpoint{{ + addr: "127.0.0.1:9001", + config: config{ + ResponseCode: 200, + ResponseBody: `{"message": "success"}`, + ListenAddress: "127.0.0.1", + ListenPort: "9001", + URL: "/", + Prefix: "json", + ContentType: "application/json", + }, + }}, + events: []target{ + {url: "http://127.0.0.1:9001/", event: `{"a":1}`}, + {url: "http://127.0.0.1:9001/", event: `{"b":2}`}, + {url: "http://127.0.0.1:9001/", event: `{"c":3}`}, + }, + want: []mapstr.M{ + {"json": mapstr.M{"a": int64(1)}}, + {"json": mapstr.M{"b": int64(2)}}, + {"json": mapstr.M{"c": int64(3)}}, + }, + }, + { + name: "distinct_ports", + cfgs: []*httpEndpoint{ + { + addr: "127.0.0.1:9001", + config: config{ + ResponseCode: 200, + ResponseBody: `{"message": "success"}`, + ListenAddress: "127.0.0.1", + ListenPort: "9001", + URL: "/a/", + Prefix: "json", + ContentType: "application/json", + }, + }, + { + addr: "127.0.0.1:9002", + config: config{ + ResponseCode: 200, + ResponseBody: `{"message": "success"}`, + ListenAddress: "127.0.0.1", + ListenPort: "9002", + URL: "/b/", + Prefix: "json", + ContentType: "application/json", + }, + }, + }, + events: []target{ + {url: "http://127.0.0.1:9001/a/", event: `{"a":1}`}, + {url: "http://127.0.0.1:9002/b/", event: `{"b":2}`}, + {url: "http://127.0.0.1:9001/a/", event: `{"c":3}`}, + }, + want: []mapstr.M{ + {"json": mapstr.M{"a": int64(1)}}, + {"json": mapstr.M{"b": int64(2)}}, + {"json": mapstr.M{"c": int64(3)}}, + }, + }, + { + name: "shared_ports", + cfgs: []*httpEndpoint{ + { + addr: "127.0.0.1:9001", + config: config{ + ResponseCode: 200, + ResponseBody: `{"message": "success"}`, + ListenAddress: "127.0.0.1", + ListenPort: "9001", + URL: "/a/", + Prefix: "json", + ContentType: "application/json", + }, + }, + { + addr: "127.0.0.1:9001", + config: config{ + ResponseCode: 200, + ResponseBody: `{"message": "success"}`, + ListenAddress: "127.0.0.1", + ListenPort: "9001", + URL: "/b/", + Prefix: "json", + ContentType: "application/json", + }, + }, + }, + events: []target{ + {url: "http://127.0.0.1:9001/a/", event: `{"a":1}`}, + {url: "http://127.0.0.1:9001/b/", event: `{"b":2}`}, + {url: "http://127.0.0.1:9001/a/", event: `{"c":3}`}, + }, + want: []mapstr.M{ + {"json": mapstr.M{"a": int64(1)}}, + {"json": mapstr.M{"b": int64(2)}}, + {"json": mapstr.M{"c": int64(3)}}, + }, + }, + { + name: "inconsistent_tls_mixed_traffic", + cfgs: []*httpEndpoint{ + { + addr: "127.0.0.1:9001", + config: config{ + ResponseCode: 200, + ResponseBody: `{"message": "success"}`, + ListenAddress: "127.0.0.1", + ListenPort: "9001", + URL: "/a/", + Prefix: "json", + ContentType: "application/json", + }, + }, + { + addr: "127.0.0.1:9001", + config: config{ + TLS: &tlscommon.ServerConfig{}, + ResponseCode: 200, + ResponseBody: `{"message": "success"}`, + ListenAddress: "127.0.0.1", + ListenPort: "9001", + URL: "/b/", + Prefix: "json", + ContentType: "application/json", + }, + }, + }, + wantErr: errors.New("inconsistent TLS usage on 127.0.0.1:9001: mixed TLS and unencrypted"), + }, + { + name: "inconsistent_tls_config", + cfgs: []*httpEndpoint{ + { + addr: "127.0.0.1:9001", + config: config{ + TLS: &tlscommon.ServerConfig{ + VerificationMode: tlscommon.VerifyStrict, + }, + ResponseCode: 200, + ResponseBody: `{"message": "success"}`, + ListenAddress: "127.0.0.1", + ListenPort: "9001", + URL: "/a/", + Prefix: "json", + ContentType: "application/json", + }, + }, + { + addr: "127.0.0.1:9001", + config: config{ + TLS: &tlscommon.ServerConfig{ + VerificationMode: tlscommon.VerifyNone, + }, + ResponseCode: 200, + ResponseBody: `{"message": "success"}`, + ListenAddress: "127.0.0.1", + ListenPort: "9001", + URL: "/b/", + Prefix: "json", + ContentType: "application/json", + }, + }, + }, + wantErr: errors.New(`inconsistent TLS configuration on 127.0.0.1:9001: ` + + `configuration options do not agree: ` + + `old={"ca_sha256":[],"certificate":"","certificate_authorities":[],"cipher_suites":[],"client_authentication":0,"curve_types":[],"key":"","key_passphrase":"","supported_protocols":[],"verification_mode":1} ` + + `new={"ca_sha256":[],"certificate":"","certificate_authorities":[],"cipher_suites":[],"client_authentication":0,"curve_types":[],"key":"","key_passphrase":"","supported_protocols":[],"verification_mode":3}`), + }, +} + +type target struct { + url string + event string +} + +func TestServerPool(t *testing.T) { + for _, test := range serverPoolTests { + t.Run(test.name, func(t *testing.T) { + servers := pool{servers: make(map[string]*server)} + + var ( + pub publisher + fails = make(chan error, 1) + ) + ctx, cancel := newCtx("server_pool_test", test.name) + var wg sync.WaitGroup + for _, cfg := range test.cfgs { + cfg := cfg + wg.Add(1) + go func() { + defer wg.Done() + err := servers.serve(ctx, cfg, &pub) + if err != http.ErrServerClosed { + select { + case fails <- err: + default: + } + } + }() + } + time.Sleep(time.Second) + + select { + case err := <-fails: + if test.wantErr == nil { + t.Errorf("unexpected error calling serve: %#q", err) + } else if test.wantErr.Error() != err.Error() { + t.Errorf("unexpected error calling serve: got=%#q, want=%#q", err, test.wantErr) + } + default: + if test.wantErr != nil { + t.Errorf("expected error calling serve") + } + } + for i, e := range test.events { + resp, err := http.Post(e.url, "application/json", strings.NewReader(e.event)) + if err != nil { + t.Fatalf("failed to post event #%d: %v", i, err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("unexpected response status code: %s (%d)\nresp: %s", + resp.Status, resp.StatusCode, dump(resp.Body)) + } + } + cancel() + wg.Wait() + var got []mapstr.M + for _, e := range pub.events { + got = append(got, e.Fields) + } + if !cmp.Equal(got, test.want) { + t.Errorf("unexpected result:\n--- got\n--- want\n%s", cmp.Diff(got, test.want)) + } + }) + } +} + +func newCtx(log, id string) (_ v2.Context, cancel func()) { + ctx, cancel := context.WithCancel(context.Background()) + return v2.Context{ + Logger: logp.NewLogger(log), + ID: id, + Cancelation: ctx, + }, cancel +} + +func dump(r io.ReadCloser) string { + defer r.Close() + var buf bytes.Buffer + io.Copy(&buf, r) + return buf.String() +}