Skip to content

Commit

Permalink
contrib/jackc/pgx.v5: publish pgxpool stats to statsd client (#2692)
Browse files Browse the repository at this point in the history
Co-authored-by: Mikayla Toffler <46911781+mtoffl01@users.noreply.github.com>
Co-authored-by: Dario Castañé <dario.castane@datadoghq.com>
  • Loading branch information
3 people authored Jul 10, 2024
1 parent 70b8a12 commit 6f26c4c
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 3 deletions.
63 changes: 63 additions & 0 deletions contrib/jackc/pgx.v5/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2022 Datadog, Inc.

package pgx

import (
"time"

"github.com/jackc/pgx/v5/pgxpool"
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)

const tracerPrefix = "datadog.tracer."

const (
AcquireCount = tracerPrefix + "pgx.pool.connections.acquire"
AcquireDuration = tracerPrefix + "pgx.pool.connections.acquire_duration"
AcquiredConns = tracerPrefix + "pgx.pool.connections.acquired_conns"
CanceledAcquireCount = tracerPrefix + "pgx.pool.connections.canceled_acquire"
ConstructingConns = tracerPrefix + "pgx.pool.connections.constructing_conns"
EmptyAcquireCount = tracerPrefix + "pgx.pool.connections.empty_acquire"
IdleConns = tracerPrefix + "pgx.pool.connections.idle_conns"
MaxConns = tracerPrefix + "pgx.pool.connections.max_conns"
TotalConns = tracerPrefix + "pgx.pool.connections.total_conns"
NewConnsCount = tracerPrefix + "pgx.pool.connections.new_conns"
MaxLifetimeDestroyCount = tracerPrefix + "pgx.pool.connections.max_lifetime_destroy"
MaxIdleDestroyCount = tracerPrefix + "pgx.pool.connections.max_idle_destroy"
)

var interval = 10 * time.Second

// pollPoolStats calls (*pgxpool).Stats on the pool at a predetermined interval. It pushes the pool Stats off to the statsd client.
func pollPoolStats(statsd internal.StatsdClient, pool *pgxpool.Pool) {
log.Debug("contrib/jackc/pgx.v5: Traced pool connection found: Pool stats will be gathered and sent every %v.", interval)
for range time.NewTicker(interval).C {
log.Debug("contrib/jackc/pgx.v5: Reporting pgxpool.Stat metrics...")
stat := pool.Stat()
statsd.Gauge(AcquireCount, float64(stat.AcquireCount()), []string{}, 1)
statsd.Timing(AcquireDuration, stat.AcquireDuration(), []string{}, 1)
statsd.Gauge(AcquiredConns, float64(stat.AcquiredConns()), []string{}, 1)
statsd.Gauge(CanceledAcquireCount, float64(stat.CanceledAcquireCount()), []string{}, 1)
statsd.Gauge(ConstructingConns, float64(stat.ConstructingConns()), []string{}, 1)
statsd.Gauge(EmptyAcquireCount, float64(stat.EmptyAcquireCount()), []string{}, 1)
statsd.Gauge(IdleConns, float64(stat.IdleConns()), []string{}, 1)
statsd.Gauge(MaxConns, float64(stat.MaxConns()), []string{}, 1)
statsd.Gauge(TotalConns, float64(stat.TotalConns()), []string{}, 1)
statsd.Gauge(NewConnsCount, float64(stat.NewConnsCount()), []string{}, 1)
statsd.Gauge(MaxLifetimeDestroyCount, float64(stat.MaxLifetimeDestroyCount()), []string{}, 1)
statsd.Gauge(MaxIdleDestroyCount, float64(stat.MaxIdleDestroyCount()), []string{}, 1)
}
}

func statsTags(c *config) []string {
tags := globalconfig.StatsTags()
if c.serviceName != "" {
tags = append(tags, "service:"+c.serviceName)
}
return tags
}
33 changes: 32 additions & 1 deletion contrib/jackc/pgx.v5/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@

package pgx

import "gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"
import (
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"
)

type config struct {
serviceName string
Expand All @@ -15,6 +20,8 @@ type config struct {
tracePrepare bool
traceConnect bool
traceAcquire bool
poolStats bool
statsdClient internal.StatsdClient
}

func defaultConfig() *config {
Expand All @@ -29,6 +36,21 @@ func defaultConfig() *config {
}
}

// checkStatsdRequired adds a statsdClient onto the config if poolStats is enabled
// NOTE: For now, the only use-case for a statsdclient is the poolStats feature. If a statsdclient becomes necessary for other items in future work, then this logic should change
func (c *config) checkStatsdRequired() {
if c.poolStats && c.statsdClient == nil {
// contrib/jackc/pgx's statsdclient should always inherit its address from the tracer's statsdclient via the globalconfig
// destination is not user-configurable
sc, err := internal.NewStatsdClient(globalconfig.DogstatsdAddr(), statsTags(c))
if err == nil {
c.statsdClient = sc
} else {
log.Warn("contrib/jackc/pgx.v5: Error creating statsd client; Pool stats will be dropped: %v", err)
}
}
}

type Option func(*config)

// WithServiceName sets the service name to use for all spans.
Expand Down Expand Up @@ -90,3 +112,12 @@ func WithTraceConnect(enabled bool) Option {
c.traceConnect = enabled
}
}

// WithPoolStats enables polling of pgxpool.Stat metrics
// ref: https://pkg.go.dev/github.com/jackc/pgx/v5/pgxpool#Stat
// These metrics are submitted to Datadog and are not billed as custom metrics
func WithPoolStats() Option {
return func(cfg *config) {
cfg.poolStats = true
}
}
24 changes: 24 additions & 0 deletions contrib/jackc/pgx.v5/option_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2022 Datadog, Inc.

package pgx

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestWithPoolStats(t *testing.T) {
t.Run("default off", func(t *testing.T) {
cfg := defaultConfig()
assert.False(t, cfg.poolStats)
})
t.Run("on", func(t *testing.T) {
cfg := new(config)
WithPoolStats()(cfg)
assert.True(t, cfg.poolStats)
})
}
1 change: 1 addition & 0 deletions contrib/jackc/pgx.v5/pgx_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func newPgxTracer(opts ...Option) *pgxTracer {
for _, opt := range opts {
opt(cfg)
}
cfg.checkStatsdRequired()
return &pgxTracer{cfg: cfg}
}

Expand Down
12 changes: 10 additions & 2 deletions contrib/jackc/pgx.v5/pgxpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ func NewPool(ctx context.Context, connString string, opts ...Option) (*pgxpool.P
}

func NewPoolWithConfig(ctx context.Context, config *pgxpool.Config, opts ...Option) (*pgxpool.Pool, error) {
config.ConnConfig.Tracer = newPgxTracer(opts...)
return pgxpool.NewWithConfig(ctx, config)
tracer := newPgxTracer(opts...)
config.ConnConfig.Tracer = tracer
pool, err := pgxpool.NewWithConfig(ctx, config)
if err != nil {
return nil, err
}
if tracer.cfg.poolStats && tracer.cfg.statsdClient != nil {
go pollPoolStats(tracer.cfg.statsdClient, pool)
}
return pool, nil
}
33 changes: 33 additions & 0 deletions contrib/jackc/pgx.v5/pgxpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ package pgx
import (
"context"
"testing"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/statsdtest"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -38,3 +41,33 @@ func TestPool(t *testing.T) {
assert.Len(t, mt.OpenSpans(), 0)
assert.Len(t, mt.FinishedSpans(), 7)
}

func TestPoolWithPoolStats(t *testing.T) {
originalInterval := interval
interval = 1 * time.Millisecond
t.Cleanup(func() {
interval = originalInterval
})

ctx := context.Background()
statsd := new(statsdtest.TestStatsdClient)
conn, err := NewPool(ctx, postgresDSN, withStatsdClient(statsd), WithPoolStats())
require.NoError(t, err)
defer conn.Close()

wantStats := []string{AcquireCount, AcquireDuration, AcquiredConns, CanceledAcquireCount, ConstructingConns, EmptyAcquireCount, IdleConns, MaxConns, TotalConns, NewConnsCount, MaxLifetimeDestroyCount, MaxIdleDestroyCount}

assert := assert.New(t)
if err := statsd.Wait(assert, len(wantStats), time.Second); err != nil {
t.Fatalf("statsd.Wait(): %v", err)
}
for _, name := range wantStats {
assert.Contains(statsd.CallNames(), name)
}
}

func withStatsdClient(s internal.StatsdClient) Option {
return func(c *config) {
c.statsdClient = s
}
}

0 comments on commit 6f26c4c

Please sign in to comment.