Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Fix database is locked error #873

Merged
merged 2 commits into from
Jul 24, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 73 additions & 29 deletions db/sql_implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math/big"
"os"
"path/filepath"
"sync"

"github.com/0xProject/0x-mesh/common/types"
"github.com/0xProject/0x-mesh/db/sqltypes"
Expand All @@ -32,6 +33,11 @@ type DB struct {
ctx context.Context
sqldb *sqlz.DB
opts *Options

// mu is used to protect all reads and writes to the database. This is a solution to the
// `database is locked` error that appears on SQLite. https://github.com/mattn/go-sqlite3/issues/607
// TODO(albrow): Make this mutex optional since not all SQL implementations need it.
mu sync.RWMutex
}

func defaultOptions() *Options {
Expand Down Expand Up @@ -262,15 +268,25 @@ func (db *DB) migrate() error {
return convertErr(err)
}

// ReadWriteTransactionalContext acqires a write lock, executes the transaction, then immediately releases the lock.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: acqires => acquires

func (db *DB) ReadWriteTransactionalContext(ctx context.Context, opts *sql.TxOptions, f func(tx *sqlz.Tx) error) error {
db.mu.Lock()
defer db.mu.Unlock()
return db.sqldb.TransactionalContext(ctx, opts, f)
}

func (db *DB) AddOrders(orders []*types.OrderWithMetadata) (added []*types.OrderWithMetadata, removed []*types.OrderWithMetadata, err error) {
defer func() {
err = convertErr(err)
}()

sqlOrders := sqltypes.OrdersFromCommonType(orders)
addedMap := map[common.Hash]*types.OrderWithMetadata{}
err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
for _, order := range orders {
result, err := txn.NamedExecContext(db.ctx, insertOrderQuery, sqltypes.OrderFromCommonType(order))
sqlRemoved := []*sqltypes.Order{}

err = db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
for i, order := range sqlOrders {
result, err := txn.NamedExecContext(db.ctx, insertOrderQuery, order)
if err != nil {
return err
}
Expand All @@ -279,7 +295,7 @@ func (db *DB) AddOrders(orders []*types.OrderWithMetadata) (added []*types.Order
return err
}
if affected > 0 {
addedMap[order.Hash] = order
addedMap[order.Hash] = orders[i]
}
}

Expand Down Expand Up @@ -308,29 +324,32 @@ func (db *DB) AddOrders(orders []*types.OrderWithMetadata) (added []*types.Order
// the added set and don't add it to the removed set.
delete(addedMap, order.Hash)
} else {
removed = append(removed, sqltypes.OrderToCommonType(order))
sqlRemoved = append(sqlRemoved, order)
}
}
return nil
})
if err != nil {
return nil, nil, err
}

for _, order := range addedMap {
added = append(added, order)
}

return added, removed, nil
return added, sqltypes.OrdersToCommonType(sqlRemoved), nil
}

func (db *DB) GetOrder(hash common.Hash) (order *types.OrderWithMetadata, err error) {
defer func() {
err = convertErr(err)
}()
var foundOrder sqltypes.Order
db.mu.RLock()
if err := db.sqldb.GetContext(db.ctx, &foundOrder, "SELECT * FROM orders WHERE hash = $1", hash); err != nil {
db.mu.RUnlock()
return nil, err
}
db.mu.RUnlock()
return sqltypes.OrderToCommonType(&foundOrder), nil
}

