Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change Msg to be an interface implemented by Backends, remove passthr… #17

Merged
merged 2 commits into from
Jul 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,39 @@ type BackendConstructorFunc func(*config.Courier) Backend

// Backend represents the part of Courier that deals with looking up and writing channels and results
type Backend interface {
// Start starts the backend and opens any db connections it needs
Start() error

// Stop stops the backend closing any db connections it has open
Stop() error

// GetChannel returns the channel with the passed in type and UUID
GetChannel(ChannelType, ChannelUUID) (Channel, error)
WriteMsg(*Msg) error

// NewIncomingMsg creates a new message from the given params
NewIncomingMsg(channel Channel, urn URN, text string) Msg

// NewOutgoingMsg creates a new outgoing message from the given params
NewOutgoingMsg(channel Channel, urn URN, text string) Msg

// WriteMsg writes the passed in message to our backend
WriteMsg(Msg) error

// WriteMsgStatus writes the passed in status update to our backend
WriteMsgStatus(*MsgStatusUpdate) error

// WriteChannelLogs writes the passed in channel logs to our backend
WriteChannelLogs([]*ChannelLog) error

PopNextOutgoingMsg() (*Msg, error)
MarkOutgoingMsgComplete(*Msg)
// PopNextOutgoingMsg returns the next message that needs to be sent, callers should call MarkOutgoingMsgComplete with the
// returned message when they have dealt with the message (regardless of whether it was sent or not)
PopNextOutgoingMsg() (Msg, error)

// MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case
// of errors during sending as it will manage the number of active workers per channel
MarkOutgoingMsgComplete(Msg)

// Health returns a string describing any health problems the backend has, or empty string if all is well
Health() string
}

Expand Down
46 changes: 32 additions & 14 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,27 @@ func (b *backend) GetChannel(ct courier.ChannelType, uuid courier.ChannelUUID) (
return getChannel(b, ct, uuid)
}

// NewIncomingMsg creates a new message from the given params
func (b *backend) NewIncomingMsg(channel courier.Channel, urn courier.URN, text string) courier.Msg {
msg := newMsg(MsgIncoming, channel, urn, text)

// have we seen this msg in the past period?
prevUUID := checkMsgSeen(b, msg)
if prevUUID != courier.NilMsgUUID {
// if so, use its UUID and mark that we've been written
msg.UUID_ = prevUUID
msg.AlreadyWritten_ = true
}
return msg
}

// NewOutgoingMsg creates a new outgoing message from the given params
func (b *backend) NewOutgoingMsg(channel courier.Channel, urn courier.URN, text string) courier.Msg {
return newMsg(MsgOutgoing, channel, urn, text)
}

