-
Notifications
You must be signed in to change notification settings - Fork 503
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support/db, services/horizon/internal: Configure postgres client connection timeouts for read only db #4390
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,9 @@ package db | |
import ( | ||
"context" | ||
"database/sql" | ||
"net/url" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
"github.com/Masterminds/squirrel" | ||
|
@@ -44,6 +47,9 @@ var ( | |
// ErrBadConnection is an error returned when driver returns `bad connection` | ||
// error. | ||
ErrBadConnection = errors.New("bad connection") | ||
// ErrStatementTimeout is an error returned by Session methods when request has | ||
// been cancelled due to a statement timeout. | ||
ErrStatementTimeout = errors.New("canceling statement due to statement timeout") | ||
) | ||
|
||
// Conn represents a connection to a single database. | ||
|
@@ -163,8 +169,46 @@ func pingDB(db *sqlx.DB) error { | |
return errors.Wrapf(err, "failed to connect to DB after %v attempts", maxDBPingAttempts) | ||
} | ||
|
||
type ClientConfig struct { | ||
Key string | ||
Value string | ||
} | ||
|
||
func StatementTimeout(timeout time.Duration) ClientConfig { | ||
return ClientConfig{ | ||
Key: "statement_timeout", | ||
Value: strconv.FormatInt(timeout.Milliseconds(), 10), | ||
} | ||
} | ||
|
||
func IdleTransactionTimeout(timeout time.Duration) ClientConfig { | ||
return ClientConfig{ | ||
Key: "idle_in_transaction_session_timeout", | ||
Value: strconv.FormatInt(timeout.Milliseconds(), 10), | ||
} | ||
} | ||
|
||
func augmentDSN(dsn string, clientConfigs []ClientConfig) string { | ||
parsed, err := url.Parse(dsn) | ||
if err != nil || parsed.Scheme == "" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is parsing expected to fail in some cases? when? shouldn't we just fail if that's the case? (I think a commend is in order) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point, returning error here would maintain most closely the current/expected behavior, rather than new outcome of attempting fix, less code to introduce here too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added some comments. Basically, I check that we're given a url connection string (e.g. |
||
parts := []string{dsn} | ||
for _, config := range clientConfigs { | ||
parts = append(parts, config.Key+"="+config.Value) | ||
} | ||
return strings.Join(parts, " ") | ||
} | ||
|
||
q := parsed.Query() | ||
for _, config := range clientConfigs { | ||
q.Set(config.Key, config.Value) | ||
} | ||
parsed.RawQuery = q.Encode() | ||
return parsed.String() | ||
} | ||
|
||
// Open the database at `dsn` and returns a new *Session using it. | ||
func Open(dialect, dsn string) (*Session, error) { | ||
func Open(dialect, dsn string, clientConfigs ...ClientConfig) (*Session, error) { | ||
dsn = augmentDSN(dsn, clientConfigs) | ||
db, err := sqlx.Open(dialect, dsn) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "open failed") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ package db | |
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/stellar/go/support/db/dbtest" | ||
"github.com/stretchr/testify/assert" | ||
|
@@ -27,3 +28,26 @@ func TestGetTable(t *testing.T) { | |
} | ||
|
||
} | ||
|
||
func TestAugmentDSN(t *testing.T) { | ||
configs := []ClientConfig{ | ||
IdleTransactionTimeout(2 * time.Second), | ||
StatementTimeout(4 * time.Millisecond), | ||
} | ||
for _, testCase := range []struct { | ||
input string | ||
expected string | ||
}{ | ||
{"postgresql://localhost", "postgresql://localhost?idle_in_transaction_session_timeout=2000&statement_timeout=4"}, | ||
{"postgresql://localhost", "postgresql://localhost?idle_in_transaction_session_timeout=2000&statement_timeout=4"}, | ||
{"postgresql://localhost", "postgresql://localhost?idle_in_transaction_session_timeout=2000&statement_timeout=4"}, | ||
{"user=bob password=secret", "user=bob password=secret idle_in_transaction_session_timeout=2000 statement_timeout=4"}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor for consideration, in case where external environments try to set something, maybe a test to confirm overwrite:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a test case for this but I decided that we shouldn't override There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, agreed, that would be expected convention, thx! |
||
} { | ||
t.Run(testCase.input, func(t *testing.T) { | ||
output := augmentDSN(testCase.input, configs) | ||
if output != testCase.expected { | ||
t.Fatalf("got %v but expected %v", output, testCase.expected) | ||
} | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -91,6 +91,10 @@ func (s *Session) Commit() error { | |
log.Debug("sql: commit") | ||
s.tx = nil | ||
s.txOptions = nil | ||
|
||
if knownErr := s.replaceWithKnownError(err, context.Background()); knownErr != nil { | ||
return knownErr | ||
} | ||
return err | ||
} | ||
|
||
|
@@ -231,6 +235,10 @@ func (s *Session) NoRows(err error) bool { | |
// replaceWithKnownError tries to replace Postgres error with package error. | ||
// Returns a new error if the err is known. | ||
func (s *Session) replaceWithKnownError(err error, ctx context.Context) error { | ||
if err == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good extra fix, coulda been some bad ptr refs happening here |
||
return nil | ||
} | ||
|
||
switch { | ||
case ctx.Err() == context.Canceled: | ||
return ErrCancelled | ||
|
@@ -243,6 +251,8 @@ func (s *Session) replaceWithKnownError(err error, ctx context.Context) error { | |
return ErrConflictWithRecovery | ||
case strings.Contains(err.Error(), "driver: bad connection"): | ||
return ErrBadConnection | ||
case strings.Contains(err.Error(), "pq: canceling statement due to statement timeout"): | ||
return ErrStatementTimeout | ||
default: | ||
return nil | ||
} | ||
|
@@ -305,6 +315,10 @@ func (s *Session) Rollback() error { | |
log.Debug("sql: rollback") | ||
s.tx = nil | ||
s.txOptions = nil | ||
|
||
if knownErr := s.replaceWithKnownError(err, context.Background()); knownErr != nil { | ||
return knownErr | ||
} | ||
return err | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -129,3 +129,34 @@ func TestSession(t *testing.T) { | |
assert.Equal("$1 = $2 = $3 = ?", out) | ||
} | ||
} | ||
|
||
func TestStatementTimeout(t *testing.T) { | ||
assert := assert.New(t) | ||
db := dbtest.Postgres(t).Load(testSchema) | ||
defer db.Close() | ||
|
||
sess, err := Open(db.Dialect, db.DSN, StatementTimeout(50*time.Millisecond)) | ||
assert.NoError(err) | ||
defer sess.Close() | ||
|
||
var count int | ||
err = sess.GetRaw(context.Background(), &count, "SELECT pg_sleep(2), COUNT(*) FROM people") | ||
assert.ErrorIs(err, ErrStatementTimeout) | ||
} | ||
|
||
func TestIdleTransactionTimeout(t *testing.T) { | ||
assert := assert.New(t) | ||
db := dbtest.Postgres(t).Load(testSchema) | ||
defer db.Close() | ||
|
||
sess, err := Open(db.Dialect, db.DSN, IdleTransactionTimeout(50*time.Millisecond)) | ||
assert.NoError(err) | ||
defer sess.Close() | ||
|
||
assert.NoError(sess.Begin()) | ||
<-time.After(100 * time.Millisecond) | ||
|
||
var count int | ||
err = sess.GetRaw(context.Background(), &count, "SELECT COUNT(*) FROM people") | ||
assert.ErrorIs(err, ErrBadConnection) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. interesting, a force closed tx, invalidates the whole connection, really good test to show that effect! |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice addition, should be interesting to see how often this StatementTimeout kicks in!