From a5bd01c5f0d02e378fe854845945361ea3ec4d05 Mon Sep 17 00:00:00 2001 From: Rafael Soares Date: Thu, 7 Nov 2024 12:02:37 -0300 Subject: [PATCH 1/3] add metadata tot billing msgs --- backends/rapidpro/backend_test.go | 10 +++++++--- billing/billing.go | 4 +++- billing/billing_test.go | 3 +++ handlers/facebookapp/facebookapp.go | 1 + sender.go | 1 + server.go | 1 + 6 files changed, 16 insertions(+), 4 deletions(-) diff --git a/backends/rapidpro/backend_test.go b/backends/rapidpro/backend_test.go index 9effe45a2..e220eb8ff 100644 --- a/backends/rapidpro/backend_test.go +++ b/backends/rapidpro/backend_test.go @@ -827,10 +827,14 @@ func (ts *BackendTestSuite) TestOutgoingQueue() { ts.NotNil(dbMsg) // serialize our message - msgJSON, err := json.Marshal([]interface{}{dbMsg}) - ts.NoError(err) + // msgJSON, err := json.Marshal([]interface{}{dbMsg}) + // ts.NoError(err) + var err error - err = queue.PushOntoQueue(r, msgQueueName, "dbc126ed-66bc-4e28-b67b-81dc3327c95d", 10, string(msgJSON), queue.HighPriority) + // msgStrJSON := "[{\"org_id\":1,\"id\":10000,\"uuid\":\"00000000-0000-0000-0000-000000000000\",\"direction\":\"O\",\"status\":\"F\",\"visibility\":\"V\",\"high_priority\":true,\"urn\":\"\",\"urn_auth\":\"\",\"text\":\"test message\",\"attachments\":null,\"external_id\":\"ext1\",\"response_to_id\":null,\"response_to_external_id\":\"\",\"metadata\":\"{\"ticketer_id\":1}\",\"channel_id\":10,\"contact_id\":100,\"contact_urn_id\":1000,\"msg_count\":1,\"error_count\":3,\"channel_uuid\":\"dbc126ed-66bc-4e28-b67b-81dc3327c95d\",\"contact_name\":\"\",\"next_attempt\":\"2024-11-06T20:45:31.123208Z\",\"created_on\":\"2024-11-06T20:30:14.898168Z\",\"modified_on\":\"2024-11-06T20:30:31.122974Z\",\"queued_on\":\"2024-11-06T20:30:14.898168Z\",\"sent_on\":null}]" + msgStrJSON := `[{"org_id":1,"id":10000,"uuid":"00000000-0000-0000-0000-000000000000","direction":"O","status":"F","visibility":"V","high_priority":true,"urn":"","urn_auth":"","text":"test message","attachments":null,"external_id":"ext1","response_to_id":null,"response_to_external_id":"","metadata":"{\"ticketer_id\":1}","channel_id":10,"contact_id":100,"contact_urn_id":1000,"msg_count":1,"error_count":3,"channel_uuid":"dbc126ed-66bc-4e28-b67b-81dc3327c95d","contact_name":"","next_attempt":"2024-11-06T20:45:31.123208Z","created_on":"2024-11-06T20:30:14.898168Z","modified_on":"2024-11-06T20:30:31.122974Z","queued_on":"2024-11-06T20:30:14.898168Z","sent_on":null}]` + + err = queue.PushOntoQueue(r, msgQueueName, "dbc126ed-66bc-4e28-b67b-81dc3327c95d", 10, msgStrJSON, queue.HighPriority) ts.NoError(err) // pop a message off our queue diff --git a/billing/billing.go b/billing/billing.go index 4afba00f7..a94a3050e 100644 --- a/billing/billing.go +++ b/billing/billing.go @@ -31,10 +31,11 @@ type Message struct { Text string `json:"text,omitempty"` Attachments []string `json:"attachments,omitempty"` QuickReplies []string `json:"quick_replies,omitempty"` + Metadata string `json:"metadata"` } // Create a new message -func NewMessage(contactURN, contactUUID, channelUUID, messageID, messageDate, direction, channelType, text string, attachments, quickreplies []string) Message { +func NewMessage(contactURN, contactUUID, channelUUID, messageID, messageDate, direction, channelType, text string, attachments, quickreplies []string, metadata string) Message { return Message{ ContactURN: contactURN, ContactUUID: contactUUID, @@ -46,6 +47,7 @@ func NewMessage(contactURN, contactUUID, channelUUID, messageID, messageDate, di Text: text, Attachments: attachments, QuickReplies: quickreplies, + Metadata: metadata, } } diff --git a/billing/billing_test.go b/billing/billing_test.go index 72df5baa5..b4eb81dc3 100644 --- a/billing/billing_test.go +++ b/billing/billing_test.go @@ -53,6 +53,7 @@ func TestBillingResilientClient(t *testing.T) { "hello", nil, nil, + "", ) billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestQueueName) @@ -117,6 +118,7 @@ func TestBillingResilientClientSendAsync(t *testing.T) { "hello", nil, nil, + "", ) billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestQueueName) @@ -181,6 +183,7 @@ func TestBillingResilientClientSendAsyncWithPanic(t *testing.T) { "hello", nil, nil, + "", ) billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestQueueName) diff --git a/handlers/facebookapp/facebookapp.go b/handlers/facebookapp/facebookapp.go index 1d4bc6aa1..973f864ee 100644 --- a/handlers/facebookapp/facebookapp.go +++ b/handlers/facebookapp/facebookapp.go @@ -717,6 +717,7 @@ func (h *handler) processCloudWhatsAppPayload(ctx context.Context, channel couri "", nil, nil, + "", ) h.Server().Billing().SendAsync(billingMsg, nil, nil) } diff --git a/sender.go b/sender.go index 9b8d3f889..4b4b31ff9 100644 --- a/sender.go +++ b/sender.go @@ -305,6 +305,7 @@ func (w *Sender) sendMessage(msg Msg) { msg.Text(), msg.Attachments(), msg.QuickReplies(), + string(msg.Metadata()), ) if w.foreman.server.Billing() != nil { w.foreman.server.Billing().SendAsync(billingMsg, nil, nil) diff --git a/server.go b/server.go index 84f065c40..8e3b4b1de 100644 --- a/server.go +++ b/server.go @@ -574,6 +574,7 @@ func handleBilling(s *server, msg Msg) error { msg.Text(), msg.Attachments(), msg.QuickReplies(), + "", ) billingMsg.ChannelType = string(msg.Channel().ChannelType()) billingMsg.Text = msg.Text() From 125f932160d6978c46710a725cc6d05cdda67b1b Mon Sep 17 00:00:00 2001 From: Rafael Soares Date: Fri, 8 Nov 2024 17:39:03 -0300 Subject: [PATCH 2/3] refactor billing to send to exchange instead queue --- billing/billing.go | 50 ++++++++--------- billing/billing_test.go | 87 +++++++++++++++++++++++------ cmd/courier/main.go | 2 +- config.go | 4 +- handler_test.go | 2 +- handlers/facebookapp/facebookapp.go | 4 +- sender.go | 40 +++++++------ server.go | 4 +- server_test.go | 2 +- 9 files changed, 125 insertions(+), 70 deletions(-) diff --git a/billing/billing.go b/billing/billing.go index a94a3050e..2e26b0cad 100644 --- a/billing/billing.go +++ b/billing/billing.go @@ -12,6 +12,11 @@ import ( "github.com/sirupsen/logrus" ) +const ( + RoutingKeyCreate = "create" + RoutingKeyUpdate = "status-update" +) + // Message represents a object that is sent to the billing service // // { @@ -31,11 +36,11 @@ type Message struct { Text string `json:"text,omitempty"` Attachments []string `json:"attachments,omitempty"` QuickReplies []string `json:"quick_replies,omitempty"` - Metadata string `json:"metadata"` + FromTicketer bool `json:"from_ticketer"` } // Create a new message -func NewMessage(contactURN, contactUUID, channelUUID, messageID, messageDate, direction, channelType, text string, attachments, quickreplies []string, metadata string) Message { +func NewMessage(contactURN, contactUUID, channelUUID, messageID, messageDate, direction, channelType, text string, attachments, quickreplies []string, fromTicketer bool) Message { return Message{ ContactURN: contactURN, ContactUUID: contactUUID, @@ -47,25 +52,25 @@ func NewMessage(contactURN, contactUUID, channelUUID, messageID, messageDate, di Text: text, Attachments: attachments, QuickReplies: quickreplies, - Metadata: metadata, + FromTicketer: fromTicketer, } } // Client represents a client interface for billing service type Client interface { - Send(msg Message) error - SendAsync(msg Message, pre func(), post func()) + Send(msg Message, routingKey string) error + SendAsync(msg Message, routingKey string, pre func(), post func()) } // rabbitmqRetryClient represents struct that implements billing service client interface type rabbitmqRetryClient struct { - publisher rabbitroutine.Publisher - conn *rabbitroutine.Connector - queueName string + publisher rabbitroutine.Publisher + conn *rabbitroutine.Connector + exchangeName string } // NewRMQBillingResilientClient creates a new billing service client implementation using RabbitMQ with publish retry and reconnect features -func NewRMQBillingResilientClient(url string, retryAttempts int, retryDelay int, queueName string) (Client, error) { +func NewRMQBillingResilientClient(url string, retryAttempts int, retryDelay int, exchangeName string) (Client, error) { cconn, err := amqp.Dial(url) if err != nil { return nil, err @@ -77,17 +82,6 @@ func NewRMQBillingResilientClient(url string, retryAttempts int, retryDelay int, return nil, errors.Wrap(err, "failed to open a channel to rabbitmq") } defer ch.Close() - _, err = ch.QueueDeclare( - queueName, - false, - false, - false, - false, - nil, - ) - if err != nil { - return nil, errors.Wrap(err, "failed to declare a queue for billing publisher") - } conn := rabbitroutine.NewConnector(rabbitroutine.Config{ ReconnectAttempts: 1000, @@ -125,19 +119,19 @@ func NewRMQBillingResilientClient(url string, retryAttempts int, retryDelay int, }() return &rabbitmqRetryClient{ - publisher: pub, - conn: conn, - queueName: queueName, + publisher: pub, + conn: conn, + exchangeName: exchangeName, }, nil } -func (c *rabbitmqRetryClient) Send(msg Message) error { +func (c *rabbitmqRetryClient) Send(msg Message, routingKey string) error { msgMarshalled, _ := json.Marshal(msg) ctx := context.Background() err := c.publisher.Publish( ctx, - "", - c.queueName, + c.exchangeName, + routingKey, amqp.Publishing{ ContentType: "application/json", Body: msgMarshalled, @@ -149,7 +143,7 @@ func (c *rabbitmqRetryClient) Send(msg Message) error { return nil } -func (c *rabbitmqRetryClient) SendAsync(msg Message, pre func(), post func()) { +func (c *rabbitmqRetryClient) SendAsync(msg Message, routingKey string, pre func(), post func()) { go func() { defer func() { if r := recover(); r != nil { @@ -159,7 +153,7 @@ func (c *rabbitmqRetryClient) SendAsync(msg Message, pre func(), post func()) { if pre != nil { pre() } - err := c.Send(msg) + err := c.Send(msg, routingKey) if err != nil { logrus.WithError(err).Error("fail to send msg to billing service") } diff --git a/billing/billing_test.go b/billing/billing_test.go index b4eb81dc3..587af42dc 100644 --- a/billing/billing_test.go +++ b/billing/billing_test.go @@ -13,7 +13,48 @@ import ( "github.com/stretchr/testify/assert" ) -const billingTestQueueName = "testqueue" +const ( + billingTestExchangeName = "test-exchange" + billingTestQueueName = "test-queue" +) + +func initalizeRMQ(ch *amqp.Channel) { + err := ch.ExchangeDeclare( + billingTestExchangeName, + "topic", + true, + false, + false, + false, + nil, + ) + if err != nil { + log.Fatal(err) + } + + _, err = ch.QueueDeclare( + billingTestQueueName, + true, + false, + false, + false, + nil, + ) + if err != nil { + log.Fatal(errors.Wrap(err, "failed to declare a queue for billing publisher")) + } + + err = ch.QueueBind( + billingTestQueueName, + "#", + billingTestExchangeName, + false, + nil, + ) + if err != nil { + log.Fatal(err) + } +} func TestInitialization(t *testing.T) { connURL := "amqp://localhost:5672/" @@ -25,9 +66,13 @@ func TestInitialization(t *testing.T) { if err != nil { log.Fatal(err) } - defer conn.Close() - defer ch.Close() + + initalizeRMQ(ch) + defer ch.QueueDelete(billingTestQueueName, false, false, false) + defer ch.ExchangeDelete(billingTestExchangeName, false, false) + defer ch.Close() + defer conn.Close() } func TestBillingResilientClient(t *testing.T) { @@ -38,8 +83,11 @@ func TestBillingResilientClient(t *testing.T) { if err != nil { t.Fatal(errors.Wrap(err, "failed to declare a channel for consumer")) } - defer ch.Close() + initalizeRMQ(ch) defer ch.QueueDelete(billingTestQueueName, false, false, false) + defer ch.ExchangeDelete(billingTestExchangeName, false, false) + defer ch.Close() + defer conn.Close() msgUUID, _ := uuid.NewV4() msg := NewMessage( @@ -53,13 +101,13 @@ func TestBillingResilientClient(t *testing.T) { "hello", nil, nil, - "", + false, ) - billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestQueueName) + billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestExchangeName) time.Sleep(1 * time.Second) assert.NoError(t, err) - err = billingClient.Send(msg) + err = billingClient.Send(msg, RoutingKeyCreate) assert.NoError(t, err) msgs, err := ch.Consume( @@ -103,8 +151,11 @@ func TestBillingResilientClientSendAsync(t *testing.T) { if err != nil { t.Fatal(errors.Wrap(err, "failed to declare a channel for consumer")) } - defer ch.Close() + initalizeRMQ(ch) defer ch.QueueDelete(billingTestQueueName, false, false, false) + defer ch.ExchangeDelete(billingTestExchangeName, false, false) + defer ch.Close() + defer conn.Close() msgUUID, _ := uuid.NewV4() msg := NewMessage( @@ -118,15 +169,16 @@ func TestBillingResilientClientSendAsync(t *testing.T) { "hello", nil, nil, - "", + false, ) - billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestQueueName) + billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestExchangeName) time.Sleep(1 * time.Second) assert.NoError(t, err) - billingClient.SendAsync(msg, nil, nil) - + // billingClient.SendAsync(msg, RoutingKeyCreate, nil, nil) + err = billingClient.Send(msg, RoutingKeyCreate) assert.NoError(t, err) + msgs, err := ch.Consume( billingTestQueueName, "", @@ -168,8 +220,11 @@ func TestBillingResilientClientSendAsyncWithPanic(t *testing.T) { if err != nil { t.Fatal(errors.Wrap(err, "failed to declare a channel for consumer")) } - defer ch.Close() + initalizeRMQ(ch) defer ch.QueueDelete(billingTestQueueName, false, false, false) + defer ch.ExchangeDelete(billingTestExchangeName, false, false) + defer ch.Close() + defer conn.Close() msgUUID, _ := uuid.NewV4() msg := NewMessage( @@ -183,14 +238,14 @@ func TestBillingResilientClientSendAsyncWithPanic(t *testing.T) { "hello", nil, nil, - "", + false, ) - billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestQueueName) + billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestExchangeName) time.Sleep(1 * time.Second) assert.NoError(t, err) time.Sleep(1 * time.Second) - billingClient.SendAsync(msg, nil, func() { panic("test panic") }) + billingClient.SendAsync(msg, RoutingKeyCreate, nil, func() { panic("test panic") }) assert.NoError(t, err) msgs, err := ch.Consume( diff --git a/cmd/courier/main.go b/cmd/courier/main.go index ed2419328..ad23b9a38 100644 --- a/cmd/courier/main.go +++ b/cmd/courier/main.go @@ -123,7 +123,7 @@ func main() { if config.RabbitmqURL != "" { billingClient, err := billing.NewRMQBillingResilientClient( - config.RabbitmqURL, config.RabbitmqRetryPubAttempts, config.RabbitmqRetryPubDelay, config.BillingQueueName) + config.RabbitmqURL, config.RabbitmqRetryPubAttempts, config.RabbitmqRetryPubDelay, config.BillingExchangeName) if err != nil { logrus.Fatalf("Error creating billing RabbitMQ client: %v", err) } diff --git a/config.go b/config.go index a74a477ba..208d024d1 100644 --- a/config.go +++ b/config.go @@ -56,7 +56,7 @@ type Config struct { RabbitmqURL string `help:"rabbitmq url"` RabbitmqRetryPubAttempts int `help:"rabbitmq retry attempts"` RabbitmqRetryPubDelay int `help:"rabbitmq retry delay"` - BillingQueueName string `help:"billing queue name"` + BillingExchangeName string `help:"billing exchange name"` EmailProxyURL string `help:"email proxy url"` EmailProxyAuthToken string `help:"email proxy auth token"` @@ -91,7 +91,7 @@ func NewConfig() *Config { WaitMediaChannels: []string{}, RabbitmqRetryPubAttempts: 3, RabbitmqRetryPubDelay: 1000, - BillingQueueName: "billing-backup", + BillingExchangeName: "msgs.topic", EmailProxyURL: "http://localhost:9090", EmailProxyAuthToken: "", } diff --git a/handler_test.go b/handler_test.go index b61667ad1..09c9b405b 100644 --- a/handler_test.go +++ b/handler_test.go @@ -45,7 +45,7 @@ func (h *dummyHandler) Initialize(s Server) error { "amqp://localhost:5672/", 3, 100, - s.Config().BillingQueueName, + s.Config().BillingExchangeName, ) if err != nil { logrus.Fatalf("Error creating billing RabbitMQ client: %v", err) diff --git a/handlers/facebookapp/facebookapp.go b/handlers/facebookapp/facebookapp.go index 973f864ee..f1b1fb494 100644 --- a/handlers/facebookapp/facebookapp.go +++ b/handlers/facebookapp/facebookapp.go @@ -717,9 +717,9 @@ func (h *handler) processCloudWhatsAppPayload(ctx context.Context, channel couri "", nil, nil, - "", + false, ) - h.Server().Billing().SendAsync(billingMsg, nil, nil) + h.Server().Billing().SendAsync(billingMsg, billing.RoutingKeyUpdate, nil, nil) } } } diff --git a/sender.go b/sender.go index 4b4b31ff9..b0de7fc36 100644 --- a/sender.go +++ b/sender.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/buger/jsonparser" "github.com/nyaruka/courier/billing" "github.com/nyaruka/courier/utils" "github.com/nyaruka/librato" @@ -292,23 +293,28 @@ func (w *Sender) sendMessage(msg Msg) { librato.Gauge(fmt.Sprintf("courier.msg_send_%s", msg.Channel().ChannelType()), secondDuration) } - if status.Status() != MsgErrored && status.Status() != MsgFailed { - if msg.Channel().ChannelType() != "WAC" { - billingMsg := billing.NewMessage( - string(msg.URN().Identity()), - "", - msg.Channel().UUID().String(), - msg.ExternalID(), - time.Now().Format(time.RFC3339), - "O", - msg.Channel().ChannelType().String(), - msg.Text(), - msg.Attachments(), - msg.QuickReplies(), - string(msg.Metadata()), - ) - if w.foreman.server.Billing() != nil { - w.foreman.server.Billing().SendAsync(billingMsg, nil, nil) + sentWithSuccess := status.Status() != MsgErrored && status.Status() != MsgFailed + if sentWithSuccess { + if w.foreman.server.Billing() != nil { + // if ticketer_type is eg: "wenichats" it is a message from ticketer sent by an agent, so must be sent to billing anyway + ticketerType, _ := jsonparser.GetString(msg.Metadata(), "ticketer_type") + fromTicketer := ticketerType != "" + + if fromTicketer || msg.Channel().ChannelType() != "WAC" { + billingMsg := billing.NewMessage( + string(msg.URN().Identity()), + "", + msg.Channel().UUID().String(), + msg.ExternalID(), + time.Now().Format(time.RFC3339), + "O", + msg.Channel().ChannelType().String(), + msg.Text(), + msg.Attachments(), + msg.QuickReplies(), + fromTicketer, + ) + w.foreman.server.Billing().SendAsync(billingMsg, billing.RoutingKeyCreate, nil, nil) } } } diff --git a/server.go b/server.go index 8e3b4b1de..4e1a4464c 100644 --- a/server.go +++ b/server.go @@ -574,7 +574,7 @@ func handleBilling(s *server, msg Msg) error { msg.Text(), msg.Attachments(), msg.QuickReplies(), - "", + false, ) billingMsg.ChannelType = string(msg.Channel().ChannelType()) billingMsg.Text = msg.Text() @@ -582,7 +582,7 @@ func handleBilling(s *server, msg Msg) error { billingMsg.QuickReplies = msg.QuickReplies() if s.Billing() != nil { - s.Billing().SendAsync(billingMsg, nil, nil) + s.Billing().SendAsync(billingMsg, billing.RoutingKeyCreate, nil, nil) } return nil diff --git a/server_test.go b/server_test.go index 5eacc8d03..aed965226 100644 --- a/server_test.go +++ b/server_test.go @@ -25,7 +25,7 @@ func TestServer(t *testing.T) { "amqp://localhost:5672/", config.RabbitmqRetryPubAttempts, config.RabbitmqRetryPubDelay, - config.BillingQueueName, + config.BillingExchangeName, ) if err != nil { logrus.Fatalf("Error creating billing RabbitMQ client: %v", err) From 8dec1d3442f00c69cbe44770b5253d5778964b27 Mon Sep 17 00:00:00 2001 From: Rafael Soares Date: Thu, 28 Nov 2024 12:16:36 -0300 Subject: [PATCH 3/3] on send msg, only send for billing if is not for WAC --- backends/rapidpro/backend_test.go | 4 ---- sender.go | 37 ++++++++++++++----------------- 2 files changed, 17 insertions(+), 24 deletions(-) diff --git a/backends/rapidpro/backend_test.go b/backends/rapidpro/backend_test.go index e220eb8ff..9d884a7fe 100644 --- a/backends/rapidpro/backend_test.go +++ b/backends/rapidpro/backend_test.go @@ -826,12 +826,8 @@ func (ts *BackendTestSuite) TestOutgoingQueue() { dbMsg.ChannelUUID_, _ = courier.NewChannelUUID("dbc126ed-66bc-4e28-b67b-81dc3327c95d") ts.NotNil(dbMsg) - // serialize our message - // msgJSON, err := json.Marshal([]interface{}{dbMsg}) - // ts.NoError(err) var err error - // msgStrJSON := "[{\"org_id\":1,\"id\":10000,\"uuid\":\"00000000-0000-0000-0000-000000000000\",\"direction\":\"O\",\"status\":\"F\",\"visibility\":\"V\",\"high_priority\":true,\"urn\":\"\",\"urn_auth\":\"\",\"text\":\"test message\",\"attachments\":null,\"external_id\":\"ext1\",\"response_to_id\":null,\"response_to_external_id\":\"\",\"metadata\":\"{\"ticketer_id\":1}\",\"channel_id\":10,\"contact_id\":100,\"contact_urn_id\":1000,\"msg_count\":1,\"error_count\":3,\"channel_uuid\":\"dbc126ed-66bc-4e28-b67b-81dc3327c95d\",\"contact_name\":\"\",\"next_attempt\":\"2024-11-06T20:45:31.123208Z\",\"created_on\":\"2024-11-06T20:30:14.898168Z\",\"modified_on\":\"2024-11-06T20:30:31.122974Z\",\"queued_on\":\"2024-11-06T20:30:14.898168Z\",\"sent_on\":null}]" msgStrJSON := `[{"org_id":1,"id":10000,"uuid":"00000000-0000-0000-0000-000000000000","direction":"O","status":"F","visibility":"V","high_priority":true,"urn":"","urn_auth":"","text":"test message","attachments":null,"external_id":"ext1","response_to_id":null,"response_to_external_id":"","metadata":"{\"ticketer_id\":1}","channel_id":10,"contact_id":100,"contact_urn_id":1000,"msg_count":1,"error_count":3,"channel_uuid":"dbc126ed-66bc-4e28-b67b-81dc3327c95d","contact_name":"","next_attempt":"2024-11-06T20:45:31.123208Z","created_on":"2024-11-06T20:30:14.898168Z","modified_on":"2024-11-06T20:30:31.122974Z","queued_on":"2024-11-06T20:30:14.898168Z","sent_on":null}]` err = queue.PushOntoQueue(r, msgQueueName, "dbc126ed-66bc-4e28-b67b-81dc3327c95d", 10, msgStrJSON, queue.HighPriority) diff --git a/sender.go b/sender.go index b0de7fc36..115875d73 100644 --- a/sender.go +++ b/sender.go @@ -293,29 +293,26 @@ func (w *Sender) sendMessage(msg Msg) { librato.Gauge(fmt.Sprintf("courier.msg_send_%s", msg.Channel().ChannelType()), secondDuration) } - sentWithSuccess := status.Status() != MsgErrored && status.Status() != MsgFailed - if sentWithSuccess { - if w.foreman.server.Billing() != nil { + sentOk := status.Status() != MsgErrored && status.Status() != MsgFailed + if sentOk && w.foreman.server.Billing() != nil { + if msg.Channel().ChannelType() != "WAC" { // if ticketer_type is eg: "wenichats" it is a message from ticketer sent by an agent, so must be sent to billing anyway ticketerType, _ := jsonparser.GetString(msg.Metadata(), "ticketer_type") fromTicketer := ticketerType != "" - - if fromTicketer || msg.Channel().ChannelType() != "WAC" { - billingMsg := billing.NewMessage( - string(msg.URN().Identity()), - "", - msg.Channel().UUID().String(), - msg.ExternalID(), - time.Now().Format(time.RFC3339), - "O", - msg.Channel().ChannelType().String(), - msg.Text(), - msg.Attachments(), - msg.QuickReplies(), - fromTicketer, - ) - w.foreman.server.Billing().SendAsync(billingMsg, billing.RoutingKeyCreate, nil, nil) - } + billingMsg := billing.NewMessage( + string(msg.URN().Identity()), + "", + msg.Channel().UUID().String(), + msg.ExternalID(), + time.Now().Format(time.RFC3339), + "O", + msg.Channel().ChannelType().String(), + msg.Text(), + msg.Attachments(), + msg.QuickReplies(), + fromTicketer, + ) + w.foreman.server.Billing().SendAsync(billingMsg, billing.RoutingKeyCreate, nil, nil) } } }