Skip to content
This repository has been archived by the owner on Mar 9, 2019. It is now read-only.

Commit

Permalink
Rewrite Batch
Browse files Browse the repository at this point in the history
  • Loading branch information
tv42 committed Feb 11, 2015
1 parent c7f68e9 commit 29c0764
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 84 deletions.
117 changes: 34 additions & 83 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package bolt
import (
"fmt"
"sync"
"sync/atomic"
"time"
"unsafe"
)

// Batch calls fn as part of a batch. It behaves similar to Update,
Expand All @@ -21,32 +19,29 @@ import (
// take permanent effect only after a successful return is seen in
// caller.
func (db *DB) Batch(fn func(*Tx) error) error {
b := batch{
db: db,
}
b.mu.Lock()
errCh := make(chan error, 1)

for {
var cur = (*batch)(atomic.LoadPointer(&db.batch))
if cur != nil {
// another call is cur
if ch := cur.merge(fn); ch != nil {
// cur will call our fn
err := <-ch
if p, ok := err.(panicked); ok {
panic(p.reason)
}
return err
}
// this batch refused to accept more work
db.batchMu.Lock()
if db.batch == nil {
db.batch = &batch{
db: db,
}
db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
}
if len(db.batch.calls) < db.MaxBatchSize {
db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
}
if len(db.batch.calls) >= db.MaxBatchSize {
// wake up batch, it's ready to run
go db.batch.trigger()
}
db.batchMu.Unlock()

// try to become cur
if atomic.CompareAndSwapPointer(&db.batch, unsafe.Pointer(cur), unsafe.Pointer(&b)) {
// we are now cur
return b.master(db, fn)
}
err := <-errCh
if p, ok := err.(panicked); ok {
panic(p.reason)
}
return err
}

type call struct {
Expand All @@ -55,35 +50,26 @@ type call struct {
}

type batch struct {
db *DB
mu sync.Mutex
calls []call
full chan struct{}
started bool
db *DB
timer *time.Timer
start sync.Once
calls []call
}

// caller has locked batch.mu for us
func (b *batch) master(db *DB, fn func(*Tx) error) error {
b.full = make(chan struct{}, 1)
ch := make(chan error, 1)
b.calls = append(b.calls, call{fn: fn, err: ch})
b.mu.Unlock()

t := time.NewTimer(b.db.MaxBatchDelay)
select {
case <-t.C:
case <-b.full:
t.Stop()
}
func (b *batch) trigger() {
b.start.Do(b.run)
}

b.mu.Lock()
b.started = true
b.mu.Unlock()
func (b *batch) run() {
b.db.batchMu.Lock()
b.timer.Stop()
b.db.batch = nil
b.db.batchMu.Unlock()

retry:
for len(b.calls) > 0 {
var failIdx = -1
err := db.Update(func(tx *Tx) error {
err := b.db.Update(func(tx *Tx) error {
for i, c := range b.calls {
if err := safelyCall(c.fn, tx); err != nil {
failIdx = i
Expand All @@ -99,8 +85,8 @@ retry:
c := b.calls[failIdx]
b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
// run it solo, report result, continue with the rest of the batch
c.err <- db.Update(func(tx *Tx) error {
return safelyCall(fn, tx)
c.err <- b.db.Update(func(tx *Tx) error {
return safelyCall(c.fn, tx)
})
continue retry
}
Expand All @@ -113,12 +99,6 @@ retry:
}
break retry
}

err := <-ch
if p, ok := err.(panicked); ok {
panic(p.reason)
}
return err
}

type panicked struct {
Expand All @@ -140,32 +120,3 @@ func safelyCall(fn func(*Tx) error, tx *Tx) (err error) {
}()
return fn(tx)
}

func (b *batch) merge(fn func(*Tx) error) chan error {
b.mu.Lock()
defer b.mu.Unlock()

if b.started {
return nil
}

var ch chan error
if len(b.calls) < b.db.MaxBatchSize {
ch = make(chan error, 1)
c := call{
fn: fn,
err: ch,
}
b.calls = append(b.calls, c)
}

if len(b.calls) >= b.db.MaxBatchSize {
// wake up batch, it's ready to run
select {
case b.full <- struct{}{}:
default:
}
}

return ch
}
3 changes: 2 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ type DB struct {
freelist *freelist
stats Stats

batch unsafe.Pointer
batchMu sync.Mutex
batch *batch

rwlock sync.Mutex // Allows only one writer at a time.
metalock sync.Mutex // Protects meta page access.
Expand Down

0 comments on commit 29c0764

Please sign in to comment.