Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x](backport #26474) [Elastic Agent] Use http2 to connect to Fleet Server. #26546

Merged
merged 1 commit into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions libbeat/common/transport/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,63 @@ func TestTLSDialer(
}), nil
}

type DialerH2 interface {
Dial(network, address string, cfg *tls.Config) (net.Conn, error)
}

type DialerFuncH2 func(network, address string, cfg *tls.Config) (net.Conn, error)

func (d DialerFuncH2) Dial(network, address string, cfg *tls.Config) (net.Conn, error) {
return d(network, address, cfg)
}

func TLSDialerH2(forward Dialer, config *tlscommon.TLSConfig, timeout time.Duration) (DialerH2, error) {
return TestTLSDialerH2(testing.NullDriver, forward, config, timeout)
}

func TestTLSDialerH2(
d testing.Driver,
forward Dialer,
config *tlscommon.TLSConfig,
timeout time.Duration,
) (DialerH2, error) {
var lastTLSConfig *tls.Config
var lastNetwork string
var lastAddress string
var m sync.Mutex

return DialerFuncH2(func(network, address string, cfg *tls.Config) (net.Conn, error) {
switch network {
case "tcp", "tcp4", "tcp6":
default:
return nil, fmt.Errorf("unsupported network type %v", network)
}

host, _, err := net.SplitHostPort(address)
if err != nil {
return nil, err
}

var tlsConfig *tls.Config
m.Lock()
if network == lastNetwork && address == lastAddress {
tlsConfig = lastTLSConfig
}
if tlsConfig == nil {
tlsConfig = config.BuildModuleClientConfig(host)
lastNetwork = network
lastAddress = address
lastTLSConfig = tlsConfig
}
m.Unlock()

// NextProtos must be set from the passed h2 connection or it will fail
tlsConfig.NextProtos = cfg.NextProtos

return tlsDialWith(d, forward, network, address, timeout, tlsConfig, config)
}), nil
}

func tlsDialWith(
d testing.Driver,
dialer Dialer,
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,4 @@
- Use `filestream` input for internal log collection. {pull}25660[25660]
- Enable agent to send custom headers to kibana/ES {pull}26275[26275]
- Set `agent.id` to the Fleet Agent ID in events published from inputs backed by Beats. {issue}21121[21121] {pull}26394[26394]
- Communicate with Fleet Server over HTTP2. {pull}26474[26474]
32 changes: 19 additions & 13 deletions x-pack/elastic-agent/pkg/remote/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/pkg/errors"
"golang.org/x/net/http2"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport"
Expand Down Expand Up @@ -113,8 +114,16 @@ func NewWithConfig(log *logger.Logger, cfg Config, wrapper wrapperFunc) (*Client
hosts := cfg.GetHosts()
clients := make([]*requestClient, len(hosts))
for i, host := range cfg.GetHosts() {
var transport http.RoundTripper
transport, err := makeTransport(cfg.Timeout, cfg.TLS)
connStr, err := common.MakeURL(string(cfg.Protocol), p, host, 0)
if err != nil {
return nil, errors.Wrap(err, "invalid fleet-server endpoint")
}
addr, err := url.Parse(connStr)
if err != nil {
return nil, errors.Wrap(err, "invalid fleet-server endpoint")
}

transport, err := makeTransport(addr.Scheme, cfg.Timeout, cfg.TLS)
if err != nil {
return nil, err
}
Expand All @@ -136,12 +145,8 @@ func NewWithConfig(log *logger.Logger, cfg Config, wrapper wrapperFunc) (*Client
Timeout: cfg.Timeout,
}

url, err := common.MakeURL(string(cfg.Protocol), p, host, 0)
if err != nil {
return nil, errors.Wrap(err, "invalid fleet-server endpoint")
}
clients[i] = &requestClient{
request: prefixRequestFactory(url),
request: prefixRequestFactory(connStr),
client: httpClient,
}
}
Expand Down Expand Up @@ -272,17 +277,18 @@ func prefixRequestFactory(URL string) requestFunc {
}

// makeTransport create a transport object based on the TLS configuration.
func makeTransport(timeout time.Duration, tls *tlscommon.Config) (*http.Transport, error) {
func makeTransport(scheme string, timeout time.Duration, tls *tlscommon.Config) (http.RoundTripper, error) {
dialer := transport.NetDialer(timeout)
if scheme == "http" {
return &http.Transport{Dial: dialer.Dial}, nil
}
tlsConfig, err := tlscommon.LoadTLSConfig(tls)
if err != nil {
return nil, errors.Wrap(err, "invalid TLS configuration")
}
dialer := transport.NetDialer(timeout)
tlsDialer, err := transport.TLSDialer(dialer, tlsConfig, timeout)
tlsDialer, err := transport.TLSDialerH2(dialer, tlsConfig, timeout)
if err != nil {
return nil, errors.Wrap(err, "fail to create TLS dialer")
}

// TODO: Dial is deprecated we need to move to DialContext.
return &http.Transport{Dial: dialer.Dial, DialTLS: tlsDialer.Dial}, nil
return &http2.Transport{DialTLS: tlsDialer.Dial}, nil
}