Skip to content

Commit

Permalink
Merge pull request #13 from nyaruka/multiple-messages
Browse files Browse the repository at this point in the history
Add support for multiple messages per contact
  • Loading branch information
nicpottier authored Jul 10, 2017
2 parents 7bc7930 + 9466b19 commit 59d21a9
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 51 deletions.
15 changes: 8 additions & 7 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,27 @@ func (b *backend) PopNextOutgoingMsg() (*courier.Msg, error) {
token, msgJSON, err = queue.PopFromQueue(rc, msgQueueName)
}

var msg *courier.Msg
if msgJSON != "" {
dbMsg := &DBMsg{}
err = json.Unmarshal([]byte(msgJSON), dbMsg)
dbMsg := DBMsg{}
err = json.Unmarshal([]byte(msgJSON), &dbMsg)
if err != nil {
return nil, err
}

// load our channel
// create courier msg from our db msg
channel, err := b.GetChannel(courier.AnyChannelType, dbMsg.ChannelUUID)
if err != nil {
return nil, err
}

// then create our outgoing msg
msg = courier.NewOutgoingMsg(channel, dbMsg.URN, dbMsg.Text)
// TODO: what other attributes are needed here?
msg := courier.NewOutgoingMsg(channel, dbMsg.URN, dbMsg.Text).WithID(dbMsg.ID).WithExternalID(dbMsg.ExternalID)
msg.WorkerToken = token

return msg, nil
}

return msg, nil
return nil, nil
}

// MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel
Expand Down
6 changes: 3 additions & 3 deletions cmd/courier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ func main() {
logrus.SetLevel(level)

// if we have a DSN entry, try to initialize it
if config.DSN != "" {
hook, err := logrus_sentry.NewSentryHook(config.DSN, []logrus.Level{logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel})
if config.SentryDSN != "" {
hook, err := logrus_sentry.NewSentryHook(config.SentryDSN, []logrus.Level{logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel})
hook.Timeout = 0
hook.StacktraceConfiguration.Enable = true
hook.StacktraceConfiguration.Skip = 4
hook.StacktraceConfiguration.Context = 5
if err != nil {
log.Fatalf("Invalid sentry DSN: '%s': %s", config.DSN, err)
log.Fatalf("Invalid sentry DSN: '%s': %s", config.SentryDSN, err)
}
logrus.StandardLogger().Hooks.Add(hook)
}
Expand Down
2 changes: 1 addition & 1 deletion config/courier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
type Courier struct {
Backend string `default:"rapidpro"`

DSN string `default:""`
SentryDSN string `default:""`

BaseURL string `default:"https://localhost:8080"`
Port int `default:"8080"`
Expand Down
10 changes: 9 additions & 1 deletion msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func NewMsgUUID() MsgUUID {
// NewIncomingMsg creates a new message from the given params
func NewIncomingMsg(channel Channel, urn URN, text string) *Msg {
m := &Msg{}
m.ID = NilMsgID
m.UUID = NewMsgUUID()
m.Channel = channel
m.Text = text
Expand All @@ -80,7 +81,8 @@ func NewIncomingMsg(channel Channel, urn URN, text string) *Msg {
// NewOutgoingMsg creates a new message from the given params
func NewOutgoingMsg(channel Channel, urn URN, text string) *Msg {
m := &Msg{}
m.UUID = NewMsgUUID()
m.ID = NilMsgID
m.UUID = NilMsgUUID
m.Channel = channel
m.Text = text
m.URN = urn
Expand Down Expand Up @@ -119,6 +121,12 @@ func (m *Msg) WithReceivedOn(date time.Time) *Msg { m.ReceivedOn = &date; return
// WithExternalID can be used to set the external id on a msg in a chained call
func (m *Msg) WithExternalID(id string) *Msg { m.ExternalID = id; return m }

// WithID can be used to set the id on a msg in a chained call
func (m *Msg) WithID(id MsgID) *Msg { m.ID = id; return m }

// WithUUID can be used to set the id on a msg in a chained call
func (m *Msg) WithUUID(uuid MsgUUID) *Msg { m.UUID = uuid; return m }

// AddAttachment can be used to append to the media urls for a message
func (m *Msg) AddAttachment(url string) *Msg { m.Attachments = append(m.Attachments, url); return m }

Expand Down
127 changes: 95 additions & 32 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,37 @@ type Priority int64
type WorkerToken string

const (
// LowPriority is our lowest priority, penalty of one day
LowPriority = Priority(time.Hour / 1000 * 24)
// DefaultPriority is the normal priority for msgs, messages are sent first in first out
DefaultPriority = 1

// DefaultPriority just queues according to the current time
DefaultPriority = Priority(0)

// HighPriority subtracts one day
HighPriority = Priority(time.Hour / 1000 * -24)

// HigherPriority subtracts two days
HigherPriority = Priority(time.Hour / 1000 * -48)
// BulkPriority is our priority for bulk messages (sent in batches) These will only be
// processed after all default priority messages are deault with
BulkPriority = 0
)

const (
// EmptyQueue means there are no items to retrive, caller should sleep and try again later
EmptyQueue = WorkerToken("empty")

// Retry means the caller should immediately call again to get the next value
Retry = WorkerToken("retry")
)

var luaPush = redis.NewScript(6, `-- KEYS: [Epoch, QueueType, QueueName, TPS, Score, Value]
var luaPush = redis.NewScript(6, `-- KEYS: [EpochMS, QueueType, QueueName, TPS, Priority, Value]
-- first push onto our specific queue
-- our queue name is built from the type, name and tps, usually something like: "msgs:uuid1-uuid2-uuid3-uuid4|tps"
local queueKey = KEYS[2] .. ":" .. KEYS[3] .. "|" .. KEYS[4]
redis.call("zadd", queueKey, KEYS[5], KEYS[6])
-- our priority queue name also includes the priority of the message (we have one queue for default and one for bulk)
local priorityQueueKey = queueKey .. "/" .. KEYS[5]
redis.call("zadd", priorityQueueKey, KEYS[1], KEYS[6])
local tps = tonumber(KEYS[4])
-- if we have a TPS, check whether we are currently throttled
local curr = -1
if tps > 0 then
local tpsKey = queueKey .. ":tps:" .. KEYS[1]
local tpsKey = queueKey .. ":tps:" .. math.floor(KEYS[1])
curr = tonumber(redis.call("get", tpsKey))
end
Expand All @@ -62,13 +63,12 @@ var luaPush = redis.NewScript(6, `-- KEYS: [Epoch, QueueType, QueueName, TPS, Sc
// specified transactions per second are popped off at a time. A tps value of 0 means there is no
// limit to the rate that messages can be consumed
func PushOntoQueue(conn redis.Conn, qType string, queue string, tps int, value string, priority Priority) error {
epoch := time.Now().Unix()
score := time.Now().UnixNano()/1000 + int64(priority)
_, err := redis.Int(luaPush.Do(conn, epoch, qType, queue, tps, strconv.FormatInt(score, 10), value))
epochMS := strconv.FormatFloat(float64(time.Now().UnixNano()/int64(time.Microsecond))/float64(1000000), 'f', 6, 64)
_, err := redis.Int(luaPush.Do(conn, epochMS, qType, queue, tps, priority, value))
return err
}

var luaPop = redis.NewScript(2, `-- KEYS: [Epoch QueueType]
var luaPop = redis.NewScript(2, `-- KEYS: [EpochMS QueueType]
-- get the first key off our active list
local result = redis.call("zrange", KEYS[2] .. ":active", 0, 0, "WITHSCORES")
local queue = result[1]
Expand All @@ -82,39 +82,90 @@ var luaPop = redis.NewScript(2, `-- KEYS: [Epoch QueueType]
-- figure out our max transaction per second
local delim = string.find(queue, "|")
local tps = 0
local tpsKey = ""
if delim then
tps = tonumber(string.sub(queue, delim+1))
end
-- if we have a tps, then check whether we exceed it
if tps > 0 then
local tpsKey = queue .. ":tps:" .. KEYS[1]
tpsKey = queue .. ":tps:" .. math.floor(KEYS[1])
local curr = tonumber(redis.call("get", tpsKey))
-- we are under our max tps, increase our # of transactions on this second
if not curr or curr < tps then
redis.call("incr", tpsKey)
redis.call("expire", tpsKey, 10)
-- we are above our tps, move to our throttled queue
else
-- we are at or above our tps, move to our throttled queue
if curr and curr >= tps then
redis.call("zincrby", KEYS[2] .. ":throttled", workers, queue)
redis.call("zrem", KEYS[2] .. ":active", queue)
return {"retry", ""}
end
end
-- pop our next value out
result = redis.call("zrange", queue, 0, 0)
-- pop our next value out, first from our default queue
local priorityQueue = queue .. "/1"
local result = redis.call("zrangebyscore", priorityQueue, 0, "+inf", "WITHSCORES", "LIMIT", 0, 1)
-- keep track as to whether this result is in the future (and therefore ineligible)
local isFutureResult = result[1] and result[2] > KEYS[1]
-- if we didn't find one, try again from our default
if not result[1] or isFutureResult then
priorityQueue = queue .. "/0"
result = redis.call("zrangebyscore", priorityQueue, 0, "+inf", "WITHSCORES", "LIMIT", 0, 1)
-- set whether we are a future result
if result[1] and result[2] > KEYS[1] then
isFutureResult = true
else
isFutureResult = false
end
end
-- if we found one
if result[1] then
if result[1] and not isFutureResult then
-- then remove it from the queue
redis.call('zremrangebyrank', queue, 0, 0)
redis.call('zremrangebyrank', priorityQueue, 0, 0)
-- increment our tps for this second if we have a limit
if tps > 0 then
redis.call("incr", tpsKey)
redis.call("expire", tpsKey, 10)
end
-- and add a worker to this queue
redis.call("zincrby", KEYS[2] .. ":active", 1, queue)
return {queue, result[1]}
-- is this a compound message? (a JSON array, if so, we return the first element but schedule the others
-- for 5 seconds from now
local popValue = result[1]
redis.call("set", "debug", popValue)
if string.sub(popValue, 1, 1) == "[" then
-- parse it as JSON to get the first element out
local valueList = cjson.decode(popValue)
popValue = cjson.encode(valueList[1])
table.remove(valueList, 1)
-- encode it back if there is anything left
if table.getn(valueList) > 0 then
local remaining = ""
if table.getn(valueList) == 1 then
remaining = cjson.encode(valueList[1])
else
remaining = cjson.encode(valueList)
end
-- schedule it in the future 5 seconds on our main queue
redis.call("zadd", queue .. "/0", tonumber(KEYS[1]) + 5, remaining)
redis.call("zincrby", KEYS[2] .. ":future", 0, queue)
end
end
return {queue, popValue}
-- otherwise, the queue only contains future results, remove from active and add to future, have the caller retry
elseif isFutureResult then
redis.call("zincrby", KEYS[2] .. ":future", 0, queue)
redis.call("zrem", KEYS[2] .. ":active", queue)
return {"retry", ""}
-- otherwise, the queue is empty, remove it from active
else
Expand All @@ -128,8 +179,8 @@ var luaPop = redis.NewScript(2, `-- KEYS: [Epoch QueueType]
// worker token of EmptyQueue will be returned if there are no more items to retrive.
// Otherwise the WorkerToken should be saved in order to mark the task as complete later.
func PopFromQueue(conn redis.Conn, qType string) (WorkerToken, string, error) {
epoch := time.Now().Unix()
values, err := redis.Strings(luaPop.Do(conn, strconv.FormatInt(epoch, 10), qType))
epochMS := strconv.FormatFloat(float64(time.Now().UnixNano()/int64(time.Microsecond))/float64(1000000), 'f', 6, 64)
values, err := redis.Strings(luaPop.Do(conn, epochMS, qType))
if err != nil {
return "", "", err
}
Expand Down Expand Up @@ -171,6 +222,18 @@ var luaDethrottle = redis.NewScript(1, `-- KEYS: [QueueType]
end
redis.call("del", KEYS[1] .. ":throttled")
end
-- get all the keys in the future
local future = redis.call("zrange", KEYS[1] .. ":future", 0, -1, "WITHSCORES")
-- add them to our active list
if next(future) then
local activeKey = KEYS[1] .. ":active"
for i=1,#future,2 do
redis.call("zincrby", activeKey, future[i+1], future[i])
end
redis.call("del", KEYS[1] .. ":future")
end
`)

// StartDethrottler starts a goroutine responsible for dethrottling any queues that were
Expand Down
60 changes: 54 additions & 6 deletions queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,18 @@ func getPool() *redis.Pool {
func TestLua(t *testing.T) {
assert := assert.New(t)

// start our dethrottler
pool := getPool()
conn := pool.Get()
defer conn.Close()
quitter := make(chan bool)
wg := &sync.WaitGroup{}
StartDethrottler(pool, quitter, wg, "msgs")
defer close(quitter)

rate := 10
for i := 0; i < 30; i++ {
err := PushOntoQueue(conn, "msgs", "chan1", rate, fmt.Sprintf("msg:%d", i), DefaultPriority)
for i := 0; i < 15; i++ {
err := PushOntoQueue(conn, "msgs", "chan1", rate, fmt.Sprintf("msg:%d", i), BulkPriority)
assert.NoError(err)
}

Expand Down Expand Up @@ -79,7 +84,7 @@ func TestLua(t *testing.T) {
if value != "" && queue != EmptyQueue {
t.Fatal("Should be throttled")
}
err = PushOntoQueue(conn, "msgs", "chan1", rate, "msg:30", HighPriority)
err = PushOntoQueue(conn, "msgs", "chan1", rate, "msg:30", BulkPriority)
assert.NoError(err)

count, err = redis.Int(conn.Do("zcard", "msgs:throttled"))
Expand All @@ -96,8 +101,51 @@ func TestLua(t *testing.T) {
assert.NoError(err)

queue, value, err = PopFromQueue(conn, "msgs")
if value != "msg:30" && queue != "msgs:chan1|10" {
t.Fatalf("Should have received chan1 and msg30, got: %s and %s", queue, value)
if value != "msg:31" || queue != "msgs:chan1|10" {
t.Fatalf("Should have received chan1 and msg:31, got: %s and %s", queue, value)
}

// clear out our queue of remaining items
for i := 0; i < 5; i++ {
_, _, err := PopFromQueue(conn, "msgs")
assert.NoError(err)
}

// push on a compound message
err = PushOntoQueue(conn, "msgs", "chan1", rate, `[{"id":"msg:32"}, {"id":"msg:33"}]`, DefaultPriority)

queue, value, err = PopFromQueue(conn, "msgs")
assert.NoError(err)
if value != `{"id":"msg:32"}` || queue != "msgs:chan1|10" {
t.Fatalf("Should have received chan1 and msg:32, got: %s and %s", queue, value)
}

// next value should be 30 even though it is bulk
queue, value, err = PopFromQueue(conn, "msgs")
assert.NoError(err)
if value != "msg:30" || queue != "msgs:chan1|10" {
t.Fatalf("Should have received chan1 and msg:30, got: %s and %s", queue, value)
}

// popping again should give us nothing since it is too soon to send 33
queue, value, err = PopFromQueue(conn, "msgs")
assert.NoError(err)
if value != "" && queue != EmptyQueue {
t.Fatal("Should be throttled")
}

// but if we sleep 6 seconds should get it
time.Sleep(time.Second * 6)

queue, value, err = PopFromQueue(conn, "msgs")
if value != `{"id":"msg:33"}` || queue != "msgs:chan1|10" {
t.Fatalf("Should have received chan1 and msg:33, got: %s and %s", queue, value)
}

// nothing should be left
queue, value, err = PopFromQueue(conn, "msgs")
if value != "" && queue != EmptyQueue {
t.Fatal("Should be empty")
}
}

Expand Down Expand Up @@ -174,7 +222,7 @@ func BenchmarkQueue(b *testing.B) {

queue, value, err := PopFromQueue(conn, "msgs")
assert.NoError(err)
assert.Equal("msgs:chan1|0", queue, "Mismatched queue")
assert.Equal(WorkerToken("msgs:chan1|0"), queue, "Mismatched queue")
assert.Equal(insertValue, value, "Mismatched value")

err = MarkComplete(conn, "msgs", queue)
Expand Down
2 changes: 1 addition & 1 deletion test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (mb *MockBackend) GetLastQueueMsg() (*Msg, error) {
return mb.queueMsgs[len(mb.queueMsgs)-1], nil
}

// PopNextOutgoingMsg returns the next message that should be sent, or nil if there are none to send
// PopNextOutgoingMsgs returns the next message that should be sent, or nil if there are none to send
func (mb *MockBackend) PopNextOutgoingMsg() (*Msg, error) {
return nil, nil
}
Expand Down

0 comments on commit 59d21a9

Please sign in to comment.