Skip to content

Commit

Permalink
making valthreshold to the txn and req
Browse files Browse the repository at this point in the history
  • Loading branch information
aman-bansal committed Jan 8, 2021
1 parent 87908e8 commit 6d600e3
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 20 deletions.
2 changes: 1 addition & 1 deletion badger/cmd/write_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func writeRandom(db *badger.DB, num uint64) error {
vsz80 := 512
vsz19 := 256
vsz099 := 256
vsz001 := wo.valSz - vsz80 - vsz19 - vsz099
vsz001 := 1024

value := make([]byte, wo.valSz)
y.Check2(rand.Read(value))
Expand Down
15 changes: 11 additions & 4 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ func checkAndSetOptions(opt *Options) error {
}
opt.maxBatchSize = (15 * opt.MemTableSize) / 100
opt.maxBatchCount = opt.maxBatchSize / int64(skl.MaxNodeSize)
//opt.maxValueThreshold = 1024.0
opt.maxValueThreshold = math.Min(maxValueThreshold, float64(opt.maxBatchSize))

// We are limiting opt.ValueThreshold to maxValueThreshold for now.
if opt.ValueThreshold > maxValueThreshold {
Expand Down Expand Up @@ -706,7 +708,7 @@ func (db *DB) writeToLSM(b *request) error {

for i, entry := range b.Entries {
var err error
if db.vlog.skipVlog(entry) {
if db.vlog.skipVlog(entry, b.valueThreshold) {
// Will include deletion / tombstone case.
err = db.mt.Put(entry.Key,
y.ValueStruct{
Expand Down Expand Up @@ -793,12 +795,16 @@ func (db *DB) writeRequests(reqs []*request) error {
}

func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
return db.sendToWriteChWithThreshold(entries, db.vlog.valueThreshold())
}

func (db *DB) sendToWriteChWithThreshold(entries []*Entry, threshold int64) (*request, error) {
if atomic.LoadInt32(&db.blockWrites) == 1 {
return nil, ErrBlockedWrites
}
var count, size int64
for _, e := range entries {
size += e.estimateSize(db.vlog.valueThreshold())
size += e.estimateSize(threshold)
count++
}
if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize {
Expand All @@ -809,6 +815,7 @@ func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
// Txns should not interleave among other txns or rewrites.
req := requestPool.Get().(*request)
req.reset()
req.valueThreshold = threshold
req.Entries = entries
req.Wg.Add(1)
req.IncrRef() // for db write
Expand Down Expand Up @@ -885,8 +892,8 @@ func (db *DB) doWrites(lc *z.Closer) {
// batchSet applies a list of badger.Entry. If a request level error occurs it
// will be returned.
// Check(kv.BatchSet(entries))
func (db *DB) batchSet(entries []*Entry) error {
req, err := db.sendToWriteCh(entries)
func (db *DB) batchSet(entries []*Entry, threshold int64) error {
req, err := db.sendToWriteChWithThreshold(entries, threshold)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type Options struct {
// ------------------------------
maxBatchCount int64 // max entries in batch
maxBatchSize int64 // max batch size in bytes

maxValueThreshold float64
}

// DefaultOptions sets a list of recommended options for good performance.
Expand Down
3 changes: 2 additions & 1 deletion stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
streamReqs[kv.StreamId] = req
}
req.Entries = append(req.Entries, e)
req.valueThreshold = sw.db.vlog.valueThreshold()
return nil
})
if err != nil {
Expand Down Expand Up @@ -316,7 +317,7 @@ func (w *sortedWriter) handleRequests() {
for i, e := range req.Entries {
// If badger is running in InMemory mode, len(req.Ptrs) == 0.
var vs y.ValueStruct
if w.db.vlog.skipVlog(e) {
if w.db.vlog.skipVlog(e, req.valueThreshold) {
vs = y.ValueStruct{
Value: e.Value,
Meta: e.meta,
Expand Down
10 changes: 6 additions & 4 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ type Txn struct {
discarded bool
doneRead bool
update bool // update is used to conditionally keep track of reads.
valueThreshold int64
}

type pendingWritesIterator struct {
Expand Down Expand Up @@ -345,7 +346,7 @@ func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator {
func (txn *Txn) checkSize(e *Entry) error {
count := txn.count + 1
// Extra bytes for the version in key.
size := txn.size + e.estimateSize(txn.db.vlog.valueThreshold()) + 10
size := txn.size + e.estimateSize(txn.valueThreshold) + 10
if count >= txn.db.opt.maxBatchCount || size >= txn.db.opt.maxBatchSize {
return ErrTxnTooBig
}
Expand Down Expand Up @@ -377,8 +378,8 @@ func (txn *Txn) modify(e *Entry) error {
return exceedsSize("Key", maxKeySize, e.Key)
case int64(len(e.Value)) > txn.db.opt.ValueLogFileSize:
return exceedsSize("Value", txn.db.opt.ValueLogFileSize, e.Value)
case txn.db.opt.InMemory && int64(len(e.Value)) > txn.db.vlog.valueThreshold():
return exceedsSize("Value", txn.db.vlog.valueThreshold(), e.Value)
case txn.db.opt.InMemory && int64(len(e.Value)) > txn.valueThreshold:
return exceedsSize("Value", txn.valueThreshold, e.Value)
}

if err := txn.checkSize(e); err != nil {
Expand Down Expand Up @@ -594,7 +595,7 @@ func (txn *Txn) commitAndSend() (func() error, error) {
entries = append(entries, e)
}

req, err := txn.db.sendToWriteCh(entries)
req, err := txn.db.sendToWriteChWithThreshold(entries, txn.valueThreshold)
if err != nil {
orc.doneCommit(commitTs)
return nil, err
Expand Down Expand Up @@ -769,6 +770,7 @@ func (db *DB) newTransaction(update, isManaged bool) *Txn {
db: db,
count: 1, // One extra entry for BitFin.
size: int64(len(txnKey) + 10), // Some buffer for the extra entry.
valueThreshold: db.vlog.valueThreshold(),
}
if update {
if db.opt.DetectConflicts {
Expand Down
26 changes: 16 additions & 10 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (vlog *valueLog) rewrite(f *logFile) error {

vlog.opt.Infof("Rewriting fid: %d", f.fid)
wb := make([]*Entry, 0, 1000)
valThreshold := vlog.valueThreshold()
var size int64

y.AssertTrue(vlog.db != nil)
Expand Down Expand Up @@ -233,7 +234,7 @@ func (vlog *valueLog) rewrite(f *logFile) error {
ne.ExpiresAt = e.ExpiresAt
ne.Key = append([]byte{}, e.Key...)
ne.Value = append([]byte{}, e.Value...)
es := ne.estimateSize(vlog.valueThreshold())
es := ne.estimateSize(valThreshold)
// Consider size of value as well while considering the total size
// of the batch. There have been reports of high memory usage in
// rewrite because we don't consider the value size. See #1292.
Expand All @@ -242,7 +243,7 @@ func (vlog *valueLog) rewrite(f *logFile) error {
// Ensure length and size of wb is within transaction limits.
if int64(len(wb)+1) >= vlog.opt.maxBatchCount ||
size+es >= vlog.opt.maxBatchSize {
if err := vlog.db.batchSet(wb); err != nil {
if err := vlog.db.batchSet(wb, valThreshold); err != nil {
return err
}
size = 0
Expand Down Expand Up @@ -324,7 +325,7 @@ func (vlog *valueLog) rewrite(f *logFile) error {
if end > len(wb) {
end = len(wb)
}
if err := vlog.db.batchSet(wb[i:end]); err != nil {
if err := vlog.db.batchSet(wb[i:end], valThreshold); err != nil {
if err == ErrTxnTooBig {
// Decrease the batch size to half.
batchSize = batchSize / 2
Expand Down Expand Up @@ -468,8 +469,8 @@ func vlogFilePath(dirPath string, fid uint32) string {
return fmt.Sprintf("%s%s%06d.vlog", dirPath, string(os.PathSeparator), fid)
}

func (vlog *valueLog) skipVlog(e *Entry) bool {
return int64(len(e.Value)) < vlog.valueThreshold()
func (vlog *valueLog) skipVlog(e *Entry, threshold int64) bool {
return int64(len(e.Value)) < threshold
}

func (vlog *valueLog) fpath(fid uint32) string {
Expand Down Expand Up @@ -679,6 +680,8 @@ type request struct {
Wg sync.WaitGroup
Err error
ref int32

valueThreshold int64
}

func (req *request) reset() {
Expand Down Expand Up @@ -858,7 +861,7 @@ func (vlog *valueLog) write(reqs []*request) error {
buf.Reset()

e := b.Entries[j]
if vlog.skipVlog(e) {
if vlog.skipVlog(e, b.valueThreshold) {
b.Ptrs = append(b.Ptrs, valuePointer{})
continue
}
Expand Down Expand Up @@ -1131,7 +1134,7 @@ func initVlogThreshold(opt *Options) *vlogThreshold {
}

getBounds := func() []float64 {
mxbd := math.Min(maxValueThreshold, float64(opt.maxBatchSize))
mxbd := opt.maxValueThreshold
mnbd := float64(opt.ValueThreshold)
y.AssertTruef(mxbd > mnbd, "maximum threshold bound is less than the min threshold")
size := math.Min(mxbd-mnbd, 1024.0)
Expand Down Expand Up @@ -1183,15 +1186,18 @@ func (v *vlogThreshold) listenForValueThresholdUpdate() {
for {
select {
case val := <-v.valueCh:
if val == nil || val.Entries == nil {
continue
}
for _, e := range val.Entries {
v.vlMetrics.Update(int64(len(e.Value)))
}
// we are making it to get 99 percentile so that only values
// in range of 1 percentile will make it to the value log
p := int64(v.vlMetrics.Percentile(0.99))
if atomic.LoadInt64(v.valueThreshold) != p {
v.opt.Infof("updating value threshold from: %d to: %d",
v.valueThreshold, p)
vt := atomic.LoadInt64(v.valueThreshold)
if vt != p {
v.opt.Infof("updating value threshold from: %d to: %d", vt, p)
atomic.StoreInt64(v.valueThreshold, p)
}
case <-v.vCloser.HasBeenClosed():
Expand Down

0 comments on commit 6d600e3

Please sign in to comment.