Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/http_endpoint: allow http_endpoint instances to share ports #33377

Merged
merged 4 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Add cloudflare R2 to provider list in AWS S3 input. {pull}32620[32620]
- Add support for single string containing multiple relation-types in getRFC5988Link. {pull}32811[32811]
- Fix handling of invalid UserIP and LocalIP values. {pull}32896[32896]
- Allow http_endpoint instances to share ports. {issue}32578[32578] {pull}33377[33377]

*Auditbeat*

Expand Down
8 changes: 6 additions & 2 deletions x-pack/filebeat/input/http_endpoint/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -146,11 +147,14 @@ func Test_httpReadJSON(t *testing.T) {
}

type publisher struct {
mu sync.Mutex
events []beat.Event
}

func (p *publisher) Publish(event beat.Event) {
p.events = append(p.events, event)
func (p *publisher) Publish(e beat.Event) {
p.mu.Lock()
p.events = append(p.events, e)
p.mu.Unlock()
}

func Test_apiResponse(t *testing.T) {
Expand Down
141 changes: 127 additions & 14 deletions x-pack/filebeat/input/http_endpoint/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
package http_endpoint

import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"reflect"
"sync"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/feature"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
"github.com/elastic/go-concert/ctxtool"
)
Expand Down Expand Up @@ -81,30 +85,139 @@ func (e *httpEndpoint) Test(_ v2.TestContext) error {
}

func (e *httpEndpoint) Run(ctx v2.Context, publisher stateless.Publisher) error {
log := ctx.Logger.With("address", e.addr)
err := servers.serve(ctx, e, publisher)
if err != nil && err != http.ErrServerClosed {
return fmt.Errorf("unable to start server due to error: %w", err)
}
return nil
}

mux := http.NewServeMux()
mux.Handle(e.config.URL, newHandler(e.config, publisher, log))
server := &http.Server{Addr: e.addr, TLSConfig: e.tlsConfig, Handler: mux}
_, cancel := ctxtool.WithFunc(ctx.Cancelation, func() { server.Close() })
defer cancel()
// servers is the package-level server pool.
var servers = pool{servers: make(map[string]*server)}

// pool is a concurrence-safe pool of http servers.
type pool struct {
mu sync.Mutex
// servers is the server pool keyed on their address/port.
servers map[string]*server
}

// serve runs an http server configured with the provided end-point and
// publishing to pub. The server will run until either the context is
// cancelled or the context of another end-point sharing the same address
// has had its context cancelled. If an end-point is re-registered with
// the same address and mux pattern, serve will return an error.
func (p *pool) serve(ctx v2.Context, e *httpEndpoint, pub stateless.Publisher) error {
log := ctx.Logger.With("address", e.addr)
pattern := e.config.URL

var err error
if server.TLSConfig != nil {
log.Infof("Starting HTTPS server on %s", server.Addr)
// certificate is already loaded. That's why the parameters are empty
err = server.ListenAndServeTLS("", "")
p.mu.Lock()
s, ok := p.servers[e.addr]
if ok {
err = checkTLSConsistency(e.addr, s.tls, e.config.TLS)
if err != nil {
return err
}

if old, ok := s.idOf[pattern]; ok {
err = fmt.Errorf("pattern already exists for %s: %s old=%s new=%s",
e.addr, pattern, old, ctx.ID)
s.setErr(err)
s.cancel()
andrewkroh marked this conversation as resolved.
Show resolved Hide resolved
p.mu.Unlock()
return err
}
log.Infof("Adding %s end point to server on %s", pattern, e.addr)
s.mux.Handle(pattern, newHandler(e.config, pub, log))
s.idOf[pattern] = ctx.ID
p.mu.Unlock()
<-s.ctx.Done()
return s.getErr()
}

mux := http.NewServeMux()
mux.Handle(pattern, newHandler(e.config, pub, log))
srv := &http.Server{Addr: e.addr, TLSConfig: e.tlsConfig, Handler: mux}
s = &server{
idOf: map[string]string{pattern: ctx.ID},
tls: e.config.TLS,
mux: mux,
srv: srv,
}
s.ctx, s.cancel = ctxtool.WithFunc(ctx.Cancelation, func() { srv.Close() })
p.servers[e.addr] = s
p.mu.Unlock()

if e.tlsConfig != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the case where one input instance is running with TLS and another is not, I think we should detect and prevent this.

On a similar note, should we prevent two inputs that are configured with different/inconsistent TLS options too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add this check, and yes, I think inconsistent configs should be disallowed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hang on. This may not be the right thing to do; we already prevent two bindings of a pattern to an address (IP:port) so it is not possible to get into the situation where we have a conflicting TLS config. The next level of binding, where we are looking at an address as IP-only doesn't really make sense to check this for since there may be cases where one port expects TLS and another doesn't — I would imagine that this is rare, but the case where they have different TLS configurations would be less so.

Copy link
Member

@andrewkroh andrewkroh Oct 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's an example config that demonstrates the problem I am thinking of.

---

filebeat.inputs:

- type: http_endpoint
  id: http_endpoint-no-ssl
  listen_address: 127.0.0.1
  listen_port: 9001
  url: '/webhook-alpha/'
  preserve_original_event: true

# This one will start listening without TLS, but I would expect an error
# since we cannot have both an HTTP and HTTPS listener on 127.0.0.1:9001.
- type: http_endpoint
  id: http_endpoint-with-ssl
  listen_address: 127.0.0.1
  listen_port: 9001
  url: '/webhook-beta/'
  preserve_original_event: true
  ssl:
    verification_mode: none
    certificate: |
      -----BEGIN CERTIFICATE-----
      MIIBzjCCATGgAwIBAgIBATAKBggqhkjOPQQDBDASMRAwDgYDVQQKEwdBY21lIENv
      MB4XDTIyMTAyNTEyNDQ0OFoXDTIzMDQyMzEyNDQ0OFowEjEQMA4GA1UEChMHQWNt
      ZSBDbzCBmzAQBgcqhkjOPQIBBgUrgQQAIwOBhgAEANdwExxtIgYhw8kN485JxdAX
      Y/XzbXA5w1G1ubO3sOpXIcQnuOZC/ea0xnbq9JLOxs9u1glr/K9awsXGlg5JQ3K6
      Aa6dBg5IQIGc6UBbetCaOx3QQkrrKJFrlF+6oiwn3Cs+V/qZMwsNgDdEeq2gFINj
      3ede5TUE+jmsqy+qcNTZ9ik6ozUwMzAOBgNVHQ8BAf8EBAMCBaAwEwYDVR0lBAww
      CgYIKwYBBQUHAwEwDAYDVR0TAQH/BAIwADAKBggqhkjOPQQDBAOBigAwgYYCQQH2
      bwUzZWpwyvXoBGwWfGeKcmNSRFK+5Ur9k2C+PW+C+tVw0oqJvuEcJys78cB9TVwM
      OLBBY4Lip6CDGZ9Zu8MvAkETMcxFC2g3YIiHMFztfkMmU5XnVjjwjelmxNxREKUB
      pyD8tx7ptfI6hI8EUrqD1mQwnLLFU7Rq7IAbNuFXksYeWw==
      -----END CERTIFICATE-----

    # Not a secret.
    key: |
      -----BEGIN EC PRIVATE KEY-----
      MIHcAgEBBEIBtHeKAVWSypQo+PUDhUSMmh41Es6uCd+0OQhr3K9S3ntq7OjuMces
      CN2dG3WJBDo3TEfVJOhrNKE31zoM7+PE+kygBwYFK4EEACOhgYkDgYYABADXcBMc
      bSIGIcPJDePOScXQF2P1821wOcNRtbmzt7DqVyHEJ7jmQv3mtMZ26vSSzsbPbtYJ
      a/yvWsLFxpYOSUNyugGunQYOSECBnOlAW3rQmjsd0EJK6yiRa5RfuqIsJ9wrPlf6
      mTMLDYA3RHqtoBSDY93nXuU1BPo5rKsvqnDU2fYpOg==
      -----END EC PRIVATE KEY-----

output.console.enable: true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

log.Infof("Starting HTTPS server on %s with %s end point", srv.Addr, pattern)
// The certificate is already loaded so we do not need
// to pass the cert file and key file parameters.
err = s.srv.ListenAndServeTLS("", "")
} else {
log.Infof("Starting HTTP server on %s", server.Addr)
err = server.ListenAndServe()
log.Infof("Starting HTTP server on %s with %s end point", srv.Addr, pattern)
err = s.srv.ListenAndServe()
}
s.setErr(err)
s.cancel()
return err
}

if err != nil && err != http.ErrServerClosed {
return fmt.Errorf("unable to start server due to error: %w", err)
func checkTLSConsistency(addr string, old, new *tlscommon.ServerConfig) error {
if (old == nil) != (new == nil) {
return fmt.Errorf("inconsistent TLS usage on %s: mixed TLS and unencrypted", addr)
}
if !reflect.DeepEqual(old, new) {
return fmt.Errorf("inconsistent TLS configuration on %s: configuration options do not agree: old=%s new=%s",
addr, renderTLSConfig(old), renderTLSConfig(new))
}
return nil
}

func renderTLSConfig(tls *tlscommon.ServerConfig) string {
c, err := conf.NewConfigFrom(tls)
if err != nil {
return fmt.Sprintf("!%v", err)
}
var m mapstr.M
err = c.Unpack(&m)
if err != nil {
return fmt.Sprintf("!%v", err)
}
return m.String()
}

// server is a collection of http end-points sharing the same underlying
// http.Server.
type server struct {
// idOf is a map of mux pattern
// to input IDs for the server.
idOf map[string]string

tls *tlscommon.ServerConfig

mux *http.ServeMux
srv *http.Server

ctx context.Context
cancel func()

mu sync.Mutex
err error
}

func (s *server) setErr(err error) {
s.mu.Lock()
s.err = err
s.mu.Unlock()
}

func (s *server) getErr() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.err
}

func newHandler(c config, pub stateless.Publisher, log *logp.Logger) http.Handler {
validator := &apiValidator{
basicAuth: c.BasicAuth,
Expand Down
Loading