Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: implement idle_in_transaction_session_timeout #52938

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,7 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
// Stop idle timer if the connExecutor is closed to ensure cancel session
// is not called.
ex.mu.IdleInSessionTimeout.Stop()
ex.mu.IdleInTransactionSessionTimeout.Stop()

if closeType != panicClose {
ex.state.mon.Stop(ctx)
Expand Down Expand Up @@ -1073,6 +1074,11 @@ type connExecutor struct {
// IdleInSessionTimeout is returned by the AfterFunc call that cancels the
// session if the idle time exceeds the idle_in_session_timeout.
IdleInSessionTimeout timeout

// IdleInTransactionSessionTimeout is returned by the AfterFunc call that
// cancels the session if the idle time in a transaction exceeds the
// idle_in_transaction_session_timeout.
IdleInTransactionSessionTimeout timeout
}

// curStmt is the statement that's currently being prepared or executed, if
Expand Down
26 changes: 26 additions & 0 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (ex *connExecutor) execStmt(

// Stop the session idle timeout when a new statement is executed.
ex.mu.IdleInSessionTimeout.Stop()
ex.mu.IdleInTransactionSessionTimeout.Stop()

// Run observer statements in a separate code path; their execution does not
// depend on the current transaction state.
Expand Down Expand Up @@ -131,6 +132,31 @@ func (ex *connExecutor) execStmt(
)}
}

if ex.sessionData.IdleInTransactionSessionTimeout > 0 {
startIdleInTransactionSessionTimeout := func() {
switch stmt.AST.(type) {
case *tree.CommitTransaction, *tree.RollbackTransaction:
// Do nothing, the transaction is completed, we do not want to start
// an idle timer.
default:
ex.mu.IdleInTransactionSessionTimeout = timeout{time.AfterFunc(
ex.sessionData.IdleInTransactionSessionTimeout,
ex.cancelSession,
)}
}
}
switch ex.machine.CurState().(type) {
case stateAborted, stateCommitWait:
startIdleInTransactionSessionTimeout()
case stateOpen:
// Only start timeout if the statement is executed in an
// explicit transaction.
if !ex.implicitTxn() {
startIdleInTransactionSessionTimeout()
}
}
}

return ev, payload, err
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2162,6 +2162,10 @@ func (m *sessionDataMutator) SetIdleInSessionTimeout(timeout time.Duration) {
m.data.IdleInSessionTimeout = timeout
}

func (m *sessionDataMutator) SetIdleInTransactionSessionTimeout(timeout time.Duration) {
m.data.IdleInTransactionSessionTimeout = timeout
}

func (m *sessionDataMutator) SetAllowPrepareAsOptPlan(val bool) {
m.data.AllowPrepareAsOptPlan = val
}
Expand Down
191 changes: 191 additions & 0 deletions pkg/sql/run_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,3 +426,194 @@ func TestIdleInSessionTimeout(t *testing.T) {
"but the connection is still alive")
}
}

func TestIdleInTransactionSessionTimeout(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

numNodes := 1
tc := serverutils.StartTestCluster(t, numNodes,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)

var err error
conn, err := tc.ServerConn(0).Conn(ctx)
if err != nil {
t.Fatal(err)
}

_, err = conn.ExecContext(ctx,
`SET idle_in_transaction_session_timeout = '2s'`)
if err != nil {
t.Fatal(err)
}

time.Sleep(3 * time.Second)
// idle_in_transaction_session_timeout should only timeout if a transaction
// is active
err = conn.PingContext(ctx)
if err != nil {
t.Fatalf("expected the connection to be alive but the connection"+
"is dead, %v", err)
}

_, err = conn.ExecContext(ctx, `BEGIN`)
if err != nil {
t.Fatal(err)
}

time.Sleep(time.Second)
// Make sure executing a statement resets the idle timer.
_, err = conn.ExecContext(ctx, `SELECT 1`)
if err != nil {
t.Fatal(err)
}

time.Sleep(time.Second)
// The connection should still be alive.
err = conn.PingContext(ctx)
if err != nil {
t.Fatalf("expected the connection to be alive but the connection"+
"is dead, %v", err)
}

time.Sleep(3 * time.Second)
err = conn.PingContext(ctx)
if err == nil {
t.Fatal("expected the connection to be killed " +
"but the connection is still alive")
}
}

func TestIdleInTransactionSessionTimeoutAbortedState(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

numNodes := 1
tc := serverutils.StartTestCluster(t, numNodes,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)

var err error
conn, err := tc.ServerConn(0).Conn(ctx)
if err != nil {
t.Fatal(err)
}

_, err = conn.ExecContext(ctx,
`SET idle_in_transaction_session_timeout = '2s'`)
if err != nil {
t.Fatal(err)
}

_, err = conn.ExecContext(ctx, `BEGIN`)
if err != nil {
t.Fatal(err)
}

// Go into state aborted.
_, err = conn.ExecContext(ctx, `SELECT crdb_internal.force_error('', 'error')`)
if err == nil {
t.Fatal("unexpected success")
}
if err.Error() != "pq: error" {
t.Fatal(err)
}

time.Sleep(time.Second)
// Make sure executing a statement resets the idle timer.
_, err = conn.ExecContext(ctx, `SELECT 1`)
// Statement should execute in aborted state.
if err == nil {
t.Fatal("unexpected success")
}
if err.Error() != "pq: current transaction is aborted, commands ignored until end of transaction block" {
t.Fatal(err)
}

time.Sleep(time.Second)
// The connection should still be alive.
err = conn.PingContext(ctx)
if err != nil {
t.Fatalf("expected the connection to be alive but the connection"+
"is dead, %v", err)
}

time.Sleep(3 * time.Second)
err = conn.PingContext(ctx)
if err == nil {
t.Fatal("expected the connection to be killed " +
"but the connection is still alive")
}
}

