Skip to content

Commit

Permalink
core/services/pg: improve lockedDb lifecycle (#14900)
Browse files Browse the repository at this point in the history
* core/services/pg: improve lockedDb lifecycle

* core/services/pg/connection: add context to NewConnection
  • Loading branch information
jmank88 authored Oct 28, 2024
1 parent 3ad089f commit c50422a
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 73 deletions.
22 changes: 12 additions & 10 deletions core/cmd/shell_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ func (s *Shell) RebroadcastTransactions(c *cli.Context) (err error) {
}

lggr := logger.Sugared(s.Logger.Named("RebroadcastTransactions"))
db, err := pg.OpenUnlockedDB(s.Config.AppID(), s.Config.Database())
db, err := pg.OpenUnlockedDB(ctx, s.Config.AppID(), s.Config.Database())
if err != nil {
return s.errorOut(errors.Wrap(err, "opening DB"))
}
Expand Down Expand Up @@ -962,7 +962,7 @@ func (s *Shell) RollbackDatabase(c *cli.Context) error {
version = null.IntFrom(numVersion)
}

db, err := newConnection(s.Config.Database())
db, err := newConnection(ctx, s.Config.Database())
if err != nil {
return fmt.Errorf("failed to initialize orm: %v", err)
}
Expand All @@ -977,7 +977,7 @@ func (s *Shell) RollbackDatabase(c *cli.Context) error {
// VersionDatabase displays the current database version.
func (s *Shell) VersionDatabase(_ *cli.Context) error {
ctx := s.ctx()
db, err := newConnection(s.Config.Database())
db, err := newConnection(ctx, s.Config.Database())
if err != nil {
return fmt.Errorf("failed to initialize orm: %v", err)
}
Expand All @@ -994,7 +994,7 @@ func (s *Shell) VersionDatabase(_ *cli.Context) error {
// StatusDatabase displays the database migration status
func (s *Shell) StatusDatabase(_ *cli.Context) error {
ctx := s.ctx()
db, err := newConnection(s.Config.Database())
db, err := newConnection(ctx, s.Config.Database())
if err != nil {
return fmt.Errorf("failed to initialize orm: %v", err)
}
Expand All @@ -1007,10 +1007,11 @@ func (s *Shell) StatusDatabase(_ *cli.Context) error {

// CreateMigration displays the database migration status
func (s *Shell) CreateMigration(c *cli.Context) error {
ctx := s.ctx()
if !c.Args().Present() {
return s.errorOut(errors.New("You must specify a migration name"))
}
db, err := newConnection(s.Config.Database())
db, err := newConnection(ctx, s.Config.Database())
if err != nil {
return fmt.Errorf("failed to initialize orm: %v", err)
}
Expand All @@ -1028,6 +1029,7 @@ func (s *Shell) CreateMigration(c *cli.Context) error {

// CleanupChainTables deletes database table rows based on chain type and chain id input.
func (s *Shell) CleanupChainTables(c *cli.Context) error {
ctx := s.ctx()
cfg := s.Config.Database()
parsed := cfg.URL()
if parsed.String() == "" {
Expand All @@ -1039,7 +1041,7 @@ func (s *Shell) CleanupChainTables(c *cli.Context) error {
return s.errorOut(fmt.Errorf("cannot reset database named `%s`. This command can only be run against databases with a name that ends in `_test`, to prevent accidental data loss. If you really want to delete chain specific data from this database, pass in the --danger option", dbname))
}

db, err := newConnection(cfg)
db, err := newConnection(ctx, cfg)
if err != nil {
return s.errorOut(errors.Wrap(err, "error connecting to the database"))
}
Expand Down Expand Up @@ -1091,12 +1093,12 @@ type dbConfig interface {
Dialect() dialects.DialectName
}

func newConnection(cfg dbConfig) (*sqlx.DB, error) {
func newConnection(ctx context.Context, cfg dbConfig) (*sqlx.DB, error) {
parsed := cfg.URL()
if parsed.String() == "" {
return nil, errDBURLMissing
}
return pg.NewConnection(parsed.String(), cfg.Dialect(), cfg)
return pg.NewConnection(ctx, parsed.String(), cfg.Dialect(), cfg)
}

func dropAndCreateDB(parsed url.URL, force bool) (err error) {
Expand Down Expand Up @@ -1144,7 +1146,7 @@ func dropAndCreatePristineDB(db *sqlx.DB, template string) (err error) {
}

func migrateDB(ctx context.Context, config dbConfig) error {
db, err := newConnection(config)
db, err := newConnection(ctx, config)
if err != nil {
return fmt.Errorf("failed to initialize orm: %v", err)
}
Expand All @@ -1156,7 +1158,7 @@ func migrateDB(ctx context.Context, config dbConfig) error {
}

func downAndUpDB(ctx context.Context, cfg dbConfig, baseVersionID int64) error {
db, err := newConnection(cfg)
db, err := newConnection(ctx, cfg)
if err != nil {
return fmt.Errorf("failed to initialize orm: %v", err)
}
Expand Down
15 changes: 8 additions & 7 deletions core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
t.Helper()
testutils.SkipShortDB(t)

ctx := testutils.Context(t)

var lggr logger.Logger
for _, dep := range flagsAndDeps {
argLggr, is := dep.(logger.Logger)
Expand Down Expand Up @@ -364,7 +366,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
}

url := cfg.Database().URL()
db, err := pg.NewConnection(url.String(), cfg.Database().Dialect(), cfg.Database())
db, err := pg.NewConnection(ctx, url.String(), cfg.Database().Dialect(), cfg.Database())
require.NoError(t, err)
t.Cleanup(func() { assert.NoError(t, db.Close()) })

Expand Down Expand Up @@ -435,38 +437,37 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
}
}

testCtx := testutils.Context(t)
// evm alway enabled for backward compatibility
initOps := []chainlink.CoreRelayerChainInitFunc{chainlink.InitDummy(testCtx, relayerFactory), chainlink.InitEVM(testCtx, relayerFactory, evmOpts)}
initOps := []chainlink.CoreRelayerChainInitFunc{chainlink.InitDummy(ctx, relayerFactory), chainlink.InitEVM(ctx, relayerFactory, evmOpts)}

if cfg.CosmosEnabled() {
cosmosCfg := chainlink.CosmosFactoryConfig{
Keystore: keyStore.Cosmos(),
TOMLConfigs: cfg.CosmosConfigs(),
DS: ds,
}
initOps = append(initOps, chainlink.InitCosmos(testCtx, relayerFactory, cosmosCfg))
initOps = append(initOps, chainlink.InitCosmos(ctx, relayerFactory, cosmosCfg))
}
if cfg.SolanaEnabled() {
solanaCfg := chainlink.SolanaFactoryConfig{
Keystore: keyStore.Solana(),
TOMLConfigs: cfg.SolanaConfigs(),
}
initOps = append(initOps, chainlink.InitSolana(testCtx, relayerFactory, solanaCfg))
initOps = append(initOps, chainlink.InitSolana(ctx, relayerFactory, solanaCfg))
}
if cfg.StarkNetEnabled() {
starkCfg := chainlink.StarkNetFactoryConfig{
Keystore: keyStore.StarkNet(),
TOMLConfigs: cfg.StarknetConfigs(),
}
initOps = append(initOps, chainlink.InitStarknet(testCtx, relayerFactory, starkCfg))
initOps = append(initOps, chainlink.InitStarknet(ctx, relayerFactory, starkCfg))
}
if cfg.AptosEnabled() {
aptosCfg := chainlink.AptosFactoryConfig{
Keystore: keyStore.Aptos(),
TOMLConfigs: cfg.AptosConfigs(),
}
initOps = append(initOps, chainlink.InitAptos(testCtx, relayerFactory, aptosCfg))
initOps = append(initOps, chainlink.InitAptos(ctx, relayerFactory, aptosCfg))
}

relayChainInterops, err := chainlink.NewCoreRelayerChainInteroperators(initOps...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestAdapter_Integration(t *testing.T) {
logger := logger.TestLogger(t)
cfg := configtest.NewTestGeneralConfig(t)
url := cfg.Database().URL()
db, err := pg.NewConnection(url.String(), cfg.Database().Dialect(), cfg.Database())
db, err := pg.NewConnection(ctx, url.String(), cfg.Database().Dialect(), cfg.Database())
require.NoError(t, err)

keystore := keystore.NewInMemory(db, utils.FastScryptParams, logger)
Expand Down
6 changes: 3 additions & 3 deletions core/services/pg/connection.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pg

import (
"context"
"database/sql"
"errors"
"fmt"
Expand All @@ -17,7 +18,6 @@ import (
"github.com/scylladb/go-reflectx"
"go.opentelemetry.io/otel"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"golang.org/x/net/context"

"github.com/smartcontractkit/chainlink/v2/core/store/dialects"
)
Expand Down Expand Up @@ -51,7 +51,7 @@ type ConnectionConfig interface {
MaxIdleConns() int
}

func NewConnection(uri string, dialect dialects.DialectName, config ConnectionConfig) (*sqlx.DB, error) {
func NewConnection(ctx context.Context, uri string, dialect dialects.DialectName, config ConnectionConfig) (*sqlx.DB, error) {
opts := []otelsql.Option{otelsql.WithAttributes(semconv.DBSystemPostgreSQL),
otelsql.WithTracerProvider(otel.GetTracerProvider()),
otelsql.WithSQLCommenter(true),
Expand Down Expand Up @@ -83,7 +83,7 @@ func NewConnection(uri string, dialect dialects.DialectName, config ConnectionCo
if err != nil {
return nil, fmt.Errorf("failed to open txdb: %w", err)
}
_, err = sqldb.Exec(connParams)
_, err = sqldb.ExecContext(ctx, connParams)
if err != nil {
return nil, fmt.Errorf("failed to set options: %w", err)
}
Expand Down
97 changes: 52 additions & 45 deletions core/services/pg/locked_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/jmoiron/sqlx"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/static"
Expand All @@ -31,6 +32,7 @@ type LockedDBConfig interface {
}

type lockedDb struct {
services.StateMachine
appID uuid.UUID
cfg LockedDBConfig
lockCfg config.Lock
Expand All @@ -53,63 +55,68 @@ func NewLockedDB(appID uuid.UUID, cfg LockedDBConfig, lockCfg config.Lock, lggr
// OpenUnlockedDB just opens DB connection, without any DB locks.
// This should be used carefully, when we know we don't need any locks.
// Currently this is used by RebroadcastTransactions command only.
func OpenUnlockedDB(appID uuid.UUID, cfg LockedDBConfig) (db *sqlx.DB, err error) {
return openDB(appID, cfg)
func OpenUnlockedDB(ctx context.Context, appID uuid.UUID, cfg LockedDBConfig) (db *sqlx.DB, err error) {
return openDB(ctx, appID, cfg)
}

// Open function connects to DB and acquires DB locks based on configuration.
// If any of the steps fails or ctx is cancelled, it reverts everything.
// This is a blocking function and it may execute long due to DB locks acquisition.
// NOT THREAD SAFE
func (l *lockedDb) Open(ctx context.Context) (err error) {
// If Open succeeded previously, db will not be nil
if l.db != nil {
l.lggr.Panic("calling Open() twice")
}

// Step 1: open DB connection
l.db, err = openDB(l.appID, l.cfg)
if err != nil {
// l.db will be nil in case of error
return errors.Wrap(err, "failed to open db")
}
revert := func() {
// Let Open() return the actual error, while l.Close() error is just logged.
if err2 := l.Close(); err2 != nil {
l.lggr.Errorf("failed to cleanup LockedDB: %v", err2)
return l.StartOnce("LockedDB", func() error {
// Step 1: open DB connection
l.db, err = openDB(ctx, l.appID, l.cfg)
if err != nil {
// l.db will be nil in case of error
return errors.Wrap(err, "failed to open db")
}
}

// Step 2: start the stat reporter
l.statsReporter = NewStatsReporter(l.db.Stats, l.lggr)
l.statsReporter.Start(ctx)

// Step 3: acquire DB locks
lockingMode := l.lockCfg.LockingMode()
l.lggr.Debugf("Using database locking mode: %s", lockingMode)

// Take the lease before any other DB operations
switch lockingMode {
case "lease":
cfg := LeaseLockConfig{
DefaultQueryTimeout: l.cfg.DefaultQueryTimeout(),
LeaseDuration: l.lockCfg.LeaseDuration(),
LeaseRefreshInterval: l.lockCfg.LeaseRefreshInterval(),
revert := func() {
// Let Open() return the actual error, while l.Close() error is just logged.
if err2 := l.close(); err2 != nil {
l.lggr.Errorf("failed to cleanup LockedDB: %v", err2)
}
}
l.leaseLock = NewLeaseLock(l.db, l.appID, l.lggr, cfg)
if err = l.leaseLock.TakeAndHold(ctx); err != nil {
defer revert()
return errors.Wrap(err, "failed to take initial lease on database")

// Step 2: start the stat reporter
l.statsReporter = NewStatsReporter(l.db.Stats, l.lggr)
l.statsReporter.Start(ctx)

// Step 3: acquire DB locks
lockingMode := l.lockCfg.LockingMode()
l.lggr.Debugf("Using database locking mode: %s", lockingMode)

// Take the lease before any other DB operations
switch lockingMode {
case "lease":
cfg := LeaseLockConfig{
DefaultQueryTimeout: l.cfg.DefaultQueryTimeout(),
LeaseDuration: l.lockCfg.LeaseDuration(),
LeaseRefreshInterval: l.lockCfg.LeaseRefreshInterval(),
}
l.leaseLock = NewLeaseLock(l.db, l.appID, l.lggr, cfg)
if err = l.leaseLock.TakeAndHold(ctx); err != nil {
defer revert()
return errors.Wrap(err, "failed to take initial lease on database")
}
}
}

return
return nil
})
}

// Close function releases DB locks (if acquired by Open) and closes DB connection.
// Closing of a closed LockedDB instance has no effect.
// NOT THREAD SAFE
func (l *lockedDb) Close() error {
err := l.StopOnce("LockedDB", func() error {
return l.close()
})
if !errors.Is(err, services.ErrAlreadyStopped) {
return err
}
return nil
}

func (l *lockedDb) close() error {
defer func() {
l.db = nil
l.leaseLock = nil
Expand All @@ -135,14 +142,14 @@ func (l *lockedDb) Close() error {
}

// DB returns DB connection if Opened successfully, or nil.
func (l lockedDb) DB() *sqlx.DB {
func (l *lockedDb) DB() *sqlx.DB {
return l.db
}

func openDB(appID uuid.UUID, cfg LockedDBConfig) (db *sqlx.DB, err error) {
func openDB(ctx context.Context, appID uuid.UUID, cfg LockedDBConfig) (db *sqlx.DB, err error) {
uri := cfg.URL()
static.SetConsumerName(&uri, "App", &appID)
dialect := cfg.Dialect()
db, err = NewConnection(uri.String(), dialect, cfg)
db, err = NewConnection(ctx, uri.String(), dialect, cfg)
return
}
9 changes: 4 additions & 5 deletions core/services/pg/locked_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ func TestLockedDB_OpenTwice(t *testing.T) {

err := ldb.Open(testutils.Context(t))
require.NoError(t, err)
require.Panics(t, func() {
_ = ldb.Open(testutils.Context(t))
})
require.Error(t, ldb.Open(testutils.Context(t)))

_ = ldb.Close()
}
Expand Down Expand Up @@ -88,14 +86,15 @@ func TestLockedDB_TwoInstances(t *testing.T) {

func TestOpenUnlockedDB(t *testing.T) {
testutils.SkipShortDB(t)
ctx := testutils.Context(t)
config := configtest.NewGeneralConfig(t, nil)

db1, err1 := pg.OpenUnlockedDB(config.AppID(), config.Database())
db1, err1 := pg.OpenUnlockedDB(ctx, config.AppID(), config.Database())
require.NoError(t, err1)
require.NotNil(t, db1)

// should not block the second connection
db2, err2 := pg.OpenUnlockedDB(config.AppID(), config.Database())
db2, err2 := pg.OpenUnlockedDB(ctx, config.AppID(), config.Database())
require.NoError(t, err2)
require.NotNil(t, db2)

Expand Down
3 changes: 2 additions & 1 deletion core/utils/testutils/heavyweight/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/jmoiron/sqlx"

"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
Expand Down Expand Up @@ -64,7 +65,7 @@ func (c Kind) PrepareDB(t testing.TB, overrideFn func(c *chainlink.Config, s *ch
require.NoError(t, os.MkdirAll(gcfg.RootDir(), 0700))
migrationTestDBURL, err := testdb.CreateOrReplace(gcfg.Database().URL(), generateName(), c != KindEmpty)
require.NoError(t, err)
db, err := pg.NewConnection(migrationTestDBURL, dialects.Postgres, gcfg.Database())
db, err := pg.NewConnection(tests.Context(t), migrationTestDBURL, dialects.Postgres, gcfg.Database())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, db.Close()) // must close before dropping
Expand Down
Loading

0 comments on commit c50422a

Please sign in to comment.