Skip to content

Commit

Permalink
fix(compaction): Use separate compactors for L0, L1 (#1466) (#1468)
Browse files Browse the repository at this point in the history
Related to #1459

This PR contains the following changes to compactions
- Use a separate thread for compacting Level 0 and 1 and a separate one for other levels
- Pick levels to compact based on score.
- Stall Level 0 if compactions cannot keep up (we had added this in #1186)
- Limit the number of open table builders to 5 in compactions.

(cherry picked from commit 0b8eb4c)
  • Loading branch information
Ibrahim Jarif authored Aug 25, 2020
1 parent df07404 commit 6d05358
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 122 deletions.
2 changes: 1 addition & 1 deletion badger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func main() {
go func() {
for i := 8080; i < 9080; i++ {
fmt.Printf("Listening for /debug HTTP requests at port: %d\n", i)
if err := http.ListenAndServe(fmt.Sprintf("localhost:%d", i), nil); err != nil {
if err := http.ListenAndServe(fmt.Sprintf("0.0.0.0:%d", i), nil); err != nil {
fmt.Println("Port busy. Trying another one...")
continue

Expand Down
10 changes: 8 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {

// Open returns a new DB object.
func Open(opt Options) (db *DB, err error) {
// It's okay to have zero compactors which will disable all compactions but
// we cannot have just one compactor otherwise we will end up with all data
// one level 2.
if opt.NumCompactors == 1 {
return nil, errors.New("Cannot have 1 compactor. Need at least 2")
}
if opt.InMemory && (opt.Dir != "" || opt.ValueDir != "") {
return nil, errors.New("Cannot use badger in Disk-less mode with Dir or ValueDir set")
}
Expand Down Expand Up @@ -528,7 +534,7 @@ func (db *DB) close() (err error) {
// Force Compact L0
// We don't need to care about cstatus since no parallel compaction is running.
if db.opt.CompactL0OnClose {
err := db.lc.doCompact(compactionPriority{level: 0, score: 1.73})
err := db.lc.doCompact(173, compactionPriority{level: 0, score: 1.73})
switch err {
case errFillTables:
// This error only means that there might be enough tables to do a compaction. So, we
Expand Down Expand Up @@ -1455,7 +1461,7 @@ func (db *DB) Flatten(workers int) error {
errCh := make(chan error, 1)
for i := 0; i < workers; i++ {
go func() {
errCh <- db.lc.doCompact(cp)
errCh <- db.lc.doCompact(175, cp)
}()
}
var success int
Expand Down
5 changes: 2 additions & 3 deletions level_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,8 @@ func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool {
// Need lock as we may be deleting the first table during a level 0 compaction.
s.Lock()
defer s.Unlock()
// Return false only if L0 is in memory and number of tables is more than number of
// ZeroTableStall. For on disk L0, we should just add the tables to the level.
if s.db.opt.KeepL0InMemory && len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
// Stall (by returning false) if we are above the specified stall setting for L0.
if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
return false
}

Expand Down
129 changes: 69 additions & 60 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
// function in logs, and forces a compaction.
dropPrefixes: prefixes,
}
if err := s.doCompact(cp); err != nil {
if err := s.doCompact(174, cp); err != nil {
opt.Warningf("While compacting level 0: %v", err)
return nil
}
Expand Down Expand Up @@ -366,11 +366,13 @@ func (s *levelsController) startCompact(lc *y.Closer) {
n := s.kv.opt.NumCompactors
lc.AddRunning(n - 1)
for i := 0; i < n; i++ {
go s.runWorker(lc)
// The worker with id=0 is dedicated to L0 and L1. This is not counted
// towards the user specified NumCompactors.
go s.runCompactor(i, lc)
}
}

func (s *levelsController) runWorker(lc *y.Closer) {
func (s *levelsController) runCompactor(id int, lc *y.Closer) {
defer lc.Done()

randomDelay := time.NewTimer(time.Duration(rand.Int31n(1000)) * time.Millisecond)
Expand All @@ -381,7 +383,7 @@ func (s *levelsController) runWorker(lc *y.Closer) {
return
}

ticker := time.NewTicker(time.Second)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
Expand All @@ -391,7 +393,15 @@ func (s *levelsController) runWorker(lc *y.Closer) {
prios := s.pickCompactLevels()
loop:
for _, p := range prios {
err := s.doCompact(p)
if id == 0 && p.level > 1 {
// If I'm ID zero, I only compact L0 and L1.
continue
}
if id != 0 && p.level <= 1 {
// If I'm ID non-zero, I do NOT compact L0 and L1.
continue
}
err := s.doCompact(id, p)
switch err {
case nil:
break loop
Expand Down Expand Up @@ -453,10 +463,11 @@ func (s *levelsController) pickCompactLevels() (prios []compactionPriority) {
prios = append(prios, pri)
}
}
// We used to sort compaction priorities based on the score. But, we
// decided to compact based on the level, not the priority. So, upper
// levels (level 0, level 1, etc) always get compacted first, before the
// lower levels -- this allows us to avoid stalls.
// We should continue to sort the compaction priorities by score. Now that we have a dedicated
// compactor for L0 and L1, we don't need to sort by level here.
sort.Slice(prios, func(i, j int) bool {
return prios[i].score > prios[j].score
})
return prios
}

Expand Down Expand Up @@ -541,15 +552,13 @@ nextTable:
// that would affect the snapshot view guarantee provided by transactions.
discardTs := s.kv.orc.discardAtOrBelow()

// Start generating new tables.
type newTableResult struct {
table *table.Table
err error
}
resultCh := make(chan newTableResult)
var numBuilds, numVersions int
var lastKey, skipKey []byte
var vp valuePointer
var newTables []*table.Table
mu := new(sync.Mutex) // Guards newTables

inflightBuilders := y.NewThrottle(5)
for it.Valid() {
timeStart := time.Now()
dk, err := s.kv.registry.latestDataKey()
Expand Down Expand Up @@ -646,67 +655,66 @@ nextTable:
// called Add() at least once, and builder is not Empty().
s.kv.opt.Debugf("LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v",
numKeys, numSkips, time.Since(timeStart))
build := func(fileID uint64) (*table.Table, error) {
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
if err != nil {
return nil, errors.Wrapf(err, "While opening new table: %d", fileID)
}

if _, err := fd.Write(builder.Finish()); err != nil {
return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)
}
tbl, err := table.OpenTable(fd, bopts)
// decrRef is added below.
return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())
}
if builder.Empty() {
continue
}
numBuilds++
fileID := s.reserveFileID()
if err := inflightBuilders.Do(); err != nil {
// Can't return from here, until I decrRef all the tables that I built so far.
break
}
go func(builder *table.Builder) {
defer builder.Close()
var (
tbl *table.Table
err error
)
defer inflightBuilders.Done(err)

build := func(fileID uint64) (*table.Table, error) {
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
if err != nil {
return nil, errors.Wrapf(err, "While opening new table: %d", fileID)
}

if _, err := fd.Write(builder.Finish()); err != nil {
return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)
}
tbl, err := table.OpenTable(fd, bopts)
// decrRef is added below.
return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())
}

var tbl *table.Table
var err error
if s.kv.opt.InMemory {
tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
} else {
tbl, err = build(fileID)
}
resultCh <- newTableResult{tbl, err}
}(builder)
}

newTables := make([]*table.Table, 0, 20)
// Wait for all table builders to finish.
var firstErr error
for x := 0; x < numBuilds; x++ {
res := <-resultCh
newTables = append(newTables, res.table)
if firstErr == nil {
firstErr = res.err
}
// If we couldn't build the table, return fast.
if err != nil {
return
}

mu.Lock()
newTables = append(newTables, tbl)
mu.Unlock()
}(builder)
}

if firstErr == nil {
// Wait for all table builders to finish and also for newTables accumulator to finish.
err := inflightBuilders.Finish()
if err == nil {
// Ensure created files' directory entries are visible. We don't mind the extra latency
// from not doing this ASAP after all file creation has finished because this is a
// background operation.
firstErr = s.kv.syncDir(s.kv.opt.Dir)
err = s.kv.syncDir(s.kv.opt.Dir)
}

if firstErr != nil {
if err != nil {
// An error happened. Delete all the newly created table files (by calling DecrRef
// -- we're the only holders of a ref).
for j := 0; j < numBuilds; j++ {
if newTables[j] != nil {
_ = newTables[j].DecrRef()
}
}
errorReturn := errors.Wrapf(firstErr, "While running compaction for: %+v", cd)
return nil, nil, errorReturn
_ = decrRefs(newTables)
return nil, nil, errors.Wrapf(err, "while running compactions for: %+v", cd)
}

sort.Slice(newTables, func(i, j int) bool {
Expand Down Expand Up @@ -960,7 +968,7 @@ func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) {
var errFillTables = errors.New("Unable to fill tables")

// doCompact picks some table on level l and compacts it away to the next level.
func (s *levelsController) doCompact(p compactionPriority) error {
func (s *levelsController) doCompact(id int, p compactionPriority) error {
l := p.level
y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check.

Expand All @@ -973,7 +981,7 @@ func (s *levelsController) doCompact(p compactionPriority) error {
cd.elog.SetMaxEvents(100)
defer cd.elog.Finish()

s.kv.opt.Infof("Got compaction priority: %+v", p)
s.kv.opt.Debugf("[Compactor: %d] Attempting to run compaction: %+v", id, p)

// While picking tables to be compacted, both levels' tables are expected to
// remain unchanged.
Expand All @@ -989,16 +997,17 @@ func (s *levelsController) doCompact(p compactionPriority) error {
}
defer s.cstatus.delete(cd) // Remove the ranges from compaction status.

s.kv.opt.Infof("Running for level: %d\n", cd.thisLevel.level)
s.kv.opt.Infof("[Compactor: %d] Running compaction: %+v for level: %d\n",
id, p, cd.thisLevel.level)
s.cstatus.toLog(cd.elog)
if err := s.runCompactDef(l, cd); err != nil {
// This compaction couldn't be done successfully.
s.kv.opt.Warningf("LOG Compact FAILED with error: %+v: %+v", err, cd)
s.kv.opt.Warningf("[Compactor: %d] LOG Compact FAILED with error: %+v: %+v", id, err, cd)
return err
}

s.cstatus.toLog(cd.elog)
s.kv.opt.Infof("Compaction for level: %d DONE", cd.thisLevel.level)
s.kv.opt.Infof("[Compactor: %d] Compaction for level: %d DONE", id, cd.thisLevel.level)
return nil
}

Expand All @@ -1022,7 +1031,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error {
// Stall. Make sure all levels are healthy before we unstall.
var timeStart time.Time
{
s.kv.opt.Debugf("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled))
s.kv.opt.Infof("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled))
s.cstatus.RLock()
for i := 0; i < s.kv.opt.MaxLevels; i++ {
s.kv.opt.Debugf("level=%d. Status=%s Size=%d\n",
Expand Down
92 changes: 39 additions & 53 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,52 +749,6 @@ func createEmptyTable(db *DB) *table.Table {
}

func TestL0Stall(t *testing.T) {
test := func(t *testing.T, opt *Options) {
runBadgerTest(t, opt, func(t *testing.T, db *DB) {
db.lc.levels[0].Lock()
// Add NumLevelZeroTableStall+1 number of tables to level 0. This would fill up level
// zero and all new additions are expected to stall if L0 is in memory.
for i := 0; i < opt.NumLevelZeroTablesStall+1; i++ {
db.lc.levels[0].tables = append(db.lc.levels[0].tables, createEmptyTable(db))
}
db.lc.levels[0].Unlock()

timeout := time.After(5 * time.Second)
done := make(chan bool)

go func() {
tab := createEmptyTable(db)
require.NoError(t, db.lc.addLevel0Table(tab))
tab.DecrRef()
done <- true
}()
// Let it stall for a second.
time.Sleep(time.Second)

select {
case <-timeout:
if opt.KeepL0InMemory {
t.Log("Timeout triggered")
// Mark this test as successful since L0 is in memory and the
// addition of new table to L0 is supposed to stall.

// Remove tables from level 0 so that the stalled
// compaction can make progress. This does not have any
// effect on the test. This is done so that the goroutine
// stuck on addLevel0Table can make progress and end.
db.lc.levels[0].Lock()
db.lc.levels[0].tables = nil
db.lc.levels[0].Unlock()
<-done
} else {
t.Fatal("Test didn't finish in time")
}
case <-done:
// The test completed before 5 second timeout. Mark it as successful.
}
})
}

opt := DefaultOptions("")
// Disable all compactions.
opt.NumCompactors = 0
Expand All @@ -803,13 +757,45 @@ func TestL0Stall(t *testing.T) {
// Addition of new tables will stall if there are 4 or more L0 tables.
opt.NumLevelZeroTablesStall = 4

t.Run("with KeepL0InMemory", func(t *testing.T) {
opt.KeepL0InMemory = true
test(t, &opt)
})
t.Run("with L0 on disk", func(t *testing.T) {
opt.KeepL0InMemory = false
test(t, &opt)
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
db.lc.levels[0].Lock()
// Add NumLevelZeroTableStall+1 number of tables to level 0. This would fill up level
// zero and all new additions are expected to stall if L0 is in memory.
for i := 0; i < opt.NumLevelZeroTablesStall+1; i++ {
db.lc.levels[0].tables = append(db.lc.levels[0].tables, createEmptyTable(db))
}
db.lc.levels[0].Unlock()

timeout := time.After(5 * time.Second)
done := make(chan bool)

go func() {
tab := createEmptyTable(db)
require.NoError(t, db.lc.addLevel0Table(tab))
tab.DecrRef()
done <- true
}()
// Let it stall for a second.
time.Sleep(time.Second)

select {
case <-timeout:
t.Log("Timeout triggered")
// Mark this test as successful since L0 is in memory and the
// addition of new table to L0 is supposed to stall.

// Remove tables from level 0 so that the stalled
// compaction can make progress. This does not have any
// effect on the test. This is done so that the goroutine
// stuck on addLevel0Table can make progress and end.
db.lc.levels[0].Lock()
db.lc.levels[0].tables = nil
db.lc.levels[0].Unlock()
<-done
case <-done:
// The test completed before 5 second timeout. Mark it as successful.
t.Fatal("Test did not stall")
}
})
}

Expand Down
Loading

0 comments on commit 6d05358

Please sign in to comment.