Skip to content

Commit

Permalink
feat: New ORM wrapper; bunx (#29)
Browse files Browse the repository at this point in the history
* feat: New ORM wrapper; bunx

* fix(popx): Adjust sleep time for pass test

* fix(bunx): Adjust sleep time for pass test
  • Loading branch information
sawadashota authored Oct 25, 2021
1 parent cc93a04 commit 0c9d526
Show file tree
Hide file tree
Showing 10 changed files with 323 additions and 13 deletions.
30 changes: 30 additions & 0 deletions bunx/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package bunx

import (
"github.com/uptrace/bun"
)

type Client struct {
db *bun.DB
}

func NewClient(db *bun.DB) (*Client, error) {
return &Client{
db: db,
}, nil
}

func (c *Client) DB() *bun.DB {
return c.db
}

func (c *Client) Ping() error {
if err := c.db.Ping(); err != nil {
return err
}
return nil
}

func (c *Client) Close() error {
return c.db.Close()
}
43 changes: 43 additions & 0 deletions bunx/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package bunx

import (
"context"

"github.com/pkg/errors"
"github.com/uptrace/bun"
)

var (
ErrDataLockTaken = errors.Errorf("data lock taken")
)

// TransactionWithTryAdvisoryLock is Transaction with pg_try_advisory_xact_lock
// if a lock has already taken, returns error immediately
func (c *Client) TransactionWithTryAdvisoryLock(ctx context.Context, key string, callback func(ctx context.Context, tx bun.Tx) error) error {
return c.DB().RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
if err := tryTakeAdvisoryLock(ctx, tx, key); err != nil {
return err
}
return callback(ctx, tx)
})
}

func tryTakeAdvisoryLock(ctx context.Context, tx bun.Tx, key string) error {
rows, err := tx.QueryContext(ctx, `select pg_try_advisory_xact_lock(hashtext(?))`, key)
if err != nil {
return err
}
if !rows.Next() {
return errors.New("unexpected error: try to take advisory lock but no rows returned")
}

var result bool
defer rows.Close()
if err := rows.Scan(&result); err != nil {
return err
}
if !result {
return errors.WithMessagef(ErrDataLockTaken, "data lock taken at the key %s", key)
}
return nil
}
96 changes: 96 additions & 0 deletions bunx/lock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package bunx_test

import (
"context"
"database/sql"
"embed"
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uptrace/bun"
"github.com/uptrace/bun/dialect/pgdialect"
"github.com/uptrace/bun/driver/pgdriver"
"golang.org/x/sync/errgroup"

"github.com/tier4/x-go/bunx"
"github.com/tier4/x-go/dockertestx"
)

type User struct {
ID int64 `db:"id,pk"`
Email string `db:"email"`
}

//go:embed testdata/migrations/*.sql
var migrationFS embed.FS

func TestClient_TransactionWithTryAdvisoryLock(t *testing.T) {
t.Parallel()

p, err := dockertestx.New(dockertestx.PoolOption{})
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, p.Purge())
})

dsn, err := p.NewResource(new(dockertestx.PostgresFactory), dockertestx.ContainerOption{
Tag: "alpine",
})
require.NoError(t, err)

sqlDB := sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN(dsn)))
db := bun.NewDB(sqlDB, pgdialect.New())

cl, err := bunx.NewClient(db)
require.NoError(t, err)
migrator, err := bunx.NewMigrator(db, migrationFS, bunx.NewNoopLogger())
require.NoError(t, err)

ctx := context.Background()

_ = migrator.Reset(ctx)
require.NoError(t, migrator.Migrate(ctx))

tx1 := &User{
Email: "example01@example.com",
}
tx2 := &User{
Email: "example02@example.com",
}

key := "test"
var eg errgroup.Group
eg.Go(func() error {
return cl.TransactionWithTryAdvisoryLock(ctx, key, func(ctx context.Context, tx bun.Tx) error {
time.Sleep(100 * time.Millisecond)
_, err := tx.NewInsert().Model(tx1).Exec(ctx)
return err
})
})
// to ensure to start 1st transaction
time.Sleep(10 * time.Millisecond)
eg.Go(func() error {
return cl.TransactionWithTryAdvisoryLock(ctx, key, func(ctx context.Context, tx bun.Tx) error {
_, err := tx.NewInsert().Model(tx2).Exec(ctx)
return err
})
})

err = eg.Wait()
require.Error(t, err)
assert.ErrorIs(t, err, bunx.ErrDataLockTaken)

var (
found1 User
found2 User
)

require.NoError(t, cl.DB().NewSelect().Model(&found1).Where("id = ?", tx1.ID).Scan(ctx))
assert.Equal(t, *tx1, found1)

