Skip to content

Commit

Permalink
pkg/sqlutil/pg: create package; expand env config; add example relay
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Dec 17, 2024
1 parent b403079 commit 28130f7
Show file tree
Hide file tree
Showing 21 changed files with 841 additions and 104 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/smartcontractkit/chainlink-common
go 1.23.3

require (
github.com/XSAM/otelsql v0.29.0
github.com/andybalholm/brotli v1.1.1
github.com/atombender/go-jsonschema v0.16.1-0.20240916205339-a74cd4e2851c
github.com/bytecodealliance/wasmtime-go/v23 v23.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migc
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5+I=
github.com/Microsoft/hcsshim v0.9.4/go.mod h1:7pLA8lDk46WKDWlVsENo92gC0XFa8rbKfyFRBqxEbCc=
github.com/XSAM/otelsql v0.29.0 h1:pEw9YXXs8ZrGRYfDc0cmArIz9lci5b42gmP5+tA1Huc=
github.com/XSAM/otelsql v0.29.0/go.mod h1:d3/0xGIGC5RVEE+Ld7KotwaLy6zDeaF3fLJHOPpdN2w=
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
github.com/apache/arrow-go/v18 v18.0.0 h1:1dBDaSbH3LtulTyOVYaBCHO3yVRwjV+TZaqn3g6V7ZM=
Expand Down
35 changes: 35 additions & 0 deletions pkg/config/build/build.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package build

import (
"cmp"
"os"
"runtime/debug"
)

// Unset is a sentinel value.
const Unset = "unset"

// Version and Checksum are set at compile time via build arguments.
var (
// Program is updated to the full main program path if [debug.BuildInfo] is available.
Program = os.Args[0]
// Version is the semantic version of the build or Unset.
Version = Unset
// Checksum is the commit hash of the build or Unset.
Checksum = Unset
ChecksumPrefix = Unset
)

func init() {
buildInfo, ok := debug.ReadBuildInfo()
if ok {
Program = cmp.Or(buildInfo.Main.Path, Program)
if Version == Unset && buildInfo.Main.Version != "" {
Version = buildInfo.Main.Version
}
if Checksum == Unset && buildInfo.Main.Sum != "" {
Checksum = buildInfo.Main.Sum
}
}
ChecksumPrefix = Checksum[:min(7, len(Checksum))]
}
12 changes: 11 additions & 1 deletion pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package logger

import (
"io"
"fmt"
"reflect"
"testing"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"
"go.uber.org/zap/zaptest/observer"

"github.com/smartcontractkit/chainlink-common/pkg/config/build"
)

// Logger is a minimal subset of smartcontractkit/chainlink/core/logger.Logger implemented by go.uber.org/zap.SugaredLogger
Expand Down Expand Up @@ -52,9 +55,16 @@ func New() (Logger, error) { return defaultConfig.New() }
func (c *Config) New() (Logger, error) {
return NewWith(func(cfg *zap.Config) {
cfg.Level.SetLevel(c.Level)
cfg.InitialFields = map[string]interface{}{
"version": buildVersion(),
}
})
}

func buildVersion() string {
return fmt.Sprintf("%s@%s", build.Version, build.ChecksumPrefix)
}

// NewWith returns a new Logger from a modified [zap.Config].
func NewWith(cfgFn func(*zap.Config)) (Logger, error) {
cfg := zap.NewProductionConfig()
Expand Down Expand Up @@ -83,7 +93,7 @@ func Test(tb testing.TB) Logger {
zapcore.DebugLevel,
),
)
return &logger{lggr.Sugar()}
return &logger{lggr.With(zap.String("version", buildVersion())).Sugar()}
}

// TestSugared returns a new test SugaredLogger.
Expand Down
78 changes: 62 additions & 16 deletions pkg/loop/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,15 @@ import (
)

