Skip to content

Commit

Permalink
Add failsafe against duplicate sending
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed Jul 12, 2017
1 parent 783ac87 commit 0f3a96d
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 10 deletions.
6 changes: 5 additions & 1 deletion backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ type Backend interface {
// returned message when they have dealt with the message (regardless of whether it was sent or not)
PopNextOutgoingMsg() (Msg, error)

// WasMsgSent returns whether the backend thinks the passed in message was already sent. This can be used in cases where
// a backend wants to implement a failsafe against double sending messages (say if they were double queued)
WasMsgSent(msg Msg) (bool, error)

// MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case
// of errors during sending as it will manage the number of active workers per channel
MarkOutgoingMsgComplete(Msg)
MarkOutgoingMsgComplete(Msg, MsgStatus)

// Health returns a string describing any health problems the backend has, or empty string if all is well
Health() string
Expand Down
32 changes: 30 additions & 2 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
// the name for our message queue
const msgQueueName = "msgs"

// the name of our set for tracking sends
const sentSetName = "msgs_sent_%s"

func init() {
courier.RegisterBackend("rapidpro", newBackend)
}
Expand All @@ -46,7 +49,7 @@ func (b *backend) NewIncomingMsg(channel courier.Channel, urn courier.URN, text
// have we seen this msg in the past period?
prevUUID := checkMsgSeen(b, msg)
if prevUUID != courier.NilMsgUUID {
// if so, use its UUID and mark that we've been written
// if so, use its UUID and that we've been written
msg.UUID_ = prevUUID
msg.AlreadyWritten_ = true
}
Expand Down Expand Up @@ -89,13 +92,38 @@ func (b *backend) PopNextOutgoingMsg() (courier.Msg, error) {
return nil, nil
}

// WasMsgSent returns whether the passed in message has already been sent
func (b *backend) WasMsgSent(msg courier.Msg) (bool, error) {
rc := b.redisPool.Get()
defer rc.Close()

dateKey := fmt.Sprintf(sentSetName, time.Now().In(time.UTC).Format("2006_01_02"))
found, err := redis.Bool(rc.Do("SISMEMBER", dateKey, msg.ID().Int64))
if err != nil {
return false, err
}
if found {
return true, nil
}

dateKey = fmt.Sprintf(sentSetName, time.Now().Add(time.Hour*-24).In(time.UTC).Format("2006_01_02"))
found, err = redis.Bool(rc.Do("SISMEMBER", dateKey, msg.ID().Int64))
return found, err
}

// MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel
func (b *backend) MarkOutgoingMsgComplete(msg courier.Msg) {
func (b *backend) MarkOutgoingMsgComplete(msg courier.Msg, status courier.MsgStatus) {
rc := b.redisPool.Get()
defer rc.Close()

dbMsg := msg.(*DBMsg)
queue.MarkComplete(rc, msgQueueName, dbMsg.WorkerToken_)

// mark as sent in redis as well if this was actually wired or sent
if status.Status() == courier.MsgSent || status.Status() == courier.MsgWired {
dateKey := fmt.Sprintf(sentSetName, time.Now().In(time.UTC).Format("2006_01_02"))
rc.Do("SADD", dateKey, msg.ID().Int64)
}
}

// WriteMsg writes the passed in message to our store
Expand Down
17 changes: 16 additions & 1 deletion backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (ts *MsgTestSuite) TestContactURN() {
func (ts *MsgTestSuite) TestStatus() {
channel := ts.getChannel("KN", "dbc126ed-66bc-4e28-b67b-81dc3327c95d")
now := time.Now().In(time.UTC)
time.Sleep(2 * time.Millisecond)

// update by id
status := ts.b.NewMsgStatusForID(channel, courier.NewMsgID(10001), courier.MsgSent)
Expand All @@ -169,6 +170,7 @@ func (ts *MsgTestSuite) TestStatus() {

// error our msg
now = time.Now().In(time.UTC)
time.Sleep(2 * time.Millisecond)
status = ts.b.NewMsgStatusForExternalID(channel, "ext1", courier.MsgErrored)
err = ts.b.WriteMsgStatus(status)
ts.NoError(err)
Expand Down Expand Up @@ -232,12 +234,24 @@ func (ts *MsgTestSuite) TestOutgoingQueue() {
ts.Equal(msg.Text(), "test message")

// mark this message as dealt with
ts.b.MarkOutgoingMsgComplete(msg)
ts.b.MarkOutgoingMsgComplete(msg, ts.b.NewMsgStatusForID(msg.Channel(), msg.ID(), courier.MsgWired))

// this message should now be marked as sent
sent, err := ts.b.WasMsgSent(msg)
ts.NoError(err)
ts.True(sent)

// pop another message off, shouldn't get anything
msg, err = ts.b.PopNextOutgoingMsg()
ts.Nil(msg)
ts.Nil(err)

// checking another message should show unsent
msg, err = readMsgFromDB(ts.b, courier.NewMsgID(10001))
ts.NoError(err)
sent, err = ts.b.WasMsgSent(msg)
ts.NoError(err)
ts.False(sent)
}

func (ts *MsgTestSuite) TestChannel() {
Expand Down Expand Up @@ -290,6 +304,7 @@ func (ts *MsgTestSuite) TestWriteMsg() {
ts.NoError(err)

// make sure our values are set appropriately
ts.Equal(msg.ID(), m.ID())
ts.Equal(knChannel.ID_, m.ChannelID_)
ts.Equal(knChannel.OrgID_, m.OrgID_)
ts.Equal(contactURN.ContactID, m.ContactID_)
Expand Down
4 changes: 3 additions & 1 deletion backends/rapidpro/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ WHERE id = $1
`

func readMsgFromDB(b *backend, id courier.MsgID) (*DBMsg, error) {
m := &DBMsg{}
m := &DBMsg{
ID_: id,
}
err := b.db.Get(m, selectMsgSQL, id)
return m, err
}
Expand Down
25 changes: 21 additions & 4 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ func (w *Sender) Send() {
server := w.foreman.server
backend := server.Backend()

var status MsgStatus

for true {
// list ourselves as available for work
w.foreman.availableSenders <- w
Expand All @@ -151,11 +153,26 @@ func (w *Sender) Send() {
return
}

status, err := server.SendMsg(msg)
// was this msg already sent? (from a double queue?)
sent, err := backend.WasMsgSent(msg)

// failing on a lookup isn't a halting problem but we should log it
if err != nil {
log.WithField("msgID", msg.ID().Int64).WithError(err).Info("msg errored")
log.WithField("msgID", msg.ID().Int64).WithError(err).Warning("error looking up msg was sent")
}

if sent {
// if this message was already sent, create a wired status for it
status = backend.NewMsgStatusForID(msg.Channel(), msg.ID(), MsgWired)
log.WithField("msgID", msg.ID().Int64).Warning("duplicate send, marking as wired")
} else {
log.WithField("msgID", msg.ID().Int64).Info("msg sent")
// send our message
status, err = server.SendMsg(msg)
if err != nil {
log.WithField("msgID", msg.ID().Int64).WithError(err).Info("msg errored")
} else {
log.WithField("msgID", msg.ID().Int64).Info("msg sent")
}
}

// record our status
Expand All @@ -165,6 +182,6 @@ func (w *Sender) Send() {
}

// mark our send task as complete
backend.MarkOutgoingMsgComplete(msg)
backend.MarkOutgoingMsgComplete(msg, status)
}
}
7 changes: 6 additions & 1 deletion test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,13 @@ func (mb *MockBackend) PopNextOutgoingMsg() (Msg, error) {
return nil, nil
}

// WasMsgSent returns whether the passed in msg was already sent
func (mb *MockBackend) WasMsgSent(msg Msg) (bool, error) {
return false, nil
}

// MarkOutgoingMsgComplete marks the passed msg as having been dealt with
func (mb *MockBackend) MarkOutgoingMsgComplete(m Msg) {
func (mb *MockBackend) MarkOutgoingMsgComplete(m Msg, s MsgStatus) {
}

// WriteChannelLogs writes the passed in channel logs to the DB
Expand Down

0 comments on commit 0f3a96d

Please sign in to comment.