Skip to content

Commit

Permalink
Merge pull request ethereum#22762 from karalabe/snap-lower-complexity
Browse files Browse the repository at this point in the history
core, eth, ethdb, trie: simplify range proofs
  • Loading branch information
karalabe committed Apr 29, 2021
2 parents a81cf0d + fae165a commit 64b60c7
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 237 deletions.
5 changes: 0 additions & 5 deletions core/rawdb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,6 @@ func (b *tableBatch) Delete(key []byte) error {
return b.batch.Delete(append([]byte(b.prefix), key...))
}

// KeyCount retrieves the number of keys queued up for writing.
func (b *tableBatch) KeyCount() int {
return b.batch.KeyCount()
}

// ValueSize retrieves the amount of data queued up for writing.
func (b *tableBatch) ValueSize() int {
return b.batch.ValueSize()
Expand Down
2 changes: 1 addition & 1 deletion core/state/snapshot/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix
}
// Verify the snapshot segment with range prover, ensure that all flat states
// in this range correspond to merkle trie.
_, cont, err := trie.VerifyRangeProof(root, origin, last, keys, vals, proof)
cont, err := trie.VerifyRangeProof(root, origin, last, keys, vals, proof)
return &proofResult{
keys: keys,
vals: vals,
Expand Down
121 changes: 64 additions & 57 deletions eth/protocols/snap/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,8 @@ type storageResponse struct {
accounts []common.Hash // Account hashes requested, may be only partially filled
roots []common.Hash // Storage roots requested, may be only partially filled

hashes [][]common.Hash // Storage slot hashes in the returned range
slots [][][]byte // Storage slot values in the returned range
nodes []ethdb.KeyValueStore // Database containing the reconstructed trie nodes
hashes [][]common.Hash // Storage slot hashes in the returned range
slots [][][]byte // Storage slot values in the returned range

cont bool // Whether the last storage range has a continuation
}
Expand Down Expand Up @@ -680,12 +679,22 @@ func (s *Syncer) loadSyncStatus() {
}
s.tasks = progress.Tasks
for _, task := range s.tasks {
task.genBatch = s.db.NewBatch()
task.genBatch = ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
task.genTrie = trie.NewStackTrie(task.genBatch)

for _, subtasks := range task.SubTasks {
for _, subtask := range subtasks {
subtask.genBatch = s.db.NewBatch()
subtask.genBatch = ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
subtask.genTrie = trie.NewStackTrie(task.genBatch)
}
}
Expand Down Expand Up @@ -729,7 +738,12 @@ func (s *Syncer) loadSyncStatus() {
// Make sure we don't overflow if the step is not a proper divisor
last = common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
}
batch := s.db.NewBatch()
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
s.tasks = append(s.tasks, &accountTask{
Next: next,
Last: last,
Expand All @@ -746,19 +760,14 @@ func (s *Syncer) loadSyncStatus() {
func (s *Syncer) saveSyncStatus() {
// Serialize any partial progress to disk before spinning down
for _, task := range s.tasks {
keys, bytes := task.genBatch.KeyCount(), task.genBatch.ValueSize()
if err := task.genBatch.Write(); err != nil {
log.Error("Failed to persist account slots", "err", err)
}
s.accountBytes += common.StorageSize(keys*common.HashLength + bytes)

for _, subtasks := range task.SubTasks {
for _, subtask := range subtasks {
keys, bytes := subtask.genBatch.KeyCount(), subtask.genBatch.ValueSize()
if err := subtask.genBatch.Write(); err != nil {
log.Error("Failed to persist storage slots", "err", err)
}
s.accountBytes += common.StorageSize(keys*common.HashLength + bytes)
}
}
}
Expand Down Expand Up @@ -1763,12 +1772,15 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
if res.subTask != nil {
res.subTask.req = nil
}
batch := s.db.NewBatch()

batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
var (
slots int
nodes int
bytes common.StorageSize
slots int
oldStorageBytes = s.storageBytes
)
// Iterate over all the accounts and reconstruct their storage tries from the
// delivered slots
Expand Down Expand Up @@ -1829,7 +1841,12 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
r := newHashRange(lastKey, chunks)

// Our first task is the one that was just filled by this response.
batch := s.db.NewBatch()
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
tasks = append(tasks, &storageTask{
Next: common.Hash{},
Last: r.End(),
Expand All @@ -1838,7 +1855,12 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
genTrie: trie.NewStackTrie(batch),
})
for r.Next() {
batch := s.db.NewBatch()
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
tasks = append(tasks, &storageTask{
Next: r.Start(),
Last: r.End(),
Expand Down Expand Up @@ -1883,27 +1905,23 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
}
}
}
// Iterate over all the reconstructed trie nodes and push them to disk
// if the contract is fully delivered. If it's chunked, the trie nodes
// will be reconstructed later.
// Iterate over all the complete contracts, reconstruct the trie nodes and
// push them to disk. If the contract is chunked, the trie nodes will be
// reconstructed later.
slots += len(res.hashes[i])

if i < len(res.hashes)-1 || res.subTask == nil {
it := res.nodes[i].NewIterator(nil, nil)
for it.Next() {
batch.Put(it.Key(), it.Value())

bytes += common.StorageSize(common.HashLength + len(it.Value()))
nodes++
tr := trie.NewStackTrie(batch)
for j := 0; j < len(res.hashes[i]); j++ {
tr.Update(res.hashes[i][j][:], res.slots[i][j])
}
it.Release()
tr.Commit()
}
// Persist the received storage segements. These flat state maybe
// outdated during the sync, but it can be fixed later during the
// snapshot generation.
for j := 0; j < len(res.hashes[i]); j++ {
rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j])
bytes += common.StorageSize(1 + 2*common.HashLength + len(res.slots[i][j]))

// If we're storing large contracts, generate the trie nodes
// on the fly to not trash the gluing points
Expand All @@ -1926,25 +1944,20 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
}
}
}
if data := res.subTask.genBatch.ValueSize(); data > ethdb.IdealBatchSize || res.subTask.done {
keys := res.subTask.genBatch.KeyCount()
if res.subTask.genBatch.ValueSize() > ethdb.IdealBatchSize || res.subTask.done {
if err := res.subTask.genBatch.Write(); err != nil {
log.Error("Failed to persist stack slots", "err", err)
}
res.subTask.genBatch.Reset()

bytes += common.StorageSize(keys*common.HashLength + data)
nodes += keys
}
}
// Flush anything written just now and update the stats
if err := batch.Write(); err != nil {
log.Crit("Failed to persist storage slots", "err", err)
}
s.storageSynced += uint64(slots)
s.storageBytes += bytes

log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "nodes", nodes, "bytes", bytes)
log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "bytes", s.storageBytes-oldStorageBytes)

// If this delivery completed the last pending task, forward the account task
// to the next chunk
Expand Down Expand Up @@ -2042,18 +2055,20 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
// Persist the received account segements. These flat state maybe
// outdated during the sync, but it can be fixed later during the
// snapshot generation.
var (
nodes int
bytes common.StorageSize
)
batch := s.db.NewBatch()
oldAccountBytes := s.accountBytes

batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
for i, hash := range res.hashes {
if task.needCode[i] || task.needState[i] {
break
}
slim := snapshot.SlimAccountRLP(res.accounts[i].Nonce, res.accounts[i].Balance, res.accounts[i].Root, res.accounts[i].CodeHash)
rawdb.WriteAccountSnapshot(batch, hash, slim)
bytes += common.StorageSize(1 + common.HashLength + len(slim))

// If the task is complete, drop it into the stack trie to generate
// account trie nodes for it
Expand All @@ -2069,7 +2084,6 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
if err := batch.Write(); err != nil {
log.Crit("Failed to persist accounts", "err", err)
}
s.accountBytes += bytes
s.accountSynced += uint64(len(res.accounts))

// Task filling persisted, push it the chunk marker forward to the first
Expand All @@ -2091,17 +2105,13 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
log.Error("Failed to commit stack account", "err", err)
}
}
if data := task.genBatch.ValueSize(); data > ethdb.IdealBatchSize || task.done {
keys := task.genBatch.KeyCount()
if task.genBatch.ValueSize() > ethdb.IdealBatchSize || task.done {
if err := task.genBatch.Write(); err != nil {
log.Error("Failed to persist stack account", "err", err)
}
task.genBatch.Reset()

nodes += keys
bytes += common.StorageSize(keys*common.HashLength + data)
}
log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "nodes", nodes, "bytes", bytes)
log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "bytes", s.accountBytes-oldAccountBytes)
}

