Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R]db: freezer batch compatible offline prunblock command #1005

Merged
merged 5 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 39 additions & 9 deletions cmd/geth/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,18 +330,48 @@ func accessDb(ctx *cli.Context, stack *node.Node) (ethdb.Database, error) {
}

func pruneBlock(ctx *cli.Context) error {
stack, config := makeConfigNode(ctx)
var (
stack *node.Node
config gethConfig
chaindb ethdb.Database
err error

oldAncientPath string
newAncientPath string
blockAmountReserved uint64
blockpruner *pruner.BlockPruner
)

stack, config = makeConfigNode(ctx)
defer stack.Close()
blockAmountReserved := ctx.GlobalUint64(utils.BlockAmountReserved.Name)
chaindb, err := accessDb(ctx, stack)
blockAmountReserved = ctx.GlobalUint64(utils.BlockAmountReserved.Name)
chaindb, err = accessDb(ctx, stack)
if err != nil {
return err
}
var newAncientPath string
oldAncientPath := ctx.GlobalString(utils.AncientFlag.Name)
if !filepath.IsAbs(oldAncientPath) {
// force absolute paths, which often fail due to the splicing of relative paths
return errors.New("datadir.ancient not abs path")

// Most of the problems reported by users when first using the prune-block
// tool are due to incorrect directory settings.Here, the default directory
// and relative directory are canceled, and the user is forced to formulate
// an absolute path to guide users to run the prune-block command correctly.
if !ctx.GlobalIsSet(utils.DataDirFlag.Name) {
return errors.New("datadir must be set")
joeylichang marked this conversation as resolved.
Show resolved Hide resolved
} else {
datadir := ctx.GlobalString(utils.DataDirFlag.Name)
if !filepath.IsAbs(datadir) {
// force absolute paths, which often fail due to the splicing of relative paths
return errors.New("datadir not abs path")
}
}

if !ctx.GlobalIsSet(utils.AncientFlag.Name) {
return errors.New("datadir.ancient must be set")
} else {
oldAncientPath = ctx.GlobalString(utils.AncientFlag.Name)
if !filepath.IsAbs(oldAncientPath) {
// force absolute paths, which often fail due to the splicing of relative paths
return errors.New("datadir.ancient not abs path")
}
}

path, _ := filepath.Split(oldAncientPath)
Expand All @@ -350,7 +380,7 @@ func pruneBlock(ctx *cli.Context) error {
}
newAncientPath = filepath.Join(path, "ancient_back")

blockpruner := pruner.NewBlockPruner(chaindb, stack, oldAncientPath, newAncientPath, blockAmountReserved)
blockpruner = pruner.NewBlockPruner(chaindb, stack, oldAncientPath, newAncientPath, blockAmountReserved)

lock, exist, err := fileutil.Flock(filepath.Join(oldAncientPath, "PRUNEFLOCK"))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ var (
}
AncientFlag = DirectoryFlag{
Name: "datadir.ancient",
Usage: "Data directory for ancient chain segments (default = inside chaindata)",
Usage: "Data directory for ancient chain segments (default = inside chaindata, '${datadir}/geth/chaindata/ancient/')",
}
DiffFlag = DirectoryFlag{
Name: "datadir.diff",
Expand Down
21 changes: 7 additions & 14 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,10 @@ func NewDatabase(db ethdb.KeyValueStore) ethdb.Database {
// NewFreezerDb only create a freezer without statedb.
func NewFreezerDb(db ethdb.KeyValueStore, frz, namespace string, readonly bool, newOffSet uint64) (*freezer, error) {
// Create the idle freezer instance, this operation should be atomic to avoid mismatch between offset and acientDB.
frdb, err := newFreezer(frz, namespace, readonly, freezerTableSize, FreezerNoSnappy)
frdb, err := newFreezer(frz, namespace, readonly, newOffSet, freezerTableSize, FreezerNoSnappy)
if err != nil {
return nil, err
}
frdb.offset = newOffSet
frdb.frozen += newOffSet
return frdb, nil
}

Expand Down Expand Up @@ -213,12 +211,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st

if ReadAncientType(db) == PruneFreezerType {
log.Warn("prune ancinet flag is set, may start fail, can add pruneancient parameter resolve")
}

// Create the idle freezer instance
frdb, err := newFreezer(freezer, namespace, readonly, freezerTableSize, FreezerNoSnappy)
if err != nil {
return nil, err
return nil, errors.New("pruneancient was set, please add pruneancient parameter")
joeylichang marked this conversation as resolved.
Show resolved Hide resolved
}

var offset uint64
Expand All @@ -229,11 +222,11 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st
offset = ReadOffSetOfCurrentAncientFreezer(db)
}

frdb.offset = offset

// Some blocks in ancientDB may have already been frozen and been pruned, so adding the offset to
// reprensent the absolute number of blocks already frozen.
frdb.frozen += offset
// Create the idle freezer instance
frdb, err := newFreezer(freezer, namespace, readonly, offset, freezerTableSize, FreezerNoSnappy)
if err != nil {
return nil, err
joeylichang marked this conversation as resolved.
Show resolved Hide resolved
}

// Since the freezer can be stored separately from the user's key-value database,
// there's a fairly high probability that the user requests invalid combinations
Expand Down
10 changes: 8 additions & 2 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type freezer struct {
//
// The 'tables' argument defines the data tables. If the value of a map
// entry is true, snappy compression is disabled for the table.
func newFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*freezer, error) {
func newFreezer(datadir string, namespace string, readonly bool, offset uint64, maxTableSize uint32, tables map[string]bool) (*freezer, error) {
// Create the initial freezer object
var (
readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
Expand Down Expand Up @@ -131,6 +131,7 @@ func newFreezer(datadir string, namespace string, readonly bool, maxTableSize ui
instanceLock: lock,
trigger: make(chan chan struct{}),
quit: make(chan struct{}),
offset: offset,
}

// Create the tables.
Expand Down Expand Up @@ -162,10 +163,14 @@ func newFreezer(datadir string, namespace string, readonly bool, maxTableSize ui
return nil, err
}

// Some blocks in ancientDB may have already been frozen and been pruned, so adding the offset to
// reprensent the absolute number of blocks already frozen.
freezer.frozen += offset

// Create the write batch.
freezer.writeBatch = newFreezerBatch(freezer)

log.Info("Opened ancient database", "database", datadir, "readonly", readonly)
log.Info("Opened ancient database", "database", datadir, "readonly", readonly, "frozen", freezer.frozen)
return freezer, nil
}

Expand Down Expand Up @@ -368,6 +373,7 @@ func (f *freezer) repair() error {
return err
}
}
log.Info("AncientDB item count", "items", min)
atomic.StoreUint64(&f.frozen, min)
return nil
}
Expand Down
16 changes: 11 additions & 5 deletions core/rawdb/freezer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type freezerBatch struct {
func newFreezerBatch(f *freezer) *freezerBatch {
batch := &freezerBatch{tables: make(map[string]*freezerTableBatch, len(f.tables))}
for kind, table := range f.tables {
batch.tables[kind] = table.newBatch()
batch.tables[kind] = table.newBatch(f.offset)
}
return batch
}
Expand Down Expand Up @@ -91,11 +91,15 @@ type freezerTableBatch struct {
indexBuffer []byte
curItem uint64 // expected index of next append
totalBytes int64 // counts written bytes since reset
offset uint64
}

// newBatch creates a new batch for the freezer table.
func (t *freezerTable) newBatch() *freezerTableBatch {
batch := &freezerTableBatch{t: t}
func (t *freezerTable) newBatch(offset uint64) *freezerTableBatch {
var batch = &freezerTableBatch{
t: t,
offset: offset,
}
if !t.noCompression {
batch.sb = new(snappyBuffer)
}
Expand All @@ -107,7 +111,8 @@ func (t *freezerTable) newBatch() *freezerTableBatch {
func (batch *freezerTableBatch) reset() {
batch.dataBuffer = batch.dataBuffer[:0]
batch.indexBuffer = batch.indexBuffer[:0]
batch.curItem = atomic.LoadUint64(&batch.t.items)
curItem := batch.t.items + batch.offset
batch.curItem = atomic.LoadUint64(&curItem)
batch.totalBytes = 0
}

Expand Down Expand Up @@ -201,7 +206,8 @@ func (batch *freezerTableBatch) commit() error {

// Update headBytes of table.
batch.t.headBytes += dataSize
atomic.StoreUint64(&batch.t.items, batch.curItem)
items := batch.curItem - batch.offset
atomic.StoreUint64(&batch.t.items, items)

// Update metrics.
batch.t.sizeGauge.Inc(dataSize + indexSize)
Expand Down
18 changes: 9 additions & 9 deletions core/rawdb/freezer_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestFreezerBasicsClosing(t *testing.T) {
// In-between writes, the table is closed and re-opened.
for x := 0; x < 255; x++ {
data := getChunk(15, x)
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(uint64(x), data))
require.NoError(t, batch.commit())
f.Close()
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
t.Errorf("Expected error for missing index entry")
}
// We should now be able to store items again, from item = 1
batch := f.newBatch()
batch := f.newBatch(0)
for x := 1; x < 0xff; x++ {
require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x)))
}
Expand Down Expand Up @@ -412,7 +412,7 @@ func TestFreezerRepairFirstFile(t *testing.T) {
t.Fatal(err)
}
// Write 80 bytes, splitting out into two files
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(0, getChunk(40, 0xFF)))
require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xEE)))
require.NoError(t, batch.commit())
Expand Down Expand Up @@ -450,7 +450,7 @@ func TestFreezerRepairFirstFile(t *testing.T) {
}

// Write 40 bytes
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xDD)))
require.NoError(t, batch.commit())

