Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib/jackc/pgx.v5: publish pgxpool stats to statsd client #2692

Merged
merged 10 commits into from
Jul 10, 2024
63 changes: 63 additions & 0 deletions contrib/jackc/pgx.v5/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2022 Datadog, Inc.

package pgx

import (
"time"

"github.com/jackc/pgx/v5/pgxpool"
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)

const tracerPrefix = "datadog.tracer."

const (
AcquireCount = tracerPrefix + "pgx.pool.connections.acquire"
AcquireDuration = tracerPrefix + "pgx.pool.connections.acquire_duration"
AcquiredConns = tracerPrefix + "pgx.pool.connections.acquired_conns"
CanceledAcquireCount = tracerPrefix + "pgx.pool.connections.canceled_acquire"
ConstructingConns = tracerPrefix + "pgx.pool.connections.constructing_conns"
EmptyAcquireCount = tracerPrefix + "pgx.pool.connections.empty_acquire"
IdleConns = tracerPrefix + "pgx.pool.connections.idle_conns"
MaxConns = tracerPrefix + "pgx.pool.connections.max_conns"
TotalConns = tracerPrefix + "pgx.pool.connections.total_conns"
NewConnsCount = tracerPrefix + "pgx.pool.connections.new_conns"
MaxLifetimeDestroyCount = tracerPrefix + "pgx.pool.connections.max_lifetime_destroy"
MaxIdleDestroyCount = tracerPrefix + "pgx.pool.connections.max_idle_destroy"
)

var interval = 10 * time.Second

// pollPoolStats calls (*pgxpool).Stats on the pool at a predetermined interval. It pushes the pool Stats off to the statsd client.
func pollPoolStats(statsd internal.StatsdClient, pool *pgxpool.Pool) {
log.Debug("contrib/jackc/pgx.v5: Traced pool connection found: Pool stats will be gathered and sent every %v.", interval)
for range time.NewTicker(interval).C {
log.Debug("contrib/jackc/pgx.v5: Reporting pgxpool.Stat metrics...")
stat := pool.Stat()
statsd.Gauge(AcquireCount, float64(stat.AcquireCount()), []string{}, 1)
statsd.Timing(AcquireDuration, stat.AcquireDuration(), []string{}, 1)
statsd.Gauge(AcquiredConns, float64(stat.AcquiredConns()), []string{}, 1)
statsd.Gauge(CanceledAcquireCount, float64(stat.CanceledAcquireCount()), []string{}, 1)
statsd.Gauge(ConstructingConns, float64(stat.ConstructingConns()), []string{}, 1)
statsd.Gauge(EmptyAcquireCount, float64(stat.EmptyAcquireCount()), []string{}, 1)
statsd.Gauge(IdleConns, float64(stat.IdleConns()), []string{}, 1)
statsd.Gauge(MaxConns, float64(stat.MaxConns()), []string{}, 1)
statsd.Gauge(TotalConns, float64(stat.TotalConns()), []string{}, 1)
statsd.Gauge(NewConnsCount, float64(stat.NewConnsCount()), []string{}, 1)
statsd.Gauge(MaxLifetimeDestroyCount, float64(stat.MaxLifetimeDestroyCount()), []string{}, 1)
statsd.Gauge(MaxIdleDestroyCount, float64(stat.MaxIdleDestroyCount()), []string{}, 1)
}
}

func statsTags(c *config) []string {
tags := globalconfig.StatsTags()
if c.serviceName != "" {
tags = append(tags, "service:"+c.serviceName)
}
return tags
}
33 changes: 32 additions & 1 deletion contrib/jackc/pgx.v5/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@

package pgx

import "gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"
import (
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"
)

