Skip to content

Commit

Permalink
making entry to be aware of value threshold
Browse files Browse the repository at this point in the history
  • Loading branch information
aman-bansal committed Jan 13, 2021
1 parent 4827d6f commit 85bceb1
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 38 deletions.
7 changes: 2 additions & 5 deletions backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ type KVLoader struct {
entries []*Entry
entriesSize int64
totalSize int64
valueThreshold int64
}

// NewKVLoader returns a new instance of KVLoader.
Expand All @@ -174,7 +173,6 @@ func (db *DB) NewKVLoader(maxPendingWrites int) *KVLoader {
db: db,
throttle: y.NewThrottle(maxPendingWrites),
entries: make([]*Entry, 0, db.opt.maxBatchCount),
valueThreshold: db.valueThreshold(),
}
}

Expand All @@ -194,7 +192,7 @@ func (l *KVLoader) Set(kv *pb.KV) error {
ExpiresAt: kv.ExpiresAt,
meta: meta,
}
estimatedSize := e.estimateSize(l.valueThreshold)
estimatedSize := e.checkAndEvaluateEntrySize(l.db.valueThreshold())
// Flush entries if inserting the next entry would overflow the transactional limits.
if int64(len(l.entries))+1 >= l.db.opt.maxBatchCount ||
l.entriesSize+estimatedSize >= l.db.opt.maxBatchSize ||
Expand All @@ -213,7 +211,7 @@ func (l *KVLoader) send() error {
if err := l.throttle.Do(); err != nil {
return err
}
if err := l.db.batchSetAsync(l.entries, l.valueThreshold, func(err error) {
if err := l.db.batchSetAsync(l.entries, func(err error) {
l.throttle.Done(err)
}); err != nil {
return err
Expand All @@ -222,7 +220,6 @@ func (l *KVLoader) send() error {
l.entries = make([]*Entry, 0, l.db.opt.maxBatchCount)
l.entriesSize = 0
l.totalSize = 0
l.valueThreshold = l.db.valueThreshold()
return nil
}

Expand Down
19 changes: 7 additions & 12 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ func (db *DB) writeToLSM(b *request) error {

for i, entry := range b.Entries {
var err error
if db.vlog.skipVlog(entry, b.valueThreshold) {
if entry.skipVlog() {
// Will include deletion / tombstone case.
err = db.mt.Put(entry.Key,
y.ValueStruct{
Expand Down Expand Up @@ -799,17 +799,13 @@ func (db *DB) writeRequests(reqs []*request) error {
return nil
}

// func (db *DB) sxendToWriteCh(entries []*Entry) (*request, error) {
// return db.sendToWriteChWithThreshold(entries, db.valueThreshold())
// }

func (db *DB) sendToWriteChWithThreshold(entries []*Entry, threshold int64) (*request, error) {
func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
if atomic.LoadInt32(&db.blockWrites) == 1 {
return nil, ErrBlockedWrites
}
var count, size int64
for _, e := range entries {
size += e.estimateSize(threshold)
size += e.estimateSize()
count++
}
if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize {
Expand All @@ -820,7 +816,6 @@ func (db *DB) sendToWriteChWithThreshold(entries []*Entry, threshold int64) (*re
// Txns should not interleave among other txns or rewrites.
req := requestPool.Get().(*request)
req.reset()
req.valueThreshold = threshold
req.Entries = entries
req.Wg.Add(1)
req.IncrRef() // for db write
Expand Down Expand Up @@ -897,8 +892,8 @@ func (db *DB) doWrites(lc *z.Closer) {
// batchSet applies a list of badger.Entry. If a request level error occurs it
// will be returned.
// Check(kv.BatchSet(entries))
func (db *DB) batchSet(entries []*Entry, threshold int64) error {
req, err := db.sendToWriteChWithThreshold(entries, threshold)
func (db *DB) batchSet(entries []*Entry) error {
req, err := db.sendToWriteCh(entries)
if err != nil {
return err
}
Expand All @@ -912,8 +907,8 @@ func (db *DB) batchSet(entries []*Entry, threshold int64) error {
// err := kv.BatchSetAsync(entries, func(err error)) {
// Check(err)
// }
func (db *DB) batchSetAsync(entries []*Entry, valThreshold int64, f func(error)) error {
req, err := db.sendToWriteChWithThreshold(entries, valThreshold)
func (db *DB) batchSetAsync(entries []*Entry, f func(error)) error {
req, err := db.sendToWriteCh(entries)
if err != nil {
return err
}
Expand Down
3 changes: 1 addition & 2 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error {
streamReqs[kv.StreamId] = req
}
req.Entries = append(req.Entries, e)
req.valueThreshold = sw.db.valueThreshold()
return nil
})
if err != nil {
Expand Down Expand Up @@ -317,7 +316,7 @@ func (w *sortedWriter) handleRequests() {
for i, e := range req.Entries {
// If badger is running in InMemory mode, len(req.Ptrs) == 0.
var vs y.ValueStruct
if w.db.vlog.skipVlog(e, req.valueThreshold) {
if e.skipVlog() {
vs = y.ValueStruct{
Value: e.Value,
Meta: e.meta,
Expand Down
19 changes: 18 additions & 1 deletion structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,21 +148,38 @@ type Entry struct {

// Fields maintained internally.
hlen int // Length of the header.
gtThanThreshold bool
}

func (e *Entry) isZero() bool {
return len(e.Key) == 0
}

func (e *Entry) estimateSize(threshold int64) int64 {
func (e *Entry) checkAndEvaluateEntrySize(threshold int64) int64 {
k := int64(len(e.Key))
v := int64(len(e.Value))
if v < threshold {
e.gtThanThreshold = false
return k + v + 2 // Meta, UserMeta
}

e.gtThanThreshold = true
return k + 12 + 2 // 12 for ValuePointer, 2 for metas.
}

func (e *Entry) estimateSize() int64 {
k := int64(len(e.Key))
v := int64(len(e.Value))
if e.gtThanThreshold {
return k + 12 + 2 // 12 for ValuePointer, 2 for metas.
}
return k + v + 2 // Meta, UserMeta
}

func (e *Entry) skipVlog() bool {
return !e.gtThanThreshold
}

func (e Entry) print(prefix string) {
fmt.Printf("%s Key: %s Meta: %d UserMeta: %d Offset: %d len(val)=%d",
prefix, e.Key, e.meta, e.UserMeta, e.offset, len(e.Value))
Expand Down
10 changes: 4 additions & 6 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ type Txn struct {
discarded bool
doneRead bool
update bool // update is used to conditionally keep track of reads.
valueThreshold int64
}

type pendingWritesIterator struct {
Expand Down Expand Up @@ -346,7 +345,7 @@ func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator {
func (txn *Txn) checkSize(e *Entry) error {
count := txn.count + 1
// Extra bytes for the version in key.
size := txn.size + e.estimateSize(txn.valueThreshold) + 10
size := txn.size + e.checkAndEvaluateEntrySize(txn.db.valueThreshold()) + 10
if count >= txn.db.opt.maxBatchCount || size >= txn.db.opt.maxBatchSize {
return ErrTxnTooBig
}
Expand Down Expand Up @@ -378,8 +377,8 @@ func (txn *Txn) modify(e *Entry) error {
return exceedsSize("Key", maxKeySize, e.Key)
case int64(len(e.Value)) > txn.db.opt.ValueLogFileSize:
return exceedsSize("Value", txn.db.opt.ValueLogFileSize, e.Value)
case txn.db.opt.InMemory && int64(len(e.Value)) > txn.valueThreshold:
return exceedsSize("Value", txn.valueThreshold, e.Value)
case txn.db.opt.InMemory && int64(len(e.Value)) > txn.db.valueThreshold():
return exceedsSize("Value", txn.db.valueThreshold(), e.Value)
}

if err := txn.checkSize(e); err != nil {
Expand Down Expand Up @@ -595,7 +594,7 @@ func (txn *Txn) commitAndSend() (func() error, error) {
entries = append(entries, e)
}

req, err := txn.db.sendToWriteChWithThreshold(entries, txn.valueThreshold)
req, err := txn.db.sendToWriteCh(entries)
if err != nil {
orc.doneCommit(commitTs)
return nil, err
Expand Down Expand Up @@ -770,7 +769,6 @@ func (db *DB) newTransaction(update, isManaged bool) *Txn {
db: db,
count: 1, // One extra entry for BitFin.
size: int64(len(txnKey) + 10), // Some buffer for the extra entry.
valueThreshold: db.valueThreshold(),
}
if update {
if db.opt.DetectConflicts {
Expand Down
2 changes: 1 addition & 1 deletion txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ func TestIteratorAllVersionsWithDeleted(t *testing.T) {
Key: y.KeyWithTs(item.key, item.version),
meta: bitDelete,
},
}, txn.db.opt.ValueThreshold)
})
require.NoError(t, err)
return err
})
Expand Down
15 changes: 4 additions & 11 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ func (vlog *valueLog) rewrite(f *logFile) error {

vlog.opt.Infof("Rewriting fid: %d", f.fid)
wb := make([]*Entry, 0, 1000)
valThreshold := vlog.db.valueThreshold()
var size int64

y.AssertTrue(vlog.db != nil)
Expand Down Expand Up @@ -234,7 +233,7 @@ func (vlog *valueLog) rewrite(f *logFile) error {
ne.ExpiresAt = e.ExpiresAt
ne.Key = append([]byte{}, e.Key...)
ne.Value = append([]byte{}, e.Value...)
es := ne.estimateSize(valThreshold)
es := ne.checkAndEvaluateEntrySize(vlog.db.valueThreshold())
// Consider size of value as well while considering the total size
// of the batch. There have been reports of high memory usage in
// rewrite because we don't consider the value size. See #1292.
Expand All @@ -243,7 +242,7 @@ func (vlog *valueLog) rewrite(f *logFile) error {
// Ensure length and size of wb is within transaction limits.
if int64(len(wb)+1) >= vlog.opt.maxBatchCount ||
size+es >= vlog.opt.maxBatchSize {
if err := vlog.db.batchSet(wb, valThreshold); err != nil {
if err := vlog.db.batchSet(wb); err != nil {
return err
}
size = 0
Expand Down Expand Up @@ -325,7 +324,7 @@ func (vlog *valueLog) rewrite(f *logFile) error {
if end > len(wb) {
end = len(wb)
}
if err := vlog.db.batchSet(wb[i:end], valThreshold); err != nil {
if err := vlog.db.batchSet(wb[i:end]); err != nil {
if err == ErrTxnTooBig {
// Decrease the batch size to half.
batchSize = batchSize / 2
Expand Down Expand Up @@ -467,10 +466,6 @@ func vlogFilePath(dirPath string, fid uint32) string {
return fmt.Sprintf("%s%s%06d.vlog", dirPath, string(os.PathSeparator), fid)
}

func (vlog *valueLog) skipVlog(e *Entry, threshold int64) bool {
return int64(len(e.Value)) < threshold
}

func (vlog *valueLog) fpath(fid uint32) string {
return vlogFilePath(vlog.dirPath, fid)
}
Expand Down Expand Up @@ -675,8 +670,6 @@ type request struct {
Wg sync.WaitGroup
Err error
ref int32

valueThreshold int64
}

func (req *request) reset() {
Expand Down Expand Up @@ -856,7 +849,7 @@ func (vlog *valueLog) write(reqs []*request) error {
buf.Reset()

e := b.Entries[j]
if vlog.skipVlog(e, b.valueThreshold) {
if e.skipVlog() {
b.Ptrs = append(b.Ptrs, valuePointer{})
continue
}
Expand Down

0 comments on commit 85bceb1

Please sign in to comment.