Skip to content

Commit

Permalink
feat: add tx methods to IDB (uptrace#587)
Browse files Browse the repository at this point in the history
  • Loading branch information
isgj committed Jun 28, 2022
1 parent e91543d commit 2332441
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 3 deletions.
80 changes: 80 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package bun

import (
"context"
"crypto/rand"
"database/sql"
"encoding/hex"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -431,6 +433,8 @@ func (db *DB) PrepareContext(ctx context.Context, query string) (Stmt, error) {
type Tx struct {
ctx context.Context
db *DB
// name is the name of a savepoint
name string
*sql.Tx
}

Expand Down Expand Up @@ -479,19 +483,45 @@ func (db *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (Tx, error) {
}

func (tx Tx) Commit() error {
if tx.name == "" {
return tx.commitTX()
}
return tx.commitSP()
}

func (tx Tx) commitTX() error {
ctx, event := tx.db.beforeQuery(tx.ctx, nil, "COMMIT", nil, "COMMIT", nil)
err := tx.Tx.Commit()
tx.db.afterQuery(ctx, event, nil, err)
return err
}

func (tx Tx) commitSP() error {
query := "RELEASE SAVEPOINT " + tx.name
_, err := tx.ExecContext(tx.ctx, query)
return err
}

func (tx Tx) Rollback() error {
if tx.name == "" {
return tx.rollbackTX()
}
return tx.rollbackSP()
}

func (tx Tx) rollbackTX() error {
ctx, event := tx.db.beforeQuery(tx.ctx, nil, "ROLLBACK", nil, "ROLLBACK", nil)
err := tx.Tx.Rollback()
tx.db.afterQuery(ctx, event, nil, err)
return err
}

func (tx Tx) rollbackSP() error {
query := "ROLLBACK TO SAVEPOINT " + tx.name
_, err := tx.ExecContext(tx.ctx, query)
return err
}

func (tx Tx) Exec(query string, args ...interface{}) (sql.Result, error) {
return tx.ExecContext(context.TODO(), query, args...)
}
Expand Down Expand Up @@ -534,6 +564,56 @@ func (tx Tx) QueryRowContext(ctx context.Context, query string, args ...interfac

//------------------------------------------------------------------------------

func (tx Tx) Begin() (Tx, error) {
return tx.BeginTx(context.Background(), nil)
}

// BeginTx will save a point in the running transaction.
func (tx Tx) BeginTx(ctx context.Context, _ *sql.TxOptions) (Tx, error) {
sp := make([]byte, 16)
_, err := rand.Read(sp)
if err != nil {
return Tx{}, err
}

qName := "SP_" + hex.EncodeToString(sp)
query := "SAVEPOINT " + qName
_, err = tx.ExecContext(ctx, query)
if err != nil {
return Tx{}, err
}
return Tx{
ctx: ctx,
db: tx.db,
Tx: tx.Tx,
name: qName,
}, nil
}

func (tx Tx) RunInTx(
ctx context.Context, _ *sql.TxOptions, fn func(ctx context.Context, tx Tx) error,
) error {
sp, err := tx.BeginTx(ctx, nil)
if err != nil {
return err
}

var done bool

defer func() {
if !done {
_ = sp.Rollback()
}
}()

if err := fn(ctx, sp); err != nil {
return err
}

done = true
return sp.Commit()
}

func (tx Tx) Dialect() schema.Dialect {
return tx.db.Dialect()
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9
github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 h1:nonptSpoQ4vQjyraW20DXPAglgQfVnM9ZC6MmNLMR60=
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b h1:2n253B2r0pYSmEV+UNCQoPfU/FiaizQEK5Gu4Bq4JE8=
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
Expand Down
97 changes: 97 additions & 0 deletions internal/dbtest/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func TestDB(t *testing.T) {
{testEmbedModelPointer},
{testJSONMarshaler},
{testNilDriverValue},
{testRunInTxAndSavepoint},
}

testEachDB(t, func(t *testing.T, dbName string, db *bun.DB) {
Expand Down Expand Up @@ -1401,3 +1402,99 @@ func testNilDriverValue(t *testing.T, db *bun.DB) {
_, err = db.NewInsert().Model(&Model{Value: &DriverValue{s: "hello"}}).Exec(ctx)
require.NoError(t, err)
}

func testRunInTxAndSavepoint(t *testing.T, db *bun.DB) {
type Counter struct {
Count int64
}

err := db.ResetModel(ctx, (*Counter)(nil))
require.NoError(t, err)

_, err = db.NewInsert().Model(&Counter{Count: 0}).Exec(ctx)
require.NoError(t, err)

err = db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
err := tx.RunInTx(ctx, nil, func(ctx context.Context, sp bun.Tx) error {
_, err := sp.NewUpdate().Model((*Counter)(nil)).
Set("count = count + 1").
Where("1 = 1").
Exec(ctx)
return err
})
require.NoError(t, err)
// rolling back the transaction should rollback what happened inside savepoint
return errors.New("fake error")
})
require.Error(t, err)

var count int
err = db.NewSelect().Model((*Counter)(nil)).Scan(ctx, &count)
require.NoError(t, err)
require.Equal(t, 0, count)

err = db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
err := tx.RunInTx(ctx, nil, func(ctx context.Context, sp bun.Tx) error {
_, err := sp.NewInsert().Model(&Counter{Count: 1}).
Exec(ctx)
require.NoError(t, err)
return err
})
require.NoError(t, err)

// ignored on purpose this error
// rolling back a savepoint should not affect the transaction
// nor other savepoints on the same level
_ = tx.RunInTx(ctx, nil, func(ctx context.Context, sp bun.Tx) error {
_, err := sp.NewInsert().Model(&Counter{Count: 2}).
Exec(ctx)
require.NoError(t, err)
return errors.New("fake error")
})

return err
})
require.NoError(t, err)

count, err = db.NewSelect().Model((*Counter)(nil)).Count(ctx)
require.NoError(t, err)
require.Equal(t, 2, count)

err = db.ResetModel(ctx, (*Counter)(nil))
require.NoError(t, err)

// happy path, commit transaction, savepoints and sub-savepoints
err = db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
_, err := tx.NewInsert().Model(&Counter{Count: 1}).
Exec(ctx)
require.NoError(t, err)

err = tx.RunInTx(ctx, nil, func(ctx context.Context, sp bun.Tx) error {
_, err := sp.NewInsert().Model(&Counter{Count: 1}).
Exec(ctx)
if err != nil {
return err
}

return sp.RunInTx(ctx, nil, func(ctx context.Context, subSp bun.Tx) error {
_, err := subSp.NewInsert().Model(&Counter{Count: 1}).
Exec(ctx)
return err
})
})
require.NoError(t, err)

err = tx.RunInTx(ctx, nil, func(ctx context.Context, sp bun.Tx) error {
_, err := sp.NewInsert().Model(&Counter{Count: 2}).
Exec(ctx)
return err
})

return err
})
require.NoError(t, err)

count, err = db.NewSelect().Model((*Counter)(nil)).Count(ctx)
require.NoError(t, err)
require.Equal(t, 4, count)
}
3 changes: 3 additions & 0 deletions query_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ type IDB interface {
NewTruncateTable() *TruncateTableQuery
NewAddColumn() *AddColumnQuery
NewDropColumn() *DropColumnQuery

BeginTx(ctx context.Context, opts *sql.TxOptions) (Tx, error)
RunInTx(ctx context.Context, opts *sql.TxOptions, f func(ctx context.Context, tx Tx) error) error
}

var (
Expand Down

0 comments on commit 2332441

Please sign in to comment.