type config struct {
serviceName string
Expand All @@ -15,6 +20,8 @@ type config struct {
tracePrepare bool
traceConnect bool
traceAcquire bool
poolStats bool
statsdClient internal.StatsdClient
}

func defaultConfig() *config {
Expand All @@ -29,6 +36,21 @@ func defaultConfig() *config {
}
}

// checkStatsdRequired adds a statsdClient onto the config if poolStats is enabled
// NOTE: For now, the only use-case for a statsdclient is the poolStats feature. If a statsdclient becomes necessary for other items in future work, then this logic should change
func (c *config) checkStatsdRequired() {
if c.poolStats && c.statsdClient == nil {
// contrib/jackc/pgx's statsdclient should always inherit its address from the tracer's statsdclient via the globalconfig
// destination is not user-configurable
sc, err := internal.NewStatsdClient(globalconfig.DogstatsdAddr(), statsTags(c))
if err == nil {
c.statsdClient = sc
} else {
log.Warn("contrib/jackc/pgx.v5: Error creating statsd client; Pool stats will be dropped: %v", err)
}
}
}

type Option func(*config)

// WithServiceName sets the service name to use for all spans.
Expand Down Expand Up @@ -90,3 +112,12 @@ func WithTraceConnect(enabled bool) Option {
c.traceConnect = enabled
}
}

// WithPoolStats enables polling of pgxpool.Stat metrics
// ref: https://pkg.go.dev/github.com/jackc/pgx/v5/pgxpool#Stat
// These metrics are submitted to Datadog and are not billed as custom metrics
func WithPoolStats() Option {
return func(cfg *config) {
cfg.poolStats = true
}
}
24 changes: 24 additions & 0 deletions contrib/jackc/pgx.v5/option_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2022 Datadog, Inc.

package pgx

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestWithPoolStats(t *testing.T) {
t.Run("default off", func(t *testing.T) {
cfg := defaultConfig()
assert.False(t, cfg.poolStats)
})
t.Run("on", func(t *testing.T) {
cfg := new(config)
WithPoolStats()(cfg)
assert.True(t, cfg.poolStats)
})
}
1 change: 1 addition & 0 deletions contrib/jackc/pgx.v5/pgx_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func newPgxTracer(opts ...Option) *pgxTracer {
for _, opt := range opts {
opt(cfg)
}
cfg.checkStatsdRequired()
return &pgxTracer{cfg: cfg}
}

Expand Down
12 changes: 10 additions & 2 deletions contrib/jackc/pgx.v5/pgxpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ func NewPool(ctx context.Context, connString string, opts ...Option) (*pgxpool.P
}

func NewPoolWithConfig(ctx context.Context, config *pgxpool.Config, opts ...Option) (*pgxpool.Pool, error) {
config.ConnConfig.Tracer = newPgxTracer(opts...)
return pgxpool.NewWithConfig(ctx, config)
tracer := newPgxTracer(opts...)
config.ConnConfig.Tracer = tracer
pool, err := pgxpool.NewWithConfig(ctx, config)
if err != nil {
return nil, err
}
if tracer.cfg.poolStats && tracer.cfg.statsdClient != nil {
go pollPoolStats(tracer.cfg.statsdClient, pool)
}
return pool, nil
}
33 changes: 33 additions & 0 deletions contrib/jackc/pgx.v5/pgxpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ package pgx
import (
"context"
"testing"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/statsdtest"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -38,3 +41,33 @@ func TestPool(t *testing.T) {
assert.Len(t, mt.OpenSpans(), 0)
assert.Len(t, mt.FinishedSpans(), 7)
}

func TestPoolWithPoolStats(t *testing.T) {
originalInterval := interval
interval = 1 * time.Millisecond
t.Cleanup(func() {
interval = originalInterval
})

ctx := context.Background()
statsd := new(statsdtest.TestStatsdClient)
conn, err := NewPool(ctx, postgresDSN, withStatsdClient(statsd), WithPoolStats())
require.NoError(t, err)
defer conn.Close()

wantStats := []string{AcquireCount, AcquireDuration, AcquiredConns, CanceledAcquireCount, ConstructingConns, EmptyAcquireCount, IdleConns, MaxConns, TotalConns, NewConnsCount, MaxLifetimeDestroyCount, MaxIdleDestroyCount}

assert := assert.New(t)
if err := statsd.Wait(assert, len(wantStats), time.Second); err != nil {
t.Fatalf("statsd.Wait(): %v", err)
}
for _, name := range wantStats {
assert.Contains(statsd.CallNames(), name)
}
}

func withStatsdClient(s internal.StatsdClient) Option {
return func(c *config) {
c.statsdClient = s
}
}
Loading