const (
envDatabaseURL = "CL_DATABASE_URL"
envPromPort = "CL_PROMETHEUS_PORT"
envDatabaseURL = "CL_DATABASE_URL"
envDatabaseIdleInTxSessionTimeout = "CL_DATABASE_IDLE_IN_TX_SESSION_TIMEOUT"
envDatabaseLockTimeout = "CL_DATABASE_LOCK_TIMEOUT"
envDatabaseQueryTimeout = "CL_DATABASE_QUERY_TIMEOUT"
envDatabaseLogSQL = "CL_DATABASE_LOG_SQL"
envDatabaseMaxOpenConns = "CL_DATABASE_MAX_OPEN_CONNS"
envDatabaseMaxIdleConns = "CL_DATABASE_MAX_IDLE_CONNS"

envPromPort = "CL_PROMETHEUS_PORT"

envTracingEnabled = "CL_TRACING_ENABLED"
envTracingCollectorTarget = "CL_TRACING_COLLECTOR_TARGET"
Expand All @@ -36,7 +43,13 @@ const (
// EnvConfig is the configuration between the application and the LOOP executable. The values
// are fully resolved and static and passed via the environment.
type EnvConfig struct {
DatabaseURL *url.URL
DatabaseURL *url.URL
DatabaseIdleInTxSessionTimeout time.Duration
DatabaseLockTimeout time.Duration
DatabaseQueryTimeout time.Duration
DatabaseLogSQL bool
DatabaseMaxOpenConns int
DatabaseMaxIdleConns int

PrometheusPort int

Expand Down Expand Up @@ -66,7 +79,14 @@ func (e *EnvConfig) AsCmdEnv() (env []string) {

if e.DatabaseURL != nil { // optional
add(envDatabaseURL, e.DatabaseURL.String())
add(envDatabaseIdleInTxSessionTimeout, e.DatabaseIdleInTxSessionTimeout.String())
add(envDatabaseLockTimeout, e.DatabaseLockTimeout.String())
add(envDatabaseQueryTimeout, e.DatabaseQueryTimeout.String())
add(envDatabaseLogSQL, strconv.FormatBool(e.DatabaseLogSQL))
add(envDatabaseMaxOpenConns, strconv.Itoa(e.DatabaseMaxOpenConns))
add(envDatabaseMaxIdleConns, strconv.Itoa(e.DatabaseMaxIdleConns))
}

add(envPromPort, strconv.Itoa(e.PrometheusPort))

add(envTracingEnabled, strconv.FormatBool(e.TracingEnabled))
Expand Down Expand Up @@ -99,13 +119,44 @@ func (e *EnvConfig) AsCmdEnv() (env []string) {

// parse deserializes environment variables
func (e *EnvConfig) parse() error {
promPortStr := os.Getenv(envPromPort)
var err error
e.DatabaseURL, err = getDatabaseURL()
e.DatabaseURL, err = getEnv(envDatabaseURL, func(s string) (*url.URL, error) {
if s == "" { // DatabaseURL is optional
return nil, nil
}
return url.Parse(s)
})
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envDatabaseURL, err)
return err
}
if e.DatabaseURL != nil {
e.DatabaseIdleInTxSessionTimeout, err = getEnv(envDatabaseIdleInTxSessionTimeout, time.ParseDuration)
if err != nil {
return err
}
e.DatabaseLockTimeout, err = getEnv(envDatabaseLockTimeout, time.ParseDuration)
if err != nil {
return err
}
e.DatabaseQueryTimeout, err = getEnv(envDatabaseQueryTimeout, time.ParseDuration)
if err != nil {
return err
}
e.DatabaseLogSQL, err = getEnv(envDatabaseLogSQL, strconv.ParseBool)
if err != nil {
return err
}
e.DatabaseMaxOpenConns, err = getEnv(envDatabaseMaxOpenConns, strconv.Atoi)
if err != nil {
return err
}
e.DatabaseMaxIdleConns, err = getEnv(envDatabaseMaxIdleConns, strconv.Atoi)
if err != nil {
return err
}
}

promPortStr := os.Getenv(envPromPort)
e.PrometheusPort, err = strconv.Atoi(promPortStr)
if err != nil {
return fmt.Errorf("failed to parse %s = %q: %w", envPromPort, promPortStr, err)
Expand Down Expand Up @@ -211,16 +262,11 @@ func getFloat64OrZero(envKey string) float64 {
return f
}

// getDatabaseURL parses the CL_DATABASE_URL environment variable.
func getDatabaseURL() (*url.URL, error) {
databaseURL := os.Getenv(envDatabaseURL)
if databaseURL == "" {
// DatabaseURL is optional
return nil, nil
}
u, err := url.Parse(databaseURL)
func getEnv[T any](key string, parse func(string) (T, error)) (t T, err error) {
v := os.Getenv(key)
t, err = parse(v)
if err != nil {
return nil, fmt.Errorf("invalid %s: %w", envDatabaseURL, err)
err = fmt.Errorf("failed to parse %s=%s: %w", key, v, err)
}
return u, nil
return
}
73 changes: 59 additions & 14 deletions pkg/loop/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,24 @@ import (

func TestEnvConfig_parse(t *testing.T) {
cases := []struct {
name string
envVars map[string]string
expectError bool
name string
envVars map[string]string
expectError bool

expectedDatabaseURL string
expectedPrometheusPort int
expectedTracingEnabled bool
expectedTracingCollectorTarget string
expectedTracingSamplingRatio float64
expectedTracingTLSCertPath string
expectedDatabaseIdleInTxSessionTimeout time.Duration
expectedDatabaseLockTimeout time.Duration
expectedDatabaseQueryTimeout time.Duration
expectedDatabaseLogSQL bool
expectedDatabaseMaxOpenConns int
expectedDatabaseMaxIdleConns int

expectedPrometheusPort int
expectedTracingEnabled bool
expectedTracingCollectorTarget string
expectedTracingSamplingRatio float64
expectedTracingTLSCertPath string

expectedTelemetryEnabled bool
expectedTelemetryEndpoint string
expectedTelemetryInsecureConn bool
Expand All @@ -43,12 +52,20 @@ func TestEnvConfig_parse(t *testing.T) {
name: "All variables set correctly",
envVars: map[string]string{
envDatabaseURL: "postgres://user:password@localhost:5432/db",
envDatabaseIdleInTxSessionTimeout: "42s",
envDatabaseLockTimeout: "8m",
envDatabaseQueryTimeout: "7s",
envDatabaseLogSQL: "true",
envDatabaseMaxOpenConns: "9999",
envDatabaseMaxIdleConns: "8080",

envPromPort: "8080",
envTracingEnabled: "true",
envTracingCollectorTarget: "some:target",
envTracingSamplingRatio: "1.0",
envTracingTLSCertPath: "internal/test/fixtures/client.pem",
envTracingAttribute + "XYZ": "value",

envTelemetryEnabled: "true",
envTelemetryEndpoint: "example.com/beholder",
envTelemetryInsecureConn: "true",
Expand All @@ -61,13 +78,22 @@ func TestEnvConfig_parse(t *testing.T) {
envTelemetryEmitterBatchProcessor: "true",
envTelemetryEmitterExportTimeout: "1s",
},
expectError: false,
expectError: false,

expectedDatabaseURL: "postgres://user:password@localhost:5432/db",
expectedPrometheusPort: 8080,
expectedTracingEnabled: true,
expectedTracingCollectorTarget: "some:target",
expectedTracingSamplingRatio: 1.0,
expectedTracingTLSCertPath: "internal/test/fixtures/client.pem",
expectedDatabaseIdleInTxSessionTimeout: 42 * time.Second,
expectedDatabaseLockTimeout: 8 * time.Minute,
expectedDatabaseQueryTimeout: 7 * time.Second,
expectedDatabaseLogSQL: true,
expectedDatabaseMaxOpenConns: 9999,
expectedDatabaseMaxIdleConns: 8080,

expectedPrometheusPort: 8080,
expectedTracingEnabled: true,
expectedTracingCollectorTarget: "some:target",
expectedTracingSamplingRatio: 1.0,
expectedTracingTLSCertPath: "internal/test/fixtures/client.pem",

expectedTelemetryEnabled: true,
expectedTelemetryEndpoint: "example.com/beholder",
expectedTelemetryInsecureConn: true,
Expand Down Expand Up @@ -123,6 +149,25 @@ func TestEnvConfig_parse(t *testing.T) {
if config.DatabaseURL.String() != tc.expectedDatabaseURL {
t.Errorf("Expected Database URL %s, got %s", tc.expectedDatabaseURL, config.DatabaseURL)
}
if config.DatabaseIdleInTxSessionTimeout != tc.expectedDatabaseIdleInTxSessionTimeout {
t.Errorf("Expected Database idle in tx session timeout %s, got %s", tc.expectedDatabaseIdleInTxSessionTimeout, config.DatabaseIdleInTxSessionTimeout)
}
if config.DatabaseLockTimeout != tc.expectedDatabaseLockTimeout {
t.Errorf("Expected Database lock timeout %s, got %s", tc.expectedDatabaseLockTimeout, config.DatabaseLockTimeout)
}
if config.DatabaseQueryTimeout != tc.expectedDatabaseQueryTimeout {
t.Errorf("Expected Database query timeout %s, got %s", tc.expectedDatabaseQueryTimeout, config.DatabaseQueryTimeout)
}
if config.DatabaseLogSQL != tc.expectedDatabaseLogSQL {
t.Errorf("Expected Database log sql %t, got %t", tc.expectedDatabaseLogSQL, config.DatabaseLogSQL)
}
if config.DatabaseMaxOpenConns != tc.expectedDatabaseMaxOpenConns {
t.Errorf("Expected Database max open conns %d, got %d", tc.expectedDatabaseMaxOpenConns, config.DatabaseMaxOpenConns)
}
if config.DatabaseMaxIdleConns != tc.expectedDatabaseMaxIdleConns {
t.Errorf("Expected Database max idle conns %d, got %d", tc.expectedDatabaseMaxIdleConns, config.DatabaseMaxIdleConns)
}

if config.PrometheusPort != tc.expectedPrometheusPort {
t.Errorf("Expected Prometheus port %d, got %d", tc.expectedPrometheusPort, config.PrometheusPort)
}
Expand Down
Loading

0 comments on commit 28130f7

Please sign in to comment.