Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

riverpgxv5: hijack raw listener conn to assume control #661

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- `riverpgxv5` driver: `Hijack()` the underlying listener connection as soon as it is acquired from the `pgxpool.Pool` in order to prevent the pool from automatically closing it after it reaches its max age. A max lifetime makes sense in the context of a pool with many conns, but a long-lived listener does not need a max lifetime as long as it can ensure the conn remains healthy. [PR #661](https://github.com/riverqueue/river/pull/661).

## [0.13.0] - 2024-10-07

⚠️ Version 0.13.0 removes the original advisory lock based unique jobs implementation that was deprecated in v0.12.0. See details in the note below or the v0.12.0 release notes.
Expand Down
18 changes: 10 additions & 8 deletions riverdriver/riverpgxv5/river_pgx_v5_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func (t *ExecutorTx) Rollback(ctx context.Context) error {
}

type Listener struct {
conn *pgxpool.Conn
conn *pgx.Conn
dbPool *pgxpool.Pool
prefix string
mu sync.Mutex
Expand All @@ -753,11 +753,10 @@ func (l *Listener) Close(ctx context.Context) error {
// connection back into rotation, but in case a Listen was invoked without a
// subsequent Unlisten on the same topic, close the connection explicitly to
// guarantee no other caller will receive a partially tainted connection.
err := l.conn.Conn().Close(ctx)
err := l.conn.Close(ctx)

// Even in the event of an error, make sure conn is set back to nil so that
// the listener can be reused.
l.conn.Release()
l.conn = nil

return err
Expand All @@ -771,19 +770,22 @@ func (l *Listener) Connect(ctx context.Context) error {
return errors.New("connection already established")
}

conn, err := l.dbPool.Acquire(ctx)
poolConn, err := l.dbPool.Acquire(ctx)
if err != nil {
return err
}

var schema string
if err := conn.QueryRow(ctx, "SELECT current_schema();").Scan(&schema); err != nil {
conn.Release()
if err := poolConn.QueryRow(ctx, "SELECT current_schema();").Scan(&schema); err != nil {
poolConn.Release()
return err
}

l.prefix = schema + "."
l.conn = conn
// Assume full ownership of the conn so that it doesn't get released back to
// the pool or auto-closed by the pool.
l.conn = poolConn.Hijack()

return nil
}

Expand Down Expand Up @@ -814,7 +816,7 @@ func (l *Listener) WaitForNotification(ctx context.Context) (*riverdriver.Notifi
l.mu.Lock()
defer l.mu.Unlock()

notification, err := l.conn.Conn().WaitForNotification(ctx)
notification, err := l.conn.WaitForNotification(ctx)
if err != nil {
return nil, err
}
Expand Down
Loading