Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

using primary and slave DBs to solve the panic problem caused by DB c… #830

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 106 additions & 14 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Bucket struct {
//
// This is non-persisted across transactions so it must be set in every Tx.
FillPercent float64

slave *Bucket
}

// newBucket returns a new bucket associated with a transaction.
Expand Down Expand Up @@ -82,10 +84,20 @@ func (b *Bucket) Cursor() *Cursor {
}
}

// Bucket retrieves a nested bucket by name.
// Bucket retrieves a nested bucket by name with slave.
func (b *Bucket) Bucket(name []byte) *Bucket {
rb := b.bucket(name)
if rb != nil && b.slave != nil {
rb.slave = b.slave.bucket(name)
}

return rb
}

// bucket retrieves a nested bucket by name.
// Returns nil if the bucket does not exist.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) Bucket(name []byte) *Bucket {
func (b *Bucket) bucket(name []byte) *Bucket {
if b.buckets != nil {
if child := b.buckets[string(name)]; child != nil {
return child
Expand Down Expand Up @@ -142,10 +154,20 @@ func (b *Bucket) openBucket(value []byte) *Bucket {
return &child
}

// CreateBucket creates a new bucket at the given key and returns the new bucket.
// CreateBucket creates a new bucket at the given key and returns the new bucket with slave.
func (b *Bucket) CreateBucket(key []byte) (rb *Bucket, err error) {
rb, err = b.createBucket(key)
if err == nil && b.slave != nil {
rb.slave, err = b.slave.createBucket(key)
}

return
}

// createBucket creates a new bucket at the given key and returns the new bucket.
// Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) CreateBucket(key []byte) (rb *Bucket, err error) {
func (b *Bucket) createBucket(key []byte) (rb *Bucket, err error) {
if lg := b.tx.db.Logger(); lg != discardLogger {
lg.Debugf("Creating bucket %q", key)
defer func() {
Expand Down Expand Up @@ -199,10 +221,20 @@ func (b *Bucket) CreateBucket(key []byte) (rb *Bucket, err error) {
return b.Bucket(newKey), nil
}

// CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
// CreateBucketIfNotExists creates a new bucket with slave if it doesn't already exist and returns a reference to it.
func (b *Bucket) CreateBucketIfNotExists(key []byte) (rb *Bucket, err error) {
rb, err = b.createBucketIfNotExists(key)
if err == nil && b.slave != nil {
rb.slave, err = b.slave.createBucketIfNotExists(key)
}

return
}

// createBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
// Returns an error if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) CreateBucketIfNotExists(key []byte) (rb *Bucket, err error) {
func (b *Bucket) createBucketIfNotExists(key []byte) (rb *Bucket, err error) {
if lg := b.tx.db.Logger(); lg != discardLogger {
lg.Debugf("Creating bucket if not exist %q", key)
defer func() {
Expand Down Expand Up @@ -269,8 +301,18 @@ func (b *Bucket) CreateBucketIfNotExists(key []byte) (rb *Bucket, err error) {
}

// DeleteBucket deletes a bucket at the given key.
// Returns an error if the bucket does not exist, or if the key represents a non-bucket value.
func (b *Bucket) DeleteBucket(key []byte) (err error) {
err = b.deleteBucket(key)
if err == nil && b.slave != nil {
err = b.slave.deleteBucket(key)
}

return
}

// deleteBucket deletes a bucket at the given key.
// Returns an error if the bucket does not exist, or if the key represents a non-bucket value.
func (b *Bucket) deleteBucket(key []byte) (err error) {
if lg := b.tx.db.Logger(); lg != discardLogger {
lg.Debugf("Deleting bucket %q", key)
defer func() {
Expand Down Expand Up @@ -327,13 +369,23 @@ func (b *Bucket) DeleteBucket(key []byte) (err error) {
return nil
}

// MoveBucket moves a sub-bucket from the source bucket to the destination bucket.
// MoveBucket moves a sub-bucket from the source bucket to the destination bucket with slave.
func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) (err error) {
err = b.moveBucket(key, dstBucket)
if err == nil && b.slave != nil && dstBucket.slave != nil {
err = b.slave.moveBucket(key, dstBucket.slave)
}

return
}

// moveBucket moves a sub-bucket from the source bucket to the destination bucket.
// Returns an error if
// 1. the sub-bucket cannot be found in the source bucket;
// 2. or the key already exists in the destination bucket;
// 3. or the key represents a non-bucket value;
// 4. the source and destination buckets are the same.
func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) (err error) {
func (b *Bucket) moveBucket(key []byte, dstBucket *Bucket) (err error) {
lg := b.tx.db.Logger()
if lg != discardLogger {
lg.Debugf("Moving bucket %q", key)
Expand Down Expand Up @@ -445,11 +497,21 @@ func (b *Bucket) Get(key []byte) []byte {
return v
}

// Put sets the value for a key in the bucket.
// Put sets the value for a key in the bucket with slave.
func (b *Bucket) Put(key []byte, value []byte) (err error) {
err = b.put(key, value)
if err == nil && b.slave != nil {
err = b.slave.put(key, value)
}

return
}

// put sets the value for a key in the bucket.
// If the key exist then its previous value will be overwritten.
// Supplied value must remain valid for the life of the transaction.
// Returns an error if the bucket was created from a read-only transaction, if the key is blank, if the key is too large, or if the value is too large.
func (b *Bucket) Put(key []byte, value []byte) (err error) {
func (b *Bucket) put(key []byte, value []byte) (err error) {
if lg := b.tx.db.Logger(); lg != discardLogger {
lg.Debugf("Putting key %q", key)
defer func() {
Expand Down Expand Up @@ -493,10 +555,20 @@ func (b *Bucket) Put(key []byte, value []byte) (err error) {
return nil
}

// Delete removes a key from the bucket with slave.
func (b *Bucket) Delete(key []byte) (err error) {
err = b.delete(key)
if err == nil && b.slave != nil {
err = b.slave.delete(key)
}

return
}

// Delete removes a key from the bucket.
// If the key does not exist then nothing is done and a nil error is returned.
// Returns an error if the bucket was created from a read-only transaction.
func (b *Bucket) Delete(key []byte) (err error) {
func (b *Bucket) delete(key []byte) (err error) {
if lg := b.tx.db.Logger(); lg != discardLogger {
lg.Debugf("Deleting key %q", key)
defer func() {
Expand Down Expand Up @@ -539,8 +611,18 @@ func (b *Bucket) Sequence() uint64 {
return b.InSequence()
}

// SetSequence updates the sequence number for the bucket.
// SetSequence updates the sequence number for the bucket with slave.
func (b *Bucket) SetSequence(v uint64) error {
err := b.setSequence(v)
if err == nil && b.slave != nil {
err = b.slave.setSequence(v)
}

return err
}

// SetSequence updates the sequence number for the bucket.
func (b *Bucket) setSequence(v uint64) error {
if b.tx.db == nil {
return errors.ErrTxClosed
} else if !b.Writable() {
Expand All @@ -558,8 +640,18 @@ func (b *Bucket) SetSequence(v uint64) error {
return nil
}

// NextSequence returns an autoincrementing integer for the bucket.
// NextSequence returns an autoincrementing integer for the bucket with slave.
func (b *Bucket) NextSequence() (uint64, error) {
r, err := b.nextSequence()
if err == nil && b.slave != nil {
_, err = b.slave.nextSequence()
}

return r, err
}

// nextSequence returns an autoincrementing integer for the bucket.
func (b *Bucket) nextSequence() (uint64, error) {
if b.tx.db == nil {
return 0, errors.ErrTxClosed
} else if !b.Writable() {
Expand Down
131 changes: 128 additions & 3 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ type DB struct {
// Read only mode.
// When true, Update() and Begin(true) return ErrDatabaseReadOnly immediately.
readOnly bool

slave *DB
}

// Path returns the path to currently open database file.
Expand All @@ -171,11 +173,113 @@ func (db *DB) String() string {
return fmt.Sprintf("DB<%q>", db.path)
}

// Open creates and opens a database at the given path with a given file mode.
// Open creates and opens master database and slave database
func Open(path string, mode os.FileMode, options *Options) (db *DB, err error) {
openSlave := true
if options != nil && options.OpenSlave != nil {
openSlave = *options.OpenSlave
}

var master *DB

master, err = tryOpenMasterDB(path, 0644, options, openSlave)
if err != nil {
return nil, err
}

if !openSlave {
return master, nil
}

slaveDbPath := path + ".slave"
err = copyFile(path, slaveDbPath)
if err != nil {
return nil, err
}

var slave *DB
slave, err = open(slaveDbPath, 0644, options)
if err != nil {
return nil, err
}

master.slave = slave

return master, nil
}

func tryOpenMasterDB(path string, mode os.FileMode, options *Options, openSlave bool) (db *DB, err error) {
slaveDbPath := path + ".slave"
pathBackup := path + ".backup"

defer func() {
if e := recover(); e != nil {
if openSlave {
if _, err := os.Stat(slaveDbPath); err == nil {
if err := os.Rename(slaveDbPath, pathBackup); err != nil {
panic(fmt.Sprintf("rename %s-%s err %v: failed (%v)", slaveDbPath, pathBackup, err, e))
}
} else {
panic(fmt.Sprintf("slave db path %s err %v, by open db %s failed (%v)", slaveDbPath, err, path, e))
}
}

panic(fmt.Sprintf("open db %s failed (%v),rename %s-%s success", path, e, slaveDbPath, pathBackup))
}
}()

if openSlave {
if _, err := os.Stat(pathBackup); err == nil {
if err := os.Rename(pathBackup, path); err != nil {
return nil, err
}
}
}

db, err = open(path, 0644, options)

return
}

func copyFile(src, dst string) error {
os.RemoveAll(dst)

srcFile, err := os.Open(src)
if err != nil {
return err
}
defer srcFile.Close()

tmpDst := dst + ".tmp"
os.RemoveAll(tmpDst)

dstFile, err := os.Create(tmpDst)
if err != nil {
return err
}
defer dstFile.Close()

_, err = io.Copy(dstFile, srcFile)
if err != nil {
return err
}

if err := dstFile.Sync(); err != nil {
return err
}

if err := os.Rename(tmpDst, dst); err != nil {
return err
}

return nil
}

// open creates and opens a database at the given path with a given file mode.
// If the file does not exist then it will be created automatically with a given file mode.
// Passing in nil options will cause Bolt to open the database with the default options.
// Note: For read/write transactions, ensure the owner has write permission on the created/opened database file, e.g. 0600
func Open(path string, mode os.FileMode, options *Options) (db *DB, err error) {
func open(path string, mode os.FileMode, options *Options) (db *DB, err error) {
db = &DB{
opened: true,
}
Expand Down Expand Up @@ -664,10 +768,20 @@ func (db *DB) init() error {
return nil
}

// Close releases all database resources with slave.
func (db *DB) Close() error {
err := db._close()
if err == nil && db.slave != nil {
err = db.slave._close()
}

return err
}

// Close releases all database resources.
// It will block waiting for any open transactions to finish
// before closing the database and returning.
func (db *DB) Close() error {
func (db *DB) _close() error {
db.rwlock.Lock()
defer db.rwlock.Unlock()

Expand Down Expand Up @@ -814,6 +928,15 @@ func (db *DB) beginTx() (*Tx, error) {
}

func (db *DB) beginRWTx() (*Tx, error) {
tx, err := db._beginRWTx()
if err == nil && db.slave != nil {
tx.slave, err = db.slave._beginRWTx()
}

return tx, err
}

func (db *DB) _beginRWTx() (*Tx, error) {
// If the database was opened with Options.ReadOnly, return an error.
if db.readOnly {
return nil, berrors.ErrDatabaseReadOnly
Expand Down Expand Up @@ -1330,6 +1453,8 @@ type Options struct {

// Logger is the logger used for bbolt.
Logger Logger

OpenSlave *bool
}

func (o *Options) String() string {
Expand Down
Loading