Skip to content

Commit

Permalink
nsqadmin: add /config API
Browse files Browse the repository at this point in the history
Provides the ability to modify the configuration nsqadmin for setting
`nsqlookupd_http_addresses` to support dynamic configuration.

Closes nsqio#769
  • Loading branch information
kenjones-cisco committed Jul 23, 2016
1 parent bb92864 commit a4015c2
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 60 deletions.
162 changes: 112 additions & 50 deletions nsqadmin/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"encoding/json"
"fmt"
"html/template"
"io"
"io/ioutil"
"mime"
"net/http"
"net/http/httputil"
"net/url"
"path"
"reflect"
"strings"
"time"

Expand Down Expand Up @@ -50,20 +53,20 @@ type httpServer struct {
}

func NewHTTPServer(ctx *Context) *httpServer {
log := http_api.Log(ctx.nsqadmin.opts.Logger)
log := http_api.Log(ctx.nsqadmin.getOpts().Logger)

client := http_api.NewClient(ctx.nsqadmin.httpClientTLSConfig)

router := httprouter.New()
router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqadmin.opts.Logger)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqadmin.opts.Logger)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqadmin.opts.Logger)
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqadmin.getOpts().Logger)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqadmin.getOpts().Logger)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqadmin.getOpts().Logger)
s := &httpServer{
ctx: ctx,
router: router,
client: client,
ci: clusterinfo.New(ctx.nsqadmin.opts.Logger, client),
ci: clusterinfo.New(ctx.nsqadmin.getOpts().Logger, client),
}