err = cl.DB().NewSelect().Model(&found2).Where("id = ?", tx2.ID).Scan(ctx)
assert.True(t, errors.Is(err, sql.ErrNoRows))
}
20 changes: 20 additions & 0 deletions bunx/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package bunx

import "go.uber.org/zap"

type Logger interface {
Info(args ...interface{})
}

func DefaultLogger() Logger {
l, _ := zap.NewProduction()
return l.Sugar()
}

type NoopLogger struct{}

func NewNoopLogger() Logger {
return new(NoopLogger)
}

func (n *NoopLogger) Info(_ ...interface{}) {}
73 changes: 73 additions & 0 deletions bunx/migrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package bunx

import (
"context"
"embed"
"fmt"

"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)

type Migrator struct {
migrator *migrate.Migrator
logger Logger
}

func NewMigrator(db *bun.DB, migrationFS embed.FS, logger Logger) (*Migrator, error) {
migrations := migrate.NewMigrations()
if err := migrations.Discover(migrationFS); err != nil {
return nil, err
}

return &Migrator{
migrator: migrate.NewMigrator(db, migrations),
logger: logger,
}, nil
}

func (m *Migrator) Migrate(ctx context.Context) error {
if err := m.migrator.Init(ctx); err != nil {
return err
}
group, err := m.migrator.Migrate(ctx)
if err != nil {
return err
}
m.logger.Info(fmt.Sprintf("migrated to %s", group))
return nil
}

func (m *Migrator) Rollback(ctx context.Context) error {
group, err := m.migrator.Rollback(ctx)
if err != nil {
return err
}
m.logger.Info(fmt.Sprintf("rolled back %s", group))
return nil
}

func (m *Migrator) Reset(ctx context.Context) error {
for {
group, err := m.migrator.Rollback(ctx)
if err != nil {
return err
}
if group.IsZero() {
return m.migrator.Reset(ctx)
}
m.logger.Info(fmt.Sprintf("rolled back %s", group))
}
}

func (m *Migrator) Status(ctx context.Context) error {
ms, err := m.migrator.MigrationsWithStatus(ctx)
if err != nil {
return err
}
m.logger.Info(fmt.Sprintf("migrations: %s", ms))
m.logger.Info(fmt.Sprintf("un-applied migrations: %s", ms.Unapplied()))
m.logger.Info(fmt.Sprintf("last migration group: %s", ms.LastGroup()))

return nil
}
1 change: 1 addition & 0 deletions bunx/testdata/migrations/20211018215252_sql.tx.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
drop table "users";
5 changes: 5 additions & 0 deletions bunx/testdata/migrations/20211018215252_sql.tx.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
create table "users"
(
"id" serial primary key,
"email" varchar(255) not null unique
);
17 changes: 10 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
github.com/Microsoft/go-winio v0.5.1 // indirect
github.com/aws/aws-sdk-go v1.41.8
github.com/aws/aws-sdk-go v1.41.9
github.com/aws/aws-sdk-go-v2 v1.10.0
github.com/aws/aws-sdk-go-v2/config v1.9.0
github.com/aws/aws-sdk-go-v2/credentials v1.5.0
Expand All @@ -17,15 +17,13 @@ require (
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/gobuffalo/envy v1.9.0 // indirect
github.com/gobuffalo/fizz v1.13.0 // indirect
github.com/gobuffalo/flect v0.2.3 // indirect
github.com/gobuffalo/github_flavored_markdown v1.1.1 // indirect
github.com/gobuffalo/helpers v0.6.2 // indirect
github.com/gobuffalo/nulls v0.4.0 // indirect
github.com/gobuffalo/packd v1.0.0
github.com/gobuffalo/plush/v4 v4.1.6 // indirect
github.com/gobuffalo/pop/v5 v5.3.4
github.com/gobuffalo/validate/v3 v3.3.0 // indirect
github.com/gofrs/uuid v4.1.0+incompatible // indirect
github.com/gobuffalo/tags/v3 v3.1.1 // indirect
github.com/google/uuid v1.3.0
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1
Expand All @@ -40,12 +38,17 @@ require (
github.com/pkg/errors v0.9.1
github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/stretchr/testify v1.7.0
github.com/uptrace/bun v1.0.14
github.com/uptrace/bun/dialect/pgdialect v1.0.14
github.com/uptrace/bun/driver/pgdriver v1.0.14
github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.19.1
golang.org/x/net v0.0.0-20211020060615-d418f374d309 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20211020174200-9d6173849985 // indirect
golang.org/x/sys v0.0.0-20211023085530-d6a326fbbf70 // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
Loading

0 comments on commit 0c9d526

Please sign in to comment.