Skip to content

Commit

Permalink
Add /status endpoint to show queue status
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed Aug 22, 2017
1 parent 59a06f8 commit 380ecfa
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 11 deletions.
3 changes: 3 additions & 0 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type Backend interface {

// Health returns a string describing any health problems the backend has, or empty string if all is well
Health() string

// Status returns a string describing the current status, this can detail queue sizes or other attributes
Status() string
}

// NewBackend creates the type of backend passed in
Expand Down
61 changes: 57 additions & 4 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,11 @@ func (b *backend) WriteChannelLogs(logs []*courier.ChannelLog) error {
func (b *backend) Health() string {
// test redis
rc := b.redisPool.Get()
_, redisErr := rc.Do("PING")
defer rc.Close()
_, redisErr := rc.Do("PING")

// test our db
_, dbErr := b.db.Exec("SELECT 1")

health := bytes.Buffer{}

if redisErr != nil {
Expand All @@ -186,6 +185,62 @@ func (b *backend) Health() string {
return health.String()
}

// Status returns information on our queue sizes, number of workers etc..
func (b *backend) Status() string {
rc := b.redisPool.Get()
defer rc.Close()

// get all our active queues
values, err := redis.Values(rc.Do("zrevrangebyscore", fmt.Sprintf("%s:active", msgQueueName), "+inf", "-inf", "withscores"))
if err != nil {
return fmt.Sprintf("unable to read active queue: %v", err)
}

status := bytes.Buffer{}
status.WriteString("-------------------------------------------------------------------------\n")
status.WriteString(" Size | Workers | TPS | Type | Channel \n")
status.WriteString("-------------------------------------------------------------------------\n")
var queue string
var workers float64
var uuid string
var tps string
var channelType = ""

for len(values) > 0 {
values, err = redis.Scan(values, &queue, &workers)
if err != nil {
return fmt.Sprintf("error reading active queues: %v", err)
}

// our queue name is in the format msgs:uuid|tps, break it apart
queue = strings.TrimPrefix(queue, "msgs:")
parts := strings.Split(queue, "|")
if len(parts) != 2 {
return fmt.Sprintf("error parsing queue name '%s'", queue)
}
uuid = parts[0]
tps = parts[1]

// try to look up our channel
channelUUID, _ := courier.NewChannelUUID(uuid)
channel, err := getChannel(b, courier.AnyChannelType, channelUUID)
if err != nil {
channelType = "!!"
} else {
channelType = channel.ChannelType().String()
}

// get # of items in the queue
size, err := redis.Int64(rc.Do("zcard", queue))
if err != nil {
return fmt.Sprintf("error reading queue size: %v", err)
}
status.WriteString(fmt.Sprintf("% 9d % 7d % 3s % 4s %s\n", size, int(workers), tps, channelType, uuid))
}

return status.String()
}

// Start starts our RapidPro backend, this tests our various connections and starts our spool flushers
func (b *backend) Start() error {
log := logrus.WithFields(logrus.Fields{
Expand Down Expand Up @@ -338,5 +393,3 @@ type backend struct {
stopChan chan bool
waitGroup *sync.WaitGroup
}


28 changes: 26 additions & 2 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (ts *MsgTestSuite) TestContactURNPriority() {
ts.Equal(twChannel.ID(), urns[1].ChannelID)
}

func (ts *MsgTestSuite) TestStatus() {
func (ts *MsgTestSuite) TestMsgStatus() {
channel := ts.getChannel("KN", "dbc126ed-66bc-4e28-b67b-81dc3327c95d")
now := time.Now().In(time.UTC)
time.Sleep(2 * time.Millisecond)
Expand Down Expand Up @@ -311,6 +311,30 @@ func (ts *MsgTestSuite) TestHealth() {
ts.Equal(ts.b.Health(), "")
}

func (ts *MsgTestSuite) TestStatus() {
// our health should just contain the header
ts.True(strings.Contains(ts.b.Status(), "Channel"), ts.b.Status())

// add a message to our queue
r := ts.b.redisPool.Get()
defer r.Close()

dbMsg, err := readMsgFromDB(ts.b, courier.NewMsgID(10000))
dbMsg.ChannelUUID_, _ = courier.NewChannelUUID("dbc126ed-66bc-4e28-b67b-81dc3327c95d")
ts.NoError(err)
ts.NotNil(dbMsg)

// serialize our message
msgJSON, err := json.Marshal(dbMsg)
ts.NoError(err)

err = queue.PushOntoQueue(r, msgQueueName, "dbc126ed-66bc-4e28-b67b-81dc3327c95d", 10, string(msgJSON), queue.DefaultPriority)
ts.NoError(err)

// status should now contain that channel
ts.True(strings.Contains(ts.b.Status(), "dbc126ed-66bc-4e28-b67b-81dc3327c95d"), ts.b.Status())
}

func (ts *MsgTestSuite) TestOutgoingQueue() {
// add one of our outgoing messages to the queue
r := ts.b.redisPool.Get()
Expand All @@ -325,7 +349,7 @@ func (ts *MsgTestSuite) TestOutgoingQueue() {
msgJSON, err := json.Marshal(dbMsg)
ts.NoError(err)

err = queue.PushOntoQueue(r, msgQueueName, "dbc126ed-66bc-4e28-b67b-81dc3327c95d", 1, string(msgJSON), queue.DefaultPriority)
err = queue.PushOntoQueue(r, msgQueueName, "dbc126ed-66bc-4e28-b67b-81dc3327c95d", 10, string(msgJSON), queue.DefaultPriority)
ts.NoError(err)

// pop a message off our queue
Expand Down
3 changes: 3 additions & 0 deletions config/courier.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type Courier struct {
LibratoUsername string `default:""`
LibratoToken string `default:""`

StatusUsername string `default:""`
StatusPassword string `default:""`

RapidproHandleURL string `default:"https://app.rapidpro.io/handlers/mage/handle_message"`
RapidproToken string `default:"missing_rapidpro_token"`

Expand Down
5 changes: 1 addition & 4 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ func (f *Foreman) Assign() {
return

// otherwise, grab the next msg and assign it to a sender
default:
// get the next sender that is ready
sender := <-f.availableSenders

case sender := <-f.availableSenders:
// see if we have a message to work on
msg, err := backend.PopNextOutgoingMsg()

Expand Down
26 changes: 25 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ func (s *server) Start() error {
// start our spool flushers
startSpoolFlushers(s)

// wire up our index page
// wire up our main pages
s.router.Get("/", s.handleIndex)
s.router.Get("/status", s.handleStatus)

// initialize our handlers
s.initializeChannelHandlers()
Expand Down Expand Up @@ -392,6 +393,29 @@ func (s *server) handleIndex(w http.ResponseWriter, r *http.Request) {
w.Write(buf.Bytes())
}

func (s *server) handleStatus(w http.ResponseWriter, r *http.Request) {
if s.config.StatusUsername != "" {
user, pass, ok := r.BasicAuth()
if !ok || user != s.config.StatusUsername || pass != s.config.StatusPassword {
w.Header().Set("WWW-Authenticate", `Basic realm="Authenticate"`)
w.WriteHeader(401)
w.Write([]byte("Unauthorised.\n"))
return
}
}

var buf bytes.Buffer
buf.WriteString("<title>courier</title><body><pre>\n")
buf.WriteString(splash)
buf.WriteString(s.config.Version)

buf.WriteString("\n\n")
buf.WriteString(s.backend.Status())
buf.WriteString("\n\n")
buf.WriteString("</pre></body>")
w.Write(buf.Bytes())
}

var splash = `
____________ _____
___ ____/_________ ___________(_)____________
Expand Down
5 changes: 5 additions & 0 deletions test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ func (mb *MockBackend) Health() string {
return ""
}

// Status returns a string describing the status of the service, queue size etc..
func (mb *MockBackend) Status() string {
return ""
}

func buildMockBackend(config *config.Courier) Backend {
return NewMockBackend()
}
Expand Down

0 comments on commit 380ecfa

Please sign in to comment.