Skip to content

Commit

Permalink
Move metric generation into Stats method
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Dec 18, 2024
1 parent 1917dd2 commit aa5809d
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 27 deletions.
28 changes: 3 additions & 25 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,8 @@ func (b *backend) Heartbeat() error {
rc := b.rp.Get()
defer rc.Close()

metrics := b.stats.Extract().ToMetrics()

active, err := redis.Strings(rc.Do("ZRANGE", fmt.Sprintf("%s:active", msgQueueName), "0", "-1"))
if err != nil {
return fmt.Errorf("error getting active queues: %w", err)
Expand Down Expand Up @@ -773,48 +775,24 @@ func (b *backend) Heartbeat() error {
bulkSize += count
}

// get our DB and redis stats
// calculate DB and redis pool metrics
dbStats := b.db.Stats()
redisStats := b.rp.Stats()
dbWaitDurationInPeriod := dbStats.WaitDuration - b.dbWaitDuration
redisWaitDurationInPeriod := redisStats.WaitDuration - b.redisWaitDuration
b.dbWaitDuration = dbStats.WaitDuration
b.redisWaitDuration = redisStats.WaitDuration

stats := b.stats.Extract()

metrics := make([]cwtypes.MetricDatum, 0, 10)
hostDim := cwatch.Dimension("Host", b.config.InstanceID)

metrics = append(metrics,
cwatch.Datum("DBConnectionsInUse", float64(dbStats.InUse), cwtypes.StandardUnitCount, hostDim),
cwatch.Datum("DBConnectionWaitDuration", float64(dbWaitDurationInPeriod/time.Second), cwtypes.StandardUnitSeconds, hostDim),
cwatch.Datum("RedisConnectionsInUse", float64(redisStats.ActiveCount), cwtypes.StandardUnitCount, hostDim),
cwatch.Datum("RedisConnectionsWaitDuration", float64(redisWaitDurationInPeriod/time.Second), cwtypes.StandardUnitSeconds, hostDim),

cwatch.Datum("QueuedMsgs", float64(bulkSize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "bulk")),
cwatch.Datum("QueuedMsgs", float64(prioritySize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "priority")),
cwatch.Datum("ContactsCreated", float64(stats.ContactsCreated), cwtypes.StandardUnitCount),
)

metrics = append(metrics, stats.IncomingRequests.Metrics("IncomingRequests")...)
metrics = append(metrics, stats.IncomingMessages.Metrics("IncomingMessages")...)
metrics = append(metrics, stats.IncomingStatuses.Metrics("IncomingStatuses")...)
metrics = append(metrics, stats.IncomingEvents.Metrics("IncomingEvents")...)
metrics = append(metrics, stats.IncomingIgnored.Metrics("IncomingIgnored")...)
metrics = append(metrics, stats.OutgoingSends.Metrics("OutgoingSends")...)
metrics = append(metrics, stats.OutgoingErrors.Metrics("OutgoingErrors")...)

// turn our duration stats into averages for metrics
for cType, count := range stats.IncomingDuration {
avgTime := float64(count) / float64(stats.IncomingRequests[cType])
metrics = append(metrics, cwatch.Datum("IncomingDuration", float64(avgTime), cwtypes.StandardUnitCount, cwatch.Dimension("ChannelType", string(cType))))
}
for cType, duration := range stats.OutgoingDuration {
avgTime := float64(duration) / float64(stats.OutgoingSends[cType]+stats.OutgoingErrors[cType])
metrics = append(metrics, cwatch.Datum("OutgoingDuration", avgTime, cwtypes.StandardUnitSeconds, cwatch.Dimension("ChannelType", string(cType))))
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
if err := b.cw.Send(ctx, metrics...); err != nil {
slog.Error("error sending metrics", "error", err)

Check warning on line 798 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L798

Added line #L798 was not covered by tests
Expand Down
29 changes: 27 additions & 2 deletions backends/rapidpro/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (

type CountByType map[courier.ChannelType]int

// Metrics converts per channel counts into cloudwatch metrics with type as a dimension
func (c CountByType) Metrics(name string) []types.MetricDatum {
// converts per channel counts into cloudwatch metrics with type as a dimension
func (c CountByType) metrics(name string) []types.MetricDatum {
m := make([]types.MetricDatum, 0, len(c))
for typ, count := range c {
m = append(m, cwatch.Datum(name, float64(count), types.StandardUnitCount, cwatch.Dimension("ChannelType", string(typ))))
Expand Down Expand Up @@ -54,6 +54,31 @@ func newStats() *Stats {
}
}

func (s *Stats) ToMetrics() []types.MetricDatum {
metrics := make([]types.MetricDatum, 0, 20)
metrics = append(metrics, s.IncomingRequests.metrics("IncomingRequests")...)
metrics = append(metrics, s.IncomingMessages.metrics("IncomingMessages")...)
metrics = append(metrics, s.IncomingStatuses.metrics("IncomingStatuses")...)
metrics = append(metrics, s.IncomingEvents.metrics("IncomingEvents")...)
metrics = append(metrics, s.IncomingIgnored.metrics("IncomingIgnored")...)

for typ, d := range s.IncomingDuration { // convert to averages
avgTime := float64(d) / float64(s.IncomingRequests[typ])
metrics = append(metrics, cwatch.Datum("IncomingDuration", float64(avgTime), types.StandardUnitCount, cwatch.Dimension("ChannelType", string(typ))))
}

metrics = append(metrics, s.OutgoingSends.metrics("OutgoingSends")...)
metrics = append(metrics, s.OutgoingErrors.metrics("OutgoingErrors")...)

for typ, d := range s.OutgoingDuration { // convert to averages
avgTime := float64(d) / float64(s.OutgoingSends[typ]+s.OutgoingErrors[typ])
metrics = append(metrics, cwatch.Datum("OutgoingDuration", avgTime, types.StandardUnitSeconds, cwatch.Dimension("ChannelType", string(typ))))
}

metrics = append(metrics, cwatch.Datum("ContactsCreated", float64(s.ContactsCreated), types.StandardUnitCount))
return metrics
}

// StatsCollector provides threadsafe stats collection
type StatsCollector struct {
mutex sync.Mutex
Expand Down
6 changes: 6 additions & 0 deletions backends/rapidpro/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func TestStats(t *testing.T) {
assert.Equal(t, rapidpro.CountByType{}, stats.OutgoingErrors)
assert.Equal(t, rapidpro.DurationByType{"T": time.Second * 2, "FBA": time.Second * 3}, stats.OutgoingDuration)

metrics := stats.ToMetrics()
assert.Len(t, metrics, 8)

sc.RecordOutgoing("FBA", true, time.Second)
sc.RecordOutgoing("FBA", true, time.Second)

Expand All @@ -46,4 +49,7 @@ func TestStats(t *testing.T) {
assert.Equal(t, rapidpro.CountByType{"FBA": 2}, stats.OutgoingSends)
assert.Equal(t, rapidpro.CountByType{}, stats.OutgoingErrors)
assert.Equal(t, rapidpro.DurationByType{"FBA": time.Second * 2}, stats.OutgoingDuration)

metrics = stats.ToMetrics()
assert.Len(t, metrics, 3)
}

0 comments on commit aa5809d

Please sign in to comment.