From d48a80d51e6e234619f9c1264adae913b80eac07 Mon Sep 17 00:00:00 2001 From: Nic Pottier Date: Mon, 10 Jul 2017 15:17:07 -0500 Subject: [PATCH 1/2] change Msg to be an interface implemented by Backends, remove passthroughs on Server to go straight to Backend --- backend.go | 28 ++- backends/rapidpro/backend.go | 37 ++-- backends/rapidpro/backend_test.go | 67 +++---- backends/rapidpro/msg.go | 171 ++++++++++-------- handler.go | 4 +- handlers/africastalking/africastalking.go | 28 +-- .../africastalking/africastalking_test.go | 2 +- handlers/base.go | 7 + handlers/blackmyna/blackmyna.go | 28 +-- handlers/blackmyna/blackmyna_test.go | 2 +- handlers/dummy/dummy.go | 2 +- handlers/external/external.go | 38 ++-- handlers/external/external_test.go | 2 +- handlers/kannel/kannel.go | 40 ++-- handlers/kannel/kannel_test.go | 4 +- handlers/telegram/telegram.go | 34 ++-- handlers/test.go | 20 +- handlers/twilio/twilio.go | 38 ++-- handlers/twilio/twilio_test.go | 2 +- msg.go | 92 +++------- queue/queue_test.go | 16 +- responses.go | 4 +- sender.go | 15 +- server.go | 29 +-- test.go | 84 ++++++++- 25 files changed, 438 insertions(+), 356 deletions(-) diff --git a/backend.go b/backend.go index 96a17481f..196840804 100644 --- a/backend.go +++ b/backend.go @@ -12,17 +12,39 @@ type BackendConstructorFunc func(*config.Courier) Backend // Backend represents the part of Courier that deals with looking up and writing channels and results type Backend interface { + // Start starts the backend and opens any db connections it needs Start() error + + // Stop stops the backend closing any db connections it has open Stop() error + // GetChannel returns the channel with the passed in type and UUID GetChannel(ChannelType, ChannelUUID) (Channel, error) - WriteMsg(*Msg) error + + // NewIncomingMsg creates a new message from the given params + NewIncomingMsg(channel Channel, urn URN, text string) Msg + + // NewOutgoingMsg creates a new outgoing message from the given params + NewOutgoingMsg(channel Channel, urn URN, text string) Msg + + // WriteMsg writes the passed in message to our backend + WriteMsg(Msg) error + + // WriteMsgStatus writes the passed in status update to our backend WriteMsgStatus(*MsgStatusUpdate) error + + // WriteChannelLogs writes the passed in channel logs to our backend WriteChannelLogs([]*ChannelLog) error - PopNextOutgoingMsg() (*Msg, error) - MarkOutgoingMsgComplete(*Msg) + // PopNextOutgoingMsg returns the next message that needs to be sent, callers should call MarkOutgoingMsgComplete with the + // returned message when they have dealt with the message (regardless of whether it was sent or not) + PopNextOutgoingMsg() (Msg, error) + + // MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case + // of errors during sending as it will manage the number of active workers per channel + MarkOutgoingMsgComplete(Msg) + // Health returns a string describing any health problems the backend has, or empty string if all is well Health() string } diff --git a/backends/rapidpro/backend.go b/backends/rapidpro/backend.go index 7b22db0bb..4b3e39ef8 100644 --- a/backends/rapidpro/backend.go +++ b/backends/rapidpro/backend.go @@ -35,8 +35,18 @@ func (b *backend) GetChannel(ct courier.ChannelType, uuid courier.ChannelUUID) ( return getChannel(b, ct, uuid) } +// NewIncomingMsg creates a new message from the given params +func (b *backend) NewIncomingMsg(channel courier.Channel, urn courier.URN, text string) courier.Msg { + return newMsg(MsgIncoming, channel, urn, text) +} + +// NewOutgoingMsg creates a new outgoing message from the given params +func (b *backend) NewOutgoingMsg(channel courier.Channel, urn courier.URN, text string) courier.Msg { + return newMsg(MsgOutgoing, channel, urn, text) +} + // PopNextOutgoingMsg pops the next message that needs to be sent -func (b *backend) PopNextOutgoingMsg() (*courier.Msg, error) { +func (b *backend) PopNextOutgoingMsg() (courier.Msg, error) { // pop the next message off our queue rc := b.redisPool.Get() defer rc.Close() @@ -47,37 +57,36 @@ func (b *backend) PopNextOutgoingMsg() (*courier.Msg, error) { } if msgJSON != "" { - dbMsg := DBMsg{} - err = json.Unmarshal([]byte(msgJSON), &dbMsg) + dbMsg := &DBMsg{} + err = json.Unmarshal([]byte(msgJSON), dbMsg) if err != nil { return nil, err } - // create courier msg from our db msg - channel, err := b.GetChannel(courier.AnyChannelType, dbMsg.ChannelUUID) + // populate the channel on our db msg + channel, err := b.GetChannel(courier.AnyChannelType, dbMsg.ChannelUUID_) if err != nil { return nil, err } - - // TODO: what other attributes are needed here? - msg := courier.NewOutgoingMsg(channel, dbMsg.URN, dbMsg.Text).WithID(dbMsg.ID).WithExternalID(dbMsg.ExternalID) - msg.WorkerToken = token - - return msg, nil + dbMsg.Channel_ = channel + dbMsg.WorkerToken_ = token + return dbMsg, nil } return nil, nil } // MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel -func (b *backend) MarkOutgoingMsgComplete(msg *courier.Msg) { +func (b *backend) MarkOutgoingMsgComplete(msg courier.Msg) { rc := b.redisPool.Get() defer rc.Close() - queue.MarkComplete(rc, msgQueueName, msg.WorkerToken) + + dbMsg := msg.(*DBMsg) + queue.MarkComplete(rc, msgQueueName, dbMsg.WorkerToken_) } // WriteMsg writes the passed in message to our store -func (b *backend) WriteMsg(m *courier.Msg) error { +func (b *backend) WriteMsg(m courier.Msg) error { return writeMsg(b, m) } diff --git a/backends/rapidpro/backend_test.go b/backends/rapidpro/backend_test.go index 2c65716b2..8448ee5f1 100644 --- a/backends/rapidpro/backend_test.go +++ b/backends/rapidpro/backend_test.go @@ -45,6 +45,11 @@ func (ts *MsgTestSuite) SetupSuite() { panic(fmt.Errorf("Unable to read testdata.sql: %s", err)) } ts.b.db.MustExec(string(sql)) + + // clear redis + r := ts.b.redisPool.Get() + defer r.Close() + r.Do("FLUSHDB") } func (ts *MsgTestSuite) TearDownSuite() { @@ -145,8 +150,8 @@ func (ts *MsgTestSuite) TestStatus() { ts.NoError(err) m, err := readMsgFromDB(ts.b, courier.NewMsgID(10001)) ts.NoError(err) - ts.Equal(m.Status, courier.MsgSent) - ts.True(m.ModifiedOn.After(now)) + ts.Equal(m.Status_, courier.MsgSent) + ts.True(m.ModifiedOn_.After(now)) // update by external id status = courier.NewStatusUpdateForExternalID(channel, "ext1", courier.MsgFailed) @@ -154,8 +159,8 @@ func (ts *MsgTestSuite) TestStatus() { ts.NoError(err) m, err = readMsgFromDB(ts.b, courier.NewMsgID(10000)) ts.NoError(err) - ts.Equal(m.Status, courier.MsgFailed) - ts.True(m.ModifiedOn.After(now)) + ts.Equal(m.Status_, courier.MsgFailed) + ts.True(m.ModifiedOn_.After(now)) // no such external id status = courier.NewStatusUpdateForExternalID(channel, "ext2", courier.MsgSent) @@ -174,7 +179,7 @@ func (ts *MsgTestSuite) TestOutgoingQueue() { defer r.Close() dbMsg, err := readMsgFromDB(ts.b, courier.NewMsgID(10000)) - dbMsg.ChannelUUID, _ = courier.NewChannelUUID("dbc126ed-66bc-4e28-b67b-81dc3327c95d") + dbMsg.ChannelUUID_, _ = courier.NewChannelUUID("dbc126ed-66bc-4e28-b67b-81dc3327c95d") ts.NoError(err) ts.NotNil(dbMsg) @@ -191,10 +196,10 @@ func (ts *MsgTestSuite) TestOutgoingQueue() { ts.NotNil(msg) // make sure it is the message we just added - ts.Equal(dbMsg.ID, msg.ID) + ts.Equal(dbMsg.ID(), msg.ID()) // and that it has the appropriate text - ts.Equal(msg.Text, "test message") + ts.Equal(msg.Text(), "test message") // mark this message as dealt with ts.b.MarkOutgoingMsgComplete(msg) @@ -237,7 +242,7 @@ func (ts *MsgTestSuite) TestWriteMsg() { // create a new courier msg urn := courier.NewTelURNForChannel("12065551212", knChannel) - msg := courier.NewIncomingMsg(knChannel, urn, "test123").WithExternalID("ext123").WithReceivedOn(now).WithContactName("test contact") + msg := newMsg(MsgIncoming, knChannel, urn, "test123").WithExternalID("ext123").WithReceivedOn(now).WithContactName("test contact") // try to write it to our db err := ts.b.WriteMsg(msg) @@ -247,36 +252,36 @@ func (ts *MsgTestSuite) TestWriteMsg() { ts.NotZero(msg.ID) // load it back from the id - m, err := readMsgFromDB(ts.b, msg.ID) + m, err := readMsgFromDB(ts.b, msg.ID()) ts.NoError(err) // load our URN - contactURN, err := contactURNForURN(ts.b.db, m.OrgID, m.ChannelID, m.ContactID, urn) + contactURN, err := contactURNForURN(ts.b.db, m.OrgID_, m.ChannelID_, m.ContactID_, urn) ts.NoError(err) // make sure our values are set appropriately - ts.Equal(knChannel.ID_, m.ChannelID) - ts.Equal(knChannel.OrgID_, m.OrgID) - ts.Equal(contactURN.ContactID, m.ContactID) - ts.Equal(contactURN.ID, m.ContactURNID) - ts.Equal(MsgIncoming, m.Direction) - ts.Equal(courier.MsgPending, m.Status) - ts.Equal(DefaultPriority, m.Priority) - ts.Equal("ext123", m.ExternalID) - ts.Equal("test123", m.Text) - ts.Equal([]string(nil), m.Attachments) - ts.Equal(1, m.MessageCount) - ts.Equal(0, m.ErrorCount) - ts.Equal(now, m.SentOn.In(time.UTC)) - ts.NotNil(m.NextAttempt) - ts.NotNil(m.CreatedOn) - ts.NotNil(m.ModifiedOn) - ts.NotNil(m.QueuedOn) - - contact, err := contactForURN(ts.b.db, m.OrgID, m.ChannelID, urn, "") + ts.Equal(knChannel.ID_, m.ChannelID_) + ts.Equal(knChannel.OrgID_, m.OrgID_) + ts.Equal(contactURN.ContactID, m.ContactID_) + ts.Equal(contactURN.ID, m.ContactURNID_) + ts.Equal(MsgIncoming, m.Direction_) + ts.Equal(courier.MsgPending, m.Status_) + ts.Equal(DefaultPriority, m.Priority_) + ts.Equal("ext123", m.ExternalID_) + ts.Equal("test123", m.Text_) + ts.Equal([]string(nil), m.Attachments_) + ts.Equal(1, m.MessageCount_) + ts.Equal(0, m.ErrorCount_) + ts.Equal(now, m.SentOn_.In(time.UTC)) + ts.NotNil(m.NextAttempt_) + ts.NotNil(m.CreatedOn_) + ts.NotNil(m.ModifiedOn_) + ts.NotNil(m.QueuedOn_) + + contact, err := contactForURN(ts.b.db, m.OrgID_, m.ChannelID_, urn, "") ts.Equal("test contact", contact.Name) - ts.Equal(m.OrgID, contact.OrgID) - ts.Equal(m.ContactID, contact.ID) + ts.Equal(m.OrgID_, contact.OrgID) + ts.Equal(m.ContactID_, contact.ID) ts.NotNil(contact.UUID) ts.NotNil(contact.ID) } diff --git a/backends/rapidpro/msg.go b/backends/rapidpro/msg.go index 2bfbf56a6..db864c7e2 100644 --- a/backends/rapidpro/msg.go +++ b/backends/rapidpro/msg.go @@ -13,6 +13,7 @@ import ( "time" "github.com/nyaruka/courier" + "github.com/nyaruka/courier/queue" "github.com/nyaruka/courier/utils" filetype "gopkg.in/h2non/filetype.v1" ) @@ -47,18 +48,18 @@ const ( MsgArchived MsgVisibility = "A" ) -// WriteMsg creates a message given the passed in arguments, returning the uuid of the created message -func writeMsg(b *backend, msg *courier.Msg) error { - m := newDBMsgFromMsg(msg) +// WriteMsg creates a message given the passed in arguments +func writeMsg(b *backend, msg courier.Msg) error { + m := msg.(*DBMsg) // if we have media, go download it to S3 - for i, attachment := range msg.Attachments { + for i, attachment := range m.Attachments_ { if strings.HasPrefix(attachment, "http") { - url, err := downloadMediaToS3(b, msg.UUID, attachment) + url, err := downloadMediaToS3(b, m.UUID_, attachment) if err != nil { return err } - msg.Attachments[i] = url + m.Attachments_[i] = url } } @@ -69,47 +70,35 @@ func writeMsg(b *backend, msg *courier.Msg) error { if err != nil { return courier.WriteToSpool(b.config.SpoolDir, "msgs", m) } - - // set the id on the message returned (could be 0, that's ok) - msg.ID = m.ID - - // TODO: spool backdown for failure to add to redis return err } -func newDBMsgFromMsg(m *courier.Msg) *DBMsg { - attachments := make([]string, len(m.Attachments)) - for i := range m.Attachments { - attachments[i] = m.Attachments[i] - } - +// newMsg creates a new DBMsg object with the passed in parameters +func newMsg(direction MsgDirection, channel courier.Channel, urn courier.URN, text string) courier.Msg { now := time.Now() - - rpChannel := m.Channel.(*DBChannel) + dbChannel := channel.(*DBChannel) return &DBMsg{ - OrgID: rpChannel.OrgID(), - UUID: m.UUID, - Direction: MsgIncoming, - Status: courier.MsgPending, - Visibility: MsgVisible, - Priority: DefaultPriority, - Text: m.Text, - Attachments: attachments, - ExternalID: m.ExternalID, - - ChannelID: rpChannel.ID(), - ChannelUUID: m.Channel.UUID(), - URN: m.URN, - ContactName: m.ContactName, - - MessageCount: 1, - - NextAttempt: now, - CreatedOn: now, - ModifiedOn: now, - QueuedOn: now, - SentOn: *m.ReceivedOn, + Channel_: channel, + + OrgID_: dbChannel.OrgID(), + UUID_: courier.NewMsgUUID(), + Direction_: direction, + Status_: courier.MsgPending, + Visibility_: MsgVisible, + Priority_: DefaultPriority, + Text_: text, + + ChannelID_: dbChannel.ID(), + ChannelUUID_: dbChannel.UUID(), + + URN_: urn, + MessageCount_: 1, + + NextAttempt_: now, + CreatedOn_: now, + ModifiedOn_: now, + QueuedOn_: now, } } @@ -123,7 +112,7 @@ RETURNING id func writeMsgToDB(b *backend, m *DBMsg) error { // grab the contact for this msg - contact, err := contactForURN(b.db, m.OrgID, m.ChannelID, m.URN, m.ContactName) + contact, err := contactForURN(b.db, m.OrgID_, m.ChannelID_, m.URN_, m.ContactName_) // our db is down, write to the spool, we will write/queue this later if err != nil { @@ -131,21 +120,21 @@ func writeMsgToDB(b *backend, m *DBMsg) error { } // set our contact and urn ids from our contact - m.ContactID = contact.ID - m.ContactURNID = contact.URNID + m.ContactID_ = contact.ID + m.ContactURNID_ = contact.URNID rows, err := b.db.NamedQuery(insertMsgSQL, m) if err != nil { return err } rows.Next() - err = rows.Scan(&m.ID) + err = rows.Scan(&m.ID_) if err != nil { return err } // queue this up to be handled by RapidPro - b.notifier.addMsg(m.ID) + b.notifier.addMsg(m.ID_) return err } @@ -239,31 +228,67 @@ func (b *backend) flushMsgFile(filename string, contents []byte) error { // DBMsg is our base struct to represent msgs both in our JSON and db representations type DBMsg struct { - OrgID OrgID `json:"org_id" db:"org_id"` - ID courier.MsgID `json:"id" db:"id"` - UUID courier.MsgUUID `json:"uuid"` - Direction MsgDirection `json:"direction" db:"direction"` - Status courier.MsgStatus `json:"status" db:"status"` - Visibility MsgVisibility `json:"visibility" db:"visibility"` - Priority MsgPriority `json:"priority" db:"priority"` - URN courier.URN `json:"urn"` - Text string `json:"text" db:"text"` - Attachments []string `json:"attachments"` - ExternalID string `json:"external_id" db:"external_id"` - - ChannelID ChannelID `json:"channel_id" db:"channel_id"` - ContactID ContactID `json:"contact_id" db:"contact_id"` - ContactURNID ContactURNID `json:"contact_urn_id" db:"contact_urn_id"` - - MessageCount int `json:"msg_count" db:"msg_count"` - ErrorCount int `json:"error_count" db:"error_count"` - - ChannelUUID courier.ChannelUUID `json:"channel_uuid"` - ContactName string `json:"contact_name"` - - NextAttempt time.Time `json:"next_attempt" db:"next_attempt"` - CreatedOn time.Time `json:"created_on" db:"created_on"` - ModifiedOn time.Time `json:"modified_on" db:"modified_on"` - QueuedOn time.Time `json:"queued_on" db:"queued_on"` - SentOn time.Time `json:"sent_on" db:"sent_on"` + OrgID_ OrgID `json:"org_id" db:"org_id"` + ID_ courier.MsgID `json:"id" db:"id"` + UUID_ courier.MsgUUID `json:"uuid"` + Direction_ MsgDirection `json:"direction" db:"direction"` + Status_ courier.MsgStatus `json:"status" db:"status"` + Visibility_ MsgVisibility `json:"visibility" db:"visibility"` + Priority_ MsgPriority `json:"priority" db:"priority"` + URN_ courier.URN `json:"urn"` + Text_ string `json:"text" db:"text"` + Attachments_ []string `json:"attachments"` + ExternalID_ string `json:"external_id" db:"external_id"` + + ChannelID_ ChannelID `json:"channel_id" db:"channel_id"` + ContactID_ ContactID `json:"contact_id" db:"contact_id"` + ContactURNID_ ContactURNID `json:"contact_urn_id" db:"contact_urn_id"` + + MessageCount_ int `json:"msg_count" db:"msg_count"` + ErrorCount_ int `json:"error_count" db:"error_count"` + + ChannelUUID_ courier.ChannelUUID `json:"channel_uuid"` + ContactName_ string `json:"contact_name"` + + NextAttempt_ time.Time `json:"next_attempt" db:"next_attempt"` + CreatedOn_ time.Time `json:"created_on" db:"created_on"` + ModifiedOn_ time.Time `json:"modified_on" db:"modified_on"` + QueuedOn_ time.Time `json:"queued_on" db:"queued_on"` + SentOn_ time.Time `json:"sent_on" db:"sent_on"` + + Channel_ courier.Channel + WorkerToken_ queue.WorkerToken +} + +func (m *DBMsg) Channel() courier.Channel { return m.Channel_ } +func (m *DBMsg) ID() courier.MsgID { return m.ID_ } +func (m *DBMsg) UUID() courier.MsgUUID { return m.UUID_ } +func (m *DBMsg) Text() string { return m.Text_ } +func (m *DBMsg) Attachments() []string { return m.Attachments_ } +func (m *DBMsg) ExternalID() string { return m.ExternalID_ } +func (m *DBMsg) URN() courier.URN { return m.URN_ } +func (m *DBMsg) ContactName() string { return m.ContactName_ } + +func (m *DBMsg) ReceivedOn() *time.Time { return &m.SentOn_ } +func (m *DBMsg) SentOn() *time.Time { return &m.SentOn_ } + +// WithContactName can be used to set the contact name on a msg +func (m *DBMsg) WithContactName(name string) courier.Msg { m.ContactName_ = name; return m } + +// WithReceivedOn can be used to set sent_on on a msg in a chained call +func (m *DBMsg) WithReceivedOn(date time.Time) courier.Msg { m.SentOn_ = date; return m } + +// WithExternalID can be used to set the external id on a msg in a chained call +func (m *DBMsg) WithExternalID(id string) courier.Msg { m.ExternalID_ = id; return m } + +// WithID can be used to set the id on a msg in a chained call +func (m *DBMsg) WithID(id courier.MsgID) courier.Msg { m.ID_ = id; return m } + +// WithUUID can be used to set the id on a msg in a chained call +func (m *DBMsg) WithUUID(uuid courier.MsgUUID) courier.Msg { m.UUID_ = uuid; return m } + +// WithAttachment can be used to append to the media urls for a message +func (m *DBMsg) WithAttachment(url string) courier.Msg { + m.Attachments_ = append(m.Attachments_, url) + return m } diff --git a/handler.go b/handler.go index f926d865c..478e09430 100644 --- a/handler.go +++ b/handler.go @@ -6,7 +6,7 @@ import ( // ChannelReceiveMsgFunc is the interface ChannelHandler functions must satisfy to handle incoming msgs // The Server will take care of looking up the channel by UUID before passing it to this function. -type ChannelReceiveMsgFunc func(Channel, http.ResponseWriter, *http.Request) ([]*Msg, error) +type ChannelReceiveMsgFunc func(Channel, http.ResponseWriter, *http.Request) ([]Msg, error) // ChannelUpdateStatusFunc is the interface ChannelHandler functions must satisfy to handle incoming // status requests. The Server will take care of looking up the channel by UUID before passing it to this function. @@ -24,7 +24,7 @@ type ChannelHandler interface { Initialize(Server) error ChannelType() ChannelType ChannelName() string - SendMsg(*Msg) (*MsgStatusUpdate, error) + SendMsg(Msg) (*MsgStatusUpdate, error) } // RegisterHandler adds a new handler for a channel type, this is called by individual handlers when they are initialized diff --git a/handlers/africastalking/africastalking.go b/handlers/africastalking/africastalking.go index 61d44b68c..4cd294f2d 100644 --- a/handlers/africastalking/africastalking.go +++ b/handlers/africastalking/africastalking.go @@ -63,7 +63,7 @@ func (h *handler) Initialize(s courier.Server) error { } // ReceiveMessage is our HTTP handler function for incoming messages -func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter, r *http.Request) ([]*courier.Msg, error) { +func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter, r *http.Request) ([]courier.Msg, error) { // get our params atMsg := &messageRequest{} err := handlers.DecodeAndValidateForm(atMsg, r) @@ -82,15 +82,15 @@ func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter, urn := courier.NewTelURNForChannel(atMsg.From, channel) // build our msg - msg := courier.NewIncomingMsg(channel, urn, atMsg.Text).WithExternalID(atMsg.ID).WithReceivedOn(date) + msg := h.Backend().NewIncomingMsg(channel, urn, atMsg.Text).WithExternalID(atMsg.ID).WithReceivedOn(date) // and finally queue our message - err = h.Server().WriteMsg(msg) + err = h.Backend().WriteMsg(msg) if err != nil { return nil, err } - return []*courier.Msg{msg}, courier.WriteReceiveSuccess(w, r, msg) + return []courier.Msg{msg}, courier.WriteReceiveSuccess(w, r, msg) } // StatusMessage is our HTTP handler function for status updates @@ -109,7 +109,7 @@ func (h *handler) StatusMessage(channel courier.Channel, w http.ResponseWriter, // write our status status := courier.NewStatusUpdateForExternalID(channel, atStatus.ID, msgStatus) - err = h.Server().WriteMsgStatus(status) + err = h.Backend().WriteMsgStatus(status) if err != nil { return nil, err } @@ -118,16 +118,16 @@ func (h *handler) StatusMessage(channel courier.Channel, w http.ResponseWriter, } // SendMsg sends the passed in message, returning any error -func (h *handler) SendMsg(msg *courier.Msg) (*courier.MsgStatusUpdate, error) { - isSharedStr := msg.Channel.ConfigForKey(configIsShared, false) +func (h *handler) SendMsg(msg courier.Msg) (*courier.MsgStatusUpdate, error) { + isSharedStr := msg.Channel().ConfigForKey(configIsShared, false) isShared, _ := isSharedStr.(bool) - username := msg.Channel.StringConfigForKey(courier.ConfigUsername, "") + username := msg.Channel().StringConfigForKey(courier.ConfigUsername, "") if username == "" { return nil, fmt.Errorf("no username set for AT channel") } - apiKey := msg.Channel.StringConfigForKey(courier.ConfigAPIKey, "") + apiKey := msg.Channel().StringConfigForKey(courier.ConfigAPIKey, "") if apiKey == "" { return nil, fmt.Errorf("no API key set for AT channel") } @@ -135,13 +135,13 @@ func (h *handler) SendMsg(msg *courier.Msg) (*courier.MsgStatusUpdate, error) { // build our request form := url.Values{ "username": []string{username}, - "to": []string{msg.URN.Path()}, - "message": []string{msg.TextAndAttachments()}, + "to": []string{msg.URN().Path()}, + "message": []string{courier.GetTextAndAttachments(msg)}, } // if this isn't shared, include our from if !isShared { - form["from"] = []string{msg.Channel.Address()} + form["from"] = []string{msg.Channel().Address()} } req, err := http.NewRequest(http.MethodPost, sendURL, strings.NewReader(form.Encode())) @@ -151,8 +151,8 @@ func (h *handler) SendMsg(msg *courier.Msg) (*courier.MsgStatusUpdate, error) { rr, err := utils.MakeHTTPRequest(req) // record our status and log - status := courier.NewStatusUpdateForID(msg.Channel, msg.ID, courier.MsgErrored) - status.AddLog(courier.NewChannelLogFromRR(msg.Channel, msg.ID, rr)) + status := courier.NewStatusUpdateForID(msg.Channel(), msg.ID(), courier.MsgErrored) + status.AddLog(courier.NewChannelLogFromRR(msg.Channel(), msg.ID(), rr)) if err != nil { return status, err } diff --git a/handlers/africastalking/africastalking_test.go b/handlers/africastalking/africastalking_test.go index 0f7181f7d..875cdf07f 100644 --- a/handlers/africastalking/africastalking_test.go +++ b/handlers/africastalking/africastalking_test.go @@ -47,7 +47,7 @@ func BenchmarkHandler(b *testing.B) { } // setSendURL takes care of setting the sendURL to call -func setSendURL(server *httptest.Server, channel courier.Channel, msg *courier.Msg) { +func setSendURL(server *httptest.Server, channel courier.Channel, msg courier.Msg) { sendURL = server.URL } diff --git a/handlers/base.go b/handlers/base.go index ffee8a0fd..0e57ecf63 100644 --- a/handlers/base.go +++ b/handlers/base.go @@ -24,6 +24,7 @@ type BaseHandler struct { channelType courier.ChannelType name string server courier.Server + backend courier.Backend } // NewBaseHandler returns a newly constructed BaseHandler with the passed in parameters @@ -34,6 +35,7 @@ func NewBaseHandler(channelType courier.ChannelType, name string) BaseHandler { // SetServer can be used to change the server on a BaseHandler func (h *BaseHandler) SetServer(server courier.Server) { h.server = server + h.backend = server.Backend() } // Server returns the server instance on the BaseHandler @@ -41,6 +43,11 @@ func (h *BaseHandler) Server() courier.Server { return h.server } +// Backend returns the backend instance on the BaseHandler +func (h *BaseHandler) Backend() courier.Backend { + return h.backend +} + // ChannelType returns the channel type that this handler deals with func (h *BaseHandler) ChannelType() courier.ChannelType { return h.channelType diff --git a/handlers/blackmyna/blackmyna.go b/handlers/blackmyna/blackmyna.go index fbf4d33b4..54b7e5676 100644 --- a/handlers/blackmyna/blackmyna.go +++ b/handlers/blackmyna/blackmyna.go @@ -40,7 +40,7 @@ func (h *handler) Initialize(s courier.Server) error { } // ReceiveMessage is our HTTP handler function for incoming messages -func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter, r *http.Request) ([]*courier.Msg, error) { +func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter, r *http.Request) ([]courier.Msg, error) { // get our params bmMsg := &bmMessage{} err := handlers.DecodeAndValidateForm(bmMsg, r) @@ -52,15 +52,15 @@ func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter, urn := courier.NewTelURNForChannel(bmMsg.From, channel) // build our msg - msg := courier.NewIncomingMsg(channel, urn, bmMsg.Text) + msg := h.Backend().NewIncomingMsg(channel, urn, bmMsg.Text) // and finally queue our message - err = h.Server().WriteMsg(msg) + err = h.Backend().WriteMsg(msg) if err != nil { return nil, err } - return []*courier.Msg{msg}, courier.WriteReceiveSuccess(w, r, msg) + return []courier.Msg{msg}, courier.WriteReceiveSuccess(w, r, msg) } type bmMessage struct { @@ -92,7 +92,7 @@ func (h *handler) StatusMessage(channel courier.Channel, w http.ResponseWriter, // write our status status := courier.NewStatusUpdateForExternalID(channel, bmStatus.ID, msgStatus) - err = h.Server().WriteMsgStatus(status) + err = h.Backend().WriteMsgStatus(status) if err != nil { return nil, err } @@ -101,27 +101,27 @@ func (h *handler) StatusMessage(channel courier.Channel, w http.ResponseWriter, } // SendMsg sends the passed in message, returning any error -func (h *handler) SendMsg(msg *courier.Msg) (*courier.MsgStatusUpdate, error) { - username := msg.Channel.StringConfigForKey(courier.ConfigUsername, "") +func (h *handler) SendMsg(msg courier.Msg) (*courier.MsgStatusUpdate, error) { + username := msg.Channel().StringConfigForKey(courier.ConfigUsername, "") if username == "" { return nil, fmt.Errorf("no username set for BM channel") } - password := msg.Channel.StringConfigForKey(courier.ConfigPassword, "") + password := msg.Channel().StringConfigForKey(courier.ConfigPassword, "") if password == "" { return nil, fmt.Errorf("no password set for BM channel") } - apiKey := msg.Channel.StringConfigForKey(courier.ConfigAPIKey, "") + apiKey := msg.Channel().StringConfigForKey(courier.ConfigAPIKey, "") if apiKey == "" { return nil, fmt.Errorf("no API key set for BM channel") } // build our request form := url.Values{ - "address": []string{msg.URN.Path()}, - "senderaddress": []string{msg.Channel.Address()}, - "message": []string{msg.TextAndAttachments()}, + "address": []string{msg.URN().Path()}, + "senderaddress": []string{msg.Channel().Address()}, + "message": []string{courier.GetTextAndAttachments(msg)}, } req, err := http.NewRequest(http.MethodPost, sendURL, strings.NewReader(form.Encode())) @@ -130,8 +130,8 @@ func (h *handler) SendMsg(msg *courier.Msg) (*courier.MsgStatusUpdate, error) { rr, err := utils.MakeHTTPRequest(req) // record our status and log - status := courier.NewStatusUpdateForID(msg.Channel, msg.ID, courier.MsgErrored) - status.AddLog(courier.NewChannelLogFromRR(msg.Channel, msg.ID, rr)) + status := courier.NewStatusUpdateForID(msg.Channel(), msg.ID(), courier.MsgErrored) + status.AddLog(courier.NewChannelLogFromRR(msg.Channel(), msg.ID(), rr)) if err != nil { return status, err } diff --git a/handlers/blackmyna/blackmyna_test.go b/handlers/blackmyna/blackmyna_test.go index adb88bc19..56c7c4395 100644 --- a/handlers/blackmyna/blackmyna_test.go +++ b/handlers/blackmyna/blackmyna_test.go @@ -45,7 +45,7 @@ func BenchmarkHandler(b *testing.B) { } // setSend takes care of setting the sendURL to call -func setSendURL(server *httptest.Server, channel courier.Channel, msg *courier.Msg) { +func setSendURL(server *httptest.Server, channel courier.Channel, msg courier.Msg) { sendURL = server.URL } diff --git a/handlers/dummy/dummy.go b/handlers/dummy/dummy.go index ee4a69c07..2e79203b9 100644 --- a/handlers/dummy/dummy.go +++ b/handlers/dummy/dummy.go @@ -27,7 +27,7 @@ func (h *handler) Initialize(s courier.Server) error { } // SendMsg sends the passed in message, returning any error -func (h *handler) SendMsg(msg *courier.Msg) (*courier.MsgStatusUpdate, error) { +func (h *handler) SendMsg(msg courier.Msg) (*courier.MsgStatusUpdate, error) { time.Sleep(time.Second * 1) return nil, nil } diff --git a/handlers/external/external.go b/handlers/external/external.go index 31f3611b6..feeaee684 100644 --- a/handlers/external/external.go +++ b/handlers/external/external.go @@ -58,7 +58,7 @@ func (h *handler) Initialize(s courier.Server) error { } // ReceiveMessage is our HTTP handler function for incoming messages -func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter, r *http.Request) ([]*courier.Msg, error) { +func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter, r *http.Request) ([]courier.Msg, error) { externalMessage := &externalMessage{} handlers.DecodeAndValidateQueryParams(externalMessage, r) @@ -103,15 +103,15 @@ func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter, } // build our msg - msg := courier.NewIncomingMsg(channel, urn, externalMessage.Text).WithReceivedOn(date) + msg := h.Backend().NewIncomingMsg(channel, urn, externalMessage.Text).WithReceivedOn(date) // and write it - err = h.Server().WriteMsg(msg) + err = h.Backend().WriteMsg(msg) if err != nil { return nil, err } - return []*courier.Msg{msg}, courier.WriteReceiveSuccess(w, r, msg) + return []courier.Msg{msg}, courier.WriteReceiveSuccess(w, r, msg) } type externalMessage struct { @@ -153,7 +153,7 @@ func (h *handler) StatusMessage(statusString string, channel courier.Channel, w // write our status status := courier.NewStatusUpdateForID(channel, courier.NewMsgID(statusForm.ID), msgStatus) - err = h.Server().WriteMsgStatus(status) + err = h.Backend().WriteMsgStatus(status) if err != nil { return nil, err } @@ -172,25 +172,25 @@ var statusMappings = map[string]courier.MsgStatus{ } // SendMsg sends the passed in message, returning any error -func (h *handler) SendMsg(msg *courier.Msg) (*courier.MsgStatusUpdate, error) { - sendURL := msg.Channel.StringConfigForKey(courier.ConfigSendURL, "") +func (h *handler) SendMsg(msg courier.Msg) (*courier.MsgStatusUpdate, error) { + sendURL := msg.Channel().StringConfigForKey(courier.ConfigSendURL, "") if sendURL == "" { return nil, fmt.Errorf("no send url set for EX channel") } - sendMethod := msg.Channel.StringConfigForKey(courier.ConfigSendMethod, http.MethodPost) - sendBody := msg.Channel.StringConfigForKey(courier.ConfigSendBody, "") - contentType := msg.Channel.StringConfigForKey(courier.ConfigContentType, contentURLEncoded) + sendMethod := msg.Channel().StringConfigForKey(courier.ConfigSendMethod, http.MethodPost) + sendBody := msg.Channel().StringConfigForKey(courier.ConfigSendBody, "") + contentType := msg.Channel().StringConfigForKey(courier.ConfigContentType, contentURLEncoded) // build our request form := map[string]string{ - "id": strconv.FormatInt(msg.ID.Int64, 10), - "text": msg.TextAndAttachments(), - "to": msg.URN.Path(), - "to_no_plus": strings.TrimPrefix(msg.URN.Path(), "+"), - "from": msg.Channel.Address(), - "from_no_plus": strings.TrimPrefix(msg.Channel.Address(), "+"), - "channel": msg.Channel.UUID().String(), + "id": strconv.FormatInt(msg.ID().Int64, 10), + "text": courier.GetTextAndAttachments(msg), + "to": msg.URN().Path(), + "to_no_plus": strings.TrimPrefix(msg.URN().Path(), "+"), + "from": msg.Channel().Address(), + "from_no_plus": strings.TrimPrefix(msg.Channel().Address(), "+"), + "channel": msg.Channel().UUID().String(), } url := replaceVariables(sendURL, form, contentURLEncoded) @@ -207,8 +207,8 @@ func (h *handler) SendMsg(msg *courier.Msg) (*courier.MsgStatusUpdate, error) { rr, err := utils.MakeHTTPRequest(req) // record our status and log - status := courier.NewStatusUpdateForID(msg.Channel, msg.ID, courier.MsgErrored) - status.AddLog(courier.NewChannelLogFromRR(msg.Channel, msg.ID, rr)) + status := courier.NewStatusUpdateForID(msg.Channel(), msg.ID(), courier.MsgErrored) + status.AddLog(courier.NewChannelLogFromRR(msg.Channel(), msg.ID(), rr)) if err != nil { return status, err } diff --git a/handlers/external/external_test.go b/handlers/external/external_test.go index bf7b60705..28feaec3a 100644 --- a/handlers/external/external_test.go +++ b/handlers/external/external_test.go @@ -61,7 +61,7 @@ func BenchmarkHandler(b *testing.B) { } // setSendURL takes care of setting the send_url to our test server host -func setSendURL(server *httptest.Server, channel courier.Channel, msg *courier.Msg) { +func setSendURL(server *httptest.Server, channel courier.Channel, msg courier.Msg) { // this is actually a path, which we'll combine with the test server URL sendURL := channel.StringConfigForKey("send_path", "") sendURL, _ = utils.AddURLPath(server.URL, sendURL) diff --git a/handlers/kannel/kannel.go b/handlers/kannel/kannel.go index 302a6ed13..fbcc4a143 100644 --- a/handlers/kannel/kannel.go +++ b/handlers/kannel/kannel.go @@ -49,7 +49,7 @@ func (h *handler) Initialize(s courier.Server) error { } // ReceiveMessage is our HTTP handler function for incoming messages -func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter, r *http.Request) ([]*courier.Msg, error) { +func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter, r *http.Request) ([]courier.Msg, error) { // get our params kannelMsg := &kannelMessage{} err := handlers.DecodeAndValidateQueryParams(kannelMsg, r) @@ -64,15 +64,15 @@ func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter, urn := courier.NewTelURNForChannel(kannelMsg.Sender, channel) // build our msg - msg := courier.NewIncomingMsg(channel, urn, kannelMsg.Message).WithExternalID(fmt.Sprintf("%d", kannelMsg.ID)).WithReceivedOn(date) + msg := h.Backend().NewIncomingMsg(channel, urn, kannelMsg.Message).WithExternalID(fmt.Sprintf("%d", kannelMsg.ID)).WithReceivedOn(date) // and finally queue our message - err = h.Server().WriteMsg(msg) + err = h.Backend().WriteMsg(msg) if err != nil { return nil, err } - return []*courier.Msg{msg}, courier.WriteReceiveSuccess(w, r, msg) + return []courier.Msg{msg}, courier.WriteReceiveSuccess(w, r, msg) } type kannelMessage struct { @@ -106,7 +106,7 @@ func (h *handler) StatusMessage(channel courier.Channel, w http.ResponseWriter, // write our status status := courier.NewStatusUpdateForID(channel, kannelStatus.ID, msgStatus) - err = h.Server().WriteMsgStatus(status) + err = h.Backend().WriteMsgStatus(status) if err != nil { return nil, err } @@ -115,31 +115,31 @@ func (h *handler) StatusMessage(channel courier.Channel, w http.ResponseWriter, } // SendMsg sends the passed in message, returning any error -func (h *handler) SendMsg(msg *courier.Msg) (*courier.MsgStatusUpdate, error) { - username := msg.Channel.StringConfigForKey(courier.ConfigUsername, "") +func (h *handler) SendMsg(msg courier.Msg) (*courier.MsgStatusUpdate, error) { + username := msg.Channel().StringConfigForKey(courier.ConfigUsername, "") if username == "" { return nil, fmt.Errorf("no username set for KN channel") } - password := msg.Channel.StringConfigForKey(courier.ConfigPassword, "") + password := msg.Channel().StringConfigForKey(courier.ConfigPassword, "") if password == "" { return nil, fmt.Errorf("no password set for KN channel") } - sendURL := msg.Channel.StringConfigForKey(courier.ConfigSendURL, "") + sendURL := msg.Channel().StringConfigForKey(courier.ConfigSendURL, "") if sendURL == "" { return nil, fmt.Errorf("no send url set for KN channel") } - dlrURL := fmt.Sprintf("%s%s%s/?id=%d&status=%%d", h.Server().Config().BaseURL, "/c/kn/", msg.Channel.UUID(), msg.ID.Int64) + dlrURL := fmt.Sprintf("%s%s%s/?id=%d&status=%%d", h.Server().Config().BaseURL, "/c/kn/", msg.Channel().UUID(), msg.ID().Int64) // build our request form := url.Values{ "username": []string{username}, "password": []string{password}, - "from": []string{msg.Channel.Address()}, - "text": []string{msg.TextAndAttachments()}, - "to": []string{msg.URN.Path()}, + "from": []string{msg.Channel().Address()}, + "text": []string{courier.GetTextAndAttachments(msg)}, + "to": []string{msg.URN().Path()}, "dlr-url": []string{dlrURL}, "dlr-mask": []string{"31"}, } @@ -149,23 +149,23 @@ func (h *handler) SendMsg(msg *courier.Msg) (*courier.MsgStatusUpdate, error) { // form["priority"] = []string{"1"} //} - useNationalStr := msg.Channel.ConfigForKey(configUseNational, false) + useNationalStr := msg.Channel().ConfigForKey(configUseNational, false) useNational, _ := useNationalStr.(bool) // if we are meant to use national formatting (no country code) pull that out if useNational { - parsed, err := phonenumbers.Parse(msg.URN.Path(), encodingDefault) + parsed, err := phonenumbers.Parse(msg.URN().Path(), encodingDefault) if err == nil { form["to"] = []string{strings.Replace(phonenumbers.Format(parsed, phonenumbers.NATIONAL), " ", "", -1)} } } // figure out what encoding to tell kannel to send as - encoding := msg.Channel.StringConfigForKey(configEncoding, encodingSmart) + encoding := msg.Channel().StringConfigForKey(configEncoding, encodingSmart) // if we are smart, first try to convert to GSM7 chars if encoding == encodingSmart { - replaced := gsm7.ReplaceNonGSM7Chars(msg.TextAndAttachments()) + replaced := gsm7.ReplaceNonGSM7Chars(courier.GetTextAndAttachments(msg)) if gsm7.IsGSM7(replaced) { form["text"] = []string{replaced} } else { @@ -188,7 +188,7 @@ func (h *handler) SendMsg(msg *courier.Msg) (*courier.MsgStatusUpdate, error) { } // ignore SSL warnings if they ask - verifySSLStr := msg.Channel.ConfigForKey(configVerifySSL, true) + verifySSLStr := msg.Channel().ConfigForKey(configVerifySSL, true) verifySSL, _ := verifySSLStr.(bool) req, err := http.NewRequest(http.MethodGet, sendURL, nil) @@ -201,8 +201,8 @@ func (h *handler) SendMsg(msg *courier.Msg) (*courier.MsgStatusUpdate, error) { } // record our status and log - status := courier.NewStatusUpdateForID(msg.Channel, msg.ID, courier.MsgErrored) - status.AddLog(courier.NewChannelLogFromRR(msg.Channel, msg.ID, rr)) + status := courier.NewStatusUpdateForID(msg.Channel(), msg.ID(), courier.MsgErrored) + status.AddLog(courier.NewChannelLogFromRR(msg.Channel(), msg.ID(), rr)) if err != nil { return status, errors.Errorf("received error sending message") } diff --git a/handlers/kannel/kannel_test.go b/handlers/kannel/kannel_test.go index 97f63d377..cca29e5b4 100644 --- a/handlers/kannel/kannel_test.go +++ b/handlers/kannel/kannel_test.go @@ -39,12 +39,12 @@ func BenchmarkHandler(b *testing.B) { } // setSendURL takes care of setting the send_url to our test server host -func setSendURL(server *httptest.Server, channel courier.Channel, msg *courier.Msg) { +func setSendURL(server *httptest.Server, channel courier.Channel, msg courier.Msg) { channel.(*courier.MockChannel).SetConfig("send_url", server.URL) } // setSendURLWithQuery takes care of setting the send_url to our test server host -func setSendURLWithQuery(server *httptest.Server, channel courier.Channel, msg *courier.Msg) { +func setSendURLWithQuery(server *httptest.Server, channel courier.Channel, msg courier.Msg) { channel.(*courier.MockChannel).SetConfig("send_url", server.URL+"?auth=foo") } diff --git a/handlers/telegram/telegram.go b/handlers/telegram/telegram.go index 2220c64ca..1cbaf822d 100644 --- a/handlers/telegram/telegram.go +++ b/handlers/telegram/telegram.go @@ -35,7 +35,7 @@ func (h *handler) Initialize(s courier.Server) error { } // ReceiveMessage is our HTTP handler function for incoming messages -func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter, r *http.Request) ([]*courier.Msg, error) { +func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter, r *http.Request) ([]courier.Msg, error) { te := &telegramEnvelope{} err := handlers.DecodeAndValidateJSON(te, r) if err != nil { @@ -102,28 +102,28 @@ func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter, } // build our msg - msg := courier.NewIncomingMsg(channel, urn, text).WithReceivedOn(date).WithExternalID(fmt.Sprintf("%d", te.Message.MessageID)).WithContactName(name) + msg := h.Backend().NewIncomingMsg(channel, urn, text).WithReceivedOn(date).WithExternalID(fmt.Sprintf("%d", te.Message.MessageID)).WithContactName(name) if mediaURL != "" { - msg.AddAttachment(mediaURL) + msg.WithAttachment(mediaURL) } // queue our message - err = h.Server().WriteMsg(msg) + err = h.Backend().WriteMsg(msg) if err != nil { return nil, err } - return []*courier.Msg{msg}, courier.WriteReceiveSuccess(w, r, msg) + return []courier.Msg{msg}, courier.WriteReceiveSuccess(w, r, msg) } -func (h *handler) sendMsgPart(msg *courier.Msg, token string, path string, form url.Values) (string, *courier.ChannelLog, error) { +func (h *handler) sendMsgPart(msg courier.Msg, token string, path string, form url.Values) (string, *courier.ChannelLog, error) { sendURL := fmt.Sprintf("%s/bot%s/%s", telegramAPIURL, token, path) req, err := http.NewRequest(http.MethodPost, sendURL, strings.NewReader(form.Encode())) rr, err := utils.MakeHTTPRequest(req) // build our channel log - log := courier.NewChannelLogFromRR(msg.Channel, msg.ID, rr) + log := courier.NewChannelLogFromRR(msg.Channel(), msg.ID(), rr) // was this request successful? ok, err := jsonparser.GetBoolean([]byte(rr.Body), "ok") @@ -141,8 +141,8 @@ func (h *handler) sendMsgPart(msg *courier.Msg, token string, path string, form } // SendMsg sends the passed in message, returning any error -func (h *handler) SendMsg(msg *courier.Msg) (*courier.MsgStatusUpdate, error) { - confAuth := msg.Channel.ConfigForKey(courier.ConfigAuthToken, "") +func (h *handler) SendMsg(msg courier.Msg) (*courier.MsgStatusUpdate, error) { + confAuth := msg.Channel().ConfigForKey(courier.ConfigAuthToken, "") authToken, isStr := confAuth.(string) if !isStr || authToken == "" { return nil, fmt.Errorf("invalid auth token config") @@ -150,21 +150,21 @@ func (h *handler) SendMsg(msg *courier.Msg) (*courier.MsgStatusUpdate, error) { // we only caption if there is only a single attachment caption := "" - if len(msg.Attachments) == 1 { - caption = msg.Text + if len(msg.Attachments()) == 1 { + caption = msg.Text() } // the status that will be written for this message - status := courier.NewStatusUpdateForID(msg.Channel, msg.ID, courier.MsgErrored) + status := courier.NewStatusUpdateForID(msg.Channel(), msg.ID(), courier.MsgErrored) // whether we encountered any errors sending any parts hasError := true // if we have text, send that if we aren't sending it as a caption - if msg.Text != "" && caption == "" { + if msg.Text() != "" && caption == "" { form := url.Values{ - "chat_id": []string{msg.URN.Path()}, - "text": []string{msg.Text}, + "chat_id": []string{msg.URN().Path()}, + "text": []string{msg.Text()}, } externalID, log, err := h.sendMsgPart(msg, authToken, "sendMessage", form) status.ExternalID = externalID @@ -173,7 +173,7 @@ func (h *handler) SendMsg(msg *courier.Msg) (*courier.MsgStatusUpdate, error) { } // send each attachment - for _, attachment := range msg.Attachments { + for _, attachment := range msg.Attachments() { mediaType, mediaURL := courier.SplitAttachment(attachment) switch mediaType { case "image": @@ -207,7 +207,7 @@ func (h *handler) SendMsg(msg *courier.Msg) (*courier.MsgStatusUpdate, error) { status.AddLog(log) default: - status.AddLog(courier.NewChannelLog(msg.Channel, msg.ID, "", courier.NilStatusCode, + status.AddLog(courier.NewChannelLog(msg.Channel(), msg.ID(), "", courier.NilStatusCode, fmt.Errorf("unknown media type: %s", mediaType), "", "", time.Duration(0), time.Now())) hasError = true } diff --git a/handlers/test.go b/handlers/test.go index 640811fbe..cf8028ca1 100644 --- a/handlers/test.go +++ b/handlers/test.go @@ -41,7 +41,7 @@ type ChannelHandleTestCase struct { } // SendPrepFunc allows test cases to modify the channel, msg or server before a message is sent -type SendPrepFunc func(*httptest.Server, courier.Channel, *courier.Msg) +type SendPrepFunc func(*httptest.Server, courier.Channel, courier.Msg) // ChannelSendTestCase defines the test values for a particular test case type ChannelSendTestCase struct { @@ -134,9 +134,9 @@ func RunChannelSendTestCases(t *testing.T, channel courier.Channel, handler cour t.Run(testCase.Label, func(t *testing.T) { require := require.New(t) - msg := courier.NewOutgoingMsg(channel, courier.URN(testCase.URN), testCase.Text) + msg := mb.NewOutgoingMsg(channel, courier.URN(testCase.URN), testCase.Text) for _, a := range testCase.Attachments { - msg.AddAttachment(a) + msg.WithAttachment(a) } var testRequest *http.Request @@ -231,25 +231,25 @@ func RunChannelTestCases(t *testing.T, channels []courier.Channel, handler couri require.Nil(err) if testCase.Name != nil { - require.Equal(*testCase.Name, msg.ContactName) + require.Equal(*testCase.Name, msg.ContactName()) } if testCase.Text != nil { - require.Equal(*testCase.Text, msg.Text) + require.Equal(*testCase.Text, msg.Text()) } if testCase.URN != nil { - require.Equal(*testCase.URN, string(msg.URN)) + require.Equal(*testCase.URN, string(msg.URN())) } if testCase.External != nil { - require.Equal(*testCase.External, msg.ExternalID) + require.Equal(*testCase.External, msg.ExternalID()) } if testCase.Attachment != nil { - require.Equal([]string{*testCase.Attachment}, msg.Attachments) + require.Equal([]string{*testCase.Attachment}, msg.Attachments()) } if len(testCase.Attachments) > 0 { - require.Equal(testCase.Attachments, msg.Attachments) + require.Equal(testCase.Attachments, msg.Attachments()) } if testCase.Date != nil { - require.Equal(*testCase.Date, *msg.ReceivedOn) + require.Equal(*testCase.Date, *msg.ReceivedOn()) } } else if err != courier.ErrMsgNotFound { t.Fatalf("unexpected msg inserted: %v", err) diff --git a/handlers/twilio/twilio.go b/handlers/twilio/twilio.go index bf7add06a..8fd14f5db 100644 --- a/handlers/twilio/twilio.go +++ b/handlers/twilio/twilio.go @@ -82,7 +82,7 @@ var twStatusMapping = map[string]courier.MsgStatus{ } // ReceiveMessage is our HTTP handler function for incoming messages -func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter, r *http.Request) ([]*courier.Msg, error) { +func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter, r *http.Request) ([]courier.Msg, error) { err := h.validateSignature(channel, r) if err != nil { return nil, err @@ -104,21 +104,21 @@ func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter, } // build our msg - msg := courier.NewIncomingMsg(channel, urn, twMsg.Body).WithExternalID(twMsg.MessageSID) + msg := h.Backend().NewIncomingMsg(channel, urn, twMsg.Body).WithExternalID(twMsg.MessageSID) // process any attached media for i := 0; i < twMsg.NumMedia; i++ { mediaURL := r.PostForm.Get(fmt.Sprintf("MediaUrl%d", i)) - msg.AddAttachment(mediaURL) + msg.WithAttachment(mediaURL) } // and finally queue our message - err = h.Server().WriteMsg(msg) + err = h.Backend().WriteMsg(msg) if err != nil { return nil, err } - return []*courier.Msg{msg}, h.writeReceiveSuccess(w) + return []courier.Msg{msg}, h.writeReceiveSuccess(w) } // StatusMessage is our HTTP handler function for status updates @@ -143,7 +143,7 @@ func (h *handler) StatusMessage(channel courier.Channel, w http.ResponseWriter, // write our status status := courier.NewStatusUpdateForExternalID(channel, twStatus.MessageSID, msgStatus) defer status.Release() - err = h.Server().WriteMsgStatus(status) + err = h.Backend().WriteMsgStatus(status) if err != nil { return nil, err } @@ -152,42 +152,42 @@ func (h *handler) StatusMessage(channel courier.Channel, w http.ResponseWriter, } // SendMsg sends the passed in message, returning any error -func (h *handler) SendMsg(msg *courier.Msg) (*courier.MsgStatusUpdate, error) { +func (h *handler) SendMsg(msg courier.Msg) (*courier.MsgStatusUpdate, error) { // build our callback URL - callbackURL := fmt.Sprintf("%s/c/kn/%s/status/", h.Server().Config().BaseURL, msg.Channel.UUID()) + callbackURL := fmt.Sprintf("%s/c/kn/%s/status/", h.Server().Config().BaseURL, msg.Channel().UUID()) - accountSID := msg.Channel.StringConfigForKey(configAccountSID, "") + accountSID := msg.Channel().StringConfigForKey(configAccountSID, "") if accountSID == "" { return nil, fmt.Errorf("missing account sid for twilio channel") } - accountToken := msg.Channel.StringConfigForKey(courier.ConfigAuthToken, "") + accountToken := msg.Channel().StringConfigForKey(courier.ConfigAuthToken, "") if accountToken == "" { return nil, fmt.Errorf("missing account auth token for twilio channel") } // build our request form := url.Values{ - "To": []string{msg.URN.Path()}, - "Body": []string{msg.Text}, + "To": []string{msg.URN().Path()}, + "Body": []string{msg.Text()}, "StatusCallback": []string{callbackURL}, } // add any media URL - if len(msg.Attachments) > 0 { - _, mediaURL := courier.SplitAttachment(msg.Attachments[0]) + if len(msg.Attachments()) > 0 { + _, mediaURL := courier.SplitAttachment(msg.Attachments()[0]) form["MediaURL"] = []string{mediaURL} } // set our from, either as a messaging service or from our address - serviceSID := msg.Channel.StringConfigForKey(configMessagingServiceSID, "") + serviceSID := msg.Channel().StringConfigForKey(configMessagingServiceSID, "") if serviceSID != "" { form["MessagingServiceSID"] = []string{serviceSID} } else { - form["From"] = []string{msg.Channel.Address()} + form["From"] = []string{msg.Channel().Address()} } - baseSendURL := msg.Channel.StringConfigForKey(configSendURL, sendURL) + baseSendURL := msg.Channel().StringConfigForKey(configSendURL, sendURL) sendURL, err := utils.AddURLPath(baseSendURL, accountSID, "Messages.json") if err != nil { return nil, err @@ -199,8 +199,8 @@ func (h *handler) SendMsg(msg *courier.Msg) (*courier.MsgStatusUpdate, error) { rr, err := utils.MakeHTTPRequest(req) // record our status and log - status := courier.NewStatusUpdateForID(msg.Channel, msg.ID, courier.MsgErrored) - status.AddLog(courier.NewChannelLogFromRR(msg.Channel, msg.ID, rr)) + status := courier.NewStatusUpdateForID(msg.Channel(), msg.ID(), courier.MsgErrored) + status.AddLog(courier.NewChannelLogFromRR(msg.Channel(), msg.ID(), rr)) // fail if we received an error if err != nil { diff --git a/handlers/twilio/twilio_test.go b/handlers/twilio/twilio_test.go index 2f4d9a120..ea0c030cc 100644 --- a/handlers/twilio/twilio_test.go +++ b/handlers/twilio/twilio_test.go @@ -69,7 +69,7 @@ func BenchmarkHandler(b *testing.B) { } // setSendURL takes care of setting the send_url to our test server host -func setSendURL(server *httptest.Server, channel courier.Channel, msg *courier.Msg) { +func setSendURL(server *httptest.Server, channel courier.Channel, msg courier.Msg) { sendURL = server.URL } diff --git a/msg.go b/msg.go index b5c8da3cf..d1129aa29 100644 --- a/msg.go +++ b/msg.go @@ -10,7 +10,6 @@ import ( "strings" "time" - "github.com/nyaruka/courier/queue" uuid "github.com/satori/go.uuid" ) @@ -63,77 +62,36 @@ func NewMsgUUID() MsgUUID { return MsgUUID{uuid.NewV4()} } -// NewIncomingMsg creates a new message from the given params -func NewIncomingMsg(channel Channel, urn URN, text string) *Msg { - m := &Msg{} - m.ID = NilMsgID - m.UUID = NewMsgUUID() - m.Channel = channel - m.Text = text - m.URN = urn - - now := time.Now() - m.ReceivedOn = &now - - return m -} - -// NewOutgoingMsg creates a new message from the given params -func NewOutgoingMsg(channel Channel, urn URN, text string) *Msg { - m := &Msg{} - m.ID = NilMsgID - m.UUID = NilMsgUUID - m.Channel = channel - m.Text = text - m.URN = urn - - return m -} - //----------------------------------------------------------------------------- -// Msg implementation +// Msg interface //----------------------------------------------------------------------------- -// Msg is our base struct to represent an incoming or outgoing message -type Msg struct { - Channel Channel - ID MsgID - UUID MsgUUID - Text string - Attachments []string - ExternalID string - URN URN - ContactName string - - WorkerToken queue.WorkerToken - - ReceivedOn *time.Time - SentOn *time.Time - WiredOn *time.Time +// Msg is our interface to represent an incoming or outgoing message +type Msg interface { + Channel() Channel + ID() MsgID + UUID() MsgUUID + Text() string + Attachments() []string + ExternalID() string + URN() URN + ContactName() string + + ReceivedOn() *time.Time + SentOn() *time.Time + + WithContactName(name string) Msg + WithReceivedOn(date time.Time) Msg + WithExternalID(id string) Msg + WithID(id MsgID) Msg + WithUUID(uuid MsgUUID) Msg + WithAttachment(url string) Msg } -// WithContactName can be used to set the contact name on a msg -func (m *Msg) WithContactName(name string) *Msg { m.ContactName = name; return m } - -// WithReceivedOn can be used to set sent_on on a msg in a chained call -func (m *Msg) WithReceivedOn(date time.Time) *Msg { m.ReceivedOn = &date; return m } - -// WithExternalID can be used to set the external id on a msg in a chained call -func (m *Msg) WithExternalID(id string) *Msg { m.ExternalID = id; return m } - -// WithID can be used to set the id on a msg in a chained call -func (m *Msg) WithID(id MsgID) *Msg { m.ID = id; return m } - -// WithUUID can be used to set the id on a msg in a chained call -func (m *Msg) WithUUID(uuid MsgUUID) *Msg { m.UUID = uuid; return m } - -// AddAttachment can be used to append to the media urls for a message -func (m *Msg) AddAttachment(url string) *Msg { m.Attachments = append(m.Attachments, url); return m } - -// TextAndAttachments returns both the text of our message as well as any attachments, newline delimited -func (m *Msg) TextAndAttachments() string { - buf := bytes.NewBuffer([]byte(m.Text)) - for _, a := range m.Attachments { +// GetTextAndAttachments returns both the text of our message as well as any attachments, newline delimited +func GetTextAndAttachments(m Msg) string { + buf := bytes.NewBuffer([]byte(m.Text())) + for _, a := range m.Attachments() { _, url := SplitAttachment(a) buf.WriteString("\n") buf.WriteString(url) diff --git a/queue/queue_test.go b/queue/queue_test.go index ebef5a2e7..c96504afb 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -50,7 +50,7 @@ func TestLua(t *testing.T) { defer close(quitter) rate := 10 - for i := 0; i < 15; i++ { + for i := 0; i < 20; i++ { err := PushOntoQueue(conn, "msgs", "chan1", rate, fmt.Sprintf("msg:%d", i), BulkPriority) assert.NoError(err) } @@ -120,11 +120,17 @@ func TestLua(t *testing.T) { t.Fatalf("Should have received chan1 and msg:32, got: %s and %s", queue, value) } - // next value should be 30 even though it is bulk + // sleep a few seconds + time.Sleep(2 * time.Second) + + // pop until we get to 30 queue, value, err = PopFromQueue(conn, "msgs") - assert.NoError(err) - if value != "msg:30" || queue != "msgs:chan1|10" { - t.Fatalf("Should have received chan1 and msg:30, got: %s and %s", queue, value) + for value != "msg:30" { + assert.NoError(err) + if queue == EmptyQueue { + t.Fatalf("Should not reach empty queue before msg:30, got: %s and %s", queue, value) + } + queue, value, err = PopFromQueue(conn, "msgs") } // popping again should give us nothing since it is too soon to send 33 diff --git a/responses.go b/responses.go index 37aa67d47..10b8d1c58 100644 --- a/responses.go +++ b/responses.go @@ -34,9 +34,9 @@ func WriteIgnored(w http.ResponseWriter, r *http.Request, message string) error } // WriteReceiveSuccess writes a JSON response for the passed in msg indicating we handled it -func WriteReceiveSuccess(w http.ResponseWriter, r *http.Request, msg *Msg) error { +func WriteReceiveSuccess(w http.ResponseWriter, r *http.Request, msg Msg) error { lg.Log(r.Context()).WithField("msg_uuid", msg.UUID).Info("message received") - return writeData(w, http.StatusOK, "Message Accepted", &receiveData{msg.UUID}) + return writeData(w, http.StatusOK, "Message Accepted", &receiveData{msg.UUID()}) } // WriteStatusSuccess writes a JSON response for the passed in status update indicating we handled it diff --git a/sender.go b/sender.go index ab364773b..774c24b0b 100644 --- a/sender.go +++ b/sender.go @@ -103,7 +103,7 @@ func (f *Foreman) Assign() { type Sender struct { id int foreman *Foreman - job chan *Msg + job chan Msg } // NewSender creates a new sender responsible for sending messages @@ -111,7 +111,7 @@ func NewSender(foreman *Foreman, id int) *Sender { sender := &Sender{ id: id, foreman: foreman, - job: make(chan *Msg, 1), + job: make(chan Msg, 1), } return sender } @@ -136,6 +136,7 @@ func (w *Sender) Send() { log.Debug("started") server := w.foreman.server + backend := server.Backend() for true { // list ourselves as available for work @@ -152,18 +153,18 @@ func (w *Sender) Send() { status, err := server.SendMsg(msg) if err != nil { - log.WithField("msgID", msg.ID.Int64).WithError(err).Info("msg errored") + log.WithField("msgID", msg.ID().Int64).WithError(err).Info("msg errored") } else { - log.WithField("msgID", msg.ID.Int64).Info("msg sent") + log.WithField("msgID", msg.ID().Int64).Info("msg sent") } // record our status - err = server.WriteMsgStatus(status) + err = backend.WriteMsgStatus(status) if err != nil { - log.WithField("msgID", msg.ID.Int64).WithError(err).Info("error writing msg status") + log.WithField("msgID", msg.ID().Int64).WithError(err).Info("error writing msg status") } // mark our send task as complete - server.Backend().MarkOutgoingMsgComplete(msg) + backend.MarkOutgoingMsgComplete(msg) } } diff --git a/server.go b/server.go index b7a6e388a..e18d9421c 100644 --- a/server.go +++ b/server.go @@ -20,7 +20,7 @@ import ( "github.com/sirupsen/logrus" ) -// Server is the main interface ChannelHandlers use to interact with the database and redis. It provides an +// Server is the main interface ChannelHandlers use to interact with backends. It provides an // abstraction that makes mocking easier for isolated unit tests type Server interface { Config() *config.Courier @@ -29,13 +29,10 @@ type Server interface { AddReceiveMsgRoute(handler ChannelHandler, method string, action string, handlerFunc ChannelReceiveMsgFunc) error AddUpdateStatusRoute(handler ChannelHandler, method string, action string, handlerFunc ChannelUpdateStatusFunc) error - GetChannel(ChannelType, ChannelUUID) (Channel, error) - WriteMsg(*Msg) error - WriteMsgStatus(*MsgStatusUpdate) error - - SendMsg(*Msg) (*MsgStatusUpdate, error) + SendMsg(Msg) (*MsgStatusUpdate, error) Backend() Backend + WaitGroup() *sync.WaitGroup StopChan() chan bool Stopped() bool @@ -159,23 +156,11 @@ func (s *server) Stop() error { return nil } -func (s *server) GetChannel(cType ChannelType, cUUID ChannelUUID) (Channel, error) { - return s.backend.GetChannel(cType, cUUID) -} - -func (s *server) WriteMsg(msg *Msg) error { - return s.backend.WriteMsg(msg) -} - -func (s *server) WriteMsgStatus(status *MsgStatusUpdate) error { - return s.backend.WriteMsgStatus(status) -} - -func (s *server) SendMsg(msg *Msg) (*MsgStatusUpdate, error) { +func (s *server) SendMsg(msg Msg) (*MsgStatusUpdate, error) { // find the handler for this message type - handler, found := activeHandlers[msg.Channel.ChannelType()] + handler, found := activeHandlers[msg.Channel().ChannelType()] if !found { - return nil, fmt.Errorf("unable to find handler for channel type: %s", msg.Channel.ChannelType()) + return nil, fmt.Errorf("unable to find handler for channel type: %s", msg.Channel().ChannelType()) } // have the handler send it @@ -316,7 +301,7 @@ func (s *server) channelReceiveMsgWrapper(handler ChannelHandler, handlerFunc Ch // create channel logs for each of our msgs for _, msg := range msgs { - logs = append(logs, NewChannelLog(channel, msg.ID, url, ww.Status(), err, string(request), prependHeaders(response.String(), ww.Status(), w), elapsed, start)) + logs = append(logs, NewChannelLog(channel, msg.ID(), url, ww.Status(), err, string(request), prependHeaders(response.String(), ww.Status(), w), elapsed, start)) } // and write these out diff --git a/test.go b/test.go index 0d7d92701..8c7ecd2cd 100644 --- a/test.go +++ b/test.go @@ -3,6 +3,8 @@ package courier import ( "errors" + "time" + _ "github.com/lib/pq" // postgres driver "github.com/nyaruka/courier/config" ) @@ -14,7 +16,7 @@ import ( // MockBackend is a mocked version of a backend which doesn't require a real database or cache type MockBackend struct { channels map[ChannelUUID]Channel - queueMsgs []*Msg + queueMsgs []Msg errorOnQueue bool } @@ -24,20 +26,30 @@ func NewMockBackend() *MockBackend { } // GetLastQueueMsg returns the last message queued to the server -func (mb *MockBackend) GetLastQueueMsg() (*Msg, error) { +func (mb *MockBackend) GetLastQueueMsg() (Msg, error) { if len(mb.queueMsgs) == 0 { return nil, ErrMsgNotFound } return mb.queueMsgs[len(mb.queueMsgs)-1], nil } -// PopNextOutgoingMsgs returns the next message that should be sent, or nil if there are none to send -func (mb *MockBackend) PopNextOutgoingMsg() (*Msg, error) { +// NewIncomingMsg creates a new message from the given params +func (mb *MockBackend) NewIncomingMsg(channel Channel, urn URN, text string) Msg { + return &mockMsg{channel: channel, urn: urn, text: text} +} + +// NewOutgoingMsg creates a new outgoing message from the given params +func (mb *MockBackend) NewOutgoingMsg(channel Channel, urn URN, text string) Msg { + return &mockMsg{channel: channel, urn: urn, text: text} +} + +// PopNextOutgoingMsg returns the next message that should be sent, or nil if there are none to send +func (mb *MockBackend) PopNextOutgoingMsg() (Msg, error) { return nil, nil } // MarkOutgoingMsgComplete marks the passed msg as having been dealt with -func (mb *MockBackend) MarkOutgoingMsgComplete(m *Msg) { +func (mb *MockBackend) MarkOutgoingMsgComplete(m Msg) { } // WriteChannelLogs writes the passed in channel logs to the DB @@ -51,7 +63,7 @@ func (mb *MockBackend) SetErrorOnQueue(shouldError bool) { } // WriteMsg queues the passed in message internally -func (mb *MockBackend) WriteMsg(m *Msg) error { +func (mb *MockBackend) WriteMsg(m Msg) error { if mb.errorOnQueue { return errors.New("unable to queue message") } @@ -112,6 +124,7 @@ func init() { // Mock channel implementation //----------------------------------------------------------------------------- +// MockChannel implements the Channel interface and is used in our tests type MockChannel struct { uuid ChannelUUID channelType ChannelType @@ -121,16 +134,27 @@ type MockChannel struct { config map[string]interface{} } -func (c *MockChannel) UUID() ChannelUUID { return c.uuid } +// UUID returns the uuid for this channel +func (c *MockChannel) UUID() ChannelUUID { return c.uuid } + +// ChannelType returns the type of this channel func (c *MockChannel) ChannelType() ChannelType { return c.channelType } -func (c *MockChannel) Scheme() string { return c.scheme } -func (c *MockChannel) Address() string { return c.address } -func (c *MockChannel) Country() string { return c.country } +// Scheme returns the scheme of this channel +func (c *MockChannel) Scheme() string { return c.scheme } + +// Address returns the address of this channel +func (c *MockChannel) Address() string { return c.address } + +// Country returns the country this channel is for (if any) +func (c *MockChannel) Country() string { return c.country } + +// SetConfig sets the passed in config parameter func (c *MockChannel) SetConfig(key string, value interface{}) { c.config[key] = value } +// ConfigForKey returns the config value for the passed in key func (c *MockChannel) ConfigForKey(key string, defaultValue interface{}) interface{} { value, found := c.config[key] if !found { @@ -139,6 +163,7 @@ func (c *MockChannel) ConfigForKey(key string, defaultValue interface{}) interfa return value } +// StringConfigForKey returns the config value for the passed in key func (c *MockChannel) StringConfigForKey(key string, defaultValue string) string { val := c.ConfigForKey(key, defaultValue) str, isStr := val.(string) @@ -162,3 +187,42 @@ func NewMockChannel(uuid string, channelType string, address string, country str } return channel } + +//----------------------------------------------------------------------------- +// Mock msg implementation +//----------------------------------------------------------------------------- + +type mockMsg struct { + channel Channel + id MsgID + uuid MsgUUID + text string + attachments []string + externalID string + urn URN + contactName string + + receivedOn *time.Time + sentOn *time.Time + wiredOn *time.Time +} + +func (m *mockMsg) Channel() Channel { return m.channel } +func (m *mockMsg) ID() MsgID { return m.id } +func (m *mockMsg) UUID() MsgUUID { return m.uuid } +func (m *mockMsg) Text() string { return m.text } +func (m *mockMsg) Attachments() []string { return m.attachments } +func (m *mockMsg) ExternalID() string { return m.externalID } +func (m *mockMsg) URN() URN { return m.urn } +func (m *mockMsg) ContactName() string { return m.contactName } + +func (m *mockMsg) ReceivedOn() *time.Time { return m.receivedOn } +func (m *mockMsg) SentOn() *time.Time { return m.sentOn } +func (m *mockMsg) WiredOn() *time.Time { return m.wiredOn } + +func (m *mockMsg) WithContactName(name string) Msg { m.contactName = name; return m } +func (m *mockMsg) WithReceivedOn(date time.Time) Msg { m.receivedOn = &date; return m } +func (m *mockMsg) WithExternalID(id string) Msg { m.externalID = id; return m } +func (m *mockMsg) WithID(id MsgID) Msg { m.id = id; return m } +func (m *mockMsg) WithUUID(uuid MsgUUID) Msg { m.uuid = uuid; return m } +func (m *mockMsg) WithAttachment(url string) Msg { m.attachments = append(m.attachments, url); return m } From ca8f4e5f53c991b0a4270a1762fbf2b652a28383 Mon Sep 17 00:00:00 2001 From: Nic Pottier Date: Mon, 10 Jul 2017 16:24:19 -0500 Subject: [PATCH 2/2] dedupe incoming messages in a 4 second window --- backends/rapidpro/backend.go | 11 +++- backends/rapidpro/backend_test.go | 11 +++- backends/rapidpro/msg.go | 87 +++++++++++++++++++++++++++++-- msg.go | 6 +++ 4 files changed, 108 insertions(+), 7 deletions(-) diff --git a/backends/rapidpro/backend.go b/backends/rapidpro/backend.go index 4b3e39ef8..3f430ac8c 100644 --- a/backends/rapidpro/backend.go +++ b/backends/rapidpro/backend.go @@ -37,7 +37,16 @@ func (b *backend) GetChannel(ct courier.ChannelType, uuid courier.ChannelUUID) ( // NewIncomingMsg creates a new message from the given params func (b *backend) NewIncomingMsg(channel courier.Channel, urn courier.URN, text string) courier.Msg { - return newMsg(MsgIncoming, channel, urn, text) + msg := newMsg(MsgIncoming, channel, urn, text) + + // have we seen this msg in the past period? + prevUUID := checkMsgSeen(b, msg) + if prevUUID != courier.NilMsgUUID { + // if so, use its UUID and mark that we've been written + msg.UUID_ = prevUUID + msg.AlreadyWritten_ = true + } + return msg } // NewOutgoingMsg creates a new outgoing message from the given params diff --git a/backends/rapidpro/backend_test.go b/backends/rapidpro/backend_test.go index 8448ee5f1..e2f8ae6c9 100644 --- a/backends/rapidpro/backend_test.go +++ b/backends/rapidpro/backend_test.go @@ -242,7 +242,7 @@ func (ts *MsgTestSuite) TestWriteMsg() { // create a new courier msg urn := courier.NewTelURNForChannel("12065551212", knChannel) - msg := newMsg(MsgIncoming, knChannel, urn, "test123").WithExternalID("ext123").WithReceivedOn(now).WithContactName("test contact") + msg := ts.b.NewIncomingMsg(knChannel, urn, "test123").WithExternalID("ext123").WithReceivedOn(now).WithContactName("test contact").(*DBMsg) // try to write it to our db err := ts.b.WriteMsg(msg) @@ -284,6 +284,15 @@ func (ts *MsgTestSuite) TestWriteMsg() { ts.Equal(m.ContactID_, contact.ID) ts.NotNil(contact.UUID) ts.NotNil(contact.ID) + + // creating the incoming msg again should give us the same UUID and have the msg set as not to write + msg2 := ts.b.NewIncomingMsg(knChannel, urn, "test123").(*DBMsg) + ts.Equal(msg2.UUID(), msg.UUID()) + + // waiting 5 seconds should let us write it successfully + time.Sleep(5 * time.Second) + msg3 := ts.b.NewIncomingMsg(knChannel, urn, "test123").(*DBMsg) + ts.NotEqual(msg3.UUID(), msg.UUID()) } func TestMsgSuite(t *testing.T) { diff --git a/backends/rapidpro/msg.go b/backends/rapidpro/msg.go index db864c7e2..9ec0ccea9 100644 --- a/backends/rapidpro/msg.go +++ b/backends/rapidpro/msg.go @@ -12,6 +12,7 @@ import ( "strings" "time" + "github.com/garyburd/redigo/redis" "github.com/nyaruka/courier" "github.com/nyaruka/courier/queue" "github.com/nyaruka/courier/utils" @@ -52,6 +53,11 @@ const ( func writeMsg(b *backend, msg courier.Msg) error { m := msg.(*DBMsg) + // this msg has already been written (we received it twice), we are a no op + if m.AlreadyWritten_ { + return nil + } + // if we have media, go download it to S3 for i, attachment := range m.Attachments_ { if strings.HasPrefix(attachment, "http") { @@ -70,17 +76,18 @@ func writeMsg(b *backend, msg courier.Msg) error { if err != nil { return courier.WriteToSpool(b.config.SpoolDir, "msgs", m) } + + // mark this msg as having been seen + writeMsgSeen(b, m) return err } // newMsg creates a new DBMsg object with the passed in parameters -func newMsg(direction MsgDirection, channel courier.Channel, urn courier.URN, text string) courier.Msg { +func newMsg(direction MsgDirection, channel courier.Channel, urn courier.URN, text string) *DBMsg { now := time.Now() dbChannel := channel.(*DBChannel) return &DBMsg{ - Channel_: channel, - OrgID_: dbChannel.OrgID(), UUID_: courier.NewMsgUUID(), Direction_: direction, @@ -99,6 +106,10 @@ func newMsg(direction MsgDirection, channel courier.Channel, urn courier.URN, te CreatedOn_: now, ModifiedOn_: now, QueuedOn_: now, + + Channel_: channel, + WorkerToken_: "", + AlreadyWritten_: false, } } @@ -226,6 +237,66 @@ func (b *backend) flushMsgFile(filename string, contents []byte) error { return err } +//----------------------------------------------------------------------------- +// Deduping utility methods +//----------------------------------------------------------------------------- + +var luaMsgSeen = redis.NewScript(3, `-- KEYS: [Window, PrevWindow, Fingerprint] + -- try to look up in window + local uuid = redis.call("hget", KEYS[1], KEYS[3]) + + -- didn't find it, try in our previous window + if not uuid then + uuid = redis.call("hget", KEYS[2], KEYS[3]) + end + + -- return the uuid found if any + return uuid +`) + +// checkMsgSeen tries to look up whether a msg with the fingerprint passed in was seen in window or prevWindow. If +// found returns the UUID of that msg, if not returns empty string +func checkMsgSeen(b *backend, msg *DBMsg) courier.MsgUUID { + r := b.redisPool.Get() + defer r.Close() + + fingerprint := msg.fingerprint() + + now := time.Now().In(time.UTC) + prev := now.Add(time.Second * -2) + windowKey := fmt.Sprintf("seen:msgs:%s:%02d", now.Format("2006-01-02-15:05"), now.Second()/2*2) + prevWindowKey := fmt.Sprintf("seen:msgs:%s:%02d", prev.Format("2006-01-02-15:05"), prev.Second()/2*2) + + // try to look up our UUID from either window or prev window + foundUUID, _ := redis.String(luaMsgSeen.Do(r, windowKey, prevWindowKey, fingerprint)) + if foundUUID != "" { + return courier.NewMsgUUIDFromString(foundUUID) + } + return courier.NilMsgUUID +} + +var luaWriteMsgSeen = redis.NewScript(3, `-- KEYS: [Window, Fingerprint, UUID] + redis.call("hset", KEYS[1], KEYS[2], KEYS[3]) + redis.call("pexpire", KEYS[1], 5000) +`) + +// writeMsgSeen writes that the message with the passed in fingerprint and UUID was seen in the +// passed in window +func writeMsgSeen(b *backend, msg *DBMsg) { + r := b.redisPool.Get() + defer r.Close() + + fingerprint := msg.fingerprint() + now := time.Now().In(time.UTC) + windowKey := fmt.Sprintf("seen:msgs:%s:%02d", now.Format("2006-01-02-15:05"), now.Second()/2*2) + + luaWriteMsgSeen.Do(r, windowKey, fingerprint, msg.UUID().String()) +} + +//----------------------------------------------------------------------------- +// Our implementation of Msg interface +//----------------------------------------------------------------------------- + // DBMsg is our base struct to represent msgs both in our JSON and db representations type DBMsg struct { OrgID_ OrgID `json:"org_id" db:"org_id"` @@ -256,8 +327,9 @@ type DBMsg struct { QueuedOn_ time.Time `json:"queued_on" db:"queued_on"` SentOn_ time.Time `json:"sent_on" db:"sent_on"` - Channel_ courier.Channel - WorkerToken_ queue.WorkerToken + Channel_ courier.Channel + WorkerToken_ queue.WorkerToken + AlreadyWritten_ bool } func (m *DBMsg) Channel() courier.Channel { return m.Channel_ } @@ -272,6 +344,11 @@ func (m *DBMsg) ContactName() string { return m.ContactName_ } func (m *DBMsg) ReceivedOn() *time.Time { return &m.SentOn_ } func (m *DBMsg) SentOn() *time.Time { return &m.SentOn_ } +// fingerprint returns a fingerprint for this msg, suitable for figuring out if this is a dupe +func (m *DBMsg) fingerprint() string { + return fmt.Sprintf("%s:%s:%s", m.Channel_.UUID(), m.URN_, m.Text_) +} + // WithContactName can be used to set the contact name on a msg func (m *DBMsg) WithContactName(name string) courier.Msg { m.ContactName_ = name; return m } diff --git a/msg.go b/msg.go index d1129aa29..356f12600 100644 --- a/msg.go +++ b/msg.go @@ -62,6 +62,12 @@ func NewMsgUUID() MsgUUID { return MsgUUID{uuid.NewV4()} } +// NewMsgUUIDFromString creates a new message UUID for the passed in string +func NewMsgUUIDFromString(uuidString string) MsgUUID { + uuid, _ := uuid.FromString(uuidString) + return MsgUUID{uuid} +} + //----------------------------------------------------------------------------- // Msg interface //-----------------------------------------------------------------------------