Skip to content

Commit

Permalink
pkg/sqlutil/pg: create package; expand env config
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Apr 17, 2024
1 parent ec6d48a commit ea07ac6
Show file tree
Hide file tree
Showing 9 changed files with 671 additions and 45 deletions.
29 changes: 19 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/smartcontractkit/chainlink-common
go 1.21

require (
github.com/XSAM/otelsql v0.29.0
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
github.com/ethereum/go-ethereum v1.13.8
github.com/fxamacker/cbor/v2 v2.5.0
Expand All @@ -12,25 +13,27 @@ require (
github.com/hashicorp/consul/sdk v0.16.0
github.com/hashicorp/go-hclog v1.5.0
github.com/hashicorp/go-plugin v1.6.0
github.com/jackc/pgx/v4 v4.18.3
github.com/jmoiron/sqlx v1.3.5
github.com/jonboulle/clockwork v0.4.0
github.com/jpillora/backoff v1.0.0
github.com/lib/pq v1.2.0
github.com/lib/pq v1.10.2
github.com/linkedin/goavro/v2 v2.12.0
github.com/mitchellh/mapstructure v1.5.0
github.com/mwitkow/grpc-proxy v0.0.0-20230212185441-f345521cb9c9
github.com/pelletier/go-toml/v2 v2.1.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.17.0
github.com/riferrei/srclient v0.5.4
github.com/scylladb/go-reflectx v1.0.1
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.45.0
go.opentelemetry.io/otel v1.19.0
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.18.0
go.opentelemetry.io/otel/sdk v1.19.0
go.opentelemetry.io/otel/trace v1.19.0
go.opentelemetry.io/otel/sdk v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
go.uber.org/goleak v1.2.1
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa
Expand All @@ -46,15 +49,21 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.3 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.3 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
Expand All @@ -69,13 +78,13 @@ require (
github.com/stretchr/objx v0.5.2 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.18.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/crypto v0.20.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
Expand Down
157 changes: 140 additions & 17 deletions go.sum

Large diffs are not rendered by default.

78 changes: 74 additions & 4 deletions pkg/loop/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@ import (
"os"
"strconv"
"strings"
"time"
)

const (
envDatabaseURL = "CL_DATABASE_URL"
envDatabaseURL = "CL_DATABASE_URL"
envDatabaseDefaultIdleInTxSessionTimeout = "CL_DATABASE_DEFAULT_IDLE_IN_TX_SESSION_TIMEOUT"
envDatabaseDefaultLockTimeout = "CL_DATABASE_DEFAULT_LOCK_TIMEOUT"
envDatabaseDefaultQueryTimeout = "CL_DATABASE_DEFAULT_QUERY_TIMEOUT"
envDatabaseLogSQL = "CL_DATABASE_LOG_SQL"
envDatabaseMaxOpenConns = "CL_DATABASE_MAX_OPEN_CONNS"
envDatabaseMaxIdleConns = "CL_DATABASE_MAX_IDLE_CONNS"

envPromPort = "CL_PROMETHEUS_PORT"
envTracingEnabled = "CL_TRACING_ENABLED"
envTracingCollectorTarget = "CL_TRACING_COLLECTOR_TARGET"
Expand All @@ -21,7 +29,13 @@ const (
// EnvConfig is the configuration between the application and the LOOP executable. The values
// are fully resolved and static and passed via the environment.
type EnvConfig struct {
DatabaseURL *url.URL
DatabaseURL *url.URL
DatabaseDefaultIdleInTxSessionTimeout time.Duration
DatabaseDefaultLockTimeout time.Duration
DatabaseDefaultQueryTimeout time.Duration
DatabaseLogSQL bool
DatabaseMaxOpenConns int
DatabaseMaxIdleConns int

PrometheusPort int

Expand All @@ -45,6 +59,12 @@ func (e *EnvConfig) AsCmdEnv() (env []string) {
// DatabaseURL is optional
if e.DatabaseURL != nil {
injectEnv[envDatabaseURL] = e.DatabaseURL.String()
injectEnv[envDatabaseDefaultIdleInTxSessionTimeout] = e.DatabaseDefaultIdleInTxSessionTimeout.String()
injectEnv[envDatabaseDefaultLockTimeout] = e.DatabaseDefaultLockTimeout.String()
injectEnv[envDatabaseDefaultQueryTimeout] = e.DatabaseDefaultQueryTimeout.String()
injectEnv[envDatabaseLogSQL] = strconv.FormatBool(e.DatabaseLogSQL)
injectEnv[envDatabaseMaxOpenConns] = strconv.Itoa(e.DatabaseMaxOpenConns)
injectEnv[envDatabaseMaxIdleConns] = strconv.Itoa(e.DatabaseMaxIdleConns)
}

for k, v := range e.TracingAttributes {
Expand All @@ -59,13 +79,39 @@ func (e *EnvConfig) AsCmdEnv() (env []string) {

// parse deserializes environment variables
func (e *EnvConfig) parse() error {
promPortStr := os.Getenv(envPromPort)
var err error
e.DatabaseURL, err = getDatabaseURL()
if err != nil {
return fmt.Errorf("failed to parse %s: %q", envDatabaseURL, err)
return err
}
if e.DatabaseURL != nil {
e.DatabaseDefaultIdleInTxSessionTimeout, err = getDatabaseIdleInTxSessionTimeout()
if err != nil {
return err
}
e.DatabaseDefaultLockTimeout, err = getDatabaseDefaultLockTimeout()
if err != nil {
return err
}
e.DatabaseDefaultQueryTimeout, err = getDatabaseDefaultQueryTimeout()
if err != nil {
return err
}
e.DatabaseLogSQL, err = getDatabaseLogSQL()
if err != nil {
return err
}
e.DatabaseMaxOpenConns, err = getDatabaseMaxOpenConns()
if err != nil {
return err
}
e.DatabaseMaxIdleConns, err = getDatabaseMaxIdleConns()
if err != nil {
return err
}
}

promPortStr := os.Getenv(envPromPort)
e.PrometheusPort, err = strconv.Atoi(promPortStr)
if err != nil {
return fmt.Errorf("failed to parse %s = %q: %w", envPromPort, promPortStr, err)
Expand Down Expand Up @@ -150,3 +196,27 @@ func getDatabaseURL() (*url.URL, error) {
}
return u, nil
}

func getDatabaseIdleInTxSessionTimeout() (time.Duration, error) {
return time.ParseDuration(os.Getenv(envDatabaseDefaultIdleInTxSessionTimeout))
}

func getDatabaseDefaultLockTimeout() (time.Duration, error) {
return time.ParseDuration(os.Getenv(envDatabaseDefaultLockTimeout))
}

func getDatabaseDefaultQueryTimeout() (time.Duration, error) {
return time.ParseDuration(os.Getenv(envDatabaseDefaultQueryTimeout))
}

func getDatabaseLogSQL() (bool, error) {
return strconv.ParseBool(os.Getenv(envDatabaseLogSQL))
}

func getDatabaseMaxOpenConns() (int, error) {
return strconv.Atoi(os.Getenv(envDatabaseMaxOpenConns))
}

func getDatabaseMaxIdleConns() (int, error) {
return strconv.Atoi(os.Getenv(envDatabaseMaxIdleConns))
}
34 changes: 24 additions & 10 deletions pkg/loop/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,41 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestEnvConfig_parse(t *testing.T) {
cases := []struct {
name string
envVars map[string]string
expectError bool
expectedDatabaseURL string
expectedPrometheusPort int
expectedTracingEnabled bool
expectedTracingCollectorTarget string
expectedTracingSamplingRatio float64
expectedTracingTLSCertPath string
name string
envVars map[string]string
expectError bool
expectedDatabaseURL string
envDatabaseDefaultIdleInTxSessionTimeout time.Duration
envDatabaseDefaultLockTimeout time.Duration
envDatabaseDefaultQueryTimeout time.Duration
envDatabaseLogSQL bool
envDatabaseMaxOpenConns int
envDatabaseMaxIdleConns int
expectedPrometheusPort int
expectedTracingEnabled bool
expectedTracingCollectorTarget string
expectedTracingSamplingRatio float64
expectedTracingTLSCertPath string
}{
{
name: "All variables set correctly",
envVars: map[string]string{
envDatabaseURL: "postgres://user:password@localhost:5432/db",
envDatabaseURL: "postgres://user:password@localhost:5432/db",
envDatabaseDefaultIdleInTxSessionTimeout: "42s",
envDatabaseDefaultLockTimeout: "8m",
envDatabaseDefaultQueryTimeout: "7s",
envDatabaseLogSQL: "true",
envDatabaseMaxOpenConns: "9999",
envDatabaseMaxIdleConns: "8080",

envPromPort: "8080",
envTracingEnabled: "true",
envTracingCollectorTarget: "some:target",
Expand Down
43 changes: 39 additions & 4 deletions pkg/loop/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ package loop
import (
"fmt"
"os"
"time"

"github.com/jmoiron/sqlx"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg"
)

// NewStartedServer returns a started Server.
Expand Down Expand Up @@ -41,10 +46,13 @@ func MustNewStartedServer(loggerName string) *Server {

// Server holds common plugin server fields.
type Server struct {
GRPCOpts GRPCOpts
Logger logger.SugaredLogger
promServer *PromServer
checker *services.HealthChecker
GRPCOpts GRPCOpts
Logger logger.SugaredLogger
db *sqlx.DB
dbStatsReporter *pg.StatsReporter
DataSource sqlutil.DataSource
promServer *PromServer
checker *services.HealthChecker
}

func newServer(loggerName string) (*Server, error) {
Expand Down Expand Up @@ -90,6 +98,27 @@ func (s *Server) start() error {
return fmt.Errorf("error starting health checker: %w", err)
}

if envCfg.DatabaseURL != nil {
//TODO set application_name on url?
dbURL := envCfg.DatabaseURL.String()
var err error
s.db, err = pg.ConnectionConfig{
DefaultIdleInTxSessionTimeout: envCfg.DatabaseDefaultIdleInTxSessionTimeout,
DefaultLockTimeout: envCfg.DatabaseDefaultLockTimeout,
MaxOpenConns: envCfg.DatabaseMaxOpenConns,
MaxIdleConns: envCfg.DatabaseMaxIdleConns,
}.NewDB(dbURL, pg.DialectPostgres)
if err != nil {
return fmt.Errorf("error connecting to DataBase at %s: %w", dbURL, err)
}
s.DataSource = sqlutil.WrapDataSource(s.db, s.Logger,
sqlutil.TimeoutHook(func() time.Duration { return envCfg.DatabaseDefaultQueryTimeout }),
sqlutil.MonitorHook(func() bool { return envCfg.DatabaseLogSQL }))

s.dbStatsReporter = pg.NewStatsReporter(s.db.Stats, s.Logger)
s.dbStatsReporter.Start()
}

return nil
}

Expand All @@ -104,6 +133,12 @@ func (s *Server) Register(c services.HealthReporter) error { return s.checker.Re

// Stop closes resources and flushes logs.
func (s *Server) Stop() {
if s.dbStatsReporter != nil {
s.dbStatsReporter.Stop()
}
if s.db != nil {
s.Logger.ErrorIfFn(s.db.Close, "Failed to close database connection")
}
s.Logger.ErrorIfFn(s.checker.Close, "Failed to close health checker")
s.Logger.ErrorIfFn(s.promServer.Close, "Failed to close prometheus server")
if err := s.Logger.Sync(); err != nil {
Expand Down
Loading

0 comments on commit ea07ac6

Please sign in to comment.