Skip to content

Commit

Permalink
feat(x509_cert): add proxy support (#9319)
Browse files Browse the repository at this point in the history
  • Loading branch information
akrantz01 authored Jun 21, 2022
1 parent d8f2b38 commit 65a6085
Show file tree
Hide file tree
Showing 10 changed files with 264 additions and 8 deletions.
140 changes: 140 additions & 0 deletions plugins/common/proxy/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package proxy

import (
"bufio"
"context"
"fmt"
"net"
"net/http"
"net/url"

netProxy "golang.org/x/net/proxy"
)

// httpConnectProxy proxies (only?) TCP over a HTTP tunnel using the CONNECT method
type httpConnectProxy struct {
forward netProxy.Dialer
url *url.URL
}

func (c *httpConnectProxy) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
// Prevent using UDP
if network == "udp" {
return nil, fmt.Errorf("cannot proxy %q traffic over HTTP CONNECT", network)
}

var proxyConn net.Conn
var err error
if dialer, ok := c.forward.(netProxy.ContextDialer); ok {
proxyConn, err = dialer.DialContext(ctx, "tcp", c.url.Host)
} else {
shim := contextDialerShim{c.forward}
proxyConn, err = shim.DialContext(ctx, "tcp", c.url.Host)
}
if err != nil {
return nil, err
}

// Add and strip http:// to extract authority portion of the URL
// since CONNECT doesn't use a full URL. The request header would
// look something like: "CONNECT www.influxdata.com:443 HTTP/1.1"
requestURL, err := url.Parse("http://" + addr)
if err != nil {
if err := proxyConn.Close(); err != nil {
return nil, err
}
return nil, err
}
requestURL.Scheme = ""

// Build HTTP CONNECT request
req, err := http.NewRequest(http.MethodConnect, requestURL.String(), nil)
if err != nil {
if err := proxyConn.Close(); err != nil {
return nil, err
}
return nil, err
}
req.Close = false
if password, hasAuth := c.url.User.Password(); hasAuth {
req.SetBasicAuth(c.url.User.Username(), password)
}

err = req.Write(proxyConn)
if err != nil {
if err := proxyConn.Close(); err != nil {
return nil, err
}
return nil, err
}

resp, err := http.ReadResponse(bufio.NewReader(proxyConn), req)
if err != nil {
if err := proxyConn.Close(); err != nil {
return nil, err
}
return nil, err
}
if err := resp.Body.Close(); err != nil {
return nil, err
}

if resp.StatusCode != 200 {
if err := proxyConn.Close(); err != nil {
return nil, err
}
return nil, fmt.Errorf("failed to connect to proxy: %q", resp.Status)
}

return proxyConn, nil
}

func (c *httpConnectProxy) Dial(network, addr string) (net.Conn, error) {
return c.DialContext(context.Background(), network, addr)
}

func newHTTPConnectProxy(proxyURL *url.URL, forward netProxy.Dialer) (netProxy.Dialer, error) {
return &httpConnectProxy{forward, proxyURL}, nil
}

func init() {
// Register new proxy types
netProxy.RegisterDialerType("http", newHTTPConnectProxy)
netProxy.RegisterDialerType("https", newHTTPConnectProxy)
}

// contextDialerShim allows cancellation of the dial from a context even if the underlying
// dialer does not implement `proxy.ContextDialer`. Arguably, this shouldn't actually get run,
// unless a new proxy type is added that doesn't implement `proxy.ContextDialer`, as all the
// standard library dialers implement `proxy.ContextDialer`.
type contextDialerShim struct {
dialer netProxy.Dialer
}

func (cd *contextDialerShim) Dial(network, addr string) (net.Conn, error) {
return cd.dialer.Dial(network, addr)
}

func (cd *contextDialerShim) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
var (
conn net.Conn
done = make(chan struct{}, 1)
err error
)

go func() {
conn, err = cd.dialer.Dial(network, addr)
close(done)
if conn != nil && ctx.Err() != nil {
_ = conn.Close()
}
}()

