Skip to content

Commit

Permalink
mdbx: Batch() (#9999)
Browse files Browse the repository at this point in the history
This task is mostly implemented to be used in
`erigon/erigon-lib/downloader/mdbx_piece_completion.go` and maybe in
`nodesDB` (where we need many parallel RwTx)

I was agains adding this "trick"/"api" last years, because thought that
we can implement our App to be more 1-big-rwtx-friendly. And we did it
in Erigon - StagedSync. TxPool also did, but with a bit less happy face
- by "map+mutex with periodic flush to db". But `anacrolix/torrent` is
external library and unlikely will survive such big mind-model-change.
Maybe it's time to add `db.Batch()`.

#### Batch Rw transactions

Each `DB.Update()` waits for disk to commit the writes. This overhead
can be minimized by combining multiple updates with the `DB.Batch()`
function:

```go
err := db.Batch(func(tx *bolt.Tx) error {
	...
	return nil
})
```

Concurrent Batch calls are opportunistically combined into larger
transactions. Batch is only useful when there are multiple goroutines
calling it.

The trade-off is that `Batch` can call the given
function multiple times, if parts of the transaction fail. The
function must be idempotent and side effects must take effect only
after a successful return from `DB.Batch()`.

For example: don't display messages from inside the function, instead
set variables in the enclosing scope:

```go
var id uint64
err := db.Batch(func(tx *bolt.Tx) error {
	// Find last key in bucket, decode as bigendian uint64, increment
	// by one, encode back to []byte, and add new key.
	...
	id = newValue
	return nil
})
if err != nil {
	return ...
}
fmt.Println("Allocated ID %d", id)
```


---- 

Implementation mostly taken from
https://github.com/etcd-io/bbolt/?tab=readme-ov-file#batch-read-write-transactions

Maybe in future can push-down it to
https://github.com/erigontech/mdbx-go
  • Loading branch information
AskAlexSharov authored Apr 20, 2024
1 parent e7d67fd commit 122f9f8
Show file tree
Hide file tree
Showing 5 changed files with 406 additions and 3 deletions.
3 changes: 1 addition & 2 deletions cmd/snapshots/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -234,7 +233,7 @@ func NewTorrentClient(config CreateNewTorrentClientConfig) (*TorrentClient, erro

cfg.ClientConfig.DataDir = torrentDir

cfg.ClientConfig.PieceHashersPerTorrent = 32 * runtime.NumCPU()
cfg.ClientConfig.PieceHashersPerTorrent = 32
cfg.ClientConfig.DisableIPv6 = config.DisableIPv6
cfg.ClientConfig.DisableIPv4 = config.DisableIPv4

Expand Down
1 change: 1 addition & 0 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2540,6 +2540,7 @@ func openClient(ctx context.Context, dbDir, snapDir string, cfg *torrent.ClientC
return nil, nil, nil, nil, fmt.Errorf("torrentcfg.openClient: %w", err)
}
c, err = NewMdbxPieceCompletion(db)
//c, err = NewMdbxPieceCompletionBatch(db)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("torrentcfg.NewMdbxPieceCompletion: %w", err)
}
Expand Down
57 changes: 57 additions & 0 deletions erigon-lib/downloader/mdbx_piece_completion.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/types/infohash"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
)

