Skip to content

Commit

Permalink
Refactor metrics so that everything is sent from Heartbeat in the bac…
Browse files Browse the repository at this point in the history
…kend
  • Loading branch information
rowanseymour committed Dec 17, 2024
1 parent 68d55f9 commit fa48cc8
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 114 deletions.
15 changes: 6 additions & 9 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/urns"
)
Expand Down Expand Up @@ -68,7 +67,7 @@ type Backend interface {
// WriteChannelLog writes the passed in channel log to our backend
WriteChannelLog(context.Context, *ChannelLog) error

// PopNextOutgoingMsg returns the next message that needs to be sent, callers should call MarkOutgoingMsgComplete with the
// PopNextOutgoingMsg returns the next message that needs to be sent, callers should call OnSendComplete with the
// returned message when they have dealt with the message (regardless of whether it was sent or not)
PopNextOutgoingMsg(context.Context) (MsgOut, error)

Expand All @@ -80,10 +79,11 @@ type Backend interface {
// a message is being forced in being resent by a user
ClearMsgSent(context.Context, MsgID) error

// MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case
// of errors during sending as it will manage the number of active workers per channel. The status parameter can be
// used to determine any sort of deduping of msg sends
MarkOutgoingMsgComplete(context.Context, MsgOut, StatusUpdate)
// OnSendComplete is called when the sender has finished trying to send a message
OnSendComplete(context.Context, MsgOut, StatusUpdate, *ChannelLog)

// OnReceiveComplete is called when the server has finished handling an incoming request
OnReceiveComplete(context.Context, Channel, []Event, *ChannelLog)

// SaveAttachment saves an attachment to backend storage
SaveAttachment(context.Context, Channel, string, []byte, string) (string, error)
Expand All @@ -106,9 +106,6 @@ type Backend interface {

// RedisPool returns the redisPool for this backend
RedisPool() *redis.Pool

// CloudWatch return the CloudWatch service for this backend
CloudWatch() *cwatch.Service
}

// Media is a resolved media object that can be used as a message attachment
Expand Down
95 changes: 58 additions & 37 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,6 @@ func init() {
courier.RegisterBackend("rapidpro", newBackend)
}

type stats struct {
// both sqlx and redis provide wait stats which are cummulative that we need to convert into increments by
// tracking their previous values
dbWaitDuration time.Duration
redisWaitDuration time.Duration
}

type backend struct {
config *courier.Config

Expand Down Expand Up @@ -94,7 +87,7 @@ type backend struct {
// tracking of external ids of messages we've sent in case we need one before its status update has been written
sentExternalIDs *redisx.IntervalHash

stats stats
stats *StatsCollector
}

// NewBackend creates a new RapidPro backend
Expand Down Expand Up @@ -131,6 +124,8 @@ func newBackend(cfg *courier.Config) courier.Backend {
receivedExternalIDs: redisx.NewIntervalHash("seen-external-ids", time.Hour*24, 2), // 24 - 48 hours
sentIDs: redisx.NewIntervalSet("sent-ids", time.Hour, 2), // 1 - 2 hours
sentExternalIDs: redisx.NewIntervalHash("sent-external-ids", time.Hour, 2), // 1 - 2 hours

stats: NewStatsCollector(),
}
}

Expand Down Expand Up @@ -194,7 +189,6 @@ func (b *backend) Start() error {
if err != nil {
return err
}
b.cw.StartQueue(time.Second * 3)

// check attachment bucket access
if err := b.s3.Test(ctx, b.config.S3AttachmentsBucket); err != nil {
Expand Down Expand Up @@ -253,8 +247,6 @@ func (b *backend) Stop() error {
// wait for our threads to exit
b.waitGroup.Wait()

// stop cloudwatch service
b.cw.StopQueue()
return nil
}

Expand Down Expand Up @@ -464,8 +456,8 @@ func (b *backend) ClearMsgSent(ctx context.Context, id courier.MsgID) error {
return b.sentIDs.Rem(rc, id.String())
}

// MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel
func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.MsgOut, status courier.StatusUpdate) {
// OnSendComplete is called when the sender has finished trying to send a message
func (b *backend) OnSendComplete(ctx context.Context, msg courier.MsgOut, status courier.StatusUpdate, clog *courier.ChannelLog) {
rc := b.rp.Get()
defer rc.Close()

Expand All @@ -489,6 +481,33 @@ func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.MsgOu
slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_)
}
}

if wasSuccess {
b.stats.RecordSendSuccess(msg.Channel().ChannelType())
} else {
b.stats.RecordSendError(msg.Channel().ChannelType())
}

Check warning on line 489 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L488-L489

Added lines #L488 - L489 were not covered by tests

b.stats.RecordSendDuration(msg.Channel().ChannelType(), clog.Elapsed)
}

// OnReceiveComplete is called when the server has finished handling an incoming request
func (b *backend) OnReceiveComplete(ctx context.Context, ch courier.Channel, events []courier.Event, clog *courier.ChannelLog) {
for _, event := range events {
switch event.(type) {
case courier.MsgIn:
b.stats.RecordReceiveMessage(ch.ChannelType())
case courier.StatusUpdate:
b.stats.RecordReceiveStatus(ch.ChannelType())
case courier.ChannelEvent:
b.stats.RecordReceiveEvent(ch.ChannelType())

Check warning on line 503 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L495-L503

Added lines #L495 - L503 were not covered by tests
}
}
if len(events) == 0 {
b.stats.RecordReceiveIgnored(ch.ChannelType())
}

Check warning on line 508 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L506-L508

Added lines #L506 - L508 were not covered by tests

b.stats.RecordReceiveDuration(ch.ChannelType(), clog.Elapsed)

Check warning on line 510 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L510

Added line #L510 was not covered by tests
}

// WriteMsg writes the passed in message to our store
Expand Down Expand Up @@ -737,7 +756,6 @@ func (b *backend) Health() string {
return health.String()
}

// Heartbeat is called every minute, we log our queue depth to librato
func (b *backend) Heartbeat() error {
rc := b.rp.Get()
defer rc.Close()
Expand Down Expand Up @@ -774,34 +792,42 @@ func (b *backend) Heartbeat() error {
dbStats := b.db.Stats()
redisStats := b.rp.Stats()

dbWaitDurationInPeriod := dbStats.WaitDuration - b.stats.dbWaitDuration
redisWaitDurationInPeriod := redisStats.WaitDuration - b.stats.redisWaitDuration

b.stats.dbWaitDuration = dbStats.WaitDuration
b.stats.redisWaitDuration = redisStats.WaitDuration
stats := b.stats.Stats(dbStats, redisStats)

metrics := make([]cwtypes.MetricDatum, 0, 10)
hostDim := cwatch.Dimension("Host", b.config.InstanceID)

b.CloudWatch().Queue(
metrics = append(metrics,
cwatch.Datum("DBConnectionsInUse", float64(dbStats.InUse), cwtypes.StandardUnitCount, hostDim),
cwatch.Datum("DBConnectionWaitDuration", float64(dbWaitDurationInPeriod/time.Millisecond), cwtypes.StandardUnitMilliseconds, hostDim),
cwatch.Datum("DBConnectionWaitDuration", float64(stats.DBWaitDuration/time.Second), cwtypes.StandardUnitSeconds, hostDim),
cwatch.Datum("RedisConnectionsInUse", float64(redisStats.ActiveCount), cwtypes.StandardUnitCount, hostDim),
cwatch.Datum("RedisConnectionsWaitDuration", float64(redisWaitDurationInPeriod/time.Millisecond), cwtypes.StandardUnitMilliseconds, hostDim),
)
cwatch.Datum("RedisConnectionsWaitDuration", float64(stats.RedisWaitDuration/time.Second), cwtypes.StandardUnitSeconds, hostDim),

b.CloudWatch().Queue(
cwatch.Datum("QueuedMsgs", float64(bulkSize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "bulk")),
cwatch.Datum("QueuedMsgs", float64(prioritySize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "priority")),
cwatch.Datum("ContactsCreated", float64(stats.ContactsCreated), cwtypes.StandardUnitCount),
)

slog.Info("current metrics",
"db_inuse", dbStats.InUse,
"db_wait", dbWaitDurationInPeriod,
"redis_inuse", redisStats.ActiveCount,
"redis_wait", redisWaitDurationInPeriod,
"priority_size", prioritySize,
"bulk_size", bulkSize,
)
for cType, count := range stats.SendSuccesses {
metrics = append(metrics, cwatch.Datum("SendSucceeded", float64(count), cwtypes.StandardUnitCount, cwatch.Dimension("ChannelType", string(cType))))
}

Check warning on line 813 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L812-L813

Added lines #L812 - L813 were not covered by tests
for cType, count := range stats.SendErrors {
metrics = append(metrics, cwatch.Datum("SendErrored", float64(count), cwtypes.StandardUnitCount, cwatch.Dimension("ChannelType", string(cType))))
}

Check warning on line 816 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L815-L816

Added lines #L815 - L816 were not covered by tests
for cType, duration := range stats.SendDuration {
numSends := stats.SendSuccesses[cType] + stats.SendErrors[cType]
avgTime := float64(duration) / float64(numSends)
metrics = append(metrics, cwatch.Datum("SendDuration", avgTime, cwtypes.StandardUnitSeconds, cwatch.Dimension("ChannelType", string(cType))))
}

Check warning on line 821 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L818-L821

Added lines #L818 - L821 were not covered by tests

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
if err := b.cw.Send(ctx, metrics...); err != nil {
slog.Error("error sending metrics", "error", err)

Check warning on line 825 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L825

Added line #L825 was not covered by tests
} else {
slog.Info("sent metrics to cloudwatch", "metrics", len(metrics))
}
cancel()

return nil
}

Expand Down Expand Up @@ -878,8 +904,3 @@ func (b *backend) Status() string {
func (b *backend) RedisPool() *redis.Pool {
return b.rp
}

// CloudWatch return the cloudwatch service
func (b *backend) CloudWatch() *cwatch.Service {
return b.cw
}
2 changes: 1 addition & 1 deletion backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ func (ts *BackendTestSuite) TestOutgoingQueue() {
ts.Equal(msg.Text(), "test message")

// mark this message as dealt with
ts.b.MarkOutgoingMsgComplete(ctx, msg, ts.b.NewStatusUpdate(msg.Channel(), msg.ID(), courier.MsgStatusWired, clog))
ts.b.OnSendComplete(ctx, msg, ts.b.NewStatusUpdate(msg.Channel(), msg.ID(), courier.MsgStatusWired, clog), clog)

// this message should now be marked as sent
sent, err := ts.b.WasMsgSent(ctx, msg.ID())
Expand Down
6 changes: 1 addition & 5 deletions backends/rapidpro/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ import (
"time"
"unicode/utf8"

cwtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
Expand Down Expand Up @@ -218,9 +216,7 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,
// store this URN on our contact
contact.URNID_ = contactURN.ID

// report that we created a new contact
b.cw.Queue(cwatch.Datum("ContactCreated", float64(1), cwtypes.StandardUnitCount))
b.stats.RecordContactCreated()

// and return it
return contact, nil
}
Loading

0 comments on commit fa48cc8

Please sign in to comment.