Skip to content

Commit

Permalink
core/rawdb: freezer index repair (#29792)
Browse files Browse the repository at this point in the history
This pull request removes the `fsync` of index files in freezer.ModifyAncients function for 
performance gain.

Originally, fsync is added after each freezer write operation to ensure
the written data is truly transferred into disk. Unfortunately, it turns 
out `fsync` can be relatively slow, especially on
macOS (see #28754 for more
information). 

In this pull request, fsync for index file is removed as it turns out
index file can be recovered even after a unclean shutdown. But fsync for data file is still kept, as
we have no meaningful way to validate the data correctness after unclean shutdown.

---

**But why do we need the `fsync` in the first place?** 

As it's necessary for freezer to survive/recover after the machine crash
(e.g. power failure).
In linux, whenever the file write is performed, the file metadata update
and data update are
not necessarily performed at the same time. Typically, the metadata will
be flushed/journalled
ahead of the file data. Therefore, we make the pessimistic assumption
that the file is first
extended with invalid "garbage" data (normally zero bytes) and that
afterwards the correct
data replaces the garbage. 

We have observed that the index file of the freezer often contain
garbage entry with zero value
(filenumber = 0, offset = 0) after a machine power failure. It proves
that the index file is extended
without the data being flushed. And this corruption can destroy the
whole freezer data eventually.

Performing fsync after each write operation can reduce the time window
for data to be transferred
to the disk and ensure the correctness of the data in the disk to the
greatest extent.

---

**How can we maintain this guarantee without relying on fsync?**

Because the items in the index file are strictly in order, we can
leverage this characteristic to
detect the corruption and truncate them when freezer is opened.
Specifically these validation
rules are performed for each index file:

For two consecutive index items:

- If their file numbers are the same, then the offset of the latter one
MUST not be less than that of the former.
- If the file number of the latter one is equal to that of the former
plus one, then the offset of the latter one MUST not be 0.
- If their file numbers are not equal, and the latter's file number is
not equal to the former plus 1, the latter one is valid

And also, for the first non-head item, it must refer to the earliest
data file, or the next file if the
earliest file is not sufficient to place the first item(very special
case, only theoretical possible
in tests)

With these validation rules, we can detect the invalid item in index
file with greatest possibility.

--- 

But unfortunately, these scenarios are not covered and could still lead
to a freezer corruption if it occurs:

**All items in index file are in zero value**

It's impossible to distinguish if they are truly zero (e.g. all the data
entries maintained in freezer
are zero size) or just the garbage left by OS. In this case, these index
items will be kept by truncating
the entire data file, namely the freezer is corrupted.

However, we can consider that the probability of this situation
occurring is quite low, and even
if it occurs, the freezer can be considered to be close to an empty
state. Rerun the state sync
should be acceptable.

**Index file is integral while relative data file is corrupted**

It might be possible the data file is corrupted whose file size is
extended correctly with garbage
filled (e.g. zero bytes). In this case, it's impossible to detect the
corruption by index validation.

We can either choose to `fsync` the data file, or blindly believe that
if index file is integral then
the data file could be integral with very high chance. In this pull
request, the first option is taken.
  • Loading branch information
rjl493456442 authored and holiman committed Nov 19, 2024
1 parent 75d1d8a commit ee4daa1
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 14 deletions.
11 changes: 3 additions & 8 deletions core/rawdb/freezer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,10 @@ func (batch *freezerTableBatch) maybeCommit() error {
return nil
}

