diff --git a/batch.go b/batch.go index 2b9c5818..98211f76 100644 --- a/batch.go +++ b/batch.go @@ -3,9 +3,7 @@ package bolt import ( "fmt" "sync" - "sync/atomic" "time" - "unsafe" ) // Batch calls fn as part of a batch. It behaves similar to Update, @@ -21,32 +19,29 @@ import ( // take permanent effect only after a successful return is seen in // caller. func (db *DB) Batch(fn func(*Tx) error) error { - b := batch{ - db: db, - } - b.mu.Lock() + errCh := make(chan error, 1) - for { - var cur = (*batch)(atomic.LoadPointer(&db.batch)) - if cur != nil { - // another call is cur - if ch := cur.merge(fn); ch != nil { - // cur will call our fn - err := <-ch - if p, ok := err.(panicked); ok { - panic(p.reason) - } - return err - } - // this batch refused to accept more work + db.batchMu.Lock() + if db.batch == nil { + db.batch = &batch{ + db: db, } + db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger) + } + if len(db.batch.calls) < db.MaxBatchSize { + db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh}) + } + if len(db.batch.calls) >= db.MaxBatchSize { + // wake up batch, it's ready to run + go db.batch.trigger() + } + db.batchMu.Unlock() - // try to become cur - if atomic.CompareAndSwapPointer(&db.batch, unsafe.Pointer(cur), unsafe.Pointer(&b)) { - // we are now cur - return b.master(db, fn) - } + err := <-errCh + if p, ok := err.(panicked); ok { + panic(p.reason) } + return err } type call struct { @@ -55,35 +50,26 @@ type call struct { } type batch struct { - db *DB - mu sync.Mutex - calls []call - full chan struct{} - started bool + db *DB + timer *time.Timer + start sync.Once + calls []call } -// caller has locked batch.mu for us -func (b *batch) master(db *DB, fn func(*Tx) error) error { - b.full = make(chan struct{}, 1) - ch := make(chan error, 1) - b.calls = append(b.calls, call{fn: fn, err: ch}) - b.mu.Unlock() - - t := time.NewTimer(b.db.MaxBatchDelay) - select { - case <-t.C: - case <-b.full: - t.Stop() - } +func (b *batch) trigger() { + b.start.Do(b.run) +} - b.mu.Lock() - b.started = true - b.mu.Unlock() +func (b *batch) run() { + b.db.batchMu.Lock() + b.timer.Stop() + b.db.batch = nil + b.db.batchMu.Unlock() retry: for len(b.calls) > 0 { var failIdx = -1 - err := db.Update(func(tx *Tx) error { + err := b.db.Update(func(tx *Tx) error { for i, c := range b.calls { if err := safelyCall(c.fn, tx); err != nil { failIdx = i @@ -99,8 +85,8 @@ retry: c := b.calls[failIdx] b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1] // run it solo, report result, continue with the rest of the batch - c.err <- db.Update(func(tx *Tx) error { - return safelyCall(fn, tx) + c.err <- b.db.Update(func(tx *Tx) error { + return safelyCall(c.fn, tx) }) continue retry } @@ -113,12 +99,6 @@ retry: } break retry } - - err := <-ch - if p, ok := err.(panicked); ok { - panic(p.reason) - } - return err } type panicked struct { @@ -140,32 +120,3 @@ func safelyCall(fn func(*Tx) error, tx *Tx) (err error) { }() return fn(tx) } - -func (b *batch) merge(fn func(*Tx) error) chan error { - b.mu.Lock() - defer b.mu.Unlock() - - if b.started { - return nil - } - - var ch chan error - if len(b.calls) < b.db.MaxBatchSize { - ch = make(chan error, 1) - c := call{ - fn: fn, - err: ch, - } - b.calls = append(b.calls, c) - } - - if len(b.calls) >= b.db.MaxBatchSize { - // wake up batch, it's ready to run - select { - case b.full <- struct{}{}: - default: - } - } - - return ch -} diff --git a/db.go b/db.go index 79946529..06f7f3d8 100644 --- a/db.go +++ b/db.go @@ -83,7 +83,8 @@ type DB struct { freelist *freelist stats Stats - batch unsafe.Pointer + batchMu sync.Mutex + batch *batch rwlock sync.Mutex // Allows only one writer at a time. metalock sync.Mutex // Protects meta page access.