func TestIdleInTransactionSessionTimeoutCommitWaitState(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

numNodes := 1
tc := serverutils.StartTestCluster(t, numNodes,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)

var err error
conn, err := tc.ServerConn(0).Conn(ctx)
if err != nil {
t.Fatal(err)
}

_, err = conn.ExecContext(ctx,
`SET idle_in_transaction_session_timeout = '2s'`)
if err != nil {
t.Fatal(err)
}

_, err = conn.ExecContext(ctx, `BEGIN`)
if err != nil {
t.Fatal(err)
}

// Go into commit wait state.
_, err = conn.ExecContext(ctx, `SAVEPOINT cockroach_restart`)
if err != nil {
t.Fatal(err)
}

_, err = conn.ExecContext(ctx, `RELEASE SAVEPOINT cockroach_restart`)
if err != nil {
t.Fatal(err)
}

time.Sleep(time.Second)
// Make sure executing a statement resets the idle timer.
// This statement errors but should still reset the timer.
_, err = conn.ExecContext(ctx, `SELECT 1`)
// Statement should execute in aborted state.
if err == nil {
t.Fatal("unexpected success")
}
if err.Error() != "pq: current transaction is committed, commands ignored until end of transaction block" {
t.Fatal(err)
}

time.Sleep(time.Second)
// The connection should still be alive.
err = conn.PingContext(ctx)
if err != nil {
t.Fatalf("expected the connection to be alive but the connection"+
"is dead, %v", err)
}

time.Sleep(3 * time.Second)
err = conn.PingContext(ctx)
if err == nil {
t.Fatal("expected the connection to be killed " +
"but the connection is still alive")
}
}
4 changes: 4 additions & 0 deletions pkg/sql/sessiondata/session_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ type SessionData struct {
// IdleInSessionTimeout is the duration a session is permitted to idle before
// the session is canceled. If set to 0, there is no timeout.
IdleInSessionTimeout time.Duration
// IdleInTransactionSessionTimeout is the duration a session is permitted to
// idle in a transaction before the session is canceled.
// If set to 0, there is no timeout.
IdleInTransactionSessionTimeout time.Duration
// User is the name of the user logged into the session.
User string
// SafeUpdates causes errors when the client
Expand Down
13 changes: 13 additions & 0 deletions pkg/sql/set_var.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,19 @@ func idleInSessionTimeoutVarSet(ctx context.Context, m *sessionDataMutator, s st
return nil
}

func idleInTransactionSessionTimeoutVarSet(
ctx context.Context, m *sessionDataMutator, s string,
) error {
timeout, err := validateTimeoutVar(s,
"idle_in_transaction_session_timeout")
if err != nil {
return err
}

m.SetIdleInTransactionSessionTimeout(timeout)
return nil
}

func intervalToDuration(interval *tree.DInterval) (time.Duration, error) {
nanos, _, _, err := interval.Encode()
if err != nil {
Expand Down
14 changes: 10 additions & 4 deletions pkg/sql/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,10 +826,6 @@ var varGen = map[string]sessionVar{
// See https://www.postgresql.org/docs/10/static/runtime-config-client.html#GUC-LOC-TIMEOUT
`lock_timeout`: makeCompatIntVar(`lock_timeout`, 0),

// See https://www.postgresql.org/docs/10/static/runtime-config-client.html#GUC-IDLE-IN-TRANSACTION-SESSION-TIMEOUT
// See also issue #5924.
`idle_in_transaction_session_timeout`: makeCompatIntVar(`idle_in_transaction_session_timeout`, 0),

// Supported for PG compatibility only.
// See https://www.postgresql.org/docs/10/static/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
`max_identifier_length`: {
Expand Down Expand Up @@ -1002,6 +998,16 @@ var varGen = map[string]sessionVar{
},
},

`idle_in_transaction_session_timeout`: {
GetStringVal: makeTimeoutVarGetter(`idle_in_transaction_session_timeout`),
Set: idleInTransactionSessionTimeoutVarSet,
Get: func(evalCtx *extendedEvalContext) string {
ms := evalCtx.SessionData.StmtTimeout.Nanoseconds() / int64(time.Millisecond)
return strconv.FormatInt(ms, 10)
},
GlobalDefault: func(sv *settings.Values) string { return "0" },
},

// See https://www.postgresql.org/docs/10/static/runtime-config-client.html#GUC-TIMEZONE
`timezone`: {
Get: func(evalCtx *extendedEvalContext) string {
Expand Down