router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
Expand All @@ -79,7 +82,7 @@ func NewHTTPServer(ctx *Context) *httpServer {

router.Handle("GET", "/static/:asset", http_api.Decorate(s.staticAssetHandler, log, http_api.PlainText))
router.Handle("GET", "/fonts/:asset", http_api.Decorate(s.staticAssetHandler, log, http_api.PlainText))
if s.ctx.nsqadmin.opts.ProxyGraphite {
if s.ctx.nsqadmin.getOpts().ProxyGraphite {
proxy := NewSingleHostReverseProxy(ctx.nsqadmin.graphiteURL, 20*time.Second)
router.Handler("GET", "/render", proxy)
}
Expand All @@ -98,6 +101,8 @@ func NewHTTPServer(ctx *Context) *httpServer {
router.Handle("DELETE", "/api/topics/:topic/:channel", http_api.Decorate(s.deleteChannelHandler, log, http_api.V1))
router.Handle("GET", "/api/counter", http_api.Decorate(s.counterHandler, log, http_api.V1))
router.Handle("GET", "/api/graphite", http_api.Decorate(s.graphiteHandler, log, http_api.V1))
router.Handle("GET", "/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1))
router.Handle("PUT", "/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1))

return s
}
Expand Down Expand Up @@ -128,15 +133,15 @@ func (s *httpServer) indexHandler(w http.ResponseWriter, req *http.Request, ps h
NSQLookupd []string
}{
Version: version.Binary,
ProxyGraphite: s.ctx.nsqadmin.opts.ProxyGraphite,
GraphEnabled: s.ctx.nsqadmin.opts.GraphiteURL != "",
GraphiteURL: s.ctx.nsqadmin.opts.GraphiteURL,
StatsdInterval: int(s.ctx.nsqadmin.opts.StatsdInterval / time.Second),
UseStatsdPrefixes: s.ctx.nsqadmin.opts.UseStatsdPrefixes,
StatsdCounterFormat: s.ctx.nsqadmin.opts.StatsdCounterFormat,
StatsdGaugeFormat: s.ctx.nsqadmin.opts.StatsdGaugeFormat,
StatsdPrefix: s.ctx.nsqadmin.opts.StatsdPrefix,
NSQLookupd: s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses,
ProxyGraphite: s.ctx.nsqadmin.getOpts().ProxyGraphite,
GraphEnabled: s.ctx.nsqadmin.getOpts().GraphiteURL != "",
GraphiteURL: s.ctx.nsqadmin.getOpts().GraphiteURL,
StatsdInterval: int(s.ctx.nsqadmin.getOpts().StatsdInterval / time.Second),
UseStatsdPrefixes: s.ctx.nsqadmin.getOpts().UseStatsdPrefixes,
StatsdCounterFormat: s.ctx.nsqadmin.getOpts().StatsdCounterFormat,
StatsdGaugeFormat: s.ctx.nsqadmin.getOpts().StatsdGaugeFormat,
StatsdPrefix: s.ctx.nsqadmin.getOpts().StatsdPrefix,
NSQLookupd: s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
})

return nil, nil
Expand Down Expand Up @@ -182,10 +187,10 @@ func (s *httpServer) topicsHandler(w http.ResponseWriter, req *http.Request, ps
}

var topics []string
if len(s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses) != 0 {
topics, err = s.ci.GetLookupdTopics(s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses)
if len(s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses) != 0 {
topics, err = s.ci.GetLookupdTopics(s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
} else {
topics, err = s.ci.GetNSQDTopics(s.ctx.nsqadmin.opts.NSQDHTTPAddresses)
topics, err = s.ci.GetNSQDTopics(s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses)
}
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
Expand All @@ -200,15 +205,15 @@ func (s *httpServer) topicsHandler(w http.ResponseWriter, req *http.Request, ps
inactive, _ := reqParams.Get("inactive")
if inactive == "true" {
topicChannelMap := make(map[string][]string)
if len(s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses) == 0 {
if len(s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses) == 0 {
goto respond
}
for _, topicName := range topics {
producers, _ := s.ci.GetLookupdTopicProducers(
topicName, s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses)
topicName, s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
if len(producers) == 0 {
topicChannels, _ := s.ci.GetLookupdTopicChannels(
topicName, s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses)
topicName, s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
topicChannelMap[topicName] = topicChannels
}
}
Expand All @@ -231,8 +236,8 @@ func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps h
topicName := ps.ByName("topic")

producers, err := s.ci.GetTopicProducers(topicName,
s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.opts.NSQDHTTPAddresses)
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses)
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
if !ok {
Expand Down Expand Up @@ -271,8 +276,8 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps
channelName := ps.ByName("channel")

producers, err := s.ci.GetTopicProducers(topicName,
s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.opts.NSQDHTTPAddresses)
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses)
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
if !ok {
Expand Down Expand Up @@ -302,7 +307,7 @@ func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps
func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
var messages []string

producers, err := s.ci.GetProducers(s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses, s.ctx.nsqadmin.opts.NSQDHTTPAddresses)
producers, err := s.ci.GetProducers(s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses)
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
if !ok {
Expand All @@ -324,7 +329,7 @@ func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps ht

node := ps.ByName("node")

producers, err := s.ci.GetProducers(s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses, s.ctx.nsqadmin.opts.NSQDHTTPAddresses)
producers, err := s.ci.GetProducers(s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses)
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
if !ok {
Expand Down Expand Up @@ -388,7 +393,7 @@ func (s *httpServer) tombstoneNodeForTopicHandler(w http.ResponseWriter, req *ht
}

err = s.ci.TombstoneNodeForTopic(body.Topic, node,
s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses)
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
if !ok {
Expand Down Expand Up @@ -427,7 +432,7 @@ func (s *httpServer) createTopicChannelHandler(w http.ResponseWriter, req *http.
}

err = s.ci.CreateTopicChannel(body.Topic, body.Channel,
s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses)
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
if !ok {
Expand All @@ -454,8 +459,8 @@ func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request
topicName := ps.ByName("topic")

err := s.ci.DeleteTopic(topicName,
s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.opts.NSQDHTTPAddresses)
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses)
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
if !ok {
Expand All @@ -480,8 +485,8 @@ func (s *httpServer) deleteChannelHandler(w http.ResponseWriter, req *http.Reque
channelName := ps.ByName("channel")

err := s.ci.DeleteChannel(topicName, channelName,
s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.opts.NSQDHTTPAddresses)
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses)
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
if !ok {
Expand Down Expand Up @@ -525,42 +530,42 @@ func (s *httpServer) topicChannelAction(req *http.Request, topicName string, cha
case "pause":
if channelName != "" {
err = s.ci.PauseChannel(topicName, channelName,
s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.opts.NSQDHTTPAddresses)
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses)

s.notifyAdminAction("pause_channel", topicName, channelName, "", req)
} else {
err = s.ci.PauseTopic(topicName,
s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.opts.NSQDHTTPAddresses)
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses)

s.notifyAdminAction("pause_topic", topicName, "", "", req)
}
case "unpause":
if channelName != "" {
err = s.ci.UnPauseChannel(topicName, channelName,
s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.opts.NSQDHTTPAddresses)
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses)

s.notifyAdminAction("unpause_channel", topicName, channelName, "", req)
} else {
err = s.ci.UnPauseTopic(topicName,
s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.opts.NSQDHTTPAddresses)
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses)

s.notifyAdminAction("unpause_topic", topicName, "", "", req)
}
case "empty":
if channelName != "" {
err = s.ci.EmptyChannel(topicName, channelName,
s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.opts.NSQDHTTPAddresses)
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses)

s.notifyAdminAction("empty_channel", topicName, channelName, "", req)
} else {
err = s.ci.EmptyTopic(topicName,
s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.opts.NSQDHTTPAddresses)
s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses)

s.notifyAdminAction("empty_topic", topicName, "", "", req)
}
Expand Down Expand Up @@ -594,7 +599,7 @@ func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps
var messages []string
stats := make(map[string]*counterStats)

producers, err := s.ci.GetProducers(s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses, s.ctx.nsqadmin.opts.NSQDHTTPAddresses)
producers, err := s.ci.GetProducers(s.ctx.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.ctx.nsqadmin.getOpts().NSQDHTTPAddresses)
if err != nil {
pe, ok := err.(clusterinfo.PartialErr)
if !ok {
Expand Down Expand Up @@ -654,12 +659,12 @@ func (s *httpServer) graphiteHandler(w http.ResponseWriter, req *http.Request, p
}

params := url.Values{}
params.Set("from", fmt.Sprintf("-%dsec", s.ctx.nsqadmin.opts.StatsdInterval*2/time.Second))
params.Set("until", fmt.Sprintf("-%dsec", s.ctx.nsqadmin.opts.StatsdInterval/time.Second))
params.Set("from", fmt.Sprintf("-%dsec", s.ctx.nsqadmin.getOpts().StatsdInterval*2/time.Second))
params.Set("until", fmt.Sprintf("-%dsec", s.ctx.nsqadmin.getOpts().StatsdInterval/time.Second))
params.Set("format", "json")
params.Set("target", target)
query := fmt.Sprintf("/render?%s", params.Encode())
url := s.ctx.nsqadmin.opts.GraphiteURL + query
url := s.ctx.nsqadmin.getOpts().GraphiteURL + query

s.ctx.nsqadmin.logf("GRAPHITE: %s", url)

Expand All @@ -678,10 +683,67 @@ func (s *httpServer) graphiteHandler(w http.ResponseWriter, req *http.Request, p
if rate < 0 {
rateStr = "N/A"
} else {
rateDivisor := s.ctx.nsqadmin.opts.StatsdInterval / time.Second
rateDivisor := s.ctx.nsqadmin.getOpts().StatsdInterval / time.Second
rateStr = fmt.Sprintf("%.2f", rate/float64(rateDivisor))
}
return struct {
Rate string `json:"rate"`
}{rateStr}, nil
}

func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
opt := ps.ByName("opt")

if req.Method == "PUT" {
// add 1 so that it's greater than our max when we test for it
// (LimitReader returns a "fake" EOF)
readMax := int64(1024 * 1024 + 1)
body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax))
if err != nil {
return nil, http_api.Err{500, "INTERNAL_ERROR"}
}
if int64(len(body)) == readMax || len(body) == 0 {
return nil, http_api.Err{413, "INVALID_VALUE"}
}

opts := *s.ctx.nsqadmin.getOpts()
switch opt {
case "nsqlookupd_http_addresses":
err := json.Unmarshal(body, &opts.NSQLookupdHTTPAddresses)
if err != nil {
return nil, http_api.Err{400, "INVALID_VALUE"}
}
default:
return nil, http_api.Err{400, "INVALID_OPTION"}
}
s.ctx.nsqadmin.swapOpts(&opts)
}

v, ok := getOptByCfgName(s.ctx.nsqadmin.getOpts(), opt)
if !ok {
return nil, http_api.Err{400, "INVALID_OPTION"}
}

return v, nil
}

func getOptByCfgName(opts interface{}, name string) (interface{}, bool) {
val := reflect.ValueOf(opts).Elem()
typ := val.Type()
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
flagName := field.Tag.Get("flag")
cfgName := field.Tag.Get("cfg")
if flagName == "" {
continue
}
if cfgName == "" {
cfgName = strings.Replace(flagName, "-", "_", -1)
}
if name != cfgName {
continue
}
return val.FieldByName(field.Name).Interface(), true
}
return nil, false
}
Loading

0 comments on commit a4015c2

Please sign in to comment.