Skip to content

Commit

Permalink
feat(vlog): making vlog threshold dynamic 6ce3b7c (#1635)
Browse files Browse the repository at this point in the history
  • Loading branch information
aman-bansal authored Feb 4, 2021
1 parent cf66e6f commit 5dbae10
Show file tree
Hide file tree
Showing 15 changed files with 218 additions and 41 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
p/
badger-test*/
.idea/

vendor
2 changes: 1 addition & 1 deletion backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (l *KVLoader) Set(kv *pb.KV) error {
ExpiresAt: kv.ExpiresAt,
meta: meta,
}
estimatedSize := int64(e.estimateSize(l.db.opt.ValueThreshold))
estimatedSize := e.estimateSizeAndSetThreshold(l.db.valueThreshold())
// Flush entries if inserting the next entry would overflow the transactional limits.
if int64(len(l.entries))+1 >= l.db.opt.maxBatchCount ||
l.entriesSize+estimatedSize >= l.db.opt.maxBatchSize ||
Expand Down
2 changes: 1 addition & 1 deletion backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func TestBackupBitClear(t *testing.T) {

key := []byte("foo")
val := []byte(fmt.Sprintf("%0100d", 1))
require.Greater(t, len(val), db.opt.ValueThreshold)
require.Greater(t, int64(len(val)), db.valueThreshold())

err = db.Update(func(txn *Txn) error {
e := NewEntry(key, val)
Expand Down
4 changes: 2 additions & 2 deletions badger/cmd/write_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var (
sorted bool
showLogs bool

valueThreshold int
valueThreshold int64
numVersions int
vlogMaxEntries uint32
loadBloomsOnOpen bool
Expand Down Expand Up @@ -102,7 +102,7 @@ func init() {
"Force compact level 0 on close.")
writeBenchCmd.Flags().BoolVarP(&wo.sorted, "sorted", "s", false, "Write keys in sorted order.")
writeBenchCmd.Flags().BoolVarP(&wo.showLogs, "verbose", "v", false, "Show Badger logs.")
writeBenchCmd.Flags().IntVarP(&wo.valueThreshold, "value-th", "t", 1<<10, "Value threshold")
writeBenchCmd.Flags().Int64VarP(&wo.valueThreshold, "value-th", "t", 1<<10, "Value threshold")
writeBenchCmd.Flags().IntVarP(&wo.numVersions, "num-version", "n", 1, "Number of versions to keep")
writeBenchCmd.Flags().Int64Var(&wo.blockCacheSize, "block-cache-mb", 256,
"Size of block cache in MB")
Expand Down
23 changes: 15 additions & 8 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type DB struct {

orc *oracle
bannedNamespaces *lockedKeys
threshold *vlogThreshold

pub *publisher
registry *KeyRegistry
Expand All @@ -146,6 +147,12 @@ func checkAndSetOptions(opt *Options) error {
opt.maxBatchSize = (15 * opt.MemTableSize) / 100
opt.maxBatchCount = opt.maxBatchSize / int64(skl.MaxNodeSize)

// This is the maximum value, vlogThreshold can have if dynamic thresholding is enabled.
opt.maxValueThreshold = math.Min(maxValueThreshold, float64(opt.maxBatchSize))
if opt.VLogPercentile < 0.0 || opt.VLogPercentile > 1.0 {
return errors.New("vlogPercentile must be within range of 0.0-1.0")
}

// We are limiting opt.ValueThreshold to maxValueThreshold for now.
if opt.ValueThreshold > maxValueThreshold {
return errors.Errorf("Invalid ValueThreshold, must be less or equal to %d",
Expand All @@ -154,7 +161,7 @@ func checkAndSetOptions(opt *Options) error {

// If ValueThreshold is greater than opt.maxBatchSize, we won't be able to push any data using
// the transaction APIs. Transaction batches entries into batches of size opt.maxBatchSize.
if int64(opt.ValueThreshold) > opt.maxBatchSize {
if opt.ValueThreshold > opt.maxBatchSize {
return errors.Errorf("Valuethreshold %d greater than max batch size of %d. Either "+
"reduce opt.ValueThreshold or increase opt.MaxTableSize.",
opt.ValueThreshold, opt.maxBatchSize)
Expand Down Expand Up @@ -251,6 +258,7 @@ func Open(opt Options) (*DB, error) {
pub: newPublisher(),
allocPool: z.NewAllocatorPool(8),
bannedNamespaces: &lockedKeys{keys: make(map[uint64]struct{})},
threshold: initVlogThreshold(&opt),
}
// Cleanup all the goroutines started by badger in case of an error.
defer func() {
Expand Down Expand Up @@ -374,6 +382,8 @@ func Open(opt Options) (*DB, error) {
db.orc.readMark.Done(db.orc.nextTxnTs)
db.orc.incrementNextTs()

go db.threshold.listenForValueThresholdUpdate()

if err := db.initBannedNamespaces(); err != nil {
return db, errors.Wrapf(err, "While setting banned keys")
}
Expand Down Expand Up @@ -625,6 +635,7 @@ func (db *DB) close() (err error) {
db.indexCache.Close()

atomic.StoreUint32(&db.isClosed, 1)
db.threshold.close()

if db.opt.InMemory {
return
Expand Down Expand Up @@ -751,10 +762,6 @@ var requestPool = sync.Pool{
},
}

func (opt Options) skipVlog(e *Entry) bool {
return len(e.Value) < opt.ValueThreshold
}

func (db *DB) writeToLSM(b *request) error {
// We should check the length of b.Prts and b.Entries only when badger is not
// running in InMemory mode. In InMemory mode, we don't write anything to the
Expand All @@ -765,7 +772,7 @@ func (db *DB) writeToLSM(b *request) error {

for i, entry := range b.Entries {
var err error
if db.opt.skipVlog(entry) {
if entry.skipVlogAndSetThreshold(db.valueThreshold()) {
// Will include deletion / tombstone case.
err = db.mt.Put(entry.Key,
y.ValueStruct{
Expand Down Expand Up @@ -857,7 +864,7 @@ func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
}
var count, size int64
for _, e := range entries {
size += int64(e.estimateSize(db.opt.ValueThreshold))
size += e.estimateSizeAndSetThreshold(db.valueThreshold())
count++
}
if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize {
Expand Down Expand Up @@ -1681,7 +1688,7 @@ func (db *DB) dropAll() (func(), error) {
db.opt.Infof("Deleted %d value log files. DropAll done.\n", num)
db.blockCache.Clear()
db.indexCache.Clear()

db.threshold.Clear(db.opt)
return resume, nil
}

Expand Down
2 changes: 1 addition & 1 deletion db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ func TestWindowsDataLoss(t *testing.T) {
err := db.Update(func(txn *Txn) error {
key := []byte(fmt.Sprintf("%d", i))
v := []byte("barValuebarValuebarValuebarValuebarValue")
require.Greater(t, len(v), opt.ValueThreshold)
require.Greater(t, len(v), db.valueThreshold())

//32 bytes length and now it's not working
err := txn.Set(key, v)
Expand Down
4 changes: 2 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2268,8 +2268,8 @@ func TestOpenDBReadOnly(t *testing.T) {
require.NoError(t, err)
// Add bunch of entries that go into value log.
require.NoError(t, db.Update(func(txn *Txn) error {
require.Greater(t, db.opt.ValueThreshold, 10)
val := make([]byte, db.opt.ValueThreshold+10)
require.Greater(t, db.valueThreshold(), int64(10))
val := make([]byte, db.valueThreshold()+10)
rand.Read(val)
for i := 0; i < 10; i++ {
key := fmt.Sprintf("KEY-%05d", i)
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ module github.com/dgraph-io/badger/v3

go 1.12

// replace github.com/dgraph-io/ristretto => /home/mrjn/go/src/github.com/dgraph-io/ristretto
// replace github.com/dgraph-io/ristretto => /home/amanbansal/go/src/github.com/dgraph-io/ristretto

require (
github.com/DataDog/zstd v1.4.1
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/ristretto v0.0.4-0.20210122082011-bb5d392ed82d
github.com/dgraph-io/ristretto v0.0.4-0.20210204105926-13024c7bdb7e
github.com/dustin/go-humanize v1.0.0
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.3.1
Expand All @@ -20,6 +20,6 @@ require (
github.com/stretchr/testify v1.4.0
go.opencensus.io v0.22.5
golang.org/x/net v0.0.0-20201021035429-f5854403a974
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
)
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.0.4-0.20210122082011-bb5d392ed82d h1:eQYOG6A4td1tht0NdJB9Ls6DsXRGb2Ft6X9REU/MbbE=
github.com/dgraph-io/ristretto v0.0.4-0.20210122082011-bb5d392ed82d/go.mod h1:tv2ec8nA7vRpSYX7/MbP52ihrUMXIHit54CQMq8npXQ=
github.com/dgraph-io/ristretto v0.0.4-0.20210127133938-1b848e7192b2 h1:XGWcXLd8WhjNcbdf/9xLkem6sB0WvbCn+s49B6q/E9I=
github.com/dgraph-io/ristretto v0.0.4-0.20210127133938-1b848e7192b2/go.mod h1:tv2ec8nA7vRpSYX7/MbP52ihrUMXIHit54CQMq8npXQ=
github.com/dgraph-io/ristretto v0.0.4-0.20210204105926-13024c7bdb7e h1:GOe4M9eSEqKobxuydk/elptr/b11H9BJ152TYvsufi8=
github.com/dgraph-io/ristretto v0.0.4-0.20210204105926-13024c7bdb7e/go.mod h1:tv2ec8nA7vRpSYX7/MbP52ihrUMXIHit54CQMq8npXQ=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand Down Expand Up @@ -130,6 +132,8 @@ golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c h1:VwygUrnw9jn88c4u8GD3rZQbqrP/tgas88tPUbBxQrk=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand Down
31 changes: 27 additions & 4 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ type Options struct {
TableSizeMultiplier int
MaxLevels int

ValueThreshold int
VLogPercentile float64
ValueThreshold int64
NumMemtables int
// Changing BlockSize across DB runs will not break badger. The block size is
// read from the block index stored at the end of the table.
Expand Down Expand Up @@ -112,6 +113,8 @@ type Options struct {
// ------------------------------
maxBatchCount int64 // max entries in batch
maxBatchSize int64 // max batch size in bytes

maxValueThreshold float64
}

// DefaultOptions sets a list of recommended options for good performance.
Expand Down Expand Up @@ -160,8 +163,11 @@ func DefaultOptions(path string) Options {
// -1 so 2*ValueLogFileSize won't overflow on 32-bit systems.
ValueLogFileSize: 1<<30 - 1,

ValueLogMaxEntries: 1000000,
ValueThreshold: 1 << 10, // 1 KB.
ValueLogMaxEntries: 1000000,

VLogPercentile: 0.0,
ValueThreshold: 1 << 10, // 1 KB.

Logger: defaultLogger(INFO),
EncryptionKey: []byte{},
EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days.
Expand Down Expand Up @@ -336,11 +342,28 @@ func (opt Options) WithMaxLevels(val int) Options {
// tree or separately in the log value files.
//
// The default value of ValueThreshold is 1 KB, but LSMOnlyOptions sets it to maxValueThreshold.
func (opt Options) WithValueThreshold(val int) Options {
func (opt Options) WithValueThreshold(val int64) Options {
opt.ValueThreshold = val
return opt
}

// WithVLogPercentile returns a new Options value with ValLogPercentile set to given value.
//
// VLogPercentile with 0.0 means no dynamic thresholding is enabled.
// MinThreshold value will always act as the value threshold.
//
// VLogPercentile with value 0.99 means 99 percentile of value will be put in LSM tree
// and only 1 percent in vlog. The value threshold will be dynamically updated within the range of
// [ValueThreshold, Options.maxValueThreshold]
//
// Say VLogPercentile with 1.0 means threshold will eventually set to Options.maxValueThreshold
//
// The default value of VLogPercentile is 0.0.
func (opt Options) WithVLogPercentile(t float64) Options {
opt.VLogPercentile = t
return opt
}

// WithNumMemtables returns a new Options value with NumMemtables set to the given value.
//
// NumMemtables sets the maximum number of tables to keep in memory before stalling.
Expand Down
2 changes: 1 addition & 1 deletion stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (w *sortedWriter) handleRequests() {
for i, e := range req.Entries {
// If badger is running in InMemory mode, len(req.Ptrs) == 0.
var vs y.ValueStruct
if w.db.opt.skipVlog(e) {
if e.skipVlogAndSetThreshold(w.db.valueThreshold()) {
vs = y.ValueStruct{
Value: e.Value,
Meta: e.meta,
Expand Down
23 changes: 18 additions & 5 deletions structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,18 +147,31 @@ type Entry struct {
meta byte

// Fields maintained internally.
hlen int // Length of the header.
hlen int // Length of the header.
valThreshold int64
}

func (e *Entry) isZero() bool {
return len(e.Key) == 0
}

func (e *Entry) estimateSize(threshold int) int {
if len(e.Value) < threshold {
return len(e.Key) + len(e.Value) + 2 // Meta, UserMeta
func (e *Entry) estimateSizeAndSetThreshold(threshold int64) int64 {
if e.valThreshold == 0 {
e.valThreshold = threshold
}
return len(e.Key) + 12 + 2 // 12 for ValuePointer, 2 for metas.
k := int64(len(e.Key))
v := int64(len(e.Value))
if v < e.valThreshold {
return k + v + 2 // Meta, UserMeta
}
return k + 12 + 2 // 12 for ValuePointer, 2 for metas.
}

func (e *Entry) skipVlogAndSetThreshold(threshold int64) bool {
if e.valThreshold == 0 {
e.valThreshold = threshold
}
return int64(len(e.Value)) < e.valThreshold
}

func (e Entry) print(prefix string) {
Expand Down
6 changes: 3 additions & 3 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator {
func (txn *Txn) checkSize(e *Entry) error {
count := txn.count + 1
// Extra bytes for the version in key.
size := txn.size + int64(e.estimateSize(txn.db.opt.ValueThreshold)) + 10
size := txn.size + e.estimateSizeAndSetThreshold(txn.db.valueThreshold()) + 10
if count >= txn.db.opt.maxBatchCount || size >= txn.db.opt.maxBatchSize {
return ErrTxnTooBig
}
Expand Down Expand Up @@ -377,8 +377,8 @@ func (txn *Txn) modify(e *Entry) error {
return exceedsSize("Key", maxKeySize, e.Key)
case int64(len(e.Value)) > txn.db.opt.ValueLogFileSize:
return exceedsSize("Value", txn.db.opt.ValueLogFileSize, e.Value)
case txn.db.opt.InMemory && len(e.Value) > txn.db.opt.ValueThreshold:
return exceedsSize("Value", int64(txn.db.opt.ValueThreshold), e.Value)
case txn.db.opt.InMemory && int64(len(e.Value)) > txn.db.valueThreshold():
return exceedsSize("Value", txn.db.valueThreshold(), e.Value)
}

if err := txn.db.isBanned(e.Key); err != nil {
Expand Down
Loading

0 comments on commit 5dbae10

Please sign in to comment.