Skip to content

Commit

Permalink
Merge pull request #7392 from mglazer/feature/https-subscriber
Browse files Browse the repository at this point in the history
Fix Kapacitor Issue #942: HTTPS subscriptions don't work
  • Loading branch information
Nathaniel Cook authored Oct 4, 2016
2 parents 8e35dd3 + 1feca06 commit 91645c0
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

### Bugfixes

- [#7392](https://github.com/influxdata/influxdb/pull/7392): Enable https subscriptions to work with custom CA certificates.
- [#1834](https://github.com/influxdata/influxdb/issues/1834): Drop time when used as a tag or field key.
- [#7152](https://github.com/influxdata/influxdb/issues/7152): Decrement number of measurements only once when deleting the last series from a measurement.
- [#7177](https://github.com/influxdata/influxdb/issues/7177): Fix base64 encoding issue with /debug/vars stats.
Expand Down
29 changes: 27 additions & 2 deletions services/subscriber/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package subscriber

import (
"errors"
"fmt"
"os"
"path/filepath"
"time"

"github.com/influxdata/influxdb/toml"
Expand All @@ -17,19 +20,41 @@ type Config struct {
Enabled bool `toml:"enabled"`

HTTPTimeout toml.Duration `toml:"http-timeout"`

// InsecureSkipVerify gets passed to the http client, if true, it will
// skip https certificate verification. Defaults to false
InsecureSkipVerify bool `toml:"insecure-skip-verify"`

// configure the path to the PEM encoded CA certs file. If the
// empty string, the default system certs will be used
CaCerts string `toml:"ca-certs"`
}

// NewConfig returns a new instance of a subscriber config.
func NewConfig() Config {
return Config{
Enabled: true,
HTTPTimeout: toml.Duration(DefaultHTTPTimeout),
Enabled: true,
HTTPTimeout: toml.Duration(DefaultHTTPTimeout),
InsecureSkipVerify: false,
CaCerts: "",
}
}

func (c Config) Validate() error {
if c.HTTPTimeout <= 0 {
return errors.New("http-timeout must be greater than 0")
}
if c.CaCerts != "" && !fileExists(c.CaCerts) {
abspath, err := filepath.Abs(c.CaCerts)
if err != nil {
return fmt.Errorf("ca-certs file %s does not exist. Wrapped Error: %v", c.CaCerts, err)
}
return fmt.Errorf("ca-certs file %s does not exist", abspath)
}
return nil
}

func fileExists(fileName string) bool {
info, err := os.Stat(fileName)
return err == nil && !info.IsDir()
}
86 changes: 85 additions & 1 deletion services/subscriber/config_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package subscriber_test

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/BurntSushi/toml"
Expand All @@ -18,6 +22,86 @@ enabled = false

// Validate configuration.
if c.Enabled != false {
t.Fatalf("unexpected enabled state: %v", c.Enabled)
t.Errorf("unexpected enabled state: %v", c.Enabled)
}
if c.InsecureSkipVerify == true {
t.Errorf("InsecureSkipVerify: expected %v. got %v", false, c.InsecureSkipVerify)
}
}

func TestConfig_ParseTLSConfig(t *testing.T) {
abspath, err := filepath.Abs("/path/to/ca-certs.pem")
if err != nil {
t.Fatalf("Could not construct absolute path. %v", err)
}

// Parse configuration.
var c subscriber.Config
if _, err := toml.Decode(fmt.Sprintf(`
http-timeout = "60s"
enabled = true
ca-certs = '%s'
insecure-skip-verify = true
`, abspath), &c); err != nil {
t.Fatal(err)
}

// Validate configuration.
if c.Enabled != true {
t.Errorf("unexpected enabled state: %v", c.Enabled)
}
if c.CaCerts != abspath {
t.Errorf("CaCerts: expected %s. got %s", abspath, c.CaCerts)
}
if c.InsecureSkipVerify != true {
t.Errorf("InsecureSkipVerify: expected %v. got %v", true, c.InsecureSkipVerify)
}
err = c.Validate()
if err == nil {
t.Errorf("Expected Validation to fail (%s doesn't exist)", abspath)
}

if err.Error() != fmt.Sprintf("ca-certs file %s does not exist", abspath) {
t.Errorf("Expected descriptive validation error. Instead got %v", err)
}
}

func TestConfig_ParseTLSConfigValidCerts(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "ca-certs.crt")
if err != nil {
t.Fatalf("could not create temp file. error was: %v", err)
}
defer os.Remove(tmpfile.Name())

if _, err := tmpfile.Write([]byte("=== BEGIN CERTIFICATE ===\n=== END CERTIFICATE ===")); err != nil {
t.Fatalf("could not write temp file. error was: %v", err)
}
if err := tmpfile.Close(); err != nil {
t.Fatalf("could not close temp file. error was %v", err)
}

// Parse configuration.
var c subscriber.Config
if _, err := toml.Decode(fmt.Sprintf(`
http-timeout = "60s"
enabled = true
ca-certs = '%s'
insecure-skip-verify = false
`, tmpfile.Name()), &c); err != nil {
t.Fatal(err)
}

// Validate configuration.
if c.Enabled != true {
t.Errorf("unexpected enabled state: %v", c.Enabled)
}
if c.CaCerts != tmpfile.Name() {
t.Errorf("CaCerts: expected %v. got %v", tmpfile.Name(), c.CaCerts)
}
if c.InsecureSkipVerify != false {
t.Errorf("InsecureSkipVerify: expected %v. got %v", false, c.InsecureSkipVerify)
}
if err := c.Validate(); err != nil {
t.Errorf("Expected Validation to succeed. Instead was: %v", err)
}
}
37 changes: 35 additions & 2 deletions services/subscriber/http.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package subscriber

import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"time"

"github.com/influxdata/influxdb/client/v2"
Expand All @@ -14,10 +17,20 @@ type HTTP struct {

// NewHTTP returns a new HTTP points writer with default options.
func NewHTTP(addr string, timeout time.Duration) (*HTTP, error) {
return NewHTTPS(addr, timeout, false, "")
}

// NewHTTPS returns a new HTTPS points writer with default options and HTTPS configured
func NewHTTPS(addr string, timeout time.Duration, unsafeSsl bool, caCerts string) (*HTTP, error) {
tlsConfig, err := createTlsConfig(caCerts)

conf := client.HTTPConfig{
Addr: addr,
Timeout: timeout,
Addr: addr,
Timeout: timeout,
InsecureSkipVerify: unsafeSsl,
TLSConfig: tlsConfig,
}

c, err := client.NewHTTPClient(conf)
if err != nil {
return nil, err
Expand All @@ -37,3 +50,23 @@ func (h *HTTP) WritePoints(p *coordinator.WritePointsRequest) (err error) {
err = h.c.Write(bp)
return
}

func createTlsConfig(caCerts string) (*tls.Config, error) {
if caCerts == "" {
return nil, nil
}
return loadCaCerts(caCerts)
}

func loadCaCerts(caCerts string) (*tls.Config, error) {
caCert, err := ioutil.ReadFile(caCerts)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

return &tls.Config{
RootCAs: caCertPool,
}, nil
}
7 changes: 6 additions & 1 deletion services/subscriber/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,13 @@ func (s *Service) newPointsWriter(u url.URL) (PointsWriter, error) {
switch u.Scheme {
case "udp":
return NewUDP(u.Host), nil
case "http", "https":
case "http":
return NewHTTP(u.String(), time.Duration(s.conf.HTTPTimeout))
case "https":
if s.conf.InsecureSkipVerify {
s.Logger.Println("WARNING: 'insecure-skip-verify' is true. This will skip all certificate verifications.")
}
return NewHTTPS(u.String(), time.Duration(s.conf.HTTPTimeout), s.conf.InsecureSkipVerify, s.conf.CaCerts)
default:
return nil, fmt.Errorf("unknown destination scheme %s", u.Scheme)
}
Expand Down

0 comments on commit 91645c0

Please sign in to comment.