Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sending support to courier, plug in sentry, log request and respo… #11

Merged
merged 11 commits into from
Jul 5, 2017
9 changes: 6 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
language: go

addons:
apt:
packages:
- redis-server
postgresql: '9.3'

go:
- 1.8

services:
- redis-server

addons:
postgresql: '9.3'

before_script:
- psql -U postgres -c "CREATE USER courier WITH PASSWORD 'courier';"
- psql -U postgres -c "ALTER ROLE courier WITH SUPERUSER;"
Expand Down
4 changes: 4 additions & 0 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ type Backend interface {
GetChannel(ChannelType, ChannelUUID) (Channel, error)
WriteMsg(*Msg) error
WriteMsgStatus(*MsgStatusUpdate) error
WriteChannelLogs([]*ChannelLog) error

PopNextOutgoingMsg() (*Msg, error)
MarkOutgoingMsgComplete(*Msg)

Health() string
}
Expand Down
73 changes: 73 additions & 0 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rapidpro

import (
"bytes"
"encoding/json"
"fmt"
"net/url"
"path"
Expand All @@ -17,10 +18,14 @@ import (
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/config"
"github.com/nyaruka/courier/queue"
"github.com/nyaruka/courier/utils"
"github.com/sirupsen/logrus"
)

// the name for our message queue
const msgQueueName = "msgs"

func init() {
courier.RegisterBackend("rapidpro", newBackend)
}
Expand All @@ -30,6 +35,46 @@ func (b *backend) GetChannel(ct courier.ChannelType, uuid courier.ChannelUUID) (
return getChannel(b, ct, uuid)
}

// PopNextOutgoingMsg pops the next message that needs to be sent
func (b *backend) PopNextOutgoingMsg() (*courier.Msg, error) {
// pop the next message off our queue
rc := b.redisPool.Get()
defer rc.Close()

token, msgJSON, err := queue.PopFromQueue(rc, msgQueueName)
for token == queue.Retry {
token, msgJSON, err = queue.PopFromQueue(rc, msgQueueName)
}

var msg *courier.Msg
if msgJSON != "" {
dbMsg := &DBMsg{}
err = json.Unmarshal([]byte(msgJSON), dbMsg)
if err != nil {
return nil, err
}

// load our channel
channel, err := b.GetChannel(courier.AnyChannelType, dbMsg.ChannelUUID)
if err != nil {
return nil, err
}

// then create our outgoing msg
msg = courier.NewOutgoingMsg(channel, dbMsg.URN, dbMsg.Text)
msg.WorkerToken = token
}

return msg, nil
}

// MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel
func (b *backend) MarkOutgoingMsgComplete(msg *courier.Msg) {
rc := b.redisPool.Get()
defer rc.Close()
queue.MarkComplete(rc, msgQueueName, msg.WorkerToken)
}

// WriteMsg writes the passed in message to our store
func (b *backend) WriteMsg(m *courier.Msg) error {
return writeMsg(b, m)
Expand All @@ -40,6 +85,17 @@ func (b *backend) WriteMsgStatus(status *courier.MsgStatusUpdate) error {
return writeMsgStatus(b, status)
}

// WriteChannelLogs persists the passed in logs to our database, for rapidpro we swallow all errors, logging isn't critical
func (b *backend) WriteChannelLogs(logs []*courier.ChannelLog) error {
for _, l := range logs {
err := writeChannelLog(b, l)
if err != nil {
logrus.WithError(err).Error("error writing channel log")
}
}
return nil
}

// Health returns the health of this backend as a string, returning "" if all is well
func (b *backend) Health() string {
// test redis
Expand Down Expand Up @@ -124,6 +180,9 @@ func (b *backend) Start() error {
log.Info("redis ok")
}

// initialize our pop script
b.popScript = redis.NewScript(3, luaPopScript)

// create our s3 client
s3Session, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(b.config.AWSAccessKeyID, b.config.AWSSecretAccessKey, ""),
Expand Down Expand Up @@ -204,8 +263,22 @@ type backend struct {
s3Client *s3.S3
awsCreds *credentials.Credentials

popScript *redis.Script

notifier *notifier

stopChan chan bool
waitGroup *sync.WaitGroup
}

var luaPopScript = `
local val = redis.call('zrange', ARGV[2], 0, 0);
if not next(val) then
redis.call('zrem', ARGV[1], ARGV[3]);
return nil;
else
redis.call('zincrby', ARGV[1], 1, ARGV[3]);
redis.call('zremrangebyrank', ARGV[2], 0, 0);
return val[1];
end
`
21 changes: 18 additions & 3 deletions backends/rapidpro/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ func getChannel(b *backend, channelType courier.ChannelType, channelUUID courier
const lookupChannelFromUUIDSQL = `
SELECT org_id, id, uuid, channel_type, address, country, config
FROM channels_channel
WHERE channel_type = $1 AND uuid = $2 AND is_active = true`
WHERE uuid = $1 AND is_active = true`

// ChannelForUUID attempts to look up the channel with the passed in UUID, returning it
func loadChannelFromDB(b *backend, channel *DBChannel, channelType courier.ChannelType, uuid courier.ChannelUUID) error {
// select just the fields we need
err := b.db.Get(channel, lookupChannelFromUUIDSQL, channelType, uuid)
err := b.db.Get(channel, lookupChannelFromUUIDSQL, uuid)

// we didn't find a match
if err == sql.ErrNoRows {
Expand All @@ -72,6 +72,11 @@ func loadChannelFromDB(b *backend, channel *DBChannel, channelType courier.Chann
return err
}

// is it the right type?
if channelType != courier.AnyChannelType && channelType != channel.ChannelType() {
return courier.ErrChannelWrongType
}

// found it, return it
return nil
}
Expand All @@ -85,7 +90,7 @@ func getLocalChannel(channelType courier.ChannelType, uuid courier.ChannelUUID)

if found {
// if it was found but the type is wrong, that's an error
if channel.ChannelType() != channelType {
if channelType != courier.AnyChannelType && channel.ChannelType() != channelType {
return &DBChannel{ChannelType_: channelType, UUID_: uuid}, courier.ErrChannelWrongType
}

Expand Down Expand Up @@ -169,3 +174,13 @@ func (c *DBChannel) ConfigForKey(key string, defaultValue interface{}) interface
}
return value
}

// StringConfigForKey returns the config value for the passed in key, or defaultValue if it isn't found
func (c *DBChannel) StringConfigForKey(key string, defaultValue string) string {
val := c.ConfigForKey(key, defaultValue)
str, isStr := val.(string)
if !isStr {
return defaultValue
}
return str
}
33 changes: 33 additions & 0 deletions backends/rapidpro/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package rapidpro

import (
"fmt"

"time"

"github.com/nyaruka/courier"
)

const insertLogSQL = `
INSERT INTO channels_channellog("channel_id", "msg_id", "description", "is_error", "url", "request", "response", "response_status", "created_on", "request_time")
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`

// WriteChannelLog writes the passed in channel log to the database, we do not queue on errors but instead just throw away the log
func writeChannelLog(b *backend, log *courier.ChannelLog) error {
// cast our channel to our own channel type
dbChan, isChan := log.Channel.(*DBChannel)
if !isChan {
return fmt.Errorf("unable to write non-rapidpro channel logs")
}

description := "Success"
if log.Error != "" {
description = fmt.Sprintf("Error: %s", log.Error)
}

_, err := b.db.Exec(insertLogSQL, dbChan.ID(), log.MsgID, description, log.Error != "", log.URL,
log.Request, log.Response, log.StatusCode, log.CreatedOn, log.Elapsed/time.Millisecond)

return err
}
10 changes: 8 additions & 2 deletions backends/rapidpro/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rapidpro
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
Expand Down Expand Up @@ -108,7 +109,7 @@ func newDBMsgFromMsg(m *courier.Msg) *DBMsg {
CreatedOn: now,
ModifiedOn: now,
QueuedOn: now,
SentOn: m.ReceivedOn,
SentOn: *m.ReceivedOn,
}
}

Expand Down Expand Up @@ -195,7 +196,12 @@ func downloadMediaToS3(b *backend, msgUUID courier.MsgUUID, mediaURL string) (st
if err != nil {
return "", err
}
resp, body, err := utils.MakeHTTPRequest(req)
resp, err := utils.GetHTTPClient().Do(req)
if err != nil {
return "", err
}
body, err := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
return "", err
}
Expand Down
19 changes: 16 additions & 3 deletions backends/rapidpro/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func notifyRapidPro(config *config.Courier, msgID courier.MsgID) error {

req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("AUTHORIZATION", fmt.Sprintf("Token %s", config.RapidproToken))
_, _, err = utils.MakeHTTPRequest(req)
_, err = utils.MakeHTTPRequest(req)

return err
}
Expand All @@ -53,6 +53,8 @@ func (n *notifier) start(backend *backend) {
log := logrus.WithField("comp", "notifier")
log.WithField("state", "started").Info("notifier started")

lastError := false

for {
select {
case msgID := <-n.msgIDChan:
Expand All @@ -61,8 +63,13 @@ func (n *notifier) start(backend *backend) {

// we failed, append it to our retries
if err != nil {
log.WithError(err).Error("error notifying rapidpro")
if !lastError {
log.WithError(err).Error("error notifying rapidpro")
}
n.retries = append(n.retries, msgID)
lastError = true
} else {
lastError = false
}

// otherwise, all is well, move onto the next
Expand All @@ -81,9 +88,15 @@ func (n *notifier) start(backend *backend) {

err := notifyRapidPro(n.config, msgID)
if err != nil {
log.WithError(err).Error("error notifying rapidpro")
if !lastError {
log.WithError(err).Error("error notifying rapidpro")
}
n.retries = append(n.retries, msgID)
lastError = true
} else {
lastError = false
}

retried++
}
}
Expand Down
22 changes: 22 additions & 0 deletions backends/rapidpro/schema.sql
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
DROP TABLE IF EXISTS orgs_org CASCADE;
CREATE TABLE orgs_org (
id serial primary key,
name character varying(255) NOT NULL,
language character varying(64)
);

DROP TABLE IF EXISTS channels_channel CASCADE;
CREATE TABLE channels_channel (
id serial primary key,
is_active boolean NOT NULL,
Expand All @@ -18,6 +20,7 @@ CREATE TABLE channels_channel (
org_id integer references orgs_org(id) on delete cascade
);

DROP TABLE IF EXISTS contacts_contact CASCADE;
CREATE TABLE contacts_contact (
id serial primary key,
is_active boolean NOT NULL,
Expand All @@ -34,6 +37,7 @@ CREATE TABLE contacts_contact (
org_id integer references orgs_org(id) on delete cascade
);

DROP TABLE IF EXISTS contacts_contacturn CASCADE;
CREATE TABLE contacts_contacturn (
id serial primary key,
urn character varying(255) NOT NULL,
Expand All @@ -46,6 +50,7 @@ CREATE TABLE contacts_contacturn (
auth text
);

DROP TABLE IF EXISTS msgs_msg CASCADE;
CREATE TABLE msgs_msg (
id serial primary key,
text text NOT NULL,
Expand All @@ -69,4 +74,21 @@ CREATE TABLE msgs_msg (
contact_urn_id integer references contacts_contacturn(id) on delete cascade,
org_id integer references orgs_org(id) on delete cascade,
topup_id integer
);

DROP TABLE IF EXISTS channels_channellog CASCADE;
CREATE TABLE channels_channellog (
id serial primary key,
description character varying(255) NOT NULL,
is_error boolean NOT NULL,
url text,
method character varying(16),
request text,
response text,
response_status integer,
created_on timestamp with time zone NOT NULL,
request_time integer,
channel_id integer NOT NULL,
msg_id integer references msgs_msg(id) on delete cascade,
session_id integer references channels_channel(id) on delete cascade
);
Loading