// OnAccounts is a callback method to invoke when a range of accounts are
Expand Down Expand Up @@ -2176,7 +2186,7 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
if len(keys) > 0 {
end = keys[len(keys)-1]
}
_, cont, err := trie.VerifyRangeProof(root, req.origin[:], end, keys, accounts, proofdb)
cont, err := trie.VerifyRangeProof(root, req.origin[:], end, keys, accounts, proofdb)
if err != nil {
logger.Warn("Account range failed proof", "err", err)
// Signal this request as failed, and ready for rescheduling
Expand Down Expand Up @@ -2393,10 +2403,8 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
s.lock.Unlock()

// Reconstruct the partial tries from the response and verify them
var (
dbs = make([]ethdb.KeyValueStore, len(hashes))
cont bool
)
var cont bool

for i := 0; i < len(hashes); i++ {
// Convert the keys and proofs into an internal format
keys := make([][]byte, len(hashes[i]))
Expand All @@ -2413,7 +2421,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
if len(nodes) == 0 {
// No proof has been attached, the response must cover the entire key
// space and hash to the origin root.
dbs[i], _, err = trie.VerifyRangeProof(req.roots[i], nil, nil, keys, slots[i], nil)
_, err = trie.VerifyRangeProof(req.roots[i], nil, nil, keys, slots[i], nil)
if err != nil {
s.scheduleRevertStorageRequest(req) // reschedule request
logger.Warn("Storage slots failed proof", "err", err)
Expand All @@ -2428,7 +2436,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
if len(keys) > 0 {
end = keys[len(keys)-1]
}
dbs[i], cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb)
cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], end, keys, slots[i], proofdb)
if err != nil {
s.scheduleRevertStorageRequest(req) // reschedule request
logger.Warn("Storage range failed proof", "err", err)
Expand All @@ -2444,7 +2452,6 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
roots: req.roots,
hashes: hashes,
slots: slots,
nodes: dbs,
cont: cont,
}
select {
Expand Down
28 changes: 25 additions & 3 deletions ethdb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ const IdealBatchSize = 100 * 1024
type Batch interface {
KeyValueWriter

// KeyCount retrieves the number of keys queued up for writing.
KeyCount() int

// ValueSize retrieves the amount of data queued up for writing.
ValueSize() int

Expand All @@ -47,3 +44,28 @@ type Batcher interface {
// until a final write is called.
NewBatch() Batch
}

// HookedBatch wraps an arbitrary batch where each operation may be hooked into
// to monitor from black box code.
type HookedBatch struct {
Batch

OnPut func(key []byte, value []byte) // Callback if a key is inserted
OnDelete func(key []byte) // Callback if a key is deleted
}

// Put inserts the given value into the key-value data store.
func (b HookedBatch) Put(key []byte, value []byte) error {
if b.OnPut != nil {
b.OnPut(key, value)
}
return b.Batch.Put(key, value)
}

// Delete removes the key from the key-value data store.
func (b HookedBatch) Delete(key []byte) error {
if b.OnDelete != nil {
b.OnDelete(key)
}
return b.Batch.Delete(key)
}
9 changes: 1 addition & 8 deletions ethdb/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,6 @@ func (db *Database) meter(refresh time.Duration) {
type batch struct {
db *leveldb.DB
b *leveldb.Batch
keys int
size int
}

Expand All @@ -462,16 +461,10 @@ func (b *batch) Put(key, value []byte) error {
// Delete inserts the a key removal into the batch for later committing.
func (b *batch) Delete(key []byte) error {
b.b.Delete(key)
b.keys++
b.size += len(key)
return nil
}

// KeyCount retrieves the number of keys queued up for writing.
func (b *batch) KeyCount() int {
return b.keys
}

// ValueSize retrieves the amount of data queued up for writing.
func (b *batch) ValueSize() int {
return b.size
Expand All @@ -485,7 +478,7 @@ func (b *batch) Write() error {
// Reset resets the batch for reuse.
func (b *batch) Reset() {
b.b.Reset()
b.keys, b.size = 0, 0
b.size = 0
}

// Replay replays the batch contents.
Expand Down
Loading

0 comments on commit 64b60c7

Please sign in to comment.