// PopNextOutgoingMsg pops the next message that needs to be sent
func (b *backend) PopNextOutgoingMsg() (*courier.Msg, error) {
func (b *backend) PopNextOutgoingMsg() (courier.Msg, error) {
// pop the next message off our queue
rc := b.redisPool.Get()
defer rc.Close()
Expand All @@ -47,37 +66,36 @@ func (b *backend) PopNextOutgoingMsg() (*courier.Msg, error) {
}

if msgJSON != "" {
dbMsg := DBMsg{}
err = json.Unmarshal([]byte(msgJSON), &dbMsg)
dbMsg := &DBMsg{}
err = json.Unmarshal([]byte(msgJSON), dbMsg)
if err != nil {
return nil, err
}

// create courier msg from our db msg
channel, err := b.GetChannel(courier.AnyChannelType, dbMsg.ChannelUUID)
// populate the channel on our db msg
channel, err := b.GetChannel(courier.AnyChannelType, dbMsg.ChannelUUID_)
if err != nil {
return nil, err
}

// TODO: what other attributes are needed here?
msg := courier.NewOutgoingMsg(channel, dbMsg.URN, dbMsg.Text).WithID(dbMsg.ID).WithExternalID(dbMsg.ExternalID)
msg.WorkerToken = token

return msg, nil
dbMsg.Channel_ = channel
dbMsg.WorkerToken_ = token
return dbMsg, nil
}

return nil, nil
}

// MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel
func (b *backend) MarkOutgoingMsgComplete(msg *courier.Msg) {
func (b *backend) MarkOutgoingMsgComplete(msg courier.Msg) {
rc := b.redisPool.Get()
defer rc.Close()
queue.MarkComplete(rc, msgQueueName, msg.WorkerToken)

dbMsg := msg.(*DBMsg)
queue.MarkComplete(rc, msgQueueName, dbMsg.WorkerToken_)
}

// WriteMsg writes the passed in message to our store
func (b *backend) WriteMsg(m *courier.Msg) error {
func (b *backend) WriteMsg(m courier.Msg) error {
return writeMsg(b, m)
}

Expand Down
76 changes: 45 additions & 31 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (ts *MsgTestSuite) SetupSuite() {
panic(fmt.Errorf("Unable to read testdata.sql: %s", err))
}
ts.b.db.MustExec(string(sql))

// clear redis
r := ts.b.redisPool.Get()
defer r.Close()
r.Do("FLUSHDB")
}

func (ts *MsgTestSuite) TearDownSuite() {
Expand Down Expand Up @@ -145,17 +150,17 @@ func (ts *MsgTestSuite) TestStatus() {
ts.NoError(err)
m, err := readMsgFromDB(ts.b, courier.NewMsgID(10001))
ts.NoError(err)
ts.Equal(m.Status, courier.MsgSent)
ts.True(m.ModifiedOn.After(now))
ts.Equal(m.Status_, courier.MsgSent)
ts.True(m.ModifiedOn_.After(now))

// update by external id
status = courier.NewStatusUpdateForExternalID(channel, "ext1", courier.MsgFailed)
err = ts.b.WriteMsgStatus(status)
ts.NoError(err)
m, err = readMsgFromDB(ts.b, courier.NewMsgID(10000))
ts.NoError(err)
ts.Equal(m.Status, courier.MsgFailed)
ts.True(m.ModifiedOn.After(now))
ts.Equal(m.Status_, courier.MsgFailed)
ts.True(m.ModifiedOn_.After(now))

// no such external id
status = courier.NewStatusUpdateForExternalID(channel, "ext2", courier.MsgSent)
Expand All @@ -174,7 +179,7 @@ func (ts *MsgTestSuite) TestOutgoingQueue() {
defer r.Close()

dbMsg, err := readMsgFromDB(ts.b, courier.NewMsgID(10000))
dbMsg.ChannelUUID, _ = courier.NewChannelUUID("dbc126ed-66bc-4e28-b67b-81dc3327c95d")
dbMsg.ChannelUUID_, _ = courier.NewChannelUUID("dbc126ed-66bc-4e28-b67b-81dc3327c95d")
ts.NoError(err)
ts.NotNil(dbMsg)

Expand All @@ -191,10 +196,10 @@ func (ts *MsgTestSuite) TestOutgoingQueue() {
ts.NotNil(msg)

// make sure it is the message we just added
ts.Equal(dbMsg.ID, msg.ID)
ts.Equal(dbMsg.ID(), msg.ID())

// and that it has the appropriate text
ts.Equal(msg.Text, "test message")
ts.Equal(msg.Text(), "test message")

// mark this message as dealt with
ts.b.MarkOutgoingMsgComplete(msg)
Expand Down Expand Up @@ -237,7 +242,7 @@ func (ts *MsgTestSuite) TestWriteMsg() {

// create a new courier msg
urn := courier.NewTelURNForChannel("12065551212", knChannel)
msg := courier.NewIncomingMsg(knChannel, urn, "test123").WithExternalID("ext123").WithReceivedOn(now).WithContactName("test contact")
msg := ts.b.NewIncomingMsg(knChannel, urn, "test123").WithExternalID("ext123").WithReceivedOn(now).WithContactName("test contact").(*DBMsg)

// try to write it to our db
err := ts.b.WriteMsg(msg)
Expand All @@ -247,38 +252,47 @@ func (ts *MsgTestSuite) TestWriteMsg() {
ts.NotZero(msg.ID)

// load it back from the id
m, err := readMsgFromDB(ts.b, msg.ID)
m, err := readMsgFromDB(ts.b, msg.ID())
ts.NoError(err)

// load our URN
contactURN, err := contactURNForURN(ts.b.db, m.OrgID, m.ChannelID, m.ContactID, urn)
contactURN, err := contactURNForURN(ts.b.db, m.OrgID_, m.ChannelID_, m.ContactID_, urn)
ts.NoError(err)

// make sure our values are set appropriately
ts.Equal(knChannel.ID_, m.ChannelID)
ts.Equal(knChannel.OrgID_, m.OrgID)
ts.Equal(contactURN.ContactID, m.ContactID)
ts.Equal(contactURN.ID, m.ContactURNID)
ts.Equal(MsgIncoming, m.Direction)
ts.Equal(courier.MsgPending, m.Status)
ts.Equal(DefaultPriority, m.Priority)
ts.Equal("ext123", m.ExternalID)
ts.Equal("test123", m.Text)
ts.Equal([]string(nil), m.Attachments)
ts.Equal(1, m.MessageCount)
ts.Equal(0, m.ErrorCount)
ts.Equal(now, m.SentOn.In(time.UTC))
ts.NotNil(m.NextAttempt)
ts.NotNil(m.CreatedOn)
ts.NotNil(m.ModifiedOn)
ts.NotNil(m.QueuedOn)

contact, err := contactForURN(ts.b.db, m.OrgID, m.ChannelID, urn, "")
ts.Equal(knChannel.ID_, m.ChannelID_)
ts.Equal(knChannel.OrgID_, m.OrgID_)
ts.Equal(contactURN.ContactID, m.ContactID_)
ts.Equal(contactURN.ID, m.ContactURNID_)
ts.Equal(MsgIncoming, m.Direction_)
ts.Equal(courier.MsgPending, m.Status_)
ts.Equal(DefaultPriority, m.Priority_)
ts.Equal("ext123", m.ExternalID_)
ts.Equal("test123", m.Text_)
ts.Equal([]string(nil), m.Attachments_)
ts.Equal(1, m.MessageCount_)
ts.Equal(0, m.ErrorCount_)
ts.Equal(now, m.SentOn_.In(time.UTC))
ts.NotNil(m.NextAttempt_)
ts.NotNil(m.CreatedOn_)
ts.NotNil(m.ModifiedOn_)
ts.NotNil(m.QueuedOn_)

contact, err := contactForURN(ts.b.db, m.OrgID_, m.ChannelID_, urn, "")
ts.Equal("test contact", contact.Name)
ts.Equal(m.OrgID, contact.OrgID)
ts.Equal(m.ContactID, contact.ID)
ts.Equal(m.OrgID_, contact.OrgID)
ts.Equal(m.ContactID_, contact.ID)
ts.NotNil(contact.UUID)
ts.NotNil(contact.ID)

// creating the incoming msg again should give us the same UUID and have the msg set as not to write
msg2 := ts.b.NewIncomingMsg(knChannel, urn, "test123").(*DBMsg)
ts.Equal(msg2.UUID(), msg.UUID())

// waiting 5 seconds should let us write it successfully
time.Sleep(5 * time.Second)
msg3 := ts.b.NewIncomingMsg(knChannel, urn, "test123").(*DBMsg)
ts.NotEqual(msg3.UUID(), msg.UUID())
}

func TestMsgSuite(t *testing.T) {
Expand Down
Loading