select {
case <-ctx.Done():
err = ctx.Err()
case <-done:
}

return conn, err
}
37 changes: 37 additions & 0 deletions plugins/common/proxy/dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package proxy

import (
"context"
"net"
"time"

netProxy "golang.org/x/net/proxy"
)

type ProxiedDialer struct {
dialer netProxy.Dialer
}

func (pd *ProxiedDialer) Dial(network, addr string) (net.Conn, error) {
return pd.dialer.Dial(network, addr)
}

func (pd *ProxiedDialer) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
if contextDialer, ok := pd.dialer.(netProxy.ContextDialer); ok {
return contextDialer.DialContext(ctx, network, addr)
}

contextDialer := contextDialerShim{pd.dialer}
return contextDialer.DialContext(ctx, network, addr)
}

func (pd *ProxiedDialer) DialTimeout(network, addr string, timeout time.Duration) (net.Conn, error) {
ctx := context.Background()
if timeout.Seconds() != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}

return pd.DialContext(ctx, network, addr)
}
39 changes: 36 additions & 3 deletions plugins/common/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,54 @@ import (
"fmt"
"net/http"
"net/url"

"golang.org/x/net/proxy"
)

type HTTPProxy struct {
HTTPProxyURL string `toml:"http_proxy_url"`
UseSystemProxy bool `toml:"use_system_proxy"`
HTTPProxyURL string `toml:"http_proxy_url"`
}

type proxyFunc func(req *http.Request) (*url.URL, error)