// commit writes the batched items to the backing freezerTable.
// commit writes the batched items to the backing freezerTable. Note index
// file isn't fsync'd after the file write, the recent write can be lost
// after the power failure.
func (batch *freezerTableBatch) commit() error {
// Write data. The head file is fsync'd after write to ensure the
// data is truly transferred to disk.
_, err := batch.t.head.Write(batch.dataBuffer)
if err != nil {
return err
Expand All @@ -194,15 +194,10 @@ func (batch *freezerTableBatch) commit() error {
dataSize := int64(len(batch.dataBuffer))
batch.dataBuffer = batch.dataBuffer[:0]

// Write indices. The index file is fsync'd after write to ensure the
// data indexes are truly transferred to disk.
_, err = batch.t.index.Write(batch.indexBuffer)
if err != nil {
return err
}
if err := batch.t.index.Sync(); err != nil {
return err
}
indexSize := int64(len(batch.indexBuffer))
batch.indexBuffer = batch.indexBuffer[:0]

Expand Down
137 changes: 136 additions & 1 deletion core/rawdb/freezer_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package rawdb

import (
"bufio"
"bytes"
"encoding/binary"
"errors"
Expand All @@ -26,6 +27,7 @@ import (
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -219,7 +221,13 @@ func (t *freezerTable) repair() error {
return err
} // New file can't trigger this path
}
// Retrieve the file sizes and prepare for truncation
// Validate the index file as it might contain some garbage data after the
// power failures.
if err := t.repairIndex(); err != nil {
return err
}
// Retrieve the file sizes and prepare for truncation. Note the file size
// might be changed after index validation.
if stat, err = t.index.Stat(); err != nil {
return err
}
Expand Down Expand Up @@ -364,6 +372,133 @@ func (t *freezerTable) repair() error {
return nil
}

// repairIndex validates the integrity of the index file. According to the design,
// the initial entry in the file denotes the earliest data file along with the
// count of deleted items. Following this, all subsequent entries in the file must
// be in order. This function identifies any corrupted entries and truncates items
// occurring after the corruption point.
//
// corruption can occur because of the power failure. In the Linux kernel, the
// file metadata update and data update are not necessarily performed at the
// same time. Typically, the metadata will be flushed/journalled ahead of the file
// data. Therefore, we make the pessimistic assumption that the file is first
// extended with invalid "garbage" data (normally zero bytes) and that afterwards
// the correct data replaces the garbage. As all the items in index file are
// supposed to be in-order, the leftover garbage must be truncated before the
// index data is utilized.
//
// It's important to note an exception that's unfortunately undetectable: when
// all index entries in the file are zero. Distinguishing whether they represent
// leftover garbage or if all items in the table have zero size is impossible.
// In such instances, the file will remain unchanged to prevent potential data
// loss or misinterpretation.
func (t *freezerTable) repairIndex() error {
// Retrieve the file sizes and prepare for validation
stat, err := t.index.Stat()
if err != nil {
return err
}
size := stat.Size()

// Move the read cursor to the beginning of the file
_, err = t.index.Seek(0, io.SeekStart)
if err != nil {
return err
}
fr := bufio.NewReader(t.index)

var (
start = time.Now()
buff = make([]byte, indexEntrySize)
prev indexEntry
head indexEntry

read = func() (indexEntry, error) {
n, err := io.ReadFull(fr, buff)
if err != nil {
return indexEntry{}, err
}
if n != indexEntrySize {
return indexEntry{}, fmt.Errorf("failed to read from index, n: %d", n)
}
var entry indexEntry
entry.unmarshalBinary(buff)
return entry, nil
}
truncate = func(offset int64) error {
if t.readonly {
return fmt.Errorf("index file is corrupted at %d, size: %d", offset, size)
}
if err := truncateFreezerFile(t.index, offset); err != nil {
return err
}
log.Warn("Truncated index file", "offset", offset, "truncated", size-offset)
return nil
}
)
for offset := int64(0); offset < size; offset += indexEntrySize {
entry, err := read()
if err != nil {
return err
}
if offset == 0 {
head = entry
continue
}
// Ensure that the first non-head index refers to the earliest file,
// or the next file if the earliest file has no space to place the
// first item.
if offset == indexEntrySize {
if entry.filenum != head.filenum && entry.filenum != head.filenum+1 {
log.Error("Corrupted index item detected", "earliest", head.filenum, "filenumber", entry.filenum)
return truncate(offset)
}
prev = entry
continue
}
// ensure two consecutive index items are in order
if err := t.checkIndexItems(prev, entry); err != nil {
log.Error("Corrupted index item detected", "err", err)
return truncate(offset)
}
prev = entry
}
// Move the read cursor to the end of the file. While theoretically, the
// cursor should reach the end by reading all the items in the file, perform
// the seek operation anyway as a precaution.
_, err = t.index.Seek(0, io.SeekEnd)
if err != nil {
return err
}
log.Debug("Verified index file", "items", size/indexEntrySize, "elapsed", common.PrettyDuration(time.Since(start)))
return nil
}

// checkIndexItems validates the correctness of two consecutive index items based
// on the following rules:
//
// - The file number of two consecutive index items must either be the same or
// increase monotonically. If the file number decreases or skips in a
// non-sequential manner, the index item is considered invalid.
//
// - For index items with the same file number, the data offset must be in
// non-decreasing order. Note: Two index items with the same file number
// and the same data offset are permitted if the entry size is zero.
//
// - The first index item in a new data file must not have a zero data offset.
func (t *freezerTable) checkIndexItems(a, b indexEntry) error {
if b.filenum != a.filenum && b.filenum != a.filenum+1 {
return fmt.Errorf("index items with inconsistent file number, prev: %d, next: %d", a.filenum, b.filenum)
}
if b.filenum == a.filenum && b.offset < a.offset {
return fmt.Errorf("index items with unordered offset, prev: %d, next: %d", a.offset, b.offset)
}
if b.filenum == a.filenum+1 && b.offset == 0 {
return fmt.Errorf("index items with zero offset, file number: %d", b.filenum)
}
return nil
}

// preopen opens all files that the freezer will need. This method should be called from an init-context,
// since it assumes that it doesn't have to bother with locking
// The rationale for doing preopen is to not have to do it from within Retrieve, thus not needing to ever
Expand Down
66 changes: 66 additions & 0 deletions core/rawdb/freezer_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1367,3 +1367,69 @@ func TestRandom(t *testing.T) {
t.Fatal(err)
}
}

func TestIndexValidation(t *testing.T) {
const (
items = 30
dataSize = 10
)
garbage := indexEntry{
filenum: 100,
offset: 200,
}
var cases = []struct {
offset int64
data []byte
expItems int
}{
// extend index file with zero bytes at the end
{
offset: (items + 1) * indexEntrySize,
data: make([]byte, indexEntrySize),
expItems: 30,
},
// write garbage in the first non-head item
{
offset: indexEntrySize,
data: garbage.append(nil),
expItems: 0,
},
// write garbage in the first non-head item
{
offset: (items/2 + 1) * indexEntrySize,
data: garbage.append(nil),
expItems: items / 2,
},
}
for _, c := range cases {
fn := fmt.Sprintf("t-%d", rand.Uint64())
f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 100, true, false)
if err != nil {
t.Fatal(err)
}
writeChunks(t, f, items, dataSize)

// write corrupted data
f.index.WriteAt(c.data, c.offset)
f.Close()

// reopen the table, corruption should be truncated
f, err = newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 100, true, false)
if err != nil {
t.Fatal(err)
}
for i := 0; i < c.expItems; i++ {
exp := getChunk(10, i)
got, err := f.Retrieve(uint64(i))
if err != nil {
t.Fatalf("Failed to read from table, %v", err)
}
if !bytes.Equal(exp, got) {
t.Fatalf("Unexpected item data, want: %v, got: %v", exp, got)
}
}
if f.items.Load() != uint64(c.expItems) {
t.Fatalf("Unexpected item number, want: %d, got: %d", c.expItems, f.items.Load())
}
}
}
4 changes: 2 additions & 2 deletions triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest {
force = true
}
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force); err != nil {
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.db.freezer, ndl.cleans, ndl.id, force); err != nil {
return nil, err
}
// To remove outdated history objects from the end, we set the 'tail' parameter
Expand Down Expand Up @@ -267,7 +267,7 @@ func (dl *diskLayer) setBufferSize(size int) error {
if dl.stale {
return errSnapshotStale
}
return dl.buffer.setSize(size, dl.db.diskdb, dl.cleans, dl.id)
return dl.buffer.setSize(size, dl.db.diskdb, dl.db.freezer, dl.cleans, dl.id)
}

