Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Fix database is locked error (#873)
Browse files Browse the repository at this point in the history
* Fix database is locked error

* Fix typo
  • Loading branch information
albrow authored Jul 24, 2020
1 parent 0acb901 commit 5c75523
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 29 deletions.
102 changes: 73 additions & 29 deletions db/sql_implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math/big"
"os"
"path/filepath"
"sync"

"github.com/0xProject/0x-mesh/common/types"
"github.com/0xProject/0x-mesh/db/sqltypes"
Expand All @@ -32,6 +33,11 @@ type DB struct {
ctx context.Context
sqldb *sqlz.DB
opts *Options

// mu is used to protect all reads and writes to the database. This is a solution to the
// `database is locked` error that appears on SQLite. https://github.com/mattn/go-sqlite3/issues/607
// TODO(albrow): Make this mutex optional since not all SQL implementations need it.
mu sync.RWMutex
}

func defaultOptions() *Options {
Expand Down Expand Up @@ -262,15 +268,25 @@ func (db *DB) migrate() error {
return convertErr(err)
}

// ReadWriteTransactionalContext acquires a write lock, executes the transaction, then immediately releases the lock.
func (db *DB) ReadWriteTransactionalContext(ctx context.Context, opts *sql.TxOptions, f func(tx *sqlz.Tx) error) error {
db.mu.Lock()
defer db.mu.Unlock()
return db.sqldb.TransactionalContext(ctx, opts, f)
}

func (db *DB) AddOrders(orders []*types.OrderWithMetadata) (added []*types.OrderWithMetadata, removed []*types.OrderWithMetadata, err error) {
defer func() {
err = convertErr(err)
}()

sqlOrders := sqltypes.OrdersFromCommonType(orders)
addedMap := map[common.Hash]*types.OrderWithMetadata{}
err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
for _, order := range orders {
result, err := txn.NamedExecContext(db.ctx, insertOrderQuery, sqltypes.OrderFromCommonType(order))
sqlRemoved := []*sqltypes.Order{}

err = db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
for i, order := range sqlOrders {
result, err := txn.NamedExecContext(db.ctx, insertOrderQuery, order)
if err != nil {
return err
}
Expand All @@ -279,7 +295,7 @@ func (db *DB) AddOrders(orders []*types.OrderWithMetadata) (added []*types.Order
return err
}
if affected > 0 {
addedMap[order.Hash] = order
addedMap[order.Hash] = orders[i]
}
}

Expand Down Expand Up @@ -308,29 +324,32 @@ func (db *DB) AddOrders(orders []*types.OrderWithMetadata) (added []*types.Order
// the added set and don't add it to the removed set.
delete(addedMap, order.Hash)
} else {
removed = append(removed, sqltypes.OrderToCommonType(order))
sqlRemoved = append(sqlRemoved, order)
}
}
return nil
})
if err != nil {
return nil, nil, err
}

for _, order := range addedMap {
added = append(added, order)
}

return added, removed, nil
return added, sqltypes.OrdersToCommonType(sqlRemoved), nil
}

func (db *DB) GetOrder(hash common.Hash) (order *types.OrderWithMetadata, err error) {
defer func() {
err = convertErr(err)
}()
var foundOrder sqltypes.Order
db.mu.RLock()
if err := db.sqldb.GetContext(db.ctx, &foundOrder, "SELECT * FROM orders WHERE hash = $1", hash); err != nil {
db.mu.RUnlock()
return nil, err
}
db.mu.RUnlock()
return sqltypes.OrderToCommonType(&foundOrder), nil
}

