Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support/db, services/horizon/internal: Configure postgres client connection timeouts for read only db #4390

Merged
merged 2 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions services/horizon/internal/httpx/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func init() {
problem.RegisterError(context.Canceled, hProblem.ClientDisconnected)
problem.RegisterError(db.ErrCancelled, hProblem.ClientDisconnected)
problem.RegisterError(db.ErrTimeout, hProblem.ServiceUnavailable)
problem.RegisterError(db.ErrStatementTimeout, hProblem.ServiceUnavailable)
problem.RegisterError(db.ErrConflictWithRecovery, hProblem.ServiceUnavailable)
problem.RegisterError(db.ErrBadConnection, hProblem.ServiceUnavailable)
}
Expand Down
19 changes: 17 additions & 2 deletions services/horizon/internal/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"github.com/stellar/go/support/log"
)

func mustNewDBSession(subservice db.Subservice, databaseURL string, maxIdle, maxOpen int, registry *prometheus.Registry) db.SessionInterface {
func mustNewDBSession(subservice db.Subservice, databaseURL string, maxIdle, maxOpen int, registry *prometheus.Registry, clientConfigs ...db.ClientConfig) db.SessionInterface {
log.Infof("Establishing database session for %v", subservice)
session, err := db.Open("postgres", databaseURL)
session, err := db.Open("postgres", databaseURL, clientConfigs...)
if err != nil {
log.Fatalf("cannot open %v DB: %v", subservice, err)
}
Expand All @@ -47,21 +47,36 @@ func mustInitHorizonDB(app *App) {
}

if app.config.RoDatabaseURL == "" {
var clientConfigs []db.ClientConfig
if !app.config.Ingest {
// if we are not ingesting then we don't expect to have long db queries / transactions
clientConfigs = append(
clientConfigs,
db.StatementTimeout(app.config.ConnectionTimeout),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice addition, should be interesting to see how often this StatementTimeout kicks in!

db.IdleTransactionTimeout(app.config.ConnectionTimeout),
)
}
app.historyQ = &history.Q{mustNewDBSession(
db.HistorySubservice,
app.config.DatabaseURL,
maxIdle,
maxOpen,
app.prometheusRegistry,
clientConfigs...,
)}
} else {
// If RO set, use it for all DB queries
roClientConfigs := []db.ClientConfig{
db.StatementTimeout(app.config.ConnectionTimeout),
db.IdleTransactionTimeout(app.config.ConnectionTimeout),
}
app.historyQ = &history.Q{mustNewDBSession(
db.HistorySubservice,
app.config.RoDatabaseURL,
maxIdle,
maxOpen,
app.prometheusRegistry,
roClientConfigs...,
)}

app.primaryHistoryQ = &history.Q{mustNewDBSession(
Expand Down
46 changes: 45 additions & 1 deletion support/db/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ package db
import (
"context"
"database/sql"
"net/url"
"strconv"
"strings"
"time"

"github.com/Masterminds/squirrel"
Expand Down Expand Up @@ -44,6 +47,9 @@ var (
// ErrBadConnection is an error returned when driver returns `bad connection`
// error.
ErrBadConnection = errors.New("bad connection")
// ErrStatementTimeout is an error returned by Session methods when request has
// been cancelled due to a statement timeout.
ErrStatementTimeout = errors.New("canceling statement due to statement timeout")
)

// Conn represents a connection to a single database.
Expand Down Expand Up @@ -163,8 +169,46 @@ func pingDB(db *sqlx.DB) error {
return errors.Wrapf(err, "failed to connect to DB after %v attempts", maxDBPingAttempts)
}

type ClientConfig struct {
Key string
Value string
}

func StatementTimeout(timeout time.Duration) ClientConfig {
return ClientConfig{
Key: "statement_timeout",
Value: strconv.FormatInt(timeout.Milliseconds(), 10),
}
}

func IdleTransactionTimeout(timeout time.Duration) ClientConfig {
return ClientConfig{
Key: "idle_in_transaction_session_timeout",
Value: strconv.FormatInt(timeout.Milliseconds(), 10),
}
}

func augmentDSN(dsn string, clientConfigs []ClientConfig) string {
parsed, err := url.Parse(dsn)
if err != nil || parsed.Scheme == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is parsing expected to fail in some cases? when? shouldn't we just fail if that's the case?

(I think a commend is in order)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, returning error here would maintain most closely the current/expected behavior, rather than new outcome of attempting fix, less code to introduce here too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments. Basically, I check that we're given a url connection string (e.g. postgres://localhost) and if that's not the case then I assume it must be a space separated connection string. It's possible that the connection string is completely invalid but I don't attempt to validate it here and instead I will let it fail when sql.Open() is called

parts := []string{dsn}
for _, config := range clientConfigs {
parts = append(parts, config.Key+"="+config.Value)
}
return strings.Join(parts, " ")
}

q := parsed.Query()
for _, config := range clientConfigs {
q.Set(config.Key, config.Value)
}
parsed.RawQuery = q.Encode()
return parsed.String()
}

// Open the database at `dsn` and returns a new *Session using it.
func Open(dialect, dsn string) (*Session, error) {
func Open(dialect, dsn string, clientConfigs ...ClientConfig) (*Session, error) {
dsn = augmentDSN(dsn, clientConfigs)
db, err := sqlx.Open(dialect, dsn)
if err != nil {
return nil, errors.Wrap(err, "open failed")
Expand Down
24 changes: 24 additions & 0 deletions support/db/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package db

import (
"testing"
"time"

"github.com/stellar/go/support/db/dbtest"
"github.com/stretchr/testify/assert"
Expand All @@ -27,3 +28,26 @@ func TestGetTable(t *testing.T) {
}

}

func TestAugmentDSN(t *testing.T) {
configs := []ClientConfig{
IdleTransactionTimeout(2 * time.Second),
StatementTimeout(4 * time.Millisecond),
}
for _, testCase := range []struct {
input string
expected string
}{
{"postgresql://localhost", "postgresql://localhost?idle_in_transaction_session_timeout=2000&statement_timeout=4"},
{"postgresql://localhost", "postgresql://localhost?idle_in_transaction_session_timeout=2000&statement_timeout=4"},
{"postgresql://localhost", "postgresql://localhost?idle_in_transaction_session_timeout=2000&statement_timeout=4"},
{"user=bob password=secret", "user=bob password=secret idle_in_transaction_session_timeout=2000 statement_timeout=4"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor for consideration, in case where external environments try to set something, maybe a test to confirm overwrite:

{"user=bob password=secret statement_timeout=1", "user=bob password=secret idle_in_transaction_session_timeout=2000 statement_timeout=4"}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test case for this but I decided that we shouldn't override statement_timeout and idle_in_transaction_session_timeout if the connection string already configures those parameters because it would probably be surprising for the operator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, agreed, that would be expected convention, thx!

} {
t.Run(testCase.input, func(t *testing.T) {
output := augmentDSN(testCase.input, configs)
if output != testCase.expected {
t.Fatalf("got %v but expected %v", output, testCase.expected)
}
})
}
}
14 changes: 14 additions & 0 deletions support/db/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ func (s *Session) Commit() error {
log.Debug("sql: commit")
s.tx = nil
s.txOptions = nil

if knownErr := s.replaceWithKnownError(err, context.Background()); knownErr != nil {
return knownErr
}
return err
}

Expand Down Expand Up @@ -231,6 +235,10 @@ func (s *Session) NoRows(err error) bool {
// replaceWithKnownError tries to replace Postgres error with package error.
// Returns a new error if the err is known.
func (s *Session) replaceWithKnownError(err error, ctx context.Context) error {
if err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good extra fix, coulda been some bad ptr refs happening here

return nil
}

switch {
case ctx.Err() == context.Canceled:
return ErrCancelled
Expand All @@ -243,6 +251,8 @@ func (s *Session) replaceWithKnownError(err error, ctx context.Context) error {
return ErrConflictWithRecovery
case strings.Contains(err.Error(), "driver: bad connection"):
return ErrBadConnection
case strings.Contains(err.Error(), "pq: canceling statement due to statement timeout"):
return ErrStatementTimeout
default:
return nil
}
Expand Down Expand Up @@ -305,6 +315,10 @@ func (s *Session) Rollback() error {
log.Debug("sql: rollback")
s.tx = nil
s.txOptions = nil

if knownErr := s.replaceWithKnownError(err, context.Background()); knownErr != nil {
return knownErr
}
return err
}

Expand Down
31 changes: 31 additions & 0 deletions support/db/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,34 @@ func TestSession(t *testing.T) {
assert.Equal("$1 = $2 = $3 = ?", out)
}
}

func TestStatementTimeout(t *testing.T) {
assert := assert.New(t)
db := dbtest.Postgres(t).Load(testSchema)
defer db.Close()

sess, err := Open(db.Dialect, db.DSN, StatementTimeout(50*time.Millisecond))
assert.NoError(err)
defer sess.Close()

var count int
err = sess.GetRaw(context.Background(), &count, "SELECT pg_sleep(2), COUNT(*) FROM people")
assert.ErrorIs(err, ErrStatementTimeout)
}

func TestIdleTransactionTimeout(t *testing.T) {
assert := assert.New(t)
db := dbtest.Postgres(t).Load(testSchema)
defer db.Close()

sess, err := Open(db.Dialect, db.DSN, IdleTransactionTimeout(50*time.Millisecond))
assert.NoError(err)
defer sess.Close()

assert.NoError(sess.Begin())
<-time.After(100 * time.Millisecond)

var count int
err = sess.GetRaw(context.Background(), &count, "SELECT COUNT(*) FROM people")
assert.ErrorIs(err, ErrBadConnection)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, a force closed tx, invalidates the whole connection, really good test to show that effect!

}