Skip to content

Commit

Permalink
add subscription token based auth
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Jul 27, 2016
1 parent aa28035 commit c10a082
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 7 deletions.
3 changes: 3 additions & 0 deletions auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
type Interface interface {
Authenticate(username, password string) (User, error)
User(username string) (User, error)
SubscriptionUser(token string) (User, error)
GrantSubscriptionAccess(token, db, rp string) error
RevokeSubscriptionAccess(token string) error
}

// ErrAuthenticate is returned when authentication fails.
Expand Down
7 changes: 4 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,13 @@ func New(c *Config, buildInfo BuildInfo, logService logging.Interface) (*Server,
s.appendDeadmanService()
s.appendSMTPService()
s.InitHTTPDService()
s.appendStorageService()
s.appendAuthService()
if err := s.appendInfluxDBService(); err != nil {
return nil, errors.Wrap(err, "influxdb service")
}
s.appendStorageService()
s.appendTaskStoreService()
s.appendReplayService()
s.appendAuthService()

// Append Alert integration services
s.appendOpsGenieService()
Expand Down Expand Up @@ -215,9 +215,10 @@ func (s *Server) appendInfluxDBService() error {
if err != nil {
return errors.Wrap(err, "failed to get http port")
}
srv := influxdb.NewService(c, s.config.defaultInfluxDB, httpPort, s.config.Hostname, l)
srv := influxdb.NewService(c, s.config.defaultInfluxDB, httpPort, s.config.Hostname, s.config.HTTP.AuthEnabled, l)
srv.PointsWriter = s.TaskMaster
srv.LogService = s.LogService
srv.AuthService = s.AuthService

s.InfluxDBService = srv
s.TaskMaster.InfluxDBService = srv
Expand Down
14 changes: 14 additions & 0 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type AuthenticationMethod int
const (
UserAuthentication AuthenticationMethod = iota
BearerAuthentication
SubscriptionAuthentication
)

type AuthorizationHandler func(http.ResponseWriter, *http.Request, auth.User)
Expand Down Expand Up @@ -612,6 +613,11 @@ func authenticate(inner AuthorizationHandler, h *Handler, requireAuthentication
HttpError(w, err.Error(), false, http.StatusUnauthorized)
return
}
case SubscriptionAuthentication:
if user, err = h.AuthService.SubscriptionUser(creds.Token); err != nil {
HttpError(w, err.Error(), false, http.StatusUnauthorized)
return
}
default:
HttpError(w, "unsupported authentication", false, http.StatusUnauthorized)
}
Expand Down Expand Up @@ -684,6 +690,7 @@ type credentials struct {
// As params: http://127.0.0.1/query?u=username&p=password
// As basic auth: http://username:password@127.0.0.1
// As Bearer token in Authorization header: Bearer <JWT_TOKEN_BLOB>
// As simple acccess token in InfluxDB-Access-Token: <TOKEN>
func parseCredentials(r *http.Request) (credentials, error) {
q := r.URL.Query()

Expand All @@ -706,6 +713,13 @@ func parseCredentials(r *http.Request) (credentials, error) {
Password: p,
}, nil
}
} else if s := r.Header.Get("InfluxDB-Access-Token"); s != "" {
// Check for the HTTP InfluxDB-Access-Token header.
return credentials{
Method: SubscriptionAuthentication,
Token: s,
}, nil

}

// Check for username and password in URL params.
Expand Down
52 changes: 48 additions & 4 deletions services/influxdb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package influxdb

import (
"bytes"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"errors"
"encoding/base64"
"fmt"
"io"
"io/ioutil"
"log"
"net"
Expand All @@ -21,12 +23,16 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/kapacitor"
"github.com/influxdata/kapacitor/services/udp"
"github.com/pkg/errors"
)

const (
// Legacy name given to all subscriptions.
legacySubName = "kapacitor"
subNamePrefix = "kapacitor-"

// Size in bytes of a token for subscription authentication
tokenSize = 64
)

// Handles requests to write or read from an InfluxDB cluster
Expand All @@ -40,10 +46,15 @@ type Service struct {
LogService interface {
NewLogger(string, int) *log.Logger
}

AuthService interface {
GrantSubscriptionAccess(token, db, rp string) error
RevokeSubscriptionAccess(token string) error
}
logger *log.Logger
}

func NewService(configs []Config, defaultInfluxDB, httpPort int, hostname string, l *log.Logger) *Service {
func NewService(configs []Config, defaultInfluxDB, httpPort int, hostname string, useTokens bool, l *log.Logger) *Service {
clusterID := kapacitor.ClusterIDVar.StringValue()
subName := subNamePrefix + clusterID
clusters := make(map[string]*influxdb, len(configs))
Expand Down Expand Up @@ -100,6 +111,7 @@ func NewService(configs []Config, defaultInfluxDB, httpPort int, hostname string
disableSubs: c.DisableSubscriptions,
protocol: c.SubscriptionProtocol,
runningSubs: runningSubs,
useTokens: useTokens,
}
if defaultInfluxDB == i {
defaultInfluxDBName = c.Name
Expand All @@ -116,6 +128,7 @@ func (s *Service) Open() error {
for _, cluster := range s.clusters {
cluster.PointsWriter = s.PointsWriter
cluster.LogService = s.LogService
cluster.AuthService = s.AuthService
err := cluster.Open()
if err != nil {
return err
Expand Down Expand Up @@ -164,6 +177,7 @@ type influxdb struct {
subscriptionSyncInterval time.Duration
disableSubs bool
runningSubs map[subEntry]bool
useTokens bool

clusterID string
subName string
Expand All @@ -175,6 +189,10 @@ type influxdb struct {
LogService interface {
NewLogger(string, int) *log.Logger
}
AuthService interface {
GrantSubscriptionAccess(token, db, rp string) error
RevokeSubscriptionAccess(token string) error
}

services []interface {
Open() error
Expand Down Expand Up @@ -388,7 +406,12 @@ func (s *influxdb) linkSubscriptions() error {
// Check if the hostname, port or protocol have changed
if host != s.hostname ||
u.Scheme != s.protocol ||
((u.Scheme == "http" || u.Scheme == "https") && int(pn) != s.httpPort) {
((u.Scheme == "http" || u.Scheme == "https") && int(pn) != s.httpPort) ||
(s.useTokens && u.User == nil) {
// Remove access for changing subscriptions.
if u.User != nil {
s.AuthService.RevokeSubscriptionAccess(u.User.Username())
}
// Something changed, drop the sub and let it get recreated
s.dropSub(cli, se.name, se.cluster, se.rp)
} else {
Expand Down Expand Up @@ -434,7 +457,20 @@ func (s *influxdb) linkSubscriptions() error {
var destination string
switch s.protocol {
case "http", "https":
destination = fmt.Sprintf("%s://%s:%d", s.protocol, s.hostname, s.httpPort)
if s.useTokens {
// Generate token
token, err := s.generateRandomToken()
if err != nil {
return errors.Wrap(err, "generating token")
}
err = s.AuthService.GrantSubscriptionAccess(token, se.cluster, se.rp)
if err != nil {
return err
}
destination = fmt.Sprintf("%s://%s@%s:%d", s.protocol, token, s.hostname, s.httpPort)
} else {
destination = fmt.Sprintf("%s://%s:%d", s.protocol, s.hostname, s.httpPort)
}
case "udp":
addr, err := s.startUDPListener(se.cluster, se.rp, "0")
if err != nil {
Expand All @@ -457,6 +493,14 @@ func (s *influxdb) linkSubscriptions() error {
return nil
}

func (s *influxdb) generateRandomToken() (string, error) {
tokenBytes := make([]byte, tokenSize)
if _, err := io.ReadFull(rand.Reader, tokenBytes); err != nil {
return "", err
}
return base64.RawURLEncoding.EncodeToString(tokenBytes), nil
}

func (s *influxdb) createSub(cli client.Client, name, cluster, rp, mode string, destinations []string) (err error) {
var buf bytes.Buffer
for i, dst := range destinations {
Expand Down
14 changes: 14 additions & 0 deletions services/noauth/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,17 @@ func (s *Service) User(username string) (auth.User, error) {
s.logger.Println("W! using noauth auth backend. Faked authentication for user", username)
return auth.NewUser(username, nil, true, nil), nil
}

// Return a user will all privileges.
func (s *Service) SubscriptionUser(token string) (auth.User, error) {
s.logger.Println("W! using noauth auth backend. Faked authentication for subscription user token")
return auth.NewUser("subscription-user", nil, true, nil), nil
}

func (s *Service) GrantSubscriptionAccess(token, db, rp string) error {
return nil
}

func (s *Service) RevokeSubscriptionAccess(token string) error {
return nil
}

0 comments on commit c10a082

Please sign in to comment.