func (p *HTTPProxy) Proxy() (proxyFunc, error) {
if len(p.HTTPProxyURL) > 0 {
if p.UseSystemProxy {
return http.ProxyFromEnvironment, nil
} else if len(p.HTTPProxyURL) > 0 {
address, err := url.Parse(p.HTTPProxyURL)
if err != nil {
return nil, fmt.Errorf("error parsing proxy url %q: %w", p.HTTPProxyURL, err)
}
return http.ProxyURL(address), nil
}
return http.ProxyFromEnvironment, nil

return nil, nil
}

type TCPProxy struct {
UseProxy bool `toml:"use_proxy"`
ProxyURL string `toml:"proxy_url"`
}

func (p *TCPProxy) Proxy() (*ProxiedDialer, error) {
var dialer proxy.Dialer
if p.UseProxy {
if len(p.ProxyURL) > 0 {
parsed, err := url.Parse(p.ProxyURL)
if err != nil {
return nil, fmt.Errorf("error parsing proxy url %q: %w", p.ProxyURL, err)
}

if dialer, err = proxy.FromURL(parsed, proxy.Direct); err != nil {
return nil, err
}
} else {
dialer = proxy.FromEnvironment()
}
} else {
dialer = proxy.Direct
}

return &ProxiedDialer{dialer}, nil
}
3 changes: 2 additions & 1 deletion plugins/inputs/cloudwatch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ API endpoint. In the following order the plugin will attempt to authenticate.
## ex: endpoint_url = "http://localhost:8000"
# endpoint_url = ""

## Set http_proxy (telegraf uses the system wide proxy settings if it's is not set)
## Set http_proxy
# use_system_proxy = false
# http_proxy_url = "http://localhost:8888"

# The minimum period for Cloudwatch metrics is 1 minute (60s). However not all
Expand Down
10 changes: 8 additions & 2 deletions plugins/inputs/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cloudwatch
import (
"context"
"net/http"
"net/url"
"testing"
"time"

Expand Down Expand Up @@ -360,13 +361,18 @@ func TestUpdateWindow(t *testing.T) {

func TestProxyFunction(t *testing.T) {
c := &CloudWatch{
HTTPProxy: proxy.HTTPProxy{HTTPProxyURL: "http://www.penguins.com"},
HTTPProxy: proxy.HTTPProxy{
HTTPProxyURL: "http://www.penguins.com",
},
}

proxyFunction, err := c.HTTPProxy.Proxy()
require.NoError(t, err)

proxyResult, err := proxyFunction(&http.Request{})
u, err := url.Parse("https://monitoring.us-west-1.amazonaws.com/")
require.NoError(t, err)

proxyResult, err := proxyFunction(&http.Request{URL: u})
require.NoError(t, err)
require.Equal(t, "www.penguins.com", proxyResult.Host)
}
Expand Down
4 changes: 4 additions & 0 deletions plugins/inputs/x509_cert/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ When using a UDP address as a certificate source, the server must support
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
# tls_server_name = "myhost.example.org"

## Set the proxy URL
# use_proxy = true
# proxy_url = "http://localhost:8888"
```

## Metrics
Expand Down
9 changes: 8 additions & 1 deletion plugins/inputs/x509_cert/x509_cert.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/common/proxy"
_tls "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
Expand All @@ -38,6 +39,7 @@ type X509Cert struct {
ExcludeRootCerts bool `toml:"exclude_root_certs"`
tlsCfg *tls.Config
_tls.ClientConfig
proxy.TCPProxy
locations []*url.URL
globpaths []*globpath.GlobPath
Log telegraf.Logger
Expand Down Expand Up @@ -126,7 +128,12 @@ func (c *X509Cert) getCert(u *url.URL, timeout time.Duration) ([]*x509.Certifica
protocol = "tcp"
fallthrough
case "tcp", "tcp4", "tcp6":
ipConn, err := net.DialTimeout(protocol, u.Host, timeout)
dialer, err := c.Proxy()
if err != nil {
return nil, err
}

ipConn, err := dialer.DialTimeout(protocol, u.Host, timeout)
if err != nil {
return nil, err
}
Expand Down
23 changes: 23 additions & 0 deletions plugins/inputs/x509_cert/x509_cert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"math/big"
"net"
"net/http"
"net/http/httptest"
"net/url"
"os"
"path/filepath"
Expand Down Expand Up @@ -320,13 +322,33 @@ func TestGatherUDPCertIntegration(t *testing.T) {
require.True(t, acc.HasMeasurement("x509_cert"))
}

func TestGatherTCPCert(t *testing.T) {
ts := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()

m := &X509Cert{
Sources: []string{ts.URL},
Log: testutil.Logger{},
}
require.NoError(t, m.Init())

var acc testutil.Accumulator
require.NoError(t, m.Gather(&acc))

require.Len(t, acc.Errors, 0)
require.True(t, acc.HasMeasurement("x509_cert"))
}

func TestGatherCertIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

m := &X509Cert{
Sources: []string{"https://www.influxdata.com:443"},
Log: testutil.Logger{},
}
require.NoError(t, m.Init())

Expand All @@ -343,6 +365,7 @@ func TestGatherCertMustNotTimeoutIntegration(t *testing.T) {
duration := time.Duration(15) * time.Second
m := &X509Cert{
Sources: []string{"https://www.influxdata.com:443"},
Log: testutil.Logger{},
Timeout: config.Duration(duration),
}
require.NoError(t, m.Init())
Expand Down
3 changes: 2 additions & 1 deletion plugins/outputs/datadog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ This plugin writes to the [Datadog Metrics API][metrics] and requires an
## Write URL override; useful for debugging.
# url = "https://app.datadoghq.com/api/v1/series"

## Set http_proxy (telegraf uses the system wide proxy settings if it isn't set)
## Set http_proxy
# use_system_proxy = false
# http_proxy_url = "http://localhost:8888"

## Override the default (none) compression used to send data.
Expand Down
4 changes: 4 additions & 0 deletions plugins/outputs/websocket/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ It can output data in any of the [supported output formats][formats].
# socks5_username = "alice"
# socks5_password = "pass123"

## Optional HTTP proxy to use
# use_system_proxy = false
# http_proxy_url = "http://localhost:8888"

## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
Expand Down

0 comments on commit 65a6085

Please sign in to comment.