Expand All @@ -346,7 +365,10 @@ func (db *DB) FindOrders(query *OrderQuery) (orders []*types.OrderWithMetadata,
return nil, err
}
var foundOrders []*sqltypes.Order
if err := stmt.GetAllContext(db.ctx, &foundOrders); err != nil {
db.mu.RLock()
err = stmt.GetAllContext(db.ctx, &foundOrders)
db.mu.RUnlock()
if err != nil {
return nil, err
}
return sqltypes.OrdersToCommonType(foundOrders), nil
Expand All @@ -363,7 +385,9 @@ func (db *DB) CountOrders(query *OrderQuery) (count int, err error) {
if err != nil {
return 0, err
}
db.mu.RLock()
gotCount, err := stmt.GetCount()
db.mu.RUnlock()
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -441,7 +465,10 @@ func whereConditionsFromOrderFilterOpts(filterOpts []OrderFilter) ([]sqlz.WhereC
}

func (db *DB) DeleteOrder(hash common.Hash) error {
if _, err := db.sqldb.ExecContext(db.ctx, "DELETE FROM orders WHERE hash = $1", hash); err != nil {
db.mu.Lock()
_, err := db.sqldb.ExecContext(db.ctx, "DELETE FROM orders WHERE hash = $1", hash)
db.mu.Unlock()
if err != nil {
return convertErr(err)
}
return nil
Expand All @@ -458,7 +485,7 @@ func (db *DB) DeleteOrders(query *OrderQuery) (deleted []*types.OrderWithMetadat
// for DELETE statements. It also doesn't support RETURNING. As a
// workaround, we do a SELECT and DELETE inside a transaction.
var ordersToDelete []*sqltypes.Order
err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
err = db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
stmt, err := addOptsToSelectOrdersQuery(txn.Select("*").From("orders"), query)
if err != nil {
return err
Expand Down Expand Up @@ -488,7 +515,7 @@ func (db *DB) UpdateOrder(hash common.Hash, updateFunc func(existingOrder *types
return errors.New("db.UpdateOrders: updateFunc cannot be nil")
}

return db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
return db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
var existingOrder sqltypes.Order
if err := txn.GetContext(db.ctx, &existingOrder, "SELECT * FROM orders WHERE hash = $1", hash); err != nil {
if err == sql.ErrNoRows {
Expand Down Expand Up @@ -516,10 +543,13 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi
err = convertErr(err)
}()

sqlMiniHeaders := sqltypes.MiniHeadersFromCommonType(miniHeaders)
addedMap := map[common.Hash]*types.MiniHeader{}
err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
for _, miniHeader := range miniHeaders {
result, err := txn.NamedExecContext(db.ctx, insertMiniHeaderQuery, sqltypes.MiniHeaderFromCommonType(miniHeader))
sqlRemoved := []*sqltypes.MiniHeader{}

err = db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
for i, miniHeader := range sqlMiniHeaders {
result, err := txn.NamedExecContext(db.ctx, insertMiniHeaderQuery, miniHeader)
if err != nil {
return err
}
Expand All @@ -528,7 +558,7 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi
return err
}
if affected > 0 {
addedMap[miniHeader.Hash] = miniHeader
addedMap[miniHeader.Hash] = miniHeaders[i]
}
}

Expand All @@ -552,19 +582,19 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi
// the added set and don't add it to the removed set.
delete(addedMap, miniHeader.Hash)
} else {
removed = append(removed, sqltypes.MiniHeaderToCommonType(miniHeader))
sqlRemoved = append(sqlRemoved, miniHeader)
}
}
return nil
})
if err != nil {
return nil, nil, err
}

for _, miniHeader := range addedMap {
added = append(added, miniHeader)
}

return added, removed, nil
return added, sqltypes.MiniHeadersToCommonType(sqlRemoved), nil
}

// ResetMiniHeaders deletes all of the existing miniheaders and then stores new
Expand All @@ -574,13 +604,14 @@ func (db *DB) ResetMiniHeaders(newMiniHeaders []*types.MiniHeader) (err error) {
err = convertErr(err)
}()

err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
sqlNewMiniHeaders := sqltypes.MiniHeadersFromCommonType(newMiniHeaders)
err = db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
_, err := txn.DeleteFrom("miniHeaders").ExecContext(db.ctx)
if err != nil {
return err
}
for _, newMiniHeader := range newMiniHeaders {
_, err := txn.NamedExecContext(db.ctx, insertMiniHeaderQuery, sqltypes.MiniHeaderFromCommonType(newMiniHeader))
for _, newMiniHeader := range sqlNewMiniHeaders {
_, err := txn.NamedExecContext(db.ctx, insertMiniHeaderQuery, newMiniHeader)
if err != nil {
return err
}
Expand All @@ -595,7 +626,10 @@ func (db *DB) GetMiniHeader(hash common.Hash) (miniHeader *types.MiniHeader, err
err = convertErr(err)
}()
var sqlMiniHeader sqltypes.MiniHeader
if err := db.sqldb.GetContext(db.ctx, &sqlMiniHeader, "SELECT * FROM miniHeaders WHERE hash = $1", hash); err != nil {
db.mu.RLock()
err = db.sqldb.GetContext(db.ctx, &sqlMiniHeader, "SELECT * FROM miniHeaders WHERE hash = $1", hash)
db.mu.RUnlock()
if err != nil {
if err == sql.ErrNoRows {
return nil, ErrNotFound
}
Expand All @@ -613,7 +647,10 @@ func (db *DB) FindMiniHeaders(query *MiniHeaderQuery) (miniHeaders []*types.Mini
return nil, err
}
var sqlMiniHeaders []*sqltypes.MiniHeader
if err := stmt.GetAllContext(db.ctx, &sqlMiniHeaders); err != nil {
db.mu.RLock()
err = stmt.GetAllContext(db.ctx, &sqlMiniHeaders)
db.mu.RUnlock()
if err != nil {
return nil, err
}
return sqltypes.MiniHeadersToCommonType(sqlMiniHeaders), nil
Expand Down Expand Up @@ -690,7 +727,10 @@ func whereConditionsFromMiniHeaderFilterOpts(filterOpts []MiniHeaderFilter) ([]s
}

func (db *DB) DeleteMiniHeader(hash common.Hash) error {
if _, err := db.sqldb.ExecContext(db.ctx, "DELETE FROM miniHeaders WHERE hash = $1", hash); err != nil {
db.mu.Lock()
_, err := db.sqldb.ExecContext(db.ctx, "DELETE FROM miniHeaders WHERE hash = $1", hash)
db.mu.Unlock()
if err != nil {
return convertErr(err)
}
return nil
Expand All @@ -704,7 +744,7 @@ func (db *DB) DeleteMiniHeaders(query *MiniHeaderQuery) (deleted []*types.MiniHe
// for DELETE statements. It also doesn't support RETURNING. As a
// workaround, we do a SELECT and DELETE inside a transaction.
var miniHeadersToDelete []*sqltypes.MiniHeader
err = db.sqldb.TransactionalContext(db.ctx, nil, func(tx *sqlz.Tx) error {
err = db.ReadWriteTransactionalContext(db.ctx, nil, func(tx *sqlz.Tx) error {
stmt, err := findMiniHeadersQueryFromOpts(tx, query)
if err != nil {
return err
Expand All @@ -729,7 +769,10 @@ func (db *DB) DeleteMiniHeaders(query *MiniHeaderQuery) (deleted []*types.MiniHe
// GetMetadata returns the metadata (or db.ErrNotFound if no metadata has been saved).
func (db *DB) GetMetadata() (*types.Metadata, error) {
var metadata sqltypes.Metadata
if err := db.sqldb.GetContext(db.ctx, &metadata, "SELECT * FROM metadata LIMIT 1"); err != nil {
db.mu.RLock()
err := db.sqldb.GetContext(db.ctx, &metadata, "SELECT * FROM metadata LIMIT 1")
db.mu.RUnlock()
if err != nil {
return nil, convertErr(err)
}
return sqltypes.MetadataToCommonType(&metadata), nil
Expand All @@ -742,7 +785,8 @@ func (db *DB) SaveMetadata(metadata *types.Metadata) (err error) {
defer func() {
err = convertErr(err)
}()
err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
sqlMetadata := sqltypes.MetadataFromCommonType(metadata)
err = db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
query := db.sqldb.Select("COUNT(*)").From("metadata")
count, err := query.GetCount()
if err != nil {
Expand All @@ -751,7 +795,7 @@ func (db *DB) SaveMetadata(metadata *types.Metadata) (err error) {
if count != 0 {
return ErrMetadataAlreadyExists
}
_, err = db.sqldb.NamedExecContext(db.ctx, insertMetadataQuery, sqltypes.MetadataFromCommonType(metadata))
_, err = db.sqldb.NamedExecContext(db.ctx, insertMetadataQuery, sqlMetadata)
return err
})
return err
Expand All @@ -768,7 +812,7 @@ func (db *DB) UpdateMetadata(updateFunc func(oldmetadata *types.Metadata) (newMe
return errors.New("db.UpdateMetadata: updateFunc cannot be nil")
}

return db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
return db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
var existingMetadata sqltypes.Metadata
if err := txn.GetContext(db.ctx, &existingMetadata, "SELECT * FROM metadata LIMIT 1"); err != nil {
if err == sql.ErrNoRows {
Expand Down
16 changes: 16 additions & 0 deletions db/sqltypes/sqltypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,14 @@ func OrderFromCommonType(order *types.OrderWithMetadata) *Order {
}
}

func OrdersFromCommonType(orders []*types.OrderWithMetadata) []*Order {
result := make([]*Order, len(orders))
for i, order := range orders {
result[i] = OrderFromCommonType(order)
}
return result
}

func OrdersToCommonType(orders []*Order) []*types.OrderWithMetadata {
result := make([]*types.OrderWithMetadata, len(orders))
for i, order := range orders {
Expand Down Expand Up @@ -459,6 +467,14 @@ func MiniHeadersToCommonType(miniHeaders []*MiniHeader) []*types.MiniHeader {
return result
}

func MiniHeadersFromCommonType(miniHeaders []*types.MiniHeader) []*MiniHeader {
result := make([]*MiniHeader, len(miniHeaders))
for i, miniHeader := range miniHeaders {
result[i] = MiniHeaderFromCommonType(miniHeader)
}
return result
}

func MetadataToCommonType(metadata *Metadata) *types.Metadata {
if metadata == nil {
return nil
Expand Down