From cec726619a29c2ff84236626cfd137c03b6de21d Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 26 May 2020 19:06:20 +0530 Subject: [PATCH 01/16] StreamWriter: Close head writer The head writer was not being closed which would leak the `handleRequests` go routine. This PR fixes it by closing the head writer. --- stream_writer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/stream_writer.go b/stream_writer.go index f9304c7e7..f190bbef7 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -226,6 +226,7 @@ func (sw *StreamWriter) Flush() error { if err := headWriter.Done(); err != nil { return err } + headWriter.closer.SignalAndWait() if !sw.db.opt.managedTxns { if sw.db.orc != nil { From 8fc7ca8d6487e907fd7db8a87543c3c4827628ad Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 26 May 2020 20:01:52 +0530 Subject: [PATCH 02/16] Close all running goroutines on error in db.Open The test TestPenultimateLogCorruption checks for corruption towards the end of the value log file. When the DB is opened for the second time, the go routines started by db.Open() were not being closed. This PR fixes it. --- db.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/db.go b/db.go index 79b332d23..2c52a399c 100644 --- a/db.go +++ b/db.go @@ -378,7 +378,15 @@ func Open(opt Options) (db *DB, err error) { go db.doWrites(replayCloser) if err = db.vlog.open(db, vptr, db.replayFunction()); err != nil { - return db, y.Wrapf(err, "During db.vlog.open") + // Perform cleanup + replayCloser.SignalAndWait() + db.closers.updateSize.SignalAndWait() + db.blockCache.Close() + db.bfCache.Close() + db.orc.Stop() + db.vlog.Close() + + return nil, y.Wrapf(err, "During db.vlog.open") } replayCloser.SignalAndWait() // Wait for replay to be applied first. From 0fa013a892aa35a5ced398a69eff668f8001927b Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 26 May 2020 21:30:22 +0530 Subject: [PATCH 03/16] Tests: Do not leave stale goroutines A bunch of tests would start go routines and never stop them. This PR fixes that. There should not be any goroutines left after all the tests are completed. --- db_test.go | 14 ++++++-------- levels_test.go | 9 +++++++++ managed_db_test.go | 6 ++++-- value_test.go | 7 ++++--- 4 files changed, 23 insertions(+), 13 deletions(-) diff --git a/db_test.go b/db_test.go index 95ccdf30f..85edc28b0 100644 --- a/db_test.go +++ b/db_test.go @@ -1265,8 +1265,11 @@ func TestExpiry(t *testing.T) { func TestExpiryImproperDBClose(t *testing.T) { testReplay := func(opt Options) { - db0, err := Open(opt) + // L0 compaction doesn't affect the test in any way. It is set to allow + // graceful shutdown of db0. + db0, err := Open(opt.WithCompactL0OnClose(false)) require.NoError(t, err) + defer func() { require.NoError(t, db0.Close()) }() dur := 1 * time.Hour expiryTime := uint64(time.Now().Add(dur).Unix()) @@ -1280,17 +1283,12 @@ func TestExpiryImproperDBClose(t *testing.T) { // Simulate a crash by not closing db0, but releasing the locks. if db0.dirLockGuard != nil { require.NoError(t, db0.dirLockGuard.release()) + db0.dirLockGuard = nil } if db0.valueDirGuard != nil { require.NoError(t, db0.valueDirGuard.release()) + db0.valueDirGuard = nil } - // We need to close vlog to fix the vlog file size. On windows, the vlog file - // is truncated to 2*MaxVlogSize and if we don't close the vlog file, reopening - // it would return Truncate Required Error. - require.NoError(t, db0.vlog.Close()) - - require.NoError(t, db0.registry.Close()) - require.NoError(t, db0.manifest.close()) db1, err := Open(opt) require.NoError(t, err) diff --git a/levels_test.go b/levels_test.go index 8c7df15bb..aa83be8af 100644 --- a/levels_test.go +++ b/levels_test.go @@ -538,6 +538,15 @@ func TestL0Stall(t *testing.T) { t.Log("Timeout triggered") // Mark this test as successful since L0 is in memory and the // addition of new table to L0 is supposed to stall. + + // Remove tables from level 0 so that the stalled + // compaction can make progress. This does not have any + // effect on the test. This is done so that the goroutine + // stuck on addLevel0Table can make progress and end. + db.lc.levels[0].Lock() + db.lc.levels[0].tables = nil + db.lc.levels[0].Unlock() + <-done } else { t.Fatal("Test didn't finish in time") } diff --git a/managed_db_test.go b/managed_db_test.go index 932744bf1..3cbd6e5eb 100644 --- a/managed_db_test.go +++ b/managed_db_test.go @@ -162,6 +162,7 @@ func TestDropAllTwice(t *testing.T) { // Call DropAll again. require.NoError(t, db.DropAll()) + require.NoError(t, db.Close()) } t.Run("disk mode", func(t *testing.T) { dir, err := ioutil.TempDir("", "badger-test") @@ -175,7 +176,6 @@ func TestDropAllTwice(t *testing.T) { opts := getTestOptions("") opts.InMemory = true test(t, opts) - }) } @@ -280,6 +280,7 @@ func TestDropReadOnly(t *testing.T) { require.NoError(t, err) } require.Panics(t, func() { db2.DropAll() }) + require.NoError(t, db2.Close()) } func TestWriteAfterClose(t *testing.T) { @@ -525,6 +526,7 @@ func TestDropPrefixReadOnly(t *testing.T) { require.NoError(t, err) } require.Panics(t, func() { db2.DropPrefix([]byte("key0")) }) + require.NoError(t, db2.Close()) } func TestDropPrefixRace(t *testing.T) { @@ -590,7 +592,7 @@ func TestDropPrefixRace(t *testing.T) { after := numKeysManaged(db, math.MaxUint64) t.Logf("Before: %d. After dropprefix: %d\n", before, after) require.True(t, after < before) - db.Close() + require.NoError(t, db.Close()) } func TestWriteBatchManagedMode(t *testing.T) { diff --git a/value_test.go b/value_test.go index 8c3c67a8a..1610b4409 100644 --- a/value_test.go +++ b/value_test.go @@ -762,6 +762,7 @@ func TestPenultimateLogCorruption(t *testing.T) { db0, err := Open(opt) require.NoError(t, err) + defer func() { require.NoError(t, db0.Close()) }() h := testHelper{db: db0, t: t} h.writeRange(0, 7) @@ -780,13 +781,12 @@ func TestPenultimateLogCorruption(t *testing.T) { // Simulate a crash by not closing db0, but releasing the locks. if db0.dirLockGuard != nil { require.NoError(t, db0.dirLockGuard.release()) + db0.dirLockGuard = nil } if db0.valueDirGuard != nil { require.NoError(t, db0.valueDirGuard.release()) + db0.valueDirGuard = nil } - require.NoError(t, db0.vlog.Close()) - require.NoError(t, db0.manifest.close()) - require.NoError(t, db0.registry.Close()) opt.Truncate = true db1, err := Open(opt) @@ -801,6 +801,7 @@ func TestPenultimateLogCorruption(t *testing.T) { }) require.NoError(t, err) require.NoError(t, db1.Close()) + } func checkKeys(t *testing.T, kv *DB, keys [][]byte) { From e388847ae200cd80d125c54231bdeab29a257201 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 2 Jun 2020 15:43:34 +0530 Subject: [PATCH 04/16] debugging statement --- db_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/db_test.go b/db_test.go index 85edc28b0..3bb88aa4e 100644 --- a/db_test.go +++ b/db_test.go @@ -1269,6 +1269,7 @@ func TestExpiryImproperDBClose(t *testing.T) { // graceful shutdown of db0. db0, err := Open(opt.WithCompactL0OnClose(false)) require.NoError(t, err) + fmt.Printf("db0.opt.dir = %+v\n", db0.opt.Dir) defer func() { require.NoError(t, db0.Close()) }() dur := 1 * time.Hour @@ -1292,6 +1293,7 @@ func TestExpiryImproperDBClose(t *testing.T) { db1, err := Open(opt) require.NoError(t, err) + fmt.Printf("db1.opt.dir = %+v\n", db1.opt.Dir) err = db1.View(func(txn *Txn) error { itm, err := txn.Get([]byte("test_key")) require.NoError(t, err) From 2aa8e0c65b329ad634be0c8e11c021d1cdb258ac Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 2 Jun 2020 15:57:07 +0530 Subject: [PATCH 05/16] more debug --- db_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/db_test.go b/db_test.go index 3bb88aa4e..bbdaa52c8 100644 --- a/db_test.go +++ b/db_test.go @@ -1292,6 +1292,7 @@ func TestExpiryImproperDBClose(t *testing.T) { } db1, err := Open(opt) + fmt.Printf("err = %+v\n", err) require.NoError(t, err) fmt.Printf("db1.opt.dir = %+v\n", db1.opt.Dir) err = db1.View(func(txn *Txn) error { From 5f5f9185744522a575ad129955ded01192301645 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 2 Jun 2020 18:09:35 +0530 Subject: [PATCH 06/16] fix failing test --- db_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/db_test.go b/db_test.go index bbdaa52c8..b00e1925b 100644 --- a/db_test.go +++ b/db_test.go @@ -1291,6 +1291,12 @@ func TestExpiryImproperDBClose(t *testing.T) { db0.valueDirGuard = nil } + // On windows, the vlog file is truncated to 2*MaxVlogSize and if we + // do not set the trucate flag, reopening the db would return Truncate + // Required Error. + if runtime.GOOS == "windows" { + opt.Truncate = true + } db1, err := Open(opt) fmt.Printf("err = %+v\n", err) require.NoError(t, err) From 7f553dea18da1ec97ca396a439160490303f34db Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 2 Jun 2020 18:13:37 +0530 Subject: [PATCH 07/16] Address review comments --- db.go | 2 +- value_test.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/db.go b/db.go index 2c52a399c..cb0273811 100644 --- a/db.go +++ b/db.go @@ -378,7 +378,7 @@ func Open(opt Options) (db *DB, err error) { go db.doWrites(replayCloser) if err = db.vlog.open(db, vptr, db.replayFunction()); err != nil { - // Perform cleanup + // Perform cleanup. replayCloser.SignalAndWait() db.closers.updateSize.SignalAndWait() db.blockCache.Close() diff --git a/value_test.go b/value_test.go index 1610b4409..107a90b4b 100644 --- a/value_test.go +++ b/value_test.go @@ -801,7 +801,6 @@ func TestPenultimateLogCorruption(t *testing.T) { }) require.NoError(t, err) require.NoError(t, db1.Close()) - } func checkKeys(t *testing.T, kv *DB, keys [][]byte) { From 91e8abf6fbe94e7c3d444efc24dc0718cc40bd83 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 2 Jun 2020 18:15:48 +0530 Subject: [PATCH 08/16] Remove debug statements --- db_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/db_test.go b/db_test.go index b00e1925b..a54217965 100644 --- a/db_test.go +++ b/db_test.go @@ -1269,7 +1269,6 @@ func TestExpiryImproperDBClose(t *testing.T) { // graceful shutdown of db0. db0, err := Open(opt.WithCompactL0OnClose(false)) require.NoError(t, err) - fmt.Printf("db0.opt.dir = %+v\n", db0.opt.Dir) defer func() { require.NoError(t, db0.Close()) }() dur := 1 * time.Hour @@ -1298,9 +1297,7 @@ func TestExpiryImproperDBClose(t *testing.T) { opt.Truncate = true } db1, err := Open(opt) - fmt.Printf("err = %+v\n", err) require.NoError(t, err) - fmt.Printf("db1.opt.dir = %+v\n", db1.opt.Dir) err = db1.View(func(txn *Txn) error { itm, err := txn.Get([]byte("test_key")) require.NoError(t, err) From 75c531f2e2502d2d8dc9cd4ab5ef592e13d8e8d7 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 2 Jun 2020 13:17:11 +0530 Subject: [PATCH 09/16] fix windows build --- db_test.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/db_test.go b/db_test.go index a54217965..af80a2d5e 100644 --- a/db_test.go +++ b/db_test.go @@ -1269,7 +1269,6 @@ func TestExpiryImproperDBClose(t *testing.T) { // graceful shutdown of db0. db0, err := Open(opt.WithCompactL0OnClose(false)) require.NoError(t, err) - defer func() { require.NoError(t, db0.Close()) }() dur := 1 * time.Hour expiryTime := uint64(time.Now().Add(dur).Unix()) @@ -1289,13 +1288,8 @@ func TestExpiryImproperDBClose(t *testing.T) { require.NoError(t, db0.valueDirGuard.release()) db0.valueDirGuard = nil } + require.NoError(t, db0.Close()) - // On windows, the vlog file is truncated to 2*MaxVlogSize and if we - // do not set the trucate flag, reopening the db would return Truncate - // Required Error. - if runtime.GOOS == "windows" { - opt.Truncate = true - } db1, err := Open(opt) require.NoError(t, err) err = db1.View(func(txn *Txn) error { From 46951a99807829e61331fc4dd51014715e91506c Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 2 Jun 2020 19:04:20 +0530 Subject: [PATCH 10/16] fix crash --- stream_writer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/stream_writer.go b/stream_writer.go index 014841424..80cf689f3 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -229,7 +229,6 @@ func (sw *StreamWriter) Flush() error { if err := headWriter.Done(); err != nil { return err } - headWriter.closer.SignalAndWait() if !sw.db.opt.managedTxns { if sw.db.orc != nil { From 2488682054b26be704bc4176de55738120b96d22 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 2 Jun 2020 19:12:28 +0530 Subject: [PATCH 11/16] fix windows crash --- managed_db_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/managed_db_test.go b/managed_db_test.go index 3cbd6e5eb..e6cf49d5a 100644 --- a/managed_db_test.go +++ b/managed_db_test.go @@ -278,9 +278,9 @@ func TestDropReadOnly(t *testing.T) { require.Equal(t, err, ErrWindowsNotSupported) } else { require.NoError(t, err) + require.Panics(t, func() { db2.DropAll() }) + require.NoError(t, db2.Close()) } - require.Panics(t, func() { db2.DropAll() }) - require.NoError(t, db2.Close()) } func TestWriteAfterClose(t *testing.T) { From ba8b51f69228d19521b7e79dc23faa515032690a Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 2 Jun 2020 19:34:56 +0530 Subject: [PATCH 12/16] fix windows crash -1 --- managed_db_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/managed_db_test.go b/managed_db_test.go index e6cf49d5a..959000657 100644 --- a/managed_db_test.go +++ b/managed_db_test.go @@ -524,9 +524,9 @@ func TestDropPrefixReadOnly(t *testing.T) { require.Equal(t, err, ErrWindowsNotSupported) } else { require.NoError(t, err) + require.Panics(t, func() { db2.DropPrefix([]byte("key0")) }) + require.NoError(t, db2.Close()) } - require.Panics(t, func() { db2.DropPrefix([]byte("key0")) }) - require.NoError(t, db2.Close()) } func TestDropPrefixRace(t *testing.T) { From 4ee968f08f3de445bcef349b1fe846667f8585f0 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 3 Jun 2020 18:46:32 +0530 Subject: [PATCH 13/16] Stop compaction and memtable flush --- db.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/db.go b/db.go index cb0273811..3077bf5bb 100644 --- a/db.go +++ b/db.go @@ -378,9 +378,13 @@ func Open(opt Options) (db *DB, err error) { go db.doWrites(replayCloser) if err = db.vlog.open(db, vptr, db.replayFunction()); err != nil { - // Perform cleanup. + // Perform cleanup. Stop all the goroutines that been started so far. replayCloser.SignalAndWait() db.closers.updateSize.SignalAndWait() + if !opt.ReadOnly { + db.closers.memtable.SignalAndWait() + db.closers.compactors.SignalAndWait() + } db.blockCache.Close() db.bfCache.Close() db.orc.Stop() From 19782ff44af42e5ca9bd87adf16776ac851768e4 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 3 Jun 2020 18:53:47 +0530 Subject: [PATCH 14/16] Cleanup on error --- db.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/db.go b/db.go index 3077bf5bb..5378fe212 100644 --- a/db.go +++ b/db.go @@ -346,6 +346,7 @@ func Open(opt Options) (db *DB, err error) { // newLevelsController potentially loads files in directory. if db.lc, err = newLevelsController(db, &manifest); err != nil { + db.closers.updateSize.SignalAndWait() return nil, err } @@ -366,6 +367,11 @@ func Open(opt Options) (db *DB, err error) { // Need to pass with timestamp, lsm get removes the last 8 bytes and compares key vs, err := db.get(headKey) if err != nil { + db.closers.updateSize.SignalAndWait() + if !opt.ReadOnly { + db.closers.compactors.SignalAndWait() + db.closers.memtable.SignalAndWait() + } return nil, errors.Wrap(err, "Retrieving head") } db.orc.nextTxnTs = vs.Version From 55a3af106c0a1e173203a7a1c22298738d2af53c Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 5 Jun 2020 15:10:49 +0530 Subject: [PATCH 15/16] add db.cleanup --- db.go | 48 +++++++++++++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/db.go b/db.go index 5378fe212..715607202 100644 --- a/db.go +++ b/db.go @@ -291,6 +291,12 @@ func Open(opt Options) (db *DB, err error) { orc: newOracle(opt), pub: newPublisher(), } + // Cleanup all the goroutines started by badger in case of an error. + defer func() { + if err != nil { + db.cleanup() + } + }() if opt.MaxCacheSize > 0 { config := ristretto.Config{ @@ -346,7 +352,6 @@ func Open(opt Options) (db *DB, err error) { // newLevelsController potentially loads files in directory. if db.lc, err = newLevelsController(db, &manifest); err != nil { - db.closers.updateSize.SignalAndWait() return nil, err } @@ -367,11 +372,6 @@ func Open(opt Options) (db *DB, err error) { // Need to pass with timestamp, lsm get removes the last 8 bytes and compares key vs, err := db.get(headKey) if err != nil { - db.closers.updateSize.SignalAndWait() - if !opt.ReadOnly { - db.closers.compactors.SignalAndWait() - db.closers.memtable.SignalAndWait() - } return nil, errors.Wrap(err, "Retrieving head") } db.orc.nextTxnTs = vs.Version @@ -384,18 +384,7 @@ func Open(opt Options) (db *DB, err error) { go db.doWrites(replayCloser) if err = db.vlog.open(db, vptr, db.replayFunction()); err != nil { - // Perform cleanup. Stop all the goroutines that been started so far. replayCloser.SignalAndWait() - db.closers.updateSize.SignalAndWait() - if !opt.ReadOnly { - db.closers.memtable.SignalAndWait() - db.closers.compactors.SignalAndWait() - } - db.blockCache.Close() - db.bfCache.Close() - db.orc.Stop() - db.vlog.Close() - return nil, y.Wrapf(err, "During db.vlog.open") } replayCloser.SignalAndWait() // Wait for replay to be applied first. @@ -426,6 +415,31 @@ func Open(opt Options) (db *DB, err error) { return db, nil } +// cleanup stops all the goroutines started by badger. This is used in open to +// cleanup goroutines in case of an error. +func (db *DB) cleanup() { + db.blockCache.Close() + db.bfCache.Close() + db.stopMemoryFlush() + db.stopCompactions() + + if db.closers.updateSize != nil { + db.closers.updateSize.Signal() + } + if db.closers.valueGC != nil { + db.closers.valueGC.Signal() + } + if db.closers.writes != nil { + db.closers.writes.Signal() + } + if db.closers.pub != nil { + db.closers.pub.Signal() + } + + db.orc.Stop() + db.vlog.Close() +} + // DataCacheMetrics returns the metrics for the underlying data cache. func (db *DB) DataCacheMetrics() *ristretto.Metrics { if db.blockCache != nil { From fc68008c0804ebe06ac82414fc073366fc08e86d Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 5 Jun 2020 15:34:59 +0530 Subject: [PATCH 16/16] fixup --- db.go | 10 +++++----- value.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/db.go b/db.go index 715607202..fd3e85ace 100644 --- a/db.go +++ b/db.go @@ -295,6 +295,7 @@ func Open(opt Options) (db *DB, err error) { defer func() { if err != nil { db.cleanup() + db = nil } }() @@ -328,7 +329,6 @@ func Open(opt Options) (db *DB, err error) { return nil, errors.Wrap(err, "failed to create bf cache") } } - if db.opt.InMemory { db.opt.SyncWrites = false // If badger is running in memory mode, push everything into the LSM Tree. @@ -343,7 +343,7 @@ func Open(opt Options) (db *DB, err error) { } if db.registry, err = OpenKeyRegistry(krOpt); err != nil { - return nil, err + return db, err } db.calculateSize() db.closers.updateSize = y.NewCloser(1) @@ -352,7 +352,7 @@ func Open(opt Options) (db *DB, err error) { // newLevelsController potentially loads files in directory. if db.lc, err = newLevelsController(db, &manifest); err != nil { - return nil, err + return db, err } // Initialize vlog struct. @@ -372,7 +372,7 @@ func Open(opt Options) (db *DB, err error) { // Need to pass with timestamp, lsm get removes the last 8 bytes and compares key vs, err := db.get(headKey) if err != nil { - return nil, errors.Wrap(err, "Retrieving head") + return db, errors.Wrap(err, "Retrieving head") } db.orc.nextTxnTs = vs.Version var vptr valuePointer @@ -385,7 +385,7 @@ func Open(opt Options) (db *DB, err error) { if err = db.vlog.open(db, vptr, db.replayFunction()); err != nil { replayCloser.SignalAndWait() - return nil, y.Wrapf(err, "During db.vlog.open") + return db, y.Wrapf(err, "During db.vlog.open") } replayCloser.SignalAndWait() // Wait for replay to be applied first. diff --git a/value.go b/value.go index b68ef10ba..3bcac0e38 100644 --- a/value.go +++ b/value.go @@ -1219,7 +1219,7 @@ func (lf *logFile) init() error { } func (vlog *valueLog) Close() error { - if vlog.db.opt.InMemory { + if vlog == nil || vlog.db == nil || vlog.db.opt.InMemory { return nil } // close flushDiscardStats.