const (
Expand Down Expand Up @@ -115,3 +116,59 @@ func (m *mdbxPieceCompletion) Close() error {
m.db.Close()
return nil
}

type mdbxPieceCompletionBatch struct {
db *mdbx.MdbxKV
}

var _ storage.PieceCompletion = (*mdbxPieceCompletionBatch)(nil)

func NewMdbxPieceCompletionBatch(db kv.RwDB) (ret storage.PieceCompletion, err error) {
ret = &mdbxPieceCompletionBatch{db: db.(*mdbx.MdbxKV)}
return
}

func (m *mdbxPieceCompletionBatch) Get(pk metainfo.PieceKey) (cn storage.Completion, err error) {
err = m.db.View(context.Background(), func(tx kv.Tx) error {
var key [infohash.Size + 4]byte
copy(key[:], pk.InfoHash[:])
binary.BigEndian.PutUint32(key[infohash.Size:], uint32(pk.Index))
cn.Ok = true
v, err := tx.GetOne(kv.BittorrentCompletion, key[:])
if err != nil {
return err
}
switch string(v) {
case complete:
cn.Complete = true
case incomplete:
cn.Complete = false
default:
cn.Ok = false
}
return nil
})
return
}

func (m *mdbxPieceCompletionBatch) Set(pk metainfo.PieceKey, b bool) error {
if c, err := m.Get(pk); err == nil && c.Ok && c.Complete == b {
return nil
}
var key [infohash.Size + 4]byte
copy(key[:], pk.InfoHash[:])
binary.BigEndian.PutUint32(key[infohash.Size:], uint32(pk.Index))

v := []byte(incomplete)
if b {
v = []byte(complete)
}
return m.db.Batch(func(tx kv.RwTx) error {
return tx.Put(kv.BittorrentCompletion, key[:], v)
})
}

func (m *mdbxPieceCompletionBatch) Close() error {
m.db.Close()
return nil
}
156 changes: 156 additions & 0 deletions erigon-lib/kv/mdbx/kv_mdbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -404,6 +405,9 @@ func (opts MdbxOpts) Open(ctx context.Context) (kv.RwDB, error) {
txsAllDoneOnCloseCond: sync.NewCond(txsCountMutex),

leakDetector: dbg.NewLeakDetector("db."+opts.label.String(), dbg.SlowTx()),

MaxBatchSize: DefaultMaxBatchSize,
MaxBatchDelay: DefaultMaxBatchDelay,
}

customBuckets := opts.bucketsCfg(kv.ChaindataTablesCfg)
Expand Down Expand Up @@ -478,6 +482,158 @@ type MdbxKV struct {
txsAllDoneOnCloseCond *sync.Cond

leakDetector *dbg.LeakDetector

// MaxBatchSize is the maximum size of a batch. Default value is
// copied from DefaultMaxBatchSize in Open.
//
// If <=0, disables batching.
//
// Do not change concurrently with calls to Batch.
MaxBatchSize int

// MaxBatchDelay is the maximum delay before a batch starts.
// Default value is copied from DefaultMaxBatchDelay in Open.
//
// If <=0, effectively disables batching.
//
// Do not change concurrently with calls to Batch.
MaxBatchDelay time.Duration

batchMu sync.Mutex
batch *batch
}

// Default values if not set in a DB instance.
const (
DefaultMaxBatchSize int = 1000
DefaultMaxBatchDelay = 10 * time.Millisecond
)

type batch struct {
db *MdbxKV
timer *time.Timer
start sync.Once
calls []call
}

type call struct {
fn func(kv.RwTx) error
err chan<- error
}

// trigger runs the batch if it hasn't already been run.
func (b *batch) trigger() {
b.start.Do(b.run)
}

// run performs the transactions in the batch and communicates results
// back to DB.Batch.
func (b *batch) run() {
b.db.batchMu.Lock()
b.timer.Stop()
// Make sure no new work is added to this batch, but don't break
// other batches.
if b.db.batch == b {
b.db.batch = nil
}
b.db.batchMu.Unlock()

retry:
for len(b.calls) > 0 {
var failIdx = -1
err := b.db.Update(context.Background(), func(tx kv.RwTx) error {
for i, c := range b.calls {
if err := safelyCall(c.fn, tx); err != nil {
failIdx = i
return err
}
}
return nil
})

if failIdx >= 0 {
// take the failing transaction out of the batch. it's
// safe to shorten b.calls here because db.batch no longer
// points to us, and we hold the mutex anyway.
c := b.calls[failIdx]
b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
// tell the submitter re-run it solo, continue with the rest of the batch
c.err <- trySolo
continue retry
}

// pass success, or bolt internal errors, to all callers
for _, c := range b.calls {
c.err <- err
}
break retry
}
}

// trySolo is a special sentinel error value used for signaling that a
// transaction function should be re-run. It should never be seen by
// callers.
var trySolo = errors.New("batch function returned an error and should be re-run solo")

type panicked struct {
reason interface{}
}

func (p panicked) Error() string {
if err, ok := p.reason.(error); ok {
return err.Error()
}
return fmt.Sprintf("panic: %v", p.reason)
}

func safelyCall(fn func(tx kv.RwTx) error, tx kv.RwTx) (err error) {
defer func() {
if p := recover(); p != nil {
err = panicked{p}
}
}()
return fn(tx)
}

// Batch is only useful when there are multiple goroutines calling it.
// It behaves similar to Update, except:
//
// 1. concurrent Batch calls can be combined into a single RwTx.
//
// 2. the function passed to Batch may be called multiple times,
// regardless of whether it returns error or not.
//
// This means that Batch function side effects must be idempotent and
// take permanent effect only after a successful return is seen in
// caller.
//
// Example of bad side-effects: print messages, mutate external counters `i++`
//
// The maximum batch size and delay can be adjusted with DB.MaxBatchSize
// and DB.MaxBatchDelay, respectively.
func (db *MdbxKV) Batch(fn func(tx kv.RwTx) error) error {
errCh := make(chan error, 1)

db.batchMu.Lock()
if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) {
// There is no existing batch, or the existing batch is full; start a new one.
db.batch = &batch{
db: db,
}
db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
}
db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
if len(db.batch.calls) >= db.MaxBatchSize {
// wake up batch, it's ready to run
go db.batch.trigger()
}
db.batchMu.Unlock()

err := <-errCh
if errors.Is(err, trySolo) {
err = db.Update(context.Background(), fn)
}
return err
}

func (db *MdbxKV) Path() string { return db.opts.path }
Expand Down
Loading

0 comments on commit 122f9f8

Please sign in to comment.