diff --git a/backend.go b/backend.go index 2fd2595fe..921fe6f88 100644 --- a/backend.go +++ b/backend.go @@ -60,6 +60,9 @@ type Backend interface { // Health returns a string describing any health problems the backend has, or empty string if all is well Health() string + + // Status returns a string describing the current status, this can detail queue sizes or other attributes + Status() string } // NewBackend creates the type of backend passed in diff --git a/backends/rapidpro/backend.go b/backends/rapidpro/backend.go index c0b9bef83..0651acbd5 100644 --- a/backends/rapidpro/backend.go +++ b/backends/rapidpro/backend.go @@ -168,12 +168,11 @@ func (b *backend) WriteChannelLogs(logs []*courier.ChannelLog) error { func (b *backend) Health() string { // test redis rc := b.redisPool.Get() - _, redisErr := rc.Do("PING") defer rc.Close() + _, redisErr := rc.Do("PING") // test our db _, dbErr := b.db.Exec("SELECT 1") - health := bytes.Buffer{} if redisErr != nil { @@ -186,6 +185,62 @@ func (b *backend) Health() string { return health.String() } +// Status returns information on our queue sizes, number of workers etc.. +func (b *backend) Status() string { + rc := b.redisPool.Get() + defer rc.Close() + + // get all our active queues + values, err := redis.Values(rc.Do("zrevrangebyscore", fmt.Sprintf("%s:active", msgQueueName), "+inf", "-inf", "withscores")) + if err != nil { + return fmt.Sprintf("unable to read active queue: %v", err) + } + + status := bytes.Buffer{} + status.WriteString("-------------------------------------------------------------------------\n") + status.WriteString(" Size | Workers | TPS | Type | Channel \n") + status.WriteString("-------------------------------------------------------------------------\n") + var queue string + var workers float64 + var uuid string + var tps string + var channelType = "" + + for len(values) > 0 { + values, err = redis.Scan(values, &queue, &workers) + if err != nil { + return fmt.Sprintf("error reading active queues: %v", err) + } + + // our queue name is in the format msgs:uuid|tps, break it apart + queue = strings.TrimPrefix(queue, "msgs:") + parts := strings.Split(queue, "|") + if len(parts) != 2 { + return fmt.Sprintf("error parsing queue name '%s'", queue) + } + uuid = parts[0] + tps = parts[1] + + // try to look up our channel + channelUUID, _ := courier.NewChannelUUID(uuid) + channel, err := getChannel(b, courier.AnyChannelType, channelUUID) + if err != nil { + channelType = "!!" + } else { + channelType = channel.ChannelType().String() + } + + // get # of items in the queue + size, err := redis.Int64(rc.Do("zcard", queue)) + if err != nil { + return fmt.Sprintf("error reading queue size: %v", err) + } + status.WriteString(fmt.Sprintf("% 9d % 7d % 3s % 4s %s\n", size, int(workers), tps, channelType, uuid)) + } + + return status.String() +} + // Start starts our RapidPro backend, this tests our various connections and starts our spool flushers func (b *backend) Start() error { log := logrus.WithFields(logrus.Fields{ @@ -338,5 +393,3 @@ type backend struct { stopChan chan bool waitGroup *sync.WaitGroup } - - diff --git a/backends/rapidpro/backend_test.go b/backends/rapidpro/backend_test.go index 3115130e9..3ababb514 100644 --- a/backends/rapidpro/backend_test.go +++ b/backends/rapidpro/backend_test.go @@ -246,7 +246,7 @@ func (ts *MsgTestSuite) TestContactURNPriority() { ts.Equal(twChannel.ID(), urns[1].ChannelID) } -func (ts *MsgTestSuite) TestStatus() { +func (ts *MsgTestSuite) TestMsgStatus() { channel := ts.getChannel("KN", "dbc126ed-66bc-4e28-b67b-81dc3327c95d") now := time.Now().In(time.UTC) time.Sleep(2 * time.Millisecond) @@ -311,6 +311,30 @@ func (ts *MsgTestSuite) TestHealth() { ts.Equal(ts.b.Health(), "") } +func (ts *MsgTestSuite) TestStatus() { + // our health should just contain the header + ts.True(strings.Contains(ts.b.Status(), "Channel"), ts.b.Status()) + + // add a message to our queue + r := ts.b.redisPool.Get() + defer r.Close() + + dbMsg, err := readMsgFromDB(ts.b, courier.NewMsgID(10000)) + dbMsg.ChannelUUID_, _ = courier.NewChannelUUID("dbc126ed-66bc-4e28-b67b-81dc3327c95d") + ts.NoError(err) + ts.NotNil(dbMsg) + + // serialize our message + msgJSON, err := json.Marshal(dbMsg) + ts.NoError(err) + + err = queue.PushOntoQueue(r, msgQueueName, "dbc126ed-66bc-4e28-b67b-81dc3327c95d", 10, string(msgJSON), queue.DefaultPriority) + ts.NoError(err) + + // status should now contain that channel + ts.True(strings.Contains(ts.b.Status(), "dbc126ed-66bc-4e28-b67b-81dc3327c95d"), ts.b.Status()) +} + func (ts *MsgTestSuite) TestOutgoingQueue() { // add one of our outgoing messages to the queue r := ts.b.redisPool.Get() @@ -325,7 +349,7 @@ func (ts *MsgTestSuite) TestOutgoingQueue() { msgJSON, err := json.Marshal(dbMsg) ts.NoError(err) - err = queue.PushOntoQueue(r, msgQueueName, "dbc126ed-66bc-4e28-b67b-81dc3327c95d", 1, string(msgJSON), queue.DefaultPriority) + err = queue.PushOntoQueue(r, msgQueueName, "dbc126ed-66bc-4e28-b67b-81dc3327c95d", 10, string(msgJSON), queue.DefaultPriority) ts.NoError(err) // pop a message off our queue diff --git a/config/courier.go b/config/courier.go index 11010e4ff..8789ccf7e 100644 --- a/config/courier.go +++ b/config/courier.go @@ -28,6 +28,9 @@ type Courier struct { LibratoUsername string `default:""` LibratoToken string `default:""` + StatusUsername string `default:""` + StatusPassword string `default:""` + RapidproHandleURL string `default:"https://app.rapidpro.io/handlers/mage/handle_message"` RapidproToken string `default:"missing_rapidpro_token"` diff --git a/sender.go b/sender.go index 063e0c79c..c0e2ce8fc 100644 --- a/sender.go +++ b/sender.go @@ -72,10 +72,7 @@ func (f *Foreman) Assign() { return // otherwise, grab the next msg and assign it to a sender - default: - // get the next sender that is ready - sender := <-f.availableSenders - + case sender := <-f.availableSenders: // see if we have a message to work on msg, err := backend.PopNextOutgoingMsg() diff --git a/server.go b/server.go index f2de283ab..e6a11a113 100644 --- a/server.go +++ b/server.go @@ -106,8 +106,9 @@ func (s *server) Start() error { // start our spool flushers startSpoolFlushers(s) - // wire up our index page + // wire up our main pages s.router.Get("/", s.handleIndex) + s.router.Get("/status", s.handleStatus) // initialize our handlers s.initializeChannelHandlers() @@ -392,6 +393,29 @@ func (s *server) handleIndex(w http.ResponseWriter, r *http.Request) { w.Write(buf.Bytes()) } +func (s *server) handleStatus(w http.ResponseWriter, r *http.Request) { + if s.config.StatusUsername != "" { + user, pass, ok := r.BasicAuth() + if !ok || user != s.config.StatusUsername || pass != s.config.StatusPassword { + w.Header().Set("WWW-Authenticate", `Basic realm="Authenticate"`) + w.WriteHeader(401) + w.Write([]byte("Unauthorised.\n")) + return + } + } + + var buf bytes.Buffer + buf.WriteString("
\n") + buf.WriteString(splash) + buf.WriteString(s.config.Version) + + buf.WriteString("\n\n") + buf.WriteString(s.backend.Status()) + buf.WriteString("\n\n") + buf.WriteString("") + w.Write(buf.Bytes()) +} + var splash = ` ____________ _____ ___ ____/_________ ___________(_)____________ diff --git a/test.go b/test.go index 4a3c65e12..a35c6eb25 100644 --- a/test.go +++ b/test.go @@ -189,6 +189,11 @@ func (mb *MockBackend) Health() string { return "" } +// Status returns a string describing the status of the service, queue size etc.. +func (mb *MockBackend) Status() string { + return "" +} + func buildMockBackend(config *config.Courier) Backend { return NewMockBackend() }