Expand All @@ -346,7 +365,10 @@ func (db *DB) FindOrders(query *OrderQuery) (orders []*types.OrderWithMetadata,
return nil, err
}
var foundOrders []*sqltypes.Order
if err := stmt.GetAllContext(db.ctx, &foundOrders); err != nil {
db.mu.RLock()
err = stmt.GetAllContext(db.ctx, &foundOrders)
db.mu.RUnlock()
if err != nil {
return nil, err
}
return sqltypes.OrdersToCommonType(foundOrders), nil
Expand All @@ -363,7 +385,9 @@ func (db *DB) CountOrders(query *OrderQuery) (count int, err error) {
if err != nil {
return 0, err
}
db.mu.RLock()
gotCount, err := stmt.GetCount()
db.mu.RUnlock()
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -441,7 +465,10 @@ func whereConditionsFromOrderFilterOpts(filterOpts []OrderFilter) ([]sqlz.WhereC
}

func (db *DB) DeleteOrder(hash common.Hash) error {
if _, err := db.sqldb.ExecContext(db.ctx, "DELETE FROM orders WHERE hash = $1", hash); err != nil {
db.mu.Lock()
_, err := db.sqldb.ExecContext(db.ctx, "DELETE FROM orders WHERE hash = $1", hash)
db.mu.Unlock()
if err != nil {
return convertErr(err)
}
return nil
Expand All @@ -458,7 +485,7 @@ func (db *DB) DeleteOrders(query *OrderQuery) (deleted []*types.OrderWithMetadat
// for DELETE statements. It also doesn't support RETURNING. As a
// workaround, we do a SELECT and DELETE inside a transaction.
var ordersToDelete []*sqltypes.Order
err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
err = db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
stmt, err := addOptsToSelectOrdersQuery(txn.Select("*").From("orders"), query)
if err != nil {
return err
Expand Down Expand Up @@ -488,7 +515,7 @@ func (db *DB) UpdateOrder(hash common.Hash, updateFunc func(existingOrder *types
return errors.New("db.UpdateOrders: updateFunc cannot be nil")
}

return db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
return db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
var existingOrder sqltypes.Order
if err := txn.GetContext(db.ctx, &existingOrder, "SELECT * FROM orders WHERE hash = $1", hash); err != nil {
if err == sql.ErrNoRows {
Expand Down Expand Up @@ -516,10 +543,13 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi
err = convertErr(err)
}()

sqlMiniHeaders := sqltypes.MiniHeadersFromCommonType(miniHeaders)
addedMap := map[common.Hash]*types.MiniHeader{}
err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
for _, miniHeader := range miniHeaders {
result, err := txn.NamedExecContext(db.ctx, insertMiniHeaderQuery, sqltypes.MiniHeaderFromCommonType(miniHeader))
sqlRemoved := []*sqltypes.MiniHeader{}

err = db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
for i, miniHeader := range sqlMiniHeaders {
result, err := txn.NamedExecContext(db.ctx, insertMiniHeaderQuery, miniHeader)
if err != nil {
return err
}
Expand All @@ -528,7 +558,7 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi
return err
}
if affected > 0 {
addedMap[miniHeader.Hash] = miniHeader
addedMap[miniHeader.Hash] = miniHeaders[i]
}
}

Expand All @@ -552,19 +582,19 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi
// the added set and don't add it to the removed set.
delete(addedMap, miniHeader.Hash)
} else {
removed = append(removed, sqltypes.MiniHeaderToCommonType(miniHeader))
sqlRemoved = append(sqlRemoved, miniHeader)
}
}
return nil
})
if err != nil {
return nil, nil, err
}

for _, miniHeader := range addedMap {
added = append(added, miniHeader)
}

return added, removed, nil
return added, sqltypes.MiniHeadersToCommonType(sqlRemoved), nil
}

// ResetMiniHeaders deletes all of the existing miniheaders and then stores new
Expand All @@ -574,13 +604,14 @@ func (db *DB) ResetMiniHeaders(newMiniHeaders []*types.MiniHeader) (err error) {
err = convertErr(err)
}()

err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
sqlNewMiniHeaders := sqltypes.MiniHeadersFromCommonType(newMiniHeaders)
err = db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
_, err := txn.DeleteFrom("miniHeaders").ExecContext(db.ctx)
if err != nil {
return err
}
for _, newMiniHeader := range newMiniHeaders {
_, err := txn.NamedExecContext(db.ctx, insertMiniHeaderQuery, sqltypes.MiniHeaderFromCommonType(newMiniHeader))
for _, newMiniHeader := range sqlNewMiniHeaders {
_, err := txn.NamedExecContext(db.ctx, insertMiniHeaderQuery, newMiniHeader)
if err != nil {
return err
}
Expand All @@ -595,7 +626,10 @@ func (db *DB) GetMiniHeader(hash common.Hash) (miniHeader *types.MiniHeader, err
err = convertErr(err)
}()
var sqlMiniHeader sqltypes.MiniHeader
if err := db.sqldb.GetContext(db.ctx, &sqlMiniHeader, "SELECT * FROM miniHeaders WHERE hash = $1", hash); err != nil {
db.mu.RLock()
err = db.sqldb.GetContext(db.ctx, &sqlMiniHeader, "SELECT * FROM miniHeaders WHERE hash = $1", hash)
db.mu.RUnlock()
if err != nil {
if err == sql.ErrNoRows {
return nil, ErrNotFound
}
Expand All @@ -613,7 +647,10 @@ func (db *DB) FindMiniHeaders(query *MiniHeaderQuery) (miniHeaders []*types.Mini
return nil, err
}
var sqlMiniHeaders []*sqltypes.MiniHeader
if err := stmt.GetAllContext(db.ctx, &sqlMiniHeaders); err != nil {
db.mu.RLock()
err = stmt.GetAllContext(db.ctx, &sqlMiniHeaders)
db.mu.RUnlock()
if err != nil {
return nil, err
}
return sqltypes.MiniHeadersToCommonType(sqlMiniHeaders), nil
Expand Down Expand Up @@ -690,7 +727,10 @@ func whereConditionsFromMiniHeaderFilterOpts(filterOpts []MiniHeaderFilter) ([]s
}

func (db *DB) DeleteMiniHeader(hash common.Hash) error {
if _, err := db.sqldb.ExecContext(db.ctx, "DELETE FROM miniHeaders WHERE hash = $1", hash); err != nil {
db.mu.Lock()
_, err := db.sqldb.ExecContext(db.ctx, "DELETE FROM miniHeaders WHERE hash = $1", hash)
db.mu.Unlock()
if err != nil {
return convertErr(err)
}
return nil
Expand All @@ -704,7 +744,7 @@ func (db *DB) DeleteMiniHeaders(query *MiniHeaderQuery) (deleted []*types.MiniHe
// for DELETE statements. It also doesn't support RETURNING. As a
// workaround, we do a SELECT and DELETE inside a transaction.
var miniHeadersToDelete []*sqltypes.MiniHeader
err = db.sqldb.TransactionalContext(db.ctx, nil, func(tx *sqlz.Tx) error {
err = db.ReadWriteTransactionalContext(db.ctx, nil, func(tx *sqlz.Tx) error {
stmt, err := findMiniHeadersQueryFromOpts(tx, query)
if err != nil {
return err
Expand All @@ -729,7 +769,10 @@ func (db *DB) DeleteMiniHeaders(query *MiniHeaderQuery) (deleted []*types.MiniHe
// GetMetadata returns the metadata (or db.ErrNotFound if no metadata has been saved).
func (db *DB) GetMetadata() (*types.Metadata, error) {
var metadata sqltypes.Metadata
if err := db.sqldb.GetContext(db.ctx, &metadata, "SELECT * FROM metadata LIMIT 1"); err != nil {
db.mu.RLock()
err := db.sqldb.GetContext(db.ctx, &metadata, "SELECT * FROM metadata LIMIT 1")
db.mu.RUnlock()
if err != nil {
return nil, convertErr(err)
}
return sqltypes.MetadataToCommonType(&metadata), nil
Expand All @@ -742,7 +785,8 @@ func (db *DB) SaveMetadata(metadata *types.Metadata) (err error) {
defer func() {
err = convertErr(err)
}()
err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
sqlMetadata := sqltypes.MetadataFromCommonType(metadata)
err = db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
query := db.sqldb.Select("COUNT(*)").From("metadata")
count, err := query.GetCount()
if err != nil {
Expand All @@ -751,7 +795,7 @@ func (db *DB) SaveMetadata(metadata *types.Metadata) (err error) {
if count != 0 {
return ErrMetadataAlreadyExists
}
_, err = db.sqldb.NamedExecContext(db.ctx, insertMetadataQuery, sqltypes.MetadataFromCommonType(metadata))
_, err = db.sqldb.NamedExecContext(db.ctx, insertMetadataQuery, sqlMetadata)
return err
})
return err
Expand All @@ -768,7 +812,7 @@ func (db *DB) UpdateMetadata(updateFunc func(oldmetadata *types.Metadata) (newMe
return errors.New("db.UpdateMetadata: updateFunc cannot be nil")
}

return db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
return db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
var existingMetadata sqltypes.Metadata
if err := txn.GetContext(db.ctx, &existingMetadata, "SELECT * FROM metadata LIMIT 1"); err != nil {
if err == sql.ErrNoRows {
Expand Down
16 changes: 16 additions & 0 deletions db/sqltypes/sqltypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,14 @@ func OrderFromCommonType(order *types.OrderWithMetadata) *Order {
}
}

func OrdersFromCommonType(orders []*types.OrderWithMetadata) []*Order {
result := make([]*Order, len(orders))
for i, order := range orders {
result[i] = OrderFromCommonType(order)
}
return result
}

func OrdersToCommonType(orders []*Order) []*types.OrderWithMetadata {
result := make([]*types.OrderWithMetadata, len(orders))
for i, order := range orders {
Expand Down Expand Up @@ -459,6 +467,14 @@ func MiniHeadersToCommonType(miniHeaders []*MiniHeader) []*types.MiniHeader {
return result
}

func MiniHeadersFromCommonType(miniHeaders []*types.MiniHeader) []*MiniHeader {
result := make([]*MiniHeader, len(miniHeaders))
for i, miniHeader := range miniHeaders {
result[i] = MiniHeaderFromCommonType(miniHeader)
}
return result
}

func MetadataToCommonType(metadata *Metadata) *types.Metadata {
if metadata == nil {
return nil
Expand Down

0 comments on commit 5c75523

Please sign in to comment.