Skip to content

Commit

Permalink
Add API endpoint to refresh subscriptions (#804)
Browse files Browse the repository at this point in the history
* add API endpoint to refresh subscriptions

* better logging

* fix issues with removing dropped subs, and adding deleted subs

* CHANGELOG.md

* add helper method for closing subs

* add helper method for closing subs, lock linkSubs method

* fix locking issue
  • Loading branch information
Nathaniel Cook authored Aug 11, 2016
1 parent 6ce1a61 commit d892aeb
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 27 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ The corresponding alert states are:

- [#740](https://github.com/influxdata/kapacitor/pull/740): Support reset expressions to prevent an alert from being lowered in severity. Thanks @minhdanh!
- [#670](https://github.com/influxdata/kapacitor/issues/670): Add ability to supress OK recovery alert events.
- [#804](https://github.com/influxdata/kapacitor/pull/804): Add API endpoint for refreshing subscriptions.
Also fixes issue where subs were not relinked if the sub was deleted.
UDP listen ports are closed when a database is dropped.

### Bugfixes

Expand Down
1 change: 1 addition & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (s *Server) appendInfluxDBService() error {
return errors.Wrap(err, "failed to get http port")
}
srv := influxdb.NewService(c, s.config.defaultInfluxDB, httpPort, s.config.Hostname, s.config.HTTP.AuthEnabled, l)
srv.HTTPDService = s.HTTPDService
srv.PointsWriter = s.TaskMaster
srv.LogService = s.LogService
srv.AuthService = s.AuthService
Expand Down
136 changes: 109 additions & 27 deletions services/influxdb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"strconv"
"strings"
Expand All @@ -34,19 +35,28 @@ const (

// Size in bytes of a token for subscription authentication
tokenSize = 64

// API endpoint paths
subscriptionsPath = "/subscriptions"
subscriptionsPathAnchored = "/subscriptions/"
)

// Handles requests to write or read from an InfluxDB cluster
type Service struct {
defaultInfluxDB string
clusters map[string]*influxdbCluster
routes []httpd.Route

PointsWriter interface {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
}
LogService interface {
NewLogger(string, int) *log.Logger
}
HTTPDService interface {
AddRoutes([]httpd.Route) error
DelRoutes([]httpd.Route)
}
ClientCreator interface {
Create(influxdb.HTTPConfig) (influxdb.Client, error)
}
Expand Down Expand Up @@ -105,7 +115,9 @@ func NewService(configs []Config, defaultInfluxDB, httpPort int, hostname string
}
}
runningSubs := make(map[subEntry]bool, len(c.Subscriptions))
services := make(map[subEntry]openCloser, len(c.Subscriptions))
clusters[c.Name] = &influxdbCluster{
name: c.Name,
configs: urls,
configSubs: subs,
exConfigSubs: exSubs,
Expand All @@ -122,7 +134,9 @@ func NewService(configs []Config, defaultInfluxDB, httpPort int, hostname string
disableSubs: c.DisableSubscriptions,
protocol: c.SubscriptionProtocol,
runningSubs: runningSubs,
useTokens: useTokens,
services: services,
// Do not use tokens for non http protocols
useTokens: useTokens && (c.SubscriptionProtocol == "http" || c.SubscriptionProtocol == "https"),
}
if defaultInfluxDB == i {
defaultInfluxDBName = c.Name
Expand All @@ -146,10 +160,38 @@ func (s *Service) Open() error {
return err
}
}

// Define API routes
s.routes = []httpd.Route{
{
Name: "subscriptions",
Method: "POST",
Pattern: subscriptionsPath,
HandlerFunc: s.handleSubscriptions,
},
}

err := s.HTTPDService.AddRoutes(s.routes)
if err != nil {
return errors.Wrap(err, "adding API routes")
}
return nil
}

// Refresh the subscriptions linking for all clusters.
func (s *Service) handleSubscriptions(w http.ResponseWriter, r *http.Request) {
for _, cluster := range s.clusters {
err := cluster.linkSubscriptions()
if err != nil {
httpd.HttpError(w, fmt.Sprintf("failed to link subscriptions: %s", err.Error()), true, http.StatusInternalServerError)
return
}
}
w.WriteHeader(http.StatusNoContent)
}

func (s *Service) Close() error {
s.HTTPDService.DelRoutes(s.routes)
var lastErr error
for _, cluster := range s.clusters {
err := cluster.Close()
Expand All @@ -174,6 +216,7 @@ func (s *Service) NewNamedClient(name string) (influxdb.Client, error) {
}

type influxdbCluster struct {
name string
configs []influxdb.HTTPConfig
i int
configSubs map[subEntry]bool
Expand Down Expand Up @@ -210,14 +253,16 @@ type influxdbCluster struct {
RevokeSubscriptionAccess(token string) error
}

services []interface {
Open() error
Close() error
}
services map[subEntry]openCloser

mu sync.Mutex
}

type openCloser interface {
Open() error
Close() error
}

type subEntry struct {
cluster string
rp string
Expand All @@ -231,9 +276,7 @@ type subInfo struct {

func (s *influxdbCluster) Open() error {
s.mu.Lock()
defer s.mu.Unlock()
if !s.disableSubs {
err := s.linkSubscriptions()
if s.subscriptionSyncInterval != 0 {
s.subSyncTicker = time.NewTicker(s.subscriptionSyncInterval)
go func() {
Expand All @@ -242,9 +285,10 @@ func (s *influxdbCluster) Open() error {
}
}()
}
return err
}
return nil
// Release lock so we can call linkSubscriptions.
s.mu.Unlock()
return s.linkSubscriptions()
}

func (s *influxdbCluster) Close() error {
Expand Down Expand Up @@ -291,7 +335,12 @@ func (s *influxdbCluster) NewClient() (c influxdb.Client, err error) {
}

func (s *influxdbCluster) linkSubscriptions() error {
s.logger.Println("D! linking subscriptions")
if s.disableSubs {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
s.logger.Println("D! linking subscriptions for cluster", s.name)
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = s.startupTimeout
ticker := backoff.NewTicker(b)
Expand Down Expand Up @@ -431,8 +480,9 @@ func (s *influxdbCluster) linkSubscriptions() error {
// Check if the hostname, port or protocol have changed
if host != s.hostname ||
u.Scheme != s.protocol ||
((u.Scheme == "http" || u.Scheme == "https") && int(pn) != s.httpPort) ||
(s.useTokens && (u.User == nil || u.User.Username() != httpd.SubscriptionUser)) {
((u.Scheme == "http" || u.Scheme == "https") &&
(int(pn) != s.httpPort ||
(s.useTokens && (u.User == nil || u.User.Username() != httpd.SubscriptionUser)))) {
// Remove access for changing subscriptions.
if u.User != nil {
if p, ok := u.User.Password(); ok {
Expand All @@ -441,6 +491,7 @@ func (s *influxdbCluster) linkSubscriptions() error {
}
// Something changed, drop the sub and let it get recreated
s.dropSub(cli, se.name, se.cluster, se.rp)
s.closeSub(se)
} else {
existingSubs[se] = si
// Do not revoke tokens that are still in use
Expand All @@ -456,7 +507,6 @@ func (s *influxdbCluster) linkSubscriptions() error {
}

// Compare to configured list
startedSubs := make(map[subEntry]bool)
all := len(s.configSubs) == 0
for se, si := range existingSubs {
if (s.configSubs[se] || all) && !s.exConfigSubs[se] && !s.runningSubs[se] {
Expand All @@ -471,22 +521,23 @@ func (s *influxdbCluster) linkSubscriptions() error {
if host == s.hostname {
numSubscriptions++
if u.Scheme == "udp" {
_, err := s.startUDPListener(se.cluster, se.rp, port)
_, err := s.startUDPListener(se, port)
if err != nil {
s.logger.Println("E! failed to start UDP listener:", err)
}
}
startedSubs[se] = true
s.runningSubs[se] = true
break
}
}
}
}
// create and start any new subscriptions
// stop any removed subscriptions
for _, se := range allSubs {
_, exists := existingSubs[se]
// If we have been configured to subscribe and the subscription is not started yet.
if (s.configSubs[se] || all) && !startedSubs[se] && !s.exConfigSubs[se] && !s.runningSubs[se] {
if (s.configSubs[se] || all) && !s.exConfigSubs[se] && !(s.runningSubs[se] && exists) {
var destination string
switch s.protocol {
case "http", "https":
Expand Down Expand Up @@ -514,7 +565,7 @@ func (s *influxdbCluster) linkSubscriptions() error {
destination = u.String()
}
case "udp":
addr, err := s.startUDPListener(se.cluster, se.rp, "0")
addr, err := s.startUDPListener(se, "0")
if err != nil {
s.logger.Println("E! failed to start UDP listener:", err)
}
Expand All @@ -523,11 +574,31 @@ func (s *influxdbCluster) linkSubscriptions() error {

numSubscriptions++

err = s.createSub(cli, se.name, se.cluster, se.rp, "ANY", []string{destination})
mode := "ANY"
destinations := []string{destination}
err = s.createSub(cli, se.name, se.cluster, se.rp, mode, destinations)
if err != nil {
return err
}
// Mark as running
s.runningSubs[se] = true
// Add info to exiting set
existingSubs[se] = subInfo{
Mode: mode,
Destinations: destinations,
}
}
}
// Close any subs for dbs that have been dropped
for se, running := range s.runningSubs {
if !running {
continue
}
if _, exists := existingSubs[se]; !exists {
err := s.closeSub(se)
if err != nil {
s.logger.Printf("E! failed to close service for %v: %s", se, err)
}
}
}

Expand All @@ -542,6 +613,17 @@ func (s *influxdbCluster) linkSubscriptions() error {
return nil
}

// Close the service and stop tracking it.
func (s *influxdbCluster) closeSub(se subEntry) (err error) {
if service, ok := s.services[se]; ok {
s.logger.Println("D! closing service for", se)
err = service.Close()
}
delete(s.runningSubs, se)
delete(s.services, se)
return
}

func (s *influxdbCluster) generateRandomToken() (string, error) {
tokenBytes := make([]byte, tokenSize)
if _, err := io.ReadFull(rand.Reader, tokenBytes); err != nil {
Expand All @@ -550,7 +632,7 @@ func (s *influxdbCluster) generateRandomToken() (string, error) {
return base64.RawURLEncoding.EncodeToString(tokenBytes), nil
}

func (s *influxdbCluster) createSub(cli influxdb.Client, name, cluster, rp, mode string, destinations []string) (err error) {
func (s *influxdbCluster) createSub(cli influxdb.Client, name, cluster, rp, mode string, destinations []string) error {
var buf bytes.Buffer
for i, dst := range destinations {
if i != 0 {
Expand All @@ -560,7 +642,7 @@ func (s *influxdbCluster) createSub(cli influxdb.Client, name, cluster, rp, mode
buf.Write([]byte(dst))
buf.Write([]byte("'"))
}
_, err = s.execQuery(
_, err := s.execQuery(
cli,
&influxql.CreateSubscriptionStatement{
Name: name,
Expand All @@ -570,7 +652,7 @@ func (s *influxdbCluster) createSub(cli influxdb.Client, name, cluster, rp, mode
Mode: strings.ToUpper(mode),
},
)
return
return errors.Wrapf(err, "creating sub %s for db %q and rp %q", name, cluster, rp)

}
func (s *influxdbCluster) dropSub(cli influxdb.Client, name, cluster, rp string) (err error) {
Expand All @@ -585,24 +667,24 @@ func (s *influxdbCluster) dropSub(cli influxdb.Client, name, cluster, rp string)
return
}

func (s *influxdbCluster) startUDPListener(cluster, rp, port string) (*net.UDPAddr, error) {
func (s *influxdbCluster) startUDPListener(se subEntry, port string) (*net.UDPAddr, error) {
c := udp.Config{}
c.Enabled = true
c.BindAddress = fmt.Sprintf("%s:%s", s.udpBind, port)
c.Database = cluster
c.RetentionPolicy = rp
c.Database = se.cluster
c.RetentionPolicy = se.rp
c.Buffer = s.udpBuffer
c.ReadBuffer = s.udpReadBuffer

l := s.LogService.NewLogger(fmt.Sprintf("[udp:%s.%s] ", cluster, rp), log.LstdFlags)
l := s.LogService.NewLogger(fmt.Sprintf("[udp:%s.%s] ", se.cluster, se.rp), log.LstdFlags)
service := udp.NewService(c, l)
service.PointsWriter = s.PointsWriter
err := service.Open()
if err != nil {
return nil, err
}
s.services = append(s.services, service)
s.logger.Println("I! started UDP listener for", cluster, rp)
s.services[se] = service
s.logger.Println("I! started UDP listener for", se.cluster, se.rp)
return service.Addr(), nil
}

Expand Down

0 comments on commit d892aeb

Please sign in to comment.