From 9029782a32b6736ac1c96bb7a46eca490d3b51ec Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 18 May 2016 14:22:34 -0600 Subject: [PATCH] initial work to support HTTP based subscriptions --- services/influxdb/config.go | 14 +++++++++++-- services/influxdb/service.go | 38 ++++++++++++++++++++++++++++++++---- 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/services/influxdb/config.go b/services/influxdb/config.go index b22d57cdc..0f5cdb43d 100644 --- a/services/influxdb/config.go +++ b/services/influxdb/config.go @@ -2,6 +2,7 @@ package influxdb import ( "errors" + "fmt" "net/url" "time" @@ -13,6 +14,8 @@ import ( const ( // Maximum time to try and connect to InfluxDB during startup. DefaultStartUpTimeout = time.Minute * 5 + + DefaultSubscriptionProtocol = "http" ) type Config struct { @@ -33,6 +36,7 @@ type Config struct { Timeout toml.Duration `toml:"timeout"` DisableSubscriptions bool `toml:"disable-subscriptions"` + SubscriptionProtocol string `toml:"subscription-protocol"` Subscriptions map[string][]string `toml:"subscriptions"` ExcludedSubscriptions map[string][]string `toml:"excluded-subscriptions"` UDPBuffer int `toml:"udp-buffer"` @@ -52,8 +56,9 @@ func NewConfig() Config { ExcludedSubscriptions: map[string][]string{ stats.DefaultDatabse: []string{stats.DefaultRetentionPolicy}, }, - UDPBuffer: udp.DefaultBuffer, - StartUpTimeout: toml.Duration(DefaultStartUpTimeout), + UDPBuffer: udp.DefaultBuffer, + StartUpTimeout: toml.Duration(DefaultStartUpTimeout), + SubscriptionProtocol: DefaultSubscriptionProtocol, } } @@ -74,5 +79,10 @@ func (c Config) Validate() error { return err } } + switch c.SubscriptionProtocol { + case "http", "https", "udp": + default: + return fmt.Errorf("invalid subscription protocol, must be one of 'udp', 'http' or 'https', got %s", c.SubscriptionProtocol) + } return nil } diff --git a/services/influxdb/service.go b/services/influxdb/service.go index 7a6627b7f..5cc6b5330 100644 --- a/services/influxdb/service.go +++ b/services/influxdb/service.go @@ -91,6 +91,7 @@ func NewService(configs []Config, defaultInfluxDB int, hostname string, l *log.L clusterID: clusterID, subName: subName, disableSubs: c.DisableSubscriptions, + protocol: c.SubscriptionProtocol, } if defaultInfluxDB == i { defaultInfluxDBName = c.Name @@ -146,6 +147,7 @@ type influxdb struct { exConfigSubs map[subEntry]bool hostname string logger *log.Logger + protocol string udpBuffer int udpReadBuffer int startupTimeout time.Duration @@ -329,7 +331,22 @@ func (s *influxdb) linkSubscriptions() error { } existingSubs[se] = si } else if se.name == s.subName { - existingSubs[se] = si + // Check if the protocol has changed + if len(si.Destinations) == 0 { + s.logger.Println("E! found subscription without any destinations:", se) + continue + } + u, err := url.Parse(si.Destinations[0]) + if err != nil { + s.logger.Println("E! found subscription with invalid destinations:", si) + continue + } + if u.Scheme != s.protocol { + // Drop the sub and let it get recreated + s.dropSub(cli, se.name, se.cluster, se.rp) + } else { + existingSubs[se] = si + } } } } @@ -364,9 +381,16 @@ func (s *influxdb) linkSubscriptions() error { for _, se := range allSubs { // If we have been configured to subscribe and the subscription is not started yet. if (s.configSubs[se] || all) && !startedSubs[se] && !s.exConfigSubs[se] { - u, err := url.Parse("udp://:0") + var urlStr string + switch s.protocol { + case "http", "https": + urlStr = fmt.Sprintf("%s://%s:%d", s.protocol, s.hostname, 9092) + case "udp": + urlStr = "udp://:0" + } + u, err := url.Parse(urlStr) if err != nil { - return fmt.Errorf("could not create valid destination url, is hostname correct? err: %s", err) + return fmt.Errorf("could not create valid destination url, err: %s", err) } numSubscriptions++ @@ -376,7 +400,13 @@ func (s *influxdb) linkSubscriptions() error { } // Get port from addr - destination := fmt.Sprintf("udp://%s:%d", s.hostname, addr.Port) + var destination string + switch s.protocol { + case "http", "https": + destination = urlStr + case "udp": + destination = fmt.Sprintf("udp://%s:%d", s.hostname, addr.Port) + } err = s.createSub(cli, se.name, se.cluster, se.rp, "ANY", []string{destination}) if err != nil {