From 5c75523a690572618cb65cf623b8e8ef389ccd0f Mon Sep 17 00:00:00 2001 From: Alex Browne Date: Fri, 24 Jul 2020 15:45:46 -0700 Subject: [PATCH] Fix database is locked error (#873) * Fix database is locked error * Fix typo --- db/sql_implementation.go | 102 ++++++++++++++++++++++++++++----------- db/sqltypes/sqltypes.go | 16 ++++++ 2 files changed, 89 insertions(+), 29 deletions(-) diff --git a/db/sql_implementation.go b/db/sql_implementation.go index 01fd26425..fc44d1a9a 100644 --- a/db/sql_implementation.go +++ b/db/sql_implementation.go @@ -11,6 +11,7 @@ import ( "math/big" "os" "path/filepath" + "sync" "github.com/0xProject/0x-mesh/common/types" "github.com/0xProject/0x-mesh/db/sqltypes" @@ -32,6 +33,11 @@ type DB struct { ctx context.Context sqldb *sqlz.DB opts *Options + + // mu is used to protect all reads and writes to the database. This is a solution to the + // `database is locked` error that appears on SQLite. https://github.com/mattn/go-sqlite3/issues/607 + // TODO(albrow): Make this mutex optional since not all SQL implementations need it. + mu sync.RWMutex } func defaultOptions() *Options { @@ -262,15 +268,25 @@ func (db *DB) migrate() error { return convertErr(err) } +// ReadWriteTransactionalContext acquires a write lock, executes the transaction, then immediately releases the lock. +func (db *DB) ReadWriteTransactionalContext(ctx context.Context, opts *sql.TxOptions, f func(tx *sqlz.Tx) error) error { + db.mu.Lock() + defer db.mu.Unlock() + return db.sqldb.TransactionalContext(ctx, opts, f) +} + func (db *DB) AddOrders(orders []*types.OrderWithMetadata) (added []*types.OrderWithMetadata, removed []*types.OrderWithMetadata, err error) { defer func() { err = convertErr(err) }() + sqlOrders := sqltypes.OrdersFromCommonType(orders) addedMap := map[common.Hash]*types.OrderWithMetadata{} - err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error { - for _, order := range orders { - result, err := txn.NamedExecContext(db.ctx, insertOrderQuery, sqltypes.OrderFromCommonType(order)) + sqlRemoved := []*sqltypes.Order{} + + err = db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error { + for i, order := range sqlOrders { + result, err := txn.NamedExecContext(db.ctx, insertOrderQuery, order) if err != nil { return err } @@ -279,7 +295,7 @@ func (db *DB) AddOrders(orders []*types.OrderWithMetadata) (added []*types.Order return err } if affected > 0 { - addedMap[order.Hash] = order + addedMap[order.Hash] = orders[i] } } @@ -308,7 +324,7 @@ func (db *DB) AddOrders(orders []*types.OrderWithMetadata) (added []*types.Order // the added set and don't add it to the removed set. delete(addedMap, order.Hash) } else { - removed = append(removed, sqltypes.OrderToCommonType(order)) + sqlRemoved = append(sqlRemoved, order) } } return nil @@ -316,11 +332,11 @@ func (db *DB) AddOrders(orders []*types.OrderWithMetadata) (added []*types.Order if err != nil { return nil, nil, err } + for _, order := range addedMap { added = append(added, order) } - - return added, removed, nil + return added, sqltypes.OrdersToCommonType(sqlRemoved), nil } func (db *DB) GetOrder(hash common.Hash) (order *types.OrderWithMetadata, err error) { @@ -328,9 +344,12 @@ func (db *DB) GetOrder(hash common.Hash) (order *types.OrderWithMetadata, err er err = convertErr(err) }() var foundOrder sqltypes.Order + db.mu.RLock() if err := db.sqldb.GetContext(db.ctx, &foundOrder, "SELECT * FROM orders WHERE hash = $1", hash); err != nil { + db.mu.RUnlock() return nil, err } + db.mu.RUnlock() return sqltypes.OrderToCommonType(&foundOrder), nil } @@ -346,7 +365,10 @@ func (db *DB) FindOrders(query *OrderQuery) (orders []*types.OrderWithMetadata, return nil, err } var foundOrders []*sqltypes.Order - if err := stmt.GetAllContext(db.ctx, &foundOrders); err != nil { + db.mu.RLock() + err = stmt.GetAllContext(db.ctx, &foundOrders) + db.mu.RUnlock() + if err != nil { return nil, err } return sqltypes.OrdersToCommonType(foundOrders), nil @@ -363,7 +385,9 @@ func (db *DB) CountOrders(query *OrderQuery) (count int, err error) { if err != nil { return 0, err } + db.mu.RLock() gotCount, err := stmt.GetCount() + db.mu.RUnlock() if err != nil { return 0, err } @@ -441,7 +465,10 @@ func whereConditionsFromOrderFilterOpts(filterOpts []OrderFilter) ([]sqlz.WhereC } func (db *DB) DeleteOrder(hash common.Hash) error { - if _, err := db.sqldb.ExecContext(db.ctx, "DELETE FROM orders WHERE hash = $1", hash); err != nil { + db.mu.Lock() + _, err := db.sqldb.ExecContext(db.ctx, "DELETE FROM orders WHERE hash = $1", hash) + db.mu.Unlock() + if err != nil { return convertErr(err) } return nil @@ -458,7 +485,7 @@ func (db *DB) DeleteOrders(query *OrderQuery) (deleted []*types.OrderWithMetadat // for DELETE statements. It also doesn't support RETURNING. As a // workaround, we do a SELECT and DELETE inside a transaction. var ordersToDelete []*sqltypes.Order - err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error { + err = db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error { stmt, err := addOptsToSelectOrdersQuery(txn.Select("*").From("orders"), query) if err != nil { return err @@ -488,7 +515,7 @@ func (db *DB) UpdateOrder(hash common.Hash, updateFunc func(existingOrder *types return errors.New("db.UpdateOrders: updateFunc cannot be nil") } - return db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error { + return db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error { var existingOrder sqltypes.Order if err := txn.GetContext(db.ctx, &existingOrder, "SELECT * FROM orders WHERE hash = $1", hash); err != nil { if err == sql.ErrNoRows { @@ -516,10 +543,13 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi err = convertErr(err) }() + sqlMiniHeaders := sqltypes.MiniHeadersFromCommonType(miniHeaders) addedMap := map[common.Hash]*types.MiniHeader{} - err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error { - for _, miniHeader := range miniHeaders { - result, err := txn.NamedExecContext(db.ctx, insertMiniHeaderQuery, sqltypes.MiniHeaderFromCommonType(miniHeader)) + sqlRemoved := []*sqltypes.MiniHeader{} + + err = db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error { + for i, miniHeader := range sqlMiniHeaders { + result, err := txn.NamedExecContext(db.ctx, insertMiniHeaderQuery, miniHeader) if err != nil { return err } @@ -528,7 +558,7 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi return err } if affected > 0 { - addedMap[miniHeader.Hash] = miniHeader + addedMap[miniHeader.Hash] = miniHeaders[i] } } @@ -552,7 +582,7 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi // the added set and don't add it to the removed set. delete(addedMap, miniHeader.Hash) } else { - removed = append(removed, sqltypes.MiniHeaderToCommonType(miniHeader)) + sqlRemoved = append(sqlRemoved, miniHeader) } } return nil @@ -560,11 +590,11 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi if err != nil { return nil, nil, err } + for _, miniHeader := range addedMap { added = append(added, miniHeader) } - - return added, removed, nil + return added, sqltypes.MiniHeadersToCommonType(sqlRemoved), nil } // ResetMiniHeaders deletes all of the existing miniheaders and then stores new @@ -574,13 +604,14 @@ func (db *DB) ResetMiniHeaders(newMiniHeaders []*types.MiniHeader) (err error) { err = convertErr(err) }() - err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error { + sqlNewMiniHeaders := sqltypes.MiniHeadersFromCommonType(newMiniHeaders) + err = db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error { _, err := txn.DeleteFrom("miniHeaders").ExecContext(db.ctx) if err != nil { return err } - for _, newMiniHeader := range newMiniHeaders { - _, err := txn.NamedExecContext(db.ctx, insertMiniHeaderQuery, sqltypes.MiniHeaderFromCommonType(newMiniHeader)) + for _, newMiniHeader := range sqlNewMiniHeaders { + _, err := txn.NamedExecContext(db.ctx, insertMiniHeaderQuery, newMiniHeader) if err != nil { return err } @@ -595,7 +626,10 @@ func (db *DB) GetMiniHeader(hash common.Hash) (miniHeader *types.MiniHeader, err err = convertErr(err) }() var sqlMiniHeader sqltypes.MiniHeader - if err := db.sqldb.GetContext(db.ctx, &sqlMiniHeader, "SELECT * FROM miniHeaders WHERE hash = $1", hash); err != nil { + db.mu.RLock() + err = db.sqldb.GetContext(db.ctx, &sqlMiniHeader, "SELECT * FROM miniHeaders WHERE hash = $1", hash) + db.mu.RUnlock() + if err != nil { if err == sql.ErrNoRows { return nil, ErrNotFound } @@ -613,7 +647,10 @@ func (db *DB) FindMiniHeaders(query *MiniHeaderQuery) (miniHeaders []*types.Mini return nil, err } var sqlMiniHeaders []*sqltypes.MiniHeader - if err := stmt.GetAllContext(db.ctx, &sqlMiniHeaders); err != nil { + db.mu.RLock() + err = stmt.GetAllContext(db.ctx, &sqlMiniHeaders) + db.mu.RUnlock() + if err != nil { return nil, err } return sqltypes.MiniHeadersToCommonType(sqlMiniHeaders), nil @@ -690,7 +727,10 @@ func whereConditionsFromMiniHeaderFilterOpts(filterOpts []MiniHeaderFilter) ([]s } func (db *DB) DeleteMiniHeader(hash common.Hash) error { - if _, err := db.sqldb.ExecContext(db.ctx, "DELETE FROM miniHeaders WHERE hash = $1", hash); err != nil { + db.mu.Lock() + _, err := db.sqldb.ExecContext(db.ctx, "DELETE FROM miniHeaders WHERE hash = $1", hash) + db.mu.Unlock() + if err != nil { return convertErr(err) } return nil @@ -704,7 +744,7 @@ func (db *DB) DeleteMiniHeaders(query *MiniHeaderQuery) (deleted []*types.MiniHe // for DELETE statements. It also doesn't support RETURNING. As a // workaround, we do a SELECT and DELETE inside a transaction. var miniHeadersToDelete []*sqltypes.MiniHeader - err = db.sqldb.TransactionalContext(db.ctx, nil, func(tx *sqlz.Tx) error { + err = db.ReadWriteTransactionalContext(db.ctx, nil, func(tx *sqlz.Tx) error { stmt, err := findMiniHeadersQueryFromOpts(tx, query) if err != nil { return err @@ -729,7 +769,10 @@ func (db *DB) DeleteMiniHeaders(query *MiniHeaderQuery) (deleted []*types.MiniHe // GetMetadata returns the metadata (or db.ErrNotFound if no metadata has been saved). func (db *DB) GetMetadata() (*types.Metadata, error) { var metadata sqltypes.Metadata - if err := db.sqldb.GetContext(db.ctx, &metadata, "SELECT * FROM metadata LIMIT 1"); err != nil { + db.mu.RLock() + err := db.sqldb.GetContext(db.ctx, &metadata, "SELECT * FROM metadata LIMIT 1") + db.mu.RUnlock() + if err != nil { return nil, convertErr(err) } return sqltypes.MetadataToCommonType(&metadata), nil @@ -742,7 +785,8 @@ func (db *DB) SaveMetadata(metadata *types.Metadata) (err error) { defer func() { err = convertErr(err) }() - err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error { + sqlMetadata := sqltypes.MetadataFromCommonType(metadata) + err = db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error { query := db.sqldb.Select("COUNT(*)").From("metadata") count, err := query.GetCount() if err != nil { @@ -751,7 +795,7 @@ func (db *DB) SaveMetadata(metadata *types.Metadata) (err error) { if count != 0 { return ErrMetadataAlreadyExists } - _, err = db.sqldb.NamedExecContext(db.ctx, insertMetadataQuery, sqltypes.MetadataFromCommonType(metadata)) + _, err = db.sqldb.NamedExecContext(db.ctx, insertMetadataQuery, sqlMetadata) return err }) return err @@ -768,7 +812,7 @@ func (db *DB) UpdateMetadata(updateFunc func(oldmetadata *types.Metadata) (newMe return errors.New("db.UpdateMetadata: updateFunc cannot be nil") } - return db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error { + return db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error { var existingMetadata sqltypes.Metadata if err := txn.GetContext(db.ctx, &existingMetadata, "SELECT * FROM metadata LIMIT 1"); err != nil { if err == sql.ErrNoRows { diff --git a/db/sqltypes/sqltypes.go b/db/sqltypes/sqltypes.go index 35aa52471..b378a88c1 100644 --- a/db/sqltypes/sqltypes.go +++ b/db/sqltypes/sqltypes.go @@ -369,6 +369,14 @@ func OrderFromCommonType(order *types.OrderWithMetadata) *Order { } } +func OrdersFromCommonType(orders []*types.OrderWithMetadata) []*Order { + result := make([]*Order, len(orders)) + for i, order := range orders { + result[i] = OrderFromCommonType(order) + } + return result +} + func OrdersToCommonType(orders []*Order) []*types.OrderWithMetadata { result := make([]*types.OrderWithMetadata, len(orders)) for i, order := range orders { @@ -459,6 +467,14 @@ func MiniHeadersToCommonType(miniHeaders []*MiniHeader) []*types.MiniHeader { return result } +func MiniHeadersFromCommonType(miniHeaders []*types.MiniHeader) []*MiniHeader { + result := make([]*MiniHeader, len(miniHeaders)) + for i, miniHeader := range miniHeaders { + result[i] = MiniHeaderFromCommonType(miniHeader) + } + return result +} + func MetadataToCommonType(metadata *Metadata) *types.Metadata { if metadata == nil { return nil