Expand Down Expand Up @@ -507,7 +507,7 @@ func TestFreezerReadAndTruncate(t *testing.T) {
f.truncate(0)

// Write the data again
batch := f.newBatch()
batch := f.newBatch(0)
for x := 0; x < 30; x++ {
require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x)))
}
Expand All @@ -529,7 +529,7 @@ func TestFreezerOffset(t *testing.T) {
}

// Write 6 x 20 bytes, splitting out into three files
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))

Expand Down Expand Up @@ -592,7 +592,7 @@ func TestFreezerOffset(t *testing.T) {
t.Log(f.dumpIndexString(0, 100))

// It should allow writing item 6.
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99)))
require.NoError(t, batch.commit())

Expand Down Expand Up @@ -710,7 +710,7 @@ func getChunk(size int, b int) []byte {
func writeChunks(t *testing.T, ft *freezerTable, n int, length int) {
t.Helper()

batch := ft.newBatch()
batch := ft.newBatch(0)
for i := 0; i < n; i++ {
if err := batch.AppendRaw(uint64(i), getChunk(length, i)); err != nil {
t.Fatalf("AppendRaw(%d, ...) returned error: %v", i, err)
Expand Down Expand Up @@ -906,7 +906,7 @@ func TestFreezerReadonly(t *testing.T) {

// Case 5: Now write some data via a batch.
// This should fail either during AppendRaw or Commit
batch := f.newBatch()
batch := f.newBatch(0)
writeErr := batch.AppendRaw(32, make([]byte, 1))
if writeErr == nil {
writeErr = batch.commit()
Expand Down
12 changes: 6 additions & 6 deletions core/rawdb/freezer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestFreezerModifyRollback(t *testing.T) {

// Reopen and check that the rolled-back data doesn't reappear.
tables := map[string]bool{"test": true}
f2, err := newFreezer(dir, "", false, 2049, tables)
f2, err := newFreezer(dir, "", false, 0, 2049, tables)
if err != nil {
t.Fatalf("can't reopen freezer after failed ModifyAncients: %v", err)
}
Expand Down Expand Up @@ -262,17 +262,17 @@ func TestFreezerReadonlyValidate(t *testing.T) {
defer os.RemoveAll(dir)
// Open non-readonly freezer and fill individual tables
// with different amount of data.
f, err := newFreezer(dir, "", false, 2049, tables)
f, err := newFreezer(dir, "", false, 0, 2049, tables)
if err != nil {
t.Fatal("can't open freezer", err)
}
var item = make([]byte, 1024)
aBatch := f.tables["a"].newBatch()
aBatch := f.tables["a"].newBatch(0)
require.NoError(t, aBatch.AppendRaw(0, item))
require.NoError(t, aBatch.AppendRaw(1, item))
require.NoError(t, aBatch.AppendRaw(2, item))
require.NoError(t, aBatch.commit())
bBatch := f.tables["b"].newBatch()
bBatch := f.tables["b"].newBatch(0)
require.NoError(t, bBatch.AppendRaw(0, item))
require.NoError(t, bBatch.commit())
if f.tables["a"].items != 3 {
Expand All @@ -285,7 +285,7 @@ func TestFreezerReadonlyValidate(t *testing.T) {

// Re-openening as readonly should fail when validating
// table lengths.
f, err = newFreezer(dir, "", true, 2049, tables)
f, err = newFreezer(dir, "", true, 0, 2049, tables)
if err == nil {
t.Fatal("readonly freezer should fail with differing table lengths")
}
Expand All @@ -300,7 +300,7 @@ func newFreezerForTesting(t *testing.T, tables map[string]bool) (*freezer, strin
}
// note: using low max table size here to ensure the tests actually
// switch between multiple files.
f, err := newFreezer(dir, "", false, 2049, tables)
f, err := newFreezer(dir, "", false, 0, 2049, tables)
if err != nil {
t.Fatal("can't open freezer", err)
}
Expand Down
4 changes: 3 additions & 1 deletion core/rawdb/prunedfreezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ func newPrunedFreezer(datadir string, db ethdb.KeyValueStore) (*prunedfreezer, e
func (f *prunedfreezer) repair(datadir string) error {
// compatible prune-block-tool
offset := ReadOffSetOfCurrentAncientFreezer(f.db)
log.Info("Read last offline prune-block start block number", "offset", offset)

// compatible freezer
min := uint64(math.MaxUint64)
for name, disableSnappy := range FreezerNoSnappy {
table, err := NewFreezerTable(datadir, name, disableSnappy, true)
table, err := NewFreezerTable(datadir, name, disableSnappy, false)
if err != nil {
return err
}
Expand All @@ -76,6 +77,7 @@ func (f *prunedfreezer) repair(datadir string) error {
}
table.Close()
}
log.Info("Read ancientdb item counts", "items", min)
offset += min

if frozen := ReadFrozenOfAncientFreezer(f.db); frozen > offset {
Expand Down
5 changes: 4 additions & 1 deletion core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,10 @@ func (p *BlockPruner) backUpOldDb(name string, cache, handles int, namespace str
return consensus.ErrUnknownAncestor
}
// Write into new ancient_back db.
rawdb.WriteAncientBlocks(frdbBack, []*types.Block{block}, []types.Receipts{receipts}, td)
if _, err := rawdb.WriteAncientBlocks(frdbBack, []*types.Block{block}, []types.Receipts{receipts}, td); err != nil {
log.Error("failed to write new ancient", "error", err)
return err
}
// Print the log every 5s for better trace.
if common.PrettyDuration(time.Since(start)) > common.PrettyDuration(5*time.Second) {
log.Info("block backup process running successfully", "current blockNumber for backup", blockNumber)
Expand Down