// size returns the approximate size of cached nodes in the disk layer.
Expand Down
13 changes: 10 additions & 3 deletions triedb/pathdb/nodebuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ func (b *nodebuffer) empty() bool {

// setSize sets the buffer size to the provided number, and invokes a flush
// operation if the current memory usage exceeds the new limit.
func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, freezer ethdb.AncientStore, clean *fastcache.Cache, id uint64) error {
b.limit = uint64(size)
return b.flush(db, clean, id, false)
return b.flush(db, freezer, clean, id, false)
}

// allocBatch returns a database batch with pre-allocated buffer.
Expand All @@ -214,7 +214,7 @@ func (b *nodebuffer) allocBatch(db ethdb.KeyValueStore) ethdb.Batch {

// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error {
func (b *nodebuffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, clean *fastcache.Cache, id uint64, force bool) error {
if b.size <= b.limit && !force {
return nil
}
Expand All @@ -227,6 +227,13 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui
start = time.Now()
batch = b.allocBatch(db)
)
// Explicitly sync the state freezer, ensuring that all written
// data is transferred to disk before updating the key-value store.
if freezer != nil {
if err := freezer.Sync(); err != nil {
return err
}
}
nodes := writeNodes(batch, b.nodes, clean)
rawdb.WritePersistentStateID(batch, id)

Expand Down

0 comments on commit ee4